From 1ba66be29fa6a8021ef00a079f8fe9bea2873559 Mon Sep 17 00:00:00 2001 From: JKorf Date: Tue, 14 Nov 2023 20:29:42 +0100 Subject: [PATCH] wip --- .../Clients/BaseSocketClient.cs | 6 +- CryptoExchange.Net/Clients/SocketApiClient.cs | 22 ++- .../Converters/SocketConverter.cs | 19 +- .../Objects/Sockets/ParsedMessage.cs | 10 +- .../Objects/Sockets/PendingRequest.cs | 165 ------------------ .../Sockets/StreamMessageParseCallback.cs | 3 +- CryptoExchange.Net/Sockets/Query.cs | 74 ++++++-- .../Sockets/SocketConnection.cs | 126 ++++++------- .../Sockets/SocketListenerManager.cs | 73 +++++--- CryptoExchange.Net/Sockets/Subscription.cs | 2 +- .../Sockets/SystemSubscription.cs | 2 +- 11 files changed, 214 insertions(+), 288 deletions(-) delete mode 100644 CryptoExchange.Net/Objects/Sockets/PendingRequest.cs diff --git a/CryptoExchange.Net/Clients/BaseSocketClient.cs b/CryptoExchange.Net/Clients/BaseSocketClient.cs index f74aa35..61c877e 100644 --- a/CryptoExchange.Net/Clients/BaseSocketClient.cs +++ b/CryptoExchange.Net/Clients/BaseSocketClient.cs @@ -101,8 +101,10 @@ namespace CryptoExchange.Net public string GetSubscriptionsState() { var result = new StringBuilder(); - foreach(var client in ApiClients.OfType()) - result.AppendLine(client.GetSubscriptionsState()); + foreach (var client in ApiClients.OfType()) + { + result.AppendLine(client.GetSubscriptionsState()); + } return result.ToString(); } } diff --git a/CryptoExchange.Net/Clients/SocketApiClient.cs b/CryptoExchange.Net/Clients/SocketApiClient.cs index 38c0b40..5bd0305 100644 --- a/CryptoExchange.Net/Clients/SocketApiClient.cs +++ b/CryptoExchange.Net/Clients/SocketApiClient.cs @@ -229,7 +229,7 @@ namespace CryptoExchange.Net return new CallResult(new ServerError("Socket is paused")); } - var subQuery = subscription.GetSubQuery(); + var subQuery = subscription.GetSubQuery(socketConnection); if (subQuery != null) { // Send the request and wait for answer @@ -664,12 +664,26 @@ namespace CryptoExchange.Net public string GetSubscriptionsState() { var sb = new StringBuilder(); - sb.AppendLine($"{socketConnections.Count} connections, {CurrentSubscriptions} subscriptions, kbps: {IncomingKbps}"); + sb.AppendLine($"{GetType().Name}"); + sb.AppendLine($" Connections: {socketConnections.Count}"); + sb.AppendLine($" Subscriptions: {CurrentSubscriptions}"); + sb.AppendLine($" Download speed: {IncomingKbps} kbps"); foreach (var connection in socketConnections) { - sb.AppendLine($" Connection {connection.Key}: {connection.Value.UserSubscriptionCount} subscriptions, status: {connection.Value.Status}, authenticated: {connection.Value.Authenticated}, kbps: {connection.Value.IncomingKbps}"); + sb.AppendLine($" Id: {connection.Key}"); + sb.AppendLine($" Address: {connection.Value.ConnectionUri}"); + sb.AppendLine($" Subscriptions: {connection.Value.UserSubscriptionCount}"); + sb.AppendLine($" Status: {connection.Value.Status}"); + sb.AppendLine($" Authenticated: {connection.Value.Authenticated}"); + sb.AppendLine($" Download speed: {connection.Value.IncomingKbps} kbps"); + sb.AppendLine($" Subscriptions:"); foreach (var subscription in connection.Value.Subscriptions) - sb.AppendLine($" Subscription {subscription.Id}, authenticated: {subscription.Authenticated}, confirmed: {subscription.Confirmed}"); + { + sb.AppendLine($" Id: {subscription.Id}"); + sb.AppendLine($" Confirmed: {subscription.Confirmed}"); + sb.AppendLine($" Invocations: {subscription.TotalInvocations}"); + sb.AppendLine($" Identifiers: [{string.Join(", ", subscription.Identifiers)}]"); + } } return sb.ToString(); } diff --git a/CryptoExchange.Net/Converters/SocketConverter.cs b/CryptoExchange.Net/Converters/SocketConverter.cs index 7fd87bf..cf7972b 100644 --- a/CryptoExchange.Net/Converters/SocketConverter.cs +++ b/CryptoExchange.Net/Converters/SocketConverter.cs @@ -63,7 +63,7 @@ namespace CryptoExchange.Net.Converters } PostInspectResult? inspectResult = null; - Dictionary typeIdDict = new Dictionary(); + Dictionary typeIdDict = new Dictionary(); object? usedParser = null; if (token.Type == JTokenType.Object) { @@ -75,8 +75,11 @@ namespace CryptoExchange.Net.Converters var value = typeIdDict.TryGetValue(field, out var cachedValue) ? cachedValue : GetValueForKey(token, field); if (value == null) { - allFieldsPresent = false; - break; + if (callback.AllFieldPresentNeeded) + { + allFieldsPresent = false; + break; + } } typeIdDict[field] = value; @@ -86,7 +89,8 @@ namespace CryptoExchange.Net.Converters { inspectResult = callback.Callback(typeIdDict, processors); usedParser = callback; - break; + if (inspectResult.Type != null) + break; } } } @@ -124,7 +128,12 @@ namespace CryptoExchange.Net.Converters if (usedParser == null) throw new Exception("No parser found for message"); - var instance = InterpreterPipeline.ObjectInitializer(token, inspectResult.Type); + BaseParsedMessage instance; + if (inspectResult.Type != null) + instance = InterpreterPipeline.ObjectInitializer(token, inspectResult.Type); + else + instance = new ParsedMessage(null); + if (outputOriginalData) { stream.Position = 0; diff --git a/CryptoExchange.Net/Objects/Sockets/ParsedMessage.cs b/CryptoExchange.Net/Objects/Sockets/ParsedMessage.cs index 9968f48..ec74ae6 100644 --- a/CryptoExchange.Net/Objects/Sockets/ParsedMessage.cs +++ b/CryptoExchange.Net/Objects/Sockets/ParsedMessage.cs @@ -17,6 +17,12 @@ /// If parsed /// public bool Parsed { get; set; } + + /// + /// Get the data object + /// + /// + public abstract object Data { get; } } /// @@ -28,7 +34,9 @@ /// /// Parsed data object /// - public T? Data { get; set; } + public override object? Data { get; } + + public T? TypedData => (T)Data; /// /// ctor diff --git a/CryptoExchange.Net/Objects/Sockets/PendingRequest.cs b/CryptoExchange.Net/Objects/Sockets/PendingRequest.cs deleted file mode 100644 index b0eef4a..0000000 --- a/CryptoExchange.Net/Objects/Sockets/PendingRequest.cs +++ /dev/null @@ -1,165 +0,0 @@ -using CryptoExchange.Net.Sockets; -using System; -using System.Threading; -using System.Threading.Tasks; - -namespace CryptoExchange.Net.Objects.Sockets -{ - /// - /// Pending socket request - /// - public abstract class BasePendingRequest - { - /// - /// Request id - /// - public int Id { get; set; } - /// - /// If the request is completed - /// - public bool Completed { get; protected set; } - /// - /// The response object type - /// - public abstract Type? ResponseType { get; } - /// - /// Timer event - /// - public DateTime RequestTimestamp { get; set; } - /// - /// The request object - /// - public object Request { get; set; } - /// - /// The result - /// - public abstract CallResult? Result { get; set; } - - protected AsyncResetEvent _event; - protected TimeSpan _timeout; - protected CancellationTokenSource? _cts; - - /// - /// ctor - /// - /// - /// - /// - protected BasePendingRequest(int id, object request, TimeSpan timeout) - { - Id = id; - _event = new AsyncResetEvent(false, false); - _timeout = timeout; - Request = request; - RequestTimestamp = DateTime.UtcNow; - } - - /// - /// Signal that the request has been send and the timeout timer should start - /// - public void IsSend() - { - // Start timeout countdown - _cts = new CancellationTokenSource(_timeout); - _cts.Token.Register(Timeout, false); - } - - /// - /// Wait untill timeout or the request is competed - /// - /// - /// - public async Task WaitAsync(TimeSpan timeout) => await _event.WaitAsync(timeout).ConfigureAwait(false); - - /// - /// Mark request as timeout - /// - public abstract void Timeout(); - - /// - /// Mark request as failed - /// - /// - public abstract void Fail(string error); - - /// - /// Process a response - /// - /// - /// - public abstract Task ProcessAsync(DataEvent message); - } - - /// - /// Pending socket request - /// - /// The response data type - public class PendingRequest : BasePendingRequest - { - /// - public override CallResult? Result { get; set; } - /// - /// The typed call result - /// - public CallResult? TypedResult => (CallResult?)Result; - /// - /// Data handler - /// - public Func>, Task>> Handler { get; } - /// - /// The response object type - /// - public override Type? ResponseType => typeof(T); - - /// - /// ctor - /// - /// - /// - /// - /// - private PendingRequest(int id, object request, Func>, Task>> messageHandler, TimeSpan timeout) - : base(id, request, timeout) - { - Handler = messageHandler; - } - - /// - /// Create a new pending request for provided query - /// - /// - /// - /// - public static PendingRequest CreateForQuery(Query query, int id) - { - return new PendingRequest(id, query.Request, async x => - { - var response = await query.HandleMessageAsync(x).ConfigureAwait(false); - return response.As(response.Data); - }, TimeSpan.FromSeconds(5)); - } - - /// - public override void Timeout() - { - Completed = true; - Result = new CallResult(new CancellationRequestedError()); - } - - /// - public override void Fail(string error) - { - Result = new CallResult(new ServerError(error)); - Completed = true; - _event.Set(); - } - - /// - public override async Task ProcessAsync(DataEvent message) - { - Completed = true; - Result = await Handler(message.As((ParsedMessage)message.Data)).ConfigureAwait(false); - _event.Set(); - } - } -} diff --git a/CryptoExchange.Net/Objects/Sockets/StreamMessageParseCallback.cs b/CryptoExchange.Net/Objects/Sockets/StreamMessageParseCallback.cs index 484441a..0037715 100644 --- a/CryptoExchange.Net/Objects/Sockets/StreamMessageParseCallback.cs +++ b/CryptoExchange.Net/Objects/Sockets/StreamMessageParseCallback.cs @@ -23,8 +23,9 @@ namespace CryptoExchange.Net.Objects.Sockets public class PostInspectCallback { + public bool AllFieldPresentNeeded { get; set; } = true; public List TypeFields { get; set; } = new List(); - public Func, Dictionary, PostInspectResult> Callback { get; set; } + public Func, Dictionary, PostInspectResult> Callback { get; set; } } public class PostInspectArrayCallback diff --git a/CryptoExchange.Net/Sockets/Query.cs b/CryptoExchange.Net/Sockets/Query.cs index 5ee2909..9983e0f 100644 --- a/CryptoExchange.Net/Sockets/Query.cs +++ b/CryptoExchange.Net/Sockets/Query.cs @@ -3,6 +3,7 @@ using CryptoExchange.Net.Objects; using CryptoExchange.Net.Objects.Sockets; using System; using System.Collections.Generic; +using System.Threading; using System.Threading.Tasks; namespace CryptoExchange.Net.Sockets @@ -16,6 +17,14 @@ namespace CryptoExchange.Net.Sockets /// Unique identifier /// public int Id { get; } = ExchangeHelpers.NextId(); + + public bool Completed { get; set; } + public DateTime RequestTimestamp { get; set; } + public CallResult? Result { get; set; } + + protected AsyncResetEvent _event; + protected CancellationTokenSource? _cts; + /// /// Strings to identify this subscription with /// @@ -36,11 +45,6 @@ namespace CryptoExchange.Net.Sockets /// public int Weight { get; } - /// - /// The pending request for this query - /// - public BasePendingRequest? PendingRequest { get; private set; } - public abstract Type ExpectedMessageType { get; } /// @@ -51,26 +55,41 @@ namespace CryptoExchange.Net.Sockets /// public BaseQuery(object request, bool authenticated, int weight = 1) { + _event = new AsyncResetEvent(false, false); + Authenticated = authenticated; Request = request; Weight = weight; } /// - /// Create a pending request for this query + /// Signal that the request has been send and the timeout timer should start /// - public BasePendingRequest CreatePendingRequest() + public void IsSend(TimeSpan timeout) { - PendingRequest = GetPendingRequest(Id); - return PendingRequest; + // Start timeout countdown + RequestTimestamp = DateTime.UtcNow; + _cts = new CancellationTokenSource(timeout); + _cts.Token.Register(Timeout, false); } /// - /// Create a pending request for this query + /// Wait untill timeout or the request is competed /// - /// + /// /// - public abstract BasePendingRequest GetPendingRequest(int id); + public async Task WaitAsync(TimeSpan timeout) => await _event.WaitAsync(timeout).ConfigureAwait(false); + + /// + /// Mark request as timeout + /// + public abstract void Timeout(); + + /// + /// Mark request as failed + /// + /// + public abstract void Fail(string error); /// /// Handle a response message @@ -89,6 +108,11 @@ namespace CryptoExchange.Net.Sockets { public override Type ExpectedMessageType => typeof(TResponse); + /// + /// The typed call result + /// + public CallResult? TypedResult => (CallResult?)Result; + /// /// ctor /// @@ -102,8 +126,10 @@ namespace CryptoExchange.Net.Sockets /// public override async Task HandleMessageAsync(DataEvent message) { - await PendingRequest!.ProcessAsync(message).ConfigureAwait(false); - return await HandleMessageAsync(message.As((ParsedMessage)message.Data)).ConfigureAwait(false); + Completed = true; + Result = await HandleMessageAsync(message.As((ParsedMessage)message.Data)).ConfigureAwait(false); + _event.Set(); + return Result; } /// @@ -111,9 +137,25 @@ namespace CryptoExchange.Net.Sockets /// /// /// - public virtual Task> HandleMessageAsync(DataEvent> message) => Task.FromResult(new CallResult(message.Data.Data!)); + public virtual Task> HandleMessageAsync(DataEvent> message) => Task.FromResult(new CallResult(message.Data.TypedData!)); /// - public override BasePendingRequest GetPendingRequest(int id) => PendingRequest.CreateForQuery(this, id); + public override void Timeout() + { + if (Completed) + return; + + Completed = true; + Result = new CallResult(new CancellationRequestedError()); + _event.Set(); + } + + /// + public override void Fail(string error) + { + Result = new CallResult(new ServerError(error)); + Completed = true; + _event.Set(); + } } } diff --git a/CryptoExchange.Net/Sockets/SocketConnection.cs b/CryptoExchange.Net/Sockets/SocketConnection.cs index 00b718e..7e69979 100644 --- a/CryptoExchange.Net/Sockets/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/SocketConnection.cs @@ -1,7 +1,6 @@ using CryptoExchange.Net.Interfaces; using System; using System.Collections.Generic; -using System.Diagnostics; using System.Linq; using System.Threading.Tasks; using Newtonsoft.Json; @@ -11,7 +10,6 @@ using System.Net.WebSockets; using System.IO; using CryptoExchange.Net.Objects.Sockets; using System.Text; -using System.Collections.Concurrent; namespace CryptoExchange.Net.Sockets { @@ -58,7 +56,7 @@ namespace CryptoExchange.Net.Sockets /// /// The amount of subscriptions on this connection /// - public int UserSubscriptionCount => _subscriptions.Count(h => h.UserSubscription); + public int UserSubscriptionCount => _listenerManager.GetSubscriptions().Count(h => h.UserSubscription); /// /// Get a copy of the current message subscriptions @@ -67,14 +65,14 @@ namespace CryptoExchange.Net.Sockets { get { - return _subscriptions.ToArray(h => h.UserSubscription); + return _listenerManager.GetSubscriptions().Where(h => h.UserSubscription).ToArray(); } } /// /// If the connection has been authenticated /// - public bool Authenticated { get; internal set; } + public bool Authenticated { get; set; } /// /// If connection is made @@ -152,9 +150,7 @@ namespace CryptoExchange.Net.Sockets } private bool _pausedActivity; - //private readonly ConcurrentList _pendingRequests; - //private readonly ConcurrentList _subscriptions; - private readonly SocketListenerManager _messageIdMap; + private readonly SocketListenerManager _listenerManager; private readonly ILogger _logger; private SocketStatus _status; @@ -177,10 +173,6 @@ namespace CryptoExchange.Net.Sockets Tag = tag; Properties = new Dictionary(); - //_pendingRequests = new ConcurrentList(); - //_subscriptions = new ConcurrentList(); - _messageIdMap = new SocketListenerManager(_logger); - _socket = socket; _socket.OnStreamMessage += HandleStreamMessage; _socket.OnRequestSent += HandleRequestSent; @@ -190,6 +182,8 @@ namespace CryptoExchange.Net.Sockets _socket.OnReconnected += HandleReconnected; _socket.OnError += HandleError; _socket.GetReconnectionUrl = GetReconnectionUrlAsync; + + _listenerManager = new SocketListenerManager(_logger, SocketId); } /// @@ -209,9 +203,15 @@ namespace CryptoExchange.Net.Sockets Status = SocketStatus.Closed; Authenticated = false; - foreach (var subscription in _messageIdMap.GetSubscriptions()) + foreach (var subscription in _listenerManager.GetSubscriptions()) subscription.Confirmed = false; + foreach (var query in _listenerManager.GetQueries()) + { + query.Fail("Connection interupted"); + _listenerManager.Remove(query); + } + Task.Run(() => ConnectionClosed?.Invoke()); } @@ -224,9 +224,15 @@ namespace CryptoExchange.Net.Sockets DisconnectTime = DateTime.UtcNow; Authenticated = false; - foreach (var subscription in _messageIdMap.GetSubscriptions()) + foreach (var subscription in _listenerManager.GetSubscriptions()) subscription.Confirmed = false; + foreach (var query in _listenerManager.GetQueries()) + { + query.Fail("Connection interupted"); + _listenerManager.Remove(query); + } + _ = Task.Run(() => ConnectionLost?.Invoke()); } @@ -246,10 +252,10 @@ namespace CryptoExchange.Net.Sockets { Status = SocketStatus.Resubscribing; - foreach (var pendingRequest in _pendingRequests.ToList()) + foreach (var query in _listenerManager.GetQueries()) { - pendingRequest.Fail("Connection interupted"); - // Remove? + query.Fail("Connection interupted"); + _listenerManager.Remove(query); } var reconnectSuccessful = await ProcessReconnectAsync().ConfigureAwait(false); @@ -287,14 +293,14 @@ namespace CryptoExchange.Net.Sockets /// Id of the request sent protected virtual void HandleRequestSent(int requestId) { - var pendingRequest = _pendingRequests.SingleOrDefault(p => p.Id == requestId); - if (pendingRequest == null) + var query = _listenerManager.GetById(requestId); + if (query == null) { _logger.Log(LogLevel.Debug, $"Socket {SocketId} - msg {requestId} - message sent, but not pending"); return; } - pendingRequest.IsSend(); + query.IsSend(ApiClient.ClientOptions.RequestTimeout); } /// @@ -304,14 +310,10 @@ namespace CryptoExchange.Net.Sockets /// protected virtual async Task HandleStreamMessage(Stream stream) { - var timestamp = DateTime.UtcNow; - TimeSpan userCodeDuration = TimeSpan.Zero; - - // TODO This shouldn't be done for every request, just when something changes. Might want to make it a seperate type or something with functions 'Add', 'Remove' and 'GetMapping' or something - // This could then cache the internal dictionary mapping of `GetMapping` until something changes, and also make sure there aren't duplicate ids with different message types - var result = ApiClient.StreamConverter.ReadJson(stream, _messageIdMap.GetMapping(), ApiClient.ApiOptions.OutputOriginalData ?? ApiClient.ClientOptions.OutputOriginalData); + var result = ApiClient.StreamConverter.ReadJson(stream, _listenerManager.GetMapping(), ApiClient.ApiOptions.OutputOriginalData ?? ApiClient.ClientOptions.OutputOriginalData); if(result == null) { + // Not able to parse at all stream.Position = 0; var buffer = new byte[stream.Length]; stream.Read(buffer, 0, buffer.Length); @@ -324,18 +326,18 @@ namespace CryptoExchange.Net.Sockets if (!result.Parsed) { + // Not able to determine the message type for the message _logger.LogWarning("Message not matched to type"); return; } - if (!await _messageIdMap.InvokeListenersAsync(result.Identifier, result).ConfigureAwait(false)) + if (!await _listenerManager.InvokeListenersAsync(result.Identifier, result).ConfigureAwait(false)) { + // Not able to find a listener for this message stream.Position = 0; var unhandledBuffer = new byte[stream.Length]; stream.Read(unhandledBuffer, 0, unhandledBuffer.Length); - - _logger.Log(LogLevel.Warning, $"Socket {SocketId} Message unidentified. Id: {result.Identifier.ToLowerInvariant()}, Message: {Encoding.UTF8.GetString(unhandledBuffer)} "); - + _logger.Log(LogLevel.Warning, $"Socket {SocketId} Message unidentified. Id: {result.Identifier.ToLowerInvariant()}, listening ids: [{string.Join(", ", _listenerManager.GetListenIds())}], Message: {Encoding.UTF8.GetString(unhandledBuffer)} "); UnhandledMessage?.Invoke(result); return; } @@ -373,7 +375,7 @@ namespace CryptoExchange.Net.Sockets if (ApiClient.socketConnections.ContainsKey(SocketId)) ApiClient.socketConnections.TryRemove(SocketId, out _); - foreach (var subscription in _messageIdMap.GetSubscriptions()) + foreach (var subscription in _listenerManager.GetSubscriptions()) { if (subscription.CancellationTokenRegistration.HasValue) subscription.CancellationTokenRegistration.Value.Dispose(); @@ -391,7 +393,7 @@ namespace CryptoExchange.Net.Sockets /// public async Task CloseAsync(Subscription subscription, bool unsubEvenIfNotConfirmed = false) { - if (!_messageIdMap.Contains(subscription)) + if (!_listenerManager.Contains(subscription)) return; subscription.Closed = true; @@ -412,7 +414,7 @@ namespace CryptoExchange.Net.Sockets return; } - var shouldCloseConnection = _messageIdMap.GetSubscriptions().All(r => !r.UserSubscription || r.Closed); + var shouldCloseConnection = _listenerManager.GetSubscriptions().All(r => !r.UserSubscription || r.Closed); if (shouldCloseConnection) Status = SocketStatus.Closing; @@ -422,7 +424,7 @@ namespace CryptoExchange.Net.Sockets await CloseAsync().ConfigureAwait(false); } - _messageIdMap.Remove(subscription); + _listenerManager.Remove(subscription); } /// @@ -443,12 +445,10 @@ namespace CryptoExchange.Net.Sockets if (Status != SocketStatus.None && Status != SocketStatus.Connected) return false; - _messageIdMap.Add(subscription); - if (subscription.Identifiers != null) - _messageIdMap.Add(subscription); + _listenerManager.Add(subscription); - //if (subscription.UserSubscription) - // _logger.Log(LogLevel.Debug, $"Socket {SocketId} adding new subscription with id {subscription.Id}, total subscriptions on connection: {_subscriptions.Count(s => s.UserSubscription)}"); + if (subscription.UserSubscription) + _logger.Log(LogLevel.Debug, $"Socket {SocketId} adding new subscription with id {subscription.Id}, total subscriptions on connection: {UserSubscriptionCount}"); return true; } @@ -456,14 +456,14 @@ namespace CryptoExchange.Net.Sockets /// Get a subscription on this connection by id /// /// - public Subscription? GetSubscription(int id) => _messageIdMap.GetSubscriptions().SingleOrDefault(s => s.Id == id); + public Subscription? GetSubscription(int id) => _listenerManager.GetSubscriptions().SingleOrDefault(s => s.Id == id); /// /// Get a subscription on this connection by its subscribe request /// /// Filter for a request /// - public Subscription? GetSubscriptionByRequest(Func predicate) => _messageIdMap.GetSubscriptions().SingleOrDefault(s => predicate(s)); + public Subscription? GetSubscriptionByRequest(Func predicate) => _listenerManager.GetSubscriptions().SingleOrDefault(s => predicate(s)); /// /// Send a query request and wait for an answer @@ -472,12 +472,8 @@ namespace CryptoExchange.Net.Sockets /// public virtual async Task SendAndWaitQueryAsync(BaseQuery query) { - var pendingRequest = query.CreatePendingRequest(); - if (query.Identifiers != null) - _messageIdMap.Add(query); - - await SendAndWaitAsync(pendingRequest, query.Weight).ConfigureAwait(false); - return pendingRequest.Result ?? new CallResult(new ServerError("Timeout")); + await SendAndWaitIntAsync(query).ConfigureAwait(false); + return query.Result ?? new CallResult(new ServerError("Timeout")); } /// @@ -488,23 +484,17 @@ namespace CryptoExchange.Net.Sockets /// public virtual async Task> SendAndWaitQueryAsync(Query query) { - var pendingRequest = (PendingRequest)query.CreatePendingRequest(); - if (query.Identifiers != null) - _messageIdMap.Add(query); - - await SendAndWaitAsync(pendingRequest, query.Weight).ConfigureAwait(false); - return pendingRequest.TypedResult ?? new CallResult(new ServerError("Timeout")); + await SendAndWaitIntAsync(query).ConfigureAwait(false); + return query.TypedResult ?? new CallResult(new ServerError("Timeout")); } - private async Task SendAndWaitAsync(BasePendingRequest pending, int weight) + private async Task SendAndWaitIntAsync(BaseQuery query) { - lock (_subscriptions) - _pendingRequests.Add(pending); - - var sendOk = Send(pending.Id, pending.Request, weight); + _listenerManager.Add(query); + var sendOk = Send(query.Id, query.Request, query.Weight); if (!sendOk) { - pending.Fail("Failed to send"); + query.Fail("Failed to send"); return; } @@ -512,16 +502,16 @@ namespace CryptoExchange.Net.Sockets { if (!_socket.IsOpen) { - pending.Fail("Socket not open"); + query.Fail("Socket not open"); return; } - if (pending.Completed) + if (query.Completed) return; - await pending.WaitAsync(TimeSpan.FromMilliseconds(500)).ConfigureAwait(false); + await query.WaitAsync(TimeSpan.FromMilliseconds(500)).ConfigureAwait(false); - if (pending.Completed) + if (query.Completed) return; } } @@ -567,7 +557,7 @@ namespace CryptoExchange.Net.Sockets if (!_socket.IsOpen) return new CallResult(new WebError("Socket not connected")); - var anySubscriptions = _messageIdMap.GetSubscriptions().Any(s => s.UserSubscription); + var anySubscriptions = _listenerManager.GetSubscriptions().Any(s => s.UserSubscription); if (!anySubscriptions) { // No need to resubscribe anything @@ -576,7 +566,7 @@ namespace CryptoExchange.Net.Sockets return new CallResult(true); } - var anyAuthenticated = _messageIdMap.GetSubscriptions().Any(s => s.Authenticated); + var anyAuthenticated = _listenerManager.GetSubscriptions().Any(s => s.Authenticated); if (anyAuthenticated) { // If we reconnected a authenticated connection we need to re-authenticate @@ -592,7 +582,7 @@ namespace CryptoExchange.Net.Sockets } // Get a list of all subscriptions on the socket - var subList = _messageIdMap.GetSubscriptions(); + var subList = _listenerManager.GetSubscriptions(); foreach(var subscription in subList) { @@ -614,7 +604,7 @@ namespace CryptoExchange.Net.Sockets var taskList = new List>(); foreach (var subscription in subList.Skip(i).Take(ApiClient.ClientOptions.MaxConcurrentResubscriptionsPerSocket)) { - var subQuery = subscription.GetSubQuery(); + var subQuery = subscription.GetSubQuery(this); if (subQuery == null) continue; @@ -651,7 +641,7 @@ namespace CryptoExchange.Net.Sockets if (!_socket.IsOpen) return new CallResult(new UnknownError("Socket is not connected")); - var subQuery = subscription.GetSubQuery(); + var subQuery = subscription.GetSubQuery(this); if (subQuery == null) return new CallResult(null); diff --git a/CryptoExchange.Net/Sockets/SocketListenerManager.cs b/CryptoExchange.Net/Sockets/SocketListenerManager.cs index 307334a..1b5d7e5 100644 --- a/CryptoExchange.Net/Sockets/SocketListenerManager.cs +++ b/CryptoExchange.Net/Sockets/SocketListenerManager.cs @@ -14,14 +14,19 @@ namespace CryptoExchange.Net.Sockets internal class SocketListenerManager { private ILogger _logger; + private int _socketId; private object _lock = new object(); + private Dictionary _idMap; private Dictionary _typeMap; private Dictionary> _listeners; - public SocketListenerManager(ILogger logger) + public SocketListenerManager(ILogger logger, int socketId) { + _idMap = new Dictionary(); + _listeners = new Dictionary>(); _typeMap = new Dictionary(); _logger = logger; + _socketId = socketId; } public Dictionary GetMapping() @@ -30,19 +35,29 @@ namespace CryptoExchange.Net.Sockets return _typeMap; } + public List GetListenIds() + { + lock(_lock) + return _listeners.Keys.ToList(); + } + public void Add(IMessageProcessor processor) { lock (_lock) { - foreach (var identifier in processor.Identifiers) + _idMap.Add(processor.Id, processor); + if (processor.Identifiers?.Any() == true) { - if (!_listeners.TryGetValue(identifier, out var list)) + foreach (var identifier in processor.Identifiers) { - list = new List(); - _listeners.Add(identifier, list); - } + if (!_listeners.TryGetValue(identifier, out var list)) + { + list = new List(); + _listeners.Add(identifier, list); + } - list.Add(processor); + list.Add(processor); + } } UpdateMap(); @@ -62,18 +77,14 @@ namespace CryptoExchange.Net.Sockets foreach (var listener in listeners) { - //_logger.Log(LogLevel.Trace, $"Socket {SocketId} Message mapped to processor {messageProcessor.Id} with identifier {result.Identifier}"); + _logger.Log(LogLevel.Trace, $"Socket {_socketId} Message mapped to processor {listener.Id} with identifier {data.Identifier}"); if (listener is BaseQuery query) { Remove(listener); - - if (query.PendingRequest != null) - _pendingRequests.Remove(query.PendingRequest); - - if (query.PendingRequest?.Completed == true) + if (query?.Completed == true) { // Answer to a timed out request - //_logger.Log(LogLevel.Warning, $"Socket {SocketId} Received after request timeout. Consider increasing the RequestTimeout"); + _logger.Log(LogLevel.Warning, $"Socket {_socketId} Received after request timeout. Consider increasing the RequestTimeout"); } } @@ -82,15 +93,29 @@ namespace CryptoExchange.Net.Sockets var dataEvent = new DataEvent(data, null, data.OriginalData, DateTime.UtcNow, null); await listener.HandleMessageAsync(dataEvent).ConfigureAwait(false); userSw.Stop(); + if (userSw.ElapsedMilliseconds > 500) + { + _logger.Log(LogLevel.Debug, $"Socket {_socketId} {(listener is Subscription ? "subscription " : "query " + listener!.Id)} message processing slow ({(int)userSw.ElapsedMilliseconds}ms), consider offloading data handling to another thread. " + + "Data from this socket may arrive late or not at all if message processing is continuously slow."); + } } return true; } + public T? GetById(int id) where T : BaseQuery + { + lock (_lock) + { + _idMap.TryGetValue(id, out var val); + return (T)val; + } + } + public List GetSubscriptions() { lock (_lock) - return _listeners.Values.SelectMany(v => v.OfType()).ToList(); + return _listeners.Values.SelectMany(v => v.OfType()).Distinct().ToList(); } public List GetQueries() @@ -105,22 +130,22 @@ namespace CryptoExchange.Net.Sockets return _listeners.Any(l => l.Value.Contains(processor)); } - public bool Remove(IMessageProcessor processor) + public void Remove(IMessageProcessor processor) { lock (_lock) { - var removed = false; - foreach (var identifier in processor.Identifiers) + _idMap.Remove(processor.Id); + if (processor.Identifiers?.Any() == true) { - if (_listeners[identifier].Remove(processor)) - removed = true; - - if (!_listeners[identifier].Any()) - _listeners.Remove(identifier); + foreach (var identifier in processor.Identifiers) + { + _listeners[identifier].Remove(processor); + if (!_listeners[identifier].Any()) + _listeners.Remove(identifier); + } } UpdateMap(); - return removed; } } diff --git a/CryptoExchange.Net/Sockets/Subscription.cs b/CryptoExchange.Net/Sockets/Subscription.cs index 50c99d2..adcb653 100644 --- a/CryptoExchange.Net/Sockets/Subscription.cs +++ b/CryptoExchange.Net/Sockets/Subscription.cs @@ -89,7 +89,7 @@ namespace CryptoExchange.Net.Sockets /// Get the subscribe object to send when subscribing /// /// - public abstract BaseQuery? GetSubQuery(); + public abstract BaseQuery? GetSubQuery(SocketConnection connection); /// /// Get the unsubscribe object to send when unsubscribing diff --git a/CryptoExchange.Net/Sockets/SystemSubscription.cs b/CryptoExchange.Net/Sockets/SystemSubscription.cs index c600b05..ea11614 100644 --- a/CryptoExchange.Net/Sockets/SystemSubscription.cs +++ b/CryptoExchange.Net/Sockets/SystemSubscription.cs @@ -21,7 +21,7 @@ namespace CryptoExchange.Net.Sockets } /// - public override BaseQuery? GetSubQuery() => null; + public override BaseQuery? GetSubQuery(SocketConnection connection) => null; /// public override BaseQuery? GetUnsubQuery() => null;