From a222bb3f02424b4f7827da13a0b08b491178e154 Mon Sep 17 00:00:00 2001 From: JKorf Date: Sun, 12 Feb 2023 14:05:00 +0100 Subject: [PATCH] Fixed socket client options setting, added automatic unsubscribe if the subscription confirmation comes in after request timeout --- CryptoExchange.Net/Clients/SocketApiClient.cs | 4 ++-- CryptoExchange.Net/Objects/Options.cs | 16 ++++++------- CryptoExchange.Net/Sockets/PendingRequest.cs | 22 ++++++++++------- .../Sockets/SocketConnection.cs | 24 ++++++++++++++++--- 4 files changed, 44 insertions(+), 22 deletions(-) diff --git a/CryptoExchange.Net/Clients/SocketApiClient.cs b/CryptoExchange.Net/Clients/SocketApiClient.cs index 68da362..1032277 100644 --- a/CryptoExchange.Net/Clients/SocketApiClient.cs +++ b/CryptoExchange.Net/Clients/SocketApiClient.cs @@ -265,7 +265,7 @@ namespace CryptoExchange.Net protected internal virtual async Task> SubscribeAndWaitAsync(SocketConnection socketConnection, object request, SocketSubscription subscription) { CallResult? callResult = null; - await socketConnection.SendAndWaitAsync(request, Options.SocketResponseTimeout, data => HandleSubscriptionResponse(socketConnection, subscription, request, data, out callResult)).ConfigureAwait(false); + await socketConnection.SendAndWaitAsync(request, Options.SocketResponseTimeout, subscription, data => HandleSubscriptionResponse(socketConnection, subscription, request, data, out callResult)).ConfigureAwait(false); if (callResult?.Success == true) { @@ -351,7 +351,7 @@ namespace CryptoExchange.Net protected virtual async Task> QueryAndWaitAsync(SocketConnection socket, object request) { var dataResult = new CallResult(new ServerError("No response on query received")); - await socket.SendAndWaitAsync(request, Options.SocketResponseTimeout, data => + await socket.SendAndWaitAsync(request, Options.SocketResponseTimeout, null, data => { if (!HandleQueryResponse(socket, request, data, out var callResult)) return false; diff --git a/CryptoExchange.Net/Objects/Options.cs b/CryptoExchange.Net/Objects/Options.cs index 2d19326..b56928a 100644 --- a/CryptoExchange.Net/Objects/Options.cs +++ b/CryptoExchange.Net/Objects/Options.cs @@ -298,14 +298,14 @@ namespace CryptoExchange.Net.Objects if (baseOptions == null) return; - AutoReconnect = baseOptions.AutoReconnect; - ReconnectInterval = baseOptions.ReconnectInterval; - MaxConcurrentResubscriptionsPerSocket = baseOptions.MaxConcurrentResubscriptionsPerSocket; - SocketResponseTimeout = baseOptions.SocketResponseTimeout; - SocketNoDataTimeout = baseOptions.SocketNoDataTimeout; - SocketSubscriptionsCombineTarget = baseOptions.SocketSubscriptionsCombineTarget; - MaxSocketConnections = baseOptions.MaxSocketConnections; - DelayAfterConnect = baseOptions.DelayAfterConnect; + AutoReconnect = newValues?.AutoReconnect ?? baseOptions.AutoReconnect; + ReconnectInterval = newValues?.ReconnectInterval ?? baseOptions.ReconnectInterval; + MaxConcurrentResubscriptionsPerSocket = newValues?.MaxConcurrentResubscriptionsPerSocket ?? baseOptions.MaxConcurrentResubscriptionsPerSocket; + SocketResponseTimeout = newValues?.SocketResponseTimeout ?? baseOptions.SocketResponseTimeout; + SocketNoDataTimeout = newValues?.SocketNoDataTimeout ?? baseOptions.SocketNoDataTimeout; + SocketSubscriptionsCombineTarget = newValues?.SocketSubscriptionsCombineTarget ?? baseOptions.SocketSubscriptionsCombineTarget; + MaxSocketConnections = newValues?.MaxSocketConnections ?? baseOptions.MaxSocketConnections; + DelayAfterConnect = newValues?.DelayAfterConnect ?? baseOptions.DelayAfterConnect; } /// diff --git a/CryptoExchange.Net/Sockets/PendingRequest.cs b/CryptoExchange.Net/Sockets/PendingRequest.cs index 63ed6d9..ecff7c8 100644 --- a/CryptoExchange.Net/Sockets/PendingRequest.cs +++ b/CryptoExchange.Net/Sockets/PendingRequest.cs @@ -11,15 +11,19 @@ namespace CryptoExchange.Net.Sockets public JToken? Result { get; private set; } public bool Completed { get; private set; } public AsyncResetEvent Event { get; } + public DateTime RequestTimestamp { get; set; } public TimeSpan Timeout { get; } + public SocketSubscription? Subscription { get; } private CancellationTokenSource cts; - public PendingRequest(Func handler, TimeSpan timeout) + public PendingRequest(Func handler, TimeSpan timeout, SocketSubscription? subscription) { Handler = handler; Event = new AsyncResetEvent(false, false); Timeout = timeout; + RequestTimestamp = DateTime.UtcNow; + Subscription = subscription; cts = new CancellationTokenSource(timeout); cts.Token.Register(Fail, false); @@ -27,15 +31,15 @@ namespace CryptoExchange.Net.Sockets public bool CheckData(JToken data) { - if (Handler(data)) - { - Result = data; - Completed = true; - Event.Set(); - return true; - } + return Handler(data); + } - return false; + public bool Succeed(JToken data) + { + Result = data; + Completed = true; + Event.Set(); + return true; } public void Fail() diff --git a/CryptoExchange.Net/Sockets/SocketConnection.cs b/CryptoExchange.Net/Sockets/SocketConnection.cs index 44eb0a5..346daec 100644 --- a/CryptoExchange.Net/Sockets/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/SocketConnection.cs @@ -304,18 +304,35 @@ namespace CryptoExchange.Net.Sockets PendingRequest[] requests; lock (_pendingRequests) { - _pendingRequests.RemoveAll(r => r.Completed); + // Remove only timed out requests after 5 minutes have passed so we can still process any + // message coming in after the request timeout + _pendingRequests.RemoveAll(r => r.Completed && DateTime.UtcNow - r.RequestTimestamp > TimeSpan.FromMinutes(5)); requests = _pendingRequests.ToArray(); } // Check if this message is an answer on any pending requests foreach (var pendingRequest in requests) { + if (pendingRequest.CheckData(tokenData)) { lock (_pendingRequests) _pendingRequests.Remove(pendingRequest); + if (pendingRequest.Completed) + { + // Answer to a timed out request, unsub if it is a subscription request + if (pendingRequest.Subscription != null) + { + _log.Write(LogLevel.Warning, "Received subscription info after request timed out; unsubscribing. Consider increasing the SocketResponseTimout"); + _ = ApiClient.UnsubscribeAsync(this, pendingRequest.Subscription).ConfigureAwait(false); + } + } + else + { + pendingRequest.Succeed(tokenData); + } + if (!ApiClient.ContinueOnQueryResponse) return; @@ -546,11 +563,12 @@ namespace CryptoExchange.Net.Sockets /// The data type expected in response /// The object to send /// The timeout for response + /// Subscription if this is a subscribe request /// The response handler, should return true if the received JToken was the response to the request /// - public virtual Task SendAndWaitAsync(T obj, TimeSpan timeout, Func handler) + public virtual Task SendAndWaitAsync(T obj, TimeSpan timeout, SocketSubscription? subscription, Func handler) { - var pending = new PendingRequest(handler, timeout); + var pending = new PendingRequest(handler, timeout, subscription); lock (_pendingRequests) { _pendingRequests.Add(pending);