diff --git a/CryptoExchange.Net/Converters/SocketConverter.cs b/CryptoExchange.Net/Converters/SocketConverter.cs index 176da59..e9cfc8c 100644 --- a/CryptoExchange.Net/Converters/SocketConverter.cs +++ b/CryptoExchange.Net/Converters/SocketConverter.cs @@ -8,12 +8,27 @@ using System.Text; namespace CryptoExchange.Net.Converters { + /// + /// Socket message converter + /// public abstract class SocketConverter { + /// + /// Fields to use for the message subscription identifier + /// public virtual string[]? SubscriptionIdFields => null; + /// + /// Fields to use for the message type identifier + /// public abstract string[] TypeIdFields { get; } - public abstract Type? GetDeserializationType(Dictionary idValues, List listeners); + /// + /// Return the type of object that the message should be parsed to based on the type id values dictionary + /// + /// + /// + /// + public abstract Type? GetDeserializationType(Dictionary idValues, List listeners); /// public ParsedMessage? ReadJson(Stream stream, List listeners, bool outputOriginalData) @@ -37,8 +52,9 @@ namespace CryptoExchange.Net.Converters { token = JToken.Load(jsonTextReader); } - catch(Exception ex) + catch(Exception) { + // Not a json message return null; } @@ -48,62 +64,47 @@ namespace CryptoExchange.Net.Converters token = token.First!; } - var typeIdDict = new Dictionary(); - foreach(var idField in TypeIdFields) + var typeIdDict = new Dictionary(); + string idString = ""; + foreach (var idField in TypeIdFields) { - var splitTokens = idField.Split(new char[] { ':' }); - var accessToken = token; - foreach (var splitToken in splitTokens) - { - accessToken = accessToken[splitToken]; - - if (accessToken == null) - break; - - if (accessToken.Type == JTokenType.Array) - { - // Received array, take first item as reference - accessToken = accessToken.First!; - } - } - - typeIdDict[idField] = accessToken?.ToString(); + var val = GetValueForKey(token, idField); + idString += val; + typeIdDict[idField] = val; } - string idString = ""; if (SubscriptionIdFields != null) { + idString = ""; foreach (var idField in SubscriptionIdFields) - { - var splitTokens = idField.Split(new char[] { ':' }); - var accessToken = token; - foreach (var splitToken in splitTokens) - { - accessToken = accessToken[splitToken]; - if (accessToken == null) - break; - } - idString += accessToken?.ToString(); - } - } - else - { - foreach (var item in typeIdDict) - idString += item.Value; + idString += GetValueForKey(token, idField); } result.Identifier = idString; var resultType = GetDeserializationType(typeIdDict, listeners); result.Data = resultType == null ? null : token.ToObject(resultType); - return result; } - } - public class ParsedMessage - { - public string Identifier { get; set; } = null!; - public string? OriginalData { get; set; } - public object? Data { get; set; } + private string? GetValueForKey(JToken token, string key) + { + var splitTokens = key.Split(new char[] { ':' }); + var accessToken = token; + foreach (var splitToken in splitTokens) + { + accessToken = accessToken[splitToken]; + + if (accessToken == null) + break; + + if (accessToken.Type == JTokenType.Array) + { + // Received array, take first item as reference + accessToken = accessToken.First!; + } + } + + return accessToken?.ToString(); + } } } diff --git a/CryptoExchange.Net/Objects/Sockets/ParsedMessage.cs b/CryptoExchange.Net/Objects/Sockets/ParsedMessage.cs new file mode 100644 index 0000000..6a8a0e7 --- /dev/null +++ b/CryptoExchange.Net/Objects/Sockets/ParsedMessage.cs @@ -0,0 +1,21 @@ +namespace CryptoExchange.Net.Objects.Sockets +{ + /// + /// Parsed message object + /// + public class ParsedMessage + { + /// + /// Identifier string + /// + public string Identifier { get; set; } = null!; + /// + /// Original data if the option is enabled + /// + public string? OriginalData { get; set; } + /// + /// Parsed data object + /// + public object? Data { get; set; } + } +} diff --git a/CryptoExchange.Net/Sockets/SocketConnection.cs b/CryptoExchange.Net/Sockets/SocketConnection.cs index 99ac127..814967c 100644 --- a/CryptoExchange.Net/Sockets/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/SocketConnection.cs @@ -322,14 +322,13 @@ namespace CryptoExchange.Net.Sockets protected virtual async Task HandleStreamMessage(Stream stream) { var timestamp = DateTime.UtcNow; - //var streamMessage = new StreamMessage(this, stream, timestamp); TimeSpan userCodeDuration = TimeSpan.Zero; List listeners; lock (_listenerLock) listeners = _messageListeners.OrderByDescending(x => x.Priority).ToList(); - var result = ApiClient.StreamConverter.ReadJson(stream, listeners.OfType().ToList(), ApiClient.ApiOptions.OutputOriginalData ?? ApiClient.ClientOptions.OutputOriginalData); // TODO + var result = ApiClient.StreamConverter.ReadJson(stream, listeners, ApiClient.ApiOptions.OutputOriginalData ?? ApiClient.ClientOptions.OutputOriginalData); // TODO if(result == null) { stream.Position = 0; @@ -358,78 +357,42 @@ namespace CryptoExchange.Net.Sockets return; } - foreach (var pendingRequest in _pendingRequests) + List pendingRequests; + lock (_pendingRequests) + pendingRequests = _pendingRequests.ToList(); + + foreach (var pendingRequest in pendingRequests) { if (pendingRequest.MessageMatchesHandler(result)) { - await pendingRequest.ProcessAsync(result).ConfigureAwait(false); + lock (_pendingRequests) + _pendingRequests.Remove(pendingRequest); + + if (pendingRequest.Completed) + { + // Answer to a timed out request, unsub if it is a subscription request + if (pendingRequest.MessageListener != null) + { + _logger.Log(LogLevel.Warning, $"Socket {SocketId} Received subscription info after request timed out; unsubscribing. Consider increasing the RequestTimeout"); + _ = UnsubscribeAsync(pendingRequest.MessageListener).ConfigureAwait(false); + } + } + else + { + _logger.Log(LogLevel.Trace, $"Socket {SocketId} - msg {pendingRequest.Id} - received data matched to pending request"); + await pendingRequest.ProcessAsync(result).ConfigureAwait(false); + } + return; } } - _logger.LogWarning("Message not matched"); // TODO - return; - - //if (_messageIdentifierListeners.TryGetValue(result.Identifier.ToLowerInvariant(), out var idListener)) - //{ - // var userSw = Stopwatch.StartNew(); - // await idListener.ProcessAsync(streamMessage).ConfigureAwait(false); - // userSw.Stop(); - // userCodeDuration = userSw.Elapsed; - // handledResponse = true; - //} - //else - //{ - // foreach (var listener in listeners) - // { - // if (listener.MessageMatches(streamMessage)) - // { - // if (listener is PendingRequest pendingRequest) - // { - // lock (_messageListeners) - // _messageListeners.Remove(pendingRequest); - - // if (pendingRequest.Completed) - // { - // // Answer to a timed out request, unsub if it is a subscription request - // if (pendingRequest.MessageListener != null) - // { - // _logger.Log(LogLevel.Warning, $"Socket {SocketId} Received subscription info after request timed out; unsubscribing. Consider increasing the RequestTimeout"); - // _ = UnsubscribeAsync(pendingRequest.MessageListener).ConfigureAwait(false); - // } - // } - // else - // { - // _logger.Log(LogLevel.Trace, $"Socket {SocketId} - msg {pendingRequest.Id} - received data matched to pending request"); - // await pendingRequest.ProcessAsync(streamMessage).ConfigureAwait(false); - // } - - // if (!ApiClient.ContinueOnQueryResponse) - // return; - - // handledResponse = true; - // break; - // } - // else if (listener is MessageListener subscription) - // { - // currentSubscription = subscription; - // handledResponse = true; - // var userSw = Stopwatch.StartNew(); - // await subscription.ProcessAsync(streamMessage).ConfigureAwait(false); - // userSw.Stop(); - // userCodeDuration = userSw.Elapsed; - // break; - // } - // } - // } - //} - - //if (!handledResponse) - //{ - // if (!ApiClient.UnhandledMessageExpected) - // _logger.Log(LogLevel.Warning, $"Socket {SocketId} Message not handled: " + streamMessage.Get(ParsingUtils.GetString)); - // UnhandledMessage?.Invoke(streamMessage); - //} + stream.Position = 0; + var unhandledBuffer = new byte[stream.Length]; + stream.Read(unhandledBuffer, 0, unhandledBuffer.Length); + + _logger.Log(LogLevel.Warning, $"Socket {SocketId} Message not handled: " + Encoding.UTF8.GetString(unhandledBuffer)); + UnhandledMessage?.Invoke(result); } /// diff --git a/CryptoExchange.Net/Sockets/Subscription.cs b/CryptoExchange.Net/Sockets/Subscription.cs index 704103c..ed98d79 100644 --- a/CryptoExchange.Net/Sockets/Subscription.cs +++ b/CryptoExchange.Net/Sockets/Subscription.cs @@ -1,12 +1,7 @@ -using CryptoExchange.Net.Converters; -using CryptoExchange.Net.Interfaces; -using CryptoExchange.Net.Objects; +using CryptoExchange.Net.Objects; using CryptoExchange.Net.Objects.Sockets; using Microsoft.Extensions.Logging; -using Newtonsoft.Json; using System.Collections.Generic; -using System.IO; -using System.Text; using System.Threading.Tasks; namespace CryptoExchange.Net.Sockets @@ -16,8 +11,6 @@ namespace CryptoExchange.Net.Sockets /// public abstract class Subscription { - private bool _outputOriginalData; - /// /// Logger /// @@ -28,18 +21,19 @@ namespace CryptoExchange.Net.Sockets /// public bool Authenticated { get; } + /// + /// Strings to identify this subscription with + /// public abstract List Identifiers { get; } /// /// ctor /// /// - /// /// - public Subscription(ILogger logger, ISocketApiClient apiClient, bool authenticated) + public Subscription(ILogger logger, bool authenticated) { _logger = logger; - _outputOriginalData = apiClient.ApiOptions.OutputOriginalData ?? apiClient.ClientOptions.OutputOriginalData; Authenticated = authenticated; } @@ -67,52 +61,11 @@ namespace CryptoExchange.Net.Sockets /// public abstract (bool, CallResult?) MessageMatchesUnsubRequest(ParsedMessage message); - /// - /// Check if the message is an update for this subscription - /// - /// - /// - //public abstract bool MessageMatchesEvent(ParsedMessage message); /// /// Handle the update message /// /// /// public abstract Task HandleEventAsync(DataEvent message); - - ///// - ///// Create a data event - ///// - ///// - ///// - ///// - ///// - ///// - ///// - //protected virtual DataEvent CreateDataEvent(T obj, ParsedMessage message, string? topic = null, SocketUpdateType? type = null) - //{ - // string? originalData = null; - // if (_outputOriginalData) - // originalData = message.Get(ParsingUtils.GetString); - - // return new DataEvent(obj, topic, originalData, message.Timestamp, type); - //} - - ///// - ///// Deserialize the message to an object using Json.Net - ///// - ///// - ///// - ///// - ///// - //protected virtual Task> DeserializeAsync(StreamMessage message, JsonSerializerSettings settings) - //{ - // var serializer = JsonSerializer.Create(settings); - // using var sr = new StreamReader(message.Stream, Encoding.UTF8, false, (int)message.Stream.Length, true); - // using var jsonTextReader = new JsonTextReader(sr); - // var result = serializer.Deserialize(jsonTextReader); - // message.Stream.Position = 0; - // return Task.FromResult(new CallResult(result!)); - //} } } diff --git a/CryptoExchange.Net/Sockets/SystemSubscription.cs b/CryptoExchange.Net/Sockets/SystemSubscription.cs index b3af09e..4781832 100644 --- a/CryptoExchange.Net/Sockets/SystemSubscription.cs +++ b/CryptoExchange.Net/Sockets/SystemSubscription.cs @@ -16,9 +16,8 @@ namespace CryptoExchange.Net.Sockets /// ctor /// /// - /// /// - public SystemSubscription(ILogger logger, ISocketApiClient socketApiClient, bool authenticated = false) : base(logger, socketApiClient, authenticated) + public SystemSubscription(ILogger logger, bool authenticated = false) : base(logger, authenticated) { }