diff --git a/CryptoExchange.Net/Clients/SocketApiClient.cs b/CryptoExchange.Net/Clients/SocketApiClient.cs index fbac81e..3c21cbc 100644 --- a/CryptoExchange.Net/Clients/SocketApiClient.cs +++ b/CryptoExchange.Net/Clients/SocketApiClient.cs @@ -454,7 +454,7 @@ namespace CryptoExchange.Net protected virtual MessageListener? AddSubscription(Subscription subscription, bool userSubscription, SocketConnection connection) { var messageListener = new MessageListener(ExchangeHelpers.NextId(), subscription, userSubscription); - if (!connection.AddListener(messageListener, subscription.Identifiers)) + if (!connection.AddListener(messageListener)) return null; return messageListener; @@ -469,7 +469,7 @@ namespace CryptoExchange.Net systemSubscriptions.Add(systemSubscription); var subscription = new MessageListener(ExchangeHelpers.NextId(), systemSubscription, false); foreach (var connection in socketConnections.Values) - connection.AddListener(subscription, null); + connection.AddListener(subscription); } /// @@ -539,11 +539,12 @@ namespace CryptoExchange.Net var socket = CreateSocket(connectionAddress.Data!); var socketConnection = new SocketConnection(_logger, this, socket, address); socketConnection.UnhandledMessage += HandleUnhandledMessage; + socketConnection.UnparsedMessage += HandleUnparsedMessage; foreach (var systemSubscription in systemSubscriptions) { var handler = new MessageListener(ExchangeHelpers.NextId(), systemSubscription, false); - socketConnection.AddListener(handler, null); + socketConnection.AddListener(handler); } return new CallResult(socketConnection); @@ -557,6 +558,14 @@ namespace CryptoExchange.Net { } + /// + /// Process an unparsed message + /// + /// The message that wasn't parsed + protected virtual void HandleUnparsedMessage(byte[] message) + { + } + /// /// Connect a socket /// diff --git a/CryptoExchange.Net/Converters/SocketConverter.cs b/CryptoExchange.Net/Converters/SocketConverter.cs index c6ae41f..be27734 100644 --- a/CryptoExchange.Net/Converters/SocketConverter.cs +++ b/CryptoExchange.Net/Converters/SocketConverter.cs @@ -13,7 +13,6 @@ namespace CryptoExchange.Net.Converters public abstract string[] IdFields { get; } public abstract Type? GetDeserializationType(Dictionary idValues, List listeners); - public abstract List MatchToListener(ParsedMessage message, List listeners); /// public object? ReadJson(Stream stream, List listeners) @@ -24,8 +23,16 @@ namespace CryptoExchange.Net.Converters // Deserialize to the correct type using var sr = new StreamReader(stream, Encoding.UTF8, false, (int)stream.Length, true); using var jsonTextReader = new JsonTextReader(sr); + JToken token; + try + { + token = JToken.Load(jsonTextReader); + } + catch(Exception ex) + { + return null; + } - var token = JToken.Load(jsonTextReader); var dict = new Dictionary(); foreach(var idField in IdFields) { diff --git a/CryptoExchange.Net/Interfaces/IStreamMessageListener.cs b/CryptoExchange.Net/Interfaces/IStreamMessageListener.cs index fc3eb29..9d5bce2 100644 --- a/CryptoExchange.Net/Interfaces/IStreamMessageListener.cs +++ b/CryptoExchange.Net/Interfaces/IStreamMessageListener.cs @@ -1,13 +1,13 @@ -using CryptoExchange.Net.Converters; -using CryptoExchange.Net.Objects.Sockets; -using System.Threading.Tasks; +//using CryptoExchange.Net.Converters; +//using CryptoExchange.Net.Objects.Sockets; +//using System.Threading.Tasks; -namespace CryptoExchange.Net.Interfaces -{ - internal interface IStreamMessageListener - { - int Priority { get; } - bool MessageMatches(ParsedMessage message); - Task ProcessAsync(ParsedMessage message); - } -} +//namespace CryptoExchange.Net.Interfaces +//{ +// internal interface IStreamMessageListener +// { +// int Priority { get; } +// bool MessageMatches(ParsedMessage message); +// Task ProcessAsync(ParsedMessage message); +// } +//} diff --git a/CryptoExchange.Net/Objects/Sockets/PendingRequest.cs b/CryptoExchange.Net/Objects/Sockets/PendingRequest.cs index 94e8d4b..80c50ed 100644 --- a/CryptoExchange.Net/Objects/Sockets/PendingRequest.cs +++ b/CryptoExchange.Net/Objects/Sockets/PendingRequest.cs @@ -6,7 +6,7 @@ using System.Threading.Tasks; namespace CryptoExchange.Net.Objects.Sockets { - internal class PendingRequest : IStreamMessageListener + internal class PendingRequest { public int Id { get; set; } public Func MessageMatchesHandler { get; } diff --git a/CryptoExchange.Net/Objects/Sockets/SocketSubscription.cs b/CryptoExchange.Net/Objects/Sockets/SocketSubscription.cs index b72a837..0037ef2 100644 --- a/CryptoExchange.Net/Objects/Sockets/SocketSubscription.cs +++ b/CryptoExchange.Net/Objects/Sockets/SocketSubscription.cs @@ -10,7 +10,7 @@ namespace CryptoExchange.Net.Objects.Sockets /// /// Socket listener /// - public class MessageListener : IStreamMessageListener + public class MessageListener { /// /// Unique listener id @@ -80,13 +80,6 @@ namespace CryptoExchange.Net.Objects.Sockets /// public int Priority => Subscription is SystemSubscription ? 50 : 1; - /// - /// Check if message matches the subscription - /// - /// - /// - public bool MessageMatches(ParsedMessage message) => Subscription.MessageMatchesEvent(message); - /// /// Process the message /// diff --git a/CryptoExchange.Net/Sockets/SocketConnection.cs b/CryptoExchange.Net/Sockets/SocketConnection.cs index feacdc3..ac0de2d 100644 --- a/CryptoExchange.Net/Sockets/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/SocketConnection.cs @@ -51,13 +51,18 @@ namespace CryptoExchange.Net.Sockets /// public event Action? UnhandledMessage; + /// + /// Unparsed message event + /// + public event Action? UnparsedMessage; + /// /// The amount of listeners on this connection /// public int UserListenerCount { get { lock (_listenerLock) - return _messageIdentifierListeners.Count(h => h.UserListener); } + return _messageIdentifierListeners.Values.Count(h => h.UserListener); } } /// @@ -68,7 +73,7 @@ namespace CryptoExchange.Net.Sockets get { lock (_listenerLock) - return _listeners.Where(h => h.UserListener).ToArray(); + return _messageIdentifierListeners.Values.Where(h => h.UserListener).ToArray(); } } @@ -153,10 +158,8 @@ namespace CryptoExchange.Net.Sockets } private bool _pausedActivity; - //private readonly List _listeners; - //private readonly List _messageListeners; // ? - private readonly List _pendingRequests; + private readonly List _messageListeners; private readonly Dictionary _messageIdentifierListeners; private readonly object _listenerLock = new(); @@ -184,7 +187,8 @@ namespace CryptoExchange.Net.Sockets Properties = new Dictionary(); _pendingRequests = new List(); - _messageIdentifierListeners = new Dictionary(); + _messageListeners = new List(); + _messageIdentifierListeners = new Dictionary(); _socket = socket; _socket.OnStreamMessage += HandleStreamMessage; @@ -215,7 +219,7 @@ namespace CryptoExchange.Net.Sockets Authenticated = false; lock(_listenerLock) { - foreach (var listener in _messageIdentifierListeners.Values) + foreach (var listener in _messageListeners) listener.Confirmed = false; } Task.Run(() => ConnectionClosed?.Invoke()); @@ -231,7 +235,7 @@ namespace CryptoExchange.Net.Sockets Authenticated = false; lock (_listenerLock) { - foreach (var listener in _listeners) + foreach (var listener in _messageListeners) listener.Confirmed = false; } @@ -255,10 +259,10 @@ namespace CryptoExchange.Net.Sockets Status = SocketStatus.Resubscribing; lock (_messageListeners) { - foreach (var pendingRequest in _messageListeners.OfType().ToList()) + foreach (var pendingRequest in _pendingRequests.ToList()) { pendingRequest.Fail(); - _messageListeners.Remove(pendingRequest); + // Remove? } } @@ -298,8 +302,8 @@ namespace CryptoExchange.Net.Sockets protected virtual void HandleRequestSent(int requestId) { PendingRequest pendingRequest; - lock (_messageListeners) - pendingRequest = _messageListeners.OfType().SingleOrDefault(p => p.Id == requestId); + lock (_pendingRequests) + pendingRequest = _pendingRequests.SingleOrDefault(p => p.Id == requestId); if (pendingRequest == null) { @@ -321,32 +325,42 @@ namespace CryptoExchange.Net.Sockets //var streamMessage = new StreamMessage(this, stream, timestamp); TimeSpan userCodeDuration = TimeSpan.Zero; - List listeners; + List listeners; lock (_listenerLock) listeners = _messageListeners.OrderByDescending(x => x.Priority).ToList(); var result = (ParsedMessage)ApiClient.StreamConverter.ReadJson(stream, listeners.OfType().ToList()); // TODO - stream.Position = 0; + if(result == null) + { + stream.Position = 0; + var buffer = new byte[stream.Length]; + stream.Read(buffer, 0, buffer.Length); + UnparsedMessage?.Invoke(buffer); + return; + } - if (result == null) + if (result.Data == null) { _logger.LogWarning("Message not matched to type"); return; } + // TODO lock if (_messageIdentifierListeners.TryGetValue(result.Identifier.ToLowerInvariant(), out var idListener)) { // Matched based on identifier + var userSw = Stopwatch.StartNew(); await idListener.ProcessAsync(result).ConfigureAwait(false); + userSw.Stop(); return; } - foreach (var pendingRequest in _messageListeners.OfType()) + foreach (var pendingRequest in _pendingRequests) { if (pendingRequest.MessageMatchesHandler(result)) { await pendingRequest.ProcessAsync(result).ConfigureAwait(false); - break; + return; } } @@ -447,7 +461,7 @@ namespace CryptoExchange.Net.Sockets lock (_listenerLock) { - foreach (var listener in _messageIdentifierListeners.Values) + foreach (var listener in _messageListeners) { if (listener.CancellationTokenRegistration.HasValue) listener.CancellationTokenRegistration.Value.Dispose(); @@ -467,7 +481,7 @@ namespace CryptoExchange.Net.Sockets { lock (_listenerLock) { - if (!_listeners.Contains(listener)) + if (!_messageListeners.Contains(listener)) return; listener.Closed = true; @@ -492,7 +506,7 @@ namespace CryptoExchange.Net.Sockets return; } - shouldCloseConnection = _listeners.All(r => !r.UserListener || r.Closed); + shouldCloseConnection = _messageIdentifierListeners.All(r => !r.Value.UserListener || r.Value.Closed); if (shouldCloseConnection) Status = SocketStatus.Closing; } @@ -506,7 +520,8 @@ namespace CryptoExchange.Net.Sockets lock (_listenerLock) { _messageListeners.Remove(listener); - _listeners.Remove(listener); + foreach (var id in listener.Subscription.Identifiers) + _messageIdentifierListeners.Remove(id); } } @@ -523,23 +538,22 @@ namespace CryptoExchange.Net.Sockets /// Add a listener to this connection /// /// - public bool AddListener(MessageListener listener, List? listenerIdentifiers) + public bool AddListener(MessageListener listener) { lock (_listenerLock) { if (Status != SocketStatus.None && Status != SocketStatus.Connected) return false; - _listeners.Add(listener); _messageListeners.Add(listener); - if (listenerIdentifiers != null) + if (listener.Subscription.Identifiers != null) { - foreach (var id in listenerIdentifiers) + foreach (var id in listener.Subscription.Identifiers) _messageIdentifierListeners.Add(id.ToLowerInvariant(), listener); } if (listener.UserListener) - _logger.Log(LogLevel.Debug, $"Socket {SocketId} adding new listener with id {listener.Id}, total listeners on connection: {_listeners.Count(s => s.UserListener)}"); + _logger.Log(LogLevel.Debug, $"Socket {SocketId} adding new listener with id {listener.Id}, total listeners on connection: {_messageListeners.Count(s => s.UserListener)}"); return true; } } @@ -551,7 +565,7 @@ namespace CryptoExchange.Net.Sockets public MessageListener? GetListener(int id) { lock (_listenerLock) - return _listeners.SingleOrDefault(s => s.Id == id); + return _messageListeners.SingleOrDefault(s => s.Id == id); } /// @@ -562,7 +576,7 @@ namespace CryptoExchange.Net.Sockets public MessageListener? GetListenerByRequest(Func predicate) { lock(_listenerLock) - return _listeners.SingleOrDefault(s => predicate(s.Subscription)); + return _messageListeners.SingleOrDefault(s => predicate(s.Subscription)); } /// @@ -580,7 +594,7 @@ namespace CryptoExchange.Net.Sockets var pending = new PendingRequest(ExchangeHelpers.NextId(), handler, timeout, listener); lock (_messageListeners) { - _messageListeners.Add(pending); + _pendingRequests.Add(pending); } var sendOk = Send(pending.Id, obj, weight); @@ -651,7 +665,7 @@ namespace CryptoExchange.Net.Sockets bool anySubscriptions = false; lock (_listenerLock) - anySubscriptions = _listeners.Any(s => s.UserListener); + anySubscriptions = _messageListeners.Any(s => s.UserListener); if (!anySubscriptions) { @@ -663,7 +677,7 @@ namespace CryptoExchange.Net.Sockets bool anyAuthenticated = false; lock (_listenerLock) - anyAuthenticated = _listeners.Any(s => s.Authenticated); + anyAuthenticated = _messageListeners.Any(s => s.Authenticated); if (anyAuthenticated) { @@ -683,7 +697,7 @@ namespace CryptoExchange.Net.Sockets List listenerList = new List(); lock (_listenerLock) { - foreach (var listener in _listeners) + foreach (var listener in _messageListeners) { if (listener.Subscription != null) listenerList.Add(listener); diff --git a/CryptoExchange.Net/Sockets/Subscription.cs b/CryptoExchange.Net/Sockets/Subscription.cs index bcac606..704103c 100644 --- a/CryptoExchange.Net/Sockets/Subscription.cs +++ b/CryptoExchange.Net/Sockets/Subscription.cs @@ -72,7 +72,7 @@ namespace CryptoExchange.Net.Sockets /// /// /// - public abstract bool MessageMatchesEvent(ParsedMessage message); + //public abstract bool MessageMatchesEvent(ParsedMessage message); /// /// Handle the update message ///