diff --git a/CryptoExchange.Net/Clients/SocketApiClient.cs b/CryptoExchange.Net/Clients/SocketApiClient.cs index 406387d..a57bc59 100644 --- a/CryptoExchange.Net/Clients/SocketApiClient.cs +++ b/CryptoExchange.Net/Clients/SocketApiClient.cs @@ -52,11 +52,6 @@ namespace CryptoExchange.Net.Clients /// protected internal bool UnhandledMessageExpected { get; set; } - /// - /// If true a subscription will accept message before the confirmation of a subscription has been received - /// - protected bool HandleMessageBeforeConfirmation { get; set; } - /// /// The rate limiters /// @@ -203,7 +198,6 @@ namespace CryptoExchange.Net.Clients return socketResult.As(null); socketConnection = socketResult.Data; - subscription.HandleUpdatesBeforeConfirmation = subscription.HandleUpdatesBeforeConfirmation || HandleMessageBeforeConfirmation; // Add a subscription on the socket connection var success = socketConnection.AddSubscription(subscription); @@ -250,11 +244,18 @@ namespace CryptoExchange.Net.Clients if (!subResult) { waitEvent?.Set(); - _logger.FailedToSubscribe(socketConnection.SocketId, subResult.Error?.ToString()); - // If this was a timeout we still need to send an unsubscribe to prevent messages coming in later - var unsubscribe = subResult.Error is CancellationRequestedError; - await socketConnection.CloseAsync(subscription, unsubscribe).ConfigureAwait(false); - return new CallResult(subResult.Error!); + var isTimeout = subResult.Error is CancellationRequestedError; + if (isTimeout && subscription.Confirmed) + { + // No response received, but the subscription did receive updates. We'll assume success + } + else + { + _logger.FailedToSubscribe(socketConnection.SocketId, subResult.Error?.ToString()); + // If this was a timeout we still need to send an unsubscribe to prevent messages coming in later + await socketConnection.CloseAsync(subscription, isTimeout).ConfigureAwait(false); + return new CallResult(subResult.Error!); + } } subscription.HandleSubQueryResponse(subQuery.Response!); @@ -517,7 +518,7 @@ namespace CryptoExchange.Net.Clients /// The address to connect to /// protected virtual WebSocketParameters GetWebSocketParameters(string address) - => new(new Uri(address), ClientOptions.AutoReconnect) + => new(new Uri(address), ClientOptions.ReconnectPolicy) { KeepAliveInterval = KeepAliveInterval, ReconnectInterval = ClientOptions.ReconnectInterval, diff --git a/CryptoExchange.Net/Interfaces/IMessageProcessor.cs b/CryptoExchange.Net/Interfaces/IMessageProcessor.cs index eaebc66..679959c 100644 --- a/CryptoExchange.Net/Interfaces/IMessageProcessor.cs +++ b/CryptoExchange.Net/Interfaces/IMessageProcessor.cs @@ -16,10 +16,6 @@ namespace CryptoExchange.Net.Interfaces /// public int Id { get; } /// - /// Whether this listener can handle data - /// - public bool CanHandleData { get; } - /// /// The identifiers for this processor /// public HashSet ListenerIdentifiers { get; } diff --git a/CryptoExchange.Net/Objects/Enums.cs b/CryptoExchange.Net/Objects/Enums.cs index d25c4b1..bc27771 100644 --- a/CryptoExchange.Net/Objects/Enums.cs +++ b/CryptoExchange.Net/Objects/Enums.cs @@ -169,4 +169,23 @@ /// Snapshot } + + /// + /// Reconnect policy + /// + public enum ReconnectPolicy + { + /// + /// Reconnect is disabled + /// + Disabled, + /// + /// Fixed delay of `ReconnectInterval` between retries + /// + FixedDelay, + /// + /// Backof policy of 2^`reconnectAttempt`, where `reconnectAttempt` has a max value of 5 + /// + ExponentialBackoff + } } diff --git a/CryptoExchange.Net/Objects/Options/SocketExchangeOptions.cs b/CryptoExchange.Net/Objects/Options/SocketExchangeOptions.cs index 36e28f4..f67200f 100644 --- a/CryptoExchange.Net/Objects/Options/SocketExchangeOptions.cs +++ b/CryptoExchange.Net/Objects/Options/SocketExchangeOptions.cs @@ -1,4 +1,5 @@ using CryptoExchange.Net.Authentication; +using CryptoExchange.Net.Objects.Sockets; using System; namespace CryptoExchange.Net.Objects.Options @@ -9,15 +10,15 @@ namespace CryptoExchange.Net.Objects.Options public class SocketExchangeOptions : ExchangeOptions { /// - /// Whether or not the socket should automatically reconnect when losing connection - /// - public bool AutoReconnect { get; set; } = true; - - /// - /// Time to wait between reconnect attempts + /// The fixed time to wait between reconnect attempts, only used when `ReconnectPolicy` is set to `ReconnectPolicy.ExponentialBackoff` /// public TimeSpan ReconnectInterval { get; set; } = TimeSpan.FromSeconds(5); + /// + /// Reconnect policy + /// + public ReconnectPolicy ReconnectPolicy { get; set; } = ReconnectPolicy.FixedDelay; + /// /// Max number of concurrent resubscription tasks per socket after reconnecting a socket /// @@ -57,7 +58,7 @@ namespace CryptoExchange.Net.Objects.Options { ApiCredentials = ApiCredentials?.Copy(), OutputOriginalData = OutputOriginalData, - AutoReconnect = AutoReconnect, + ReconnectPolicy = ReconnectPolicy, DelayAfterConnect = DelayAfterConnect, MaxConcurrentResubscriptionsPerSocket = MaxConcurrentResubscriptionsPerSocket, ReconnectInterval = ReconnectInterval, diff --git a/CryptoExchange.Net/Objects/Sockets/WebSocketParameters.cs b/CryptoExchange.Net/Objects/Sockets/WebSocketParameters.cs index 0cc33ed..fa66787 100644 --- a/CryptoExchange.Net/Objects/Sockets/WebSocketParameters.cs +++ b/CryptoExchange.Net/Objects/Sockets/WebSocketParameters.cs @@ -26,20 +26,20 @@ namespace CryptoExchange.Net.Objects.Sockets public IDictionary Cookies { get; set; } = new Dictionary(); /// - /// The time to wait between reconnect attempts + /// The fixed time to wait between reconnect attempts, only used when `ReconnectPolicy` is set to `ReconnectPolicy.ExponentialBackoff` /// public TimeSpan ReconnectInterval { get; set; } = TimeSpan.FromSeconds(5); + /// + /// Reconnect policy + /// + public ReconnectPolicy ReconnectPolicy { get; set; } = ReconnectPolicy.FixedDelay; + /// /// Proxy for the connection /// public ApiProxy? Proxy { get; set; } - /// - /// Whether the socket should automatically reconnect when connection is lost - /// - public bool AutoReconnect { get; set; } - /// /// The maximum time of no data received before considering the connection lost and closting/reconnecting the socket /// @@ -68,11 +68,11 @@ namespace CryptoExchange.Net.Objects.Sockets /// ctor /// /// Uri - /// Auto reconnect - public WebSocketParameters(Uri uri, bool autoReconnect) + /// Reconnect policy + public WebSocketParameters(Uri uri, ReconnectPolicy policy) { Uri = uri; - AutoReconnect = autoReconnect; + ReconnectPolicy = policy; } } } diff --git a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs index 6162317..60302f2 100644 --- a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs +++ b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs @@ -47,6 +47,7 @@ namespace CryptoExchange.Net.Sockets private ProcessState _processState; private DateTime _lastReconnectTime; private string _baseAddress; + private int _reconnectAttempt; private const int _receiveBufferSize = 1048576; private const int _sendBufferSize = 4096; @@ -246,12 +247,12 @@ namespace CryptoExchange.Net.Sockets await _closeTask.ConfigureAwait(false); _closeTask = null; - if (!Parameters.AutoReconnect) + if (Parameters.ReconnectPolicy == ReconnectPolicy.Disabled) { _processState = ProcessState.Idle; await (OnClose?.Invoke() ?? Task.CompletedTask).ConfigureAwait(false); return; - } + } if (!_stopRequested) { @@ -259,9 +260,9 @@ namespace CryptoExchange.Net.Sockets await (OnReconnecting?.Invoke() ?? Task.CompletedTask).ConfigureAwait(false); } - var sinceLastReconnect = DateTime.UtcNow - _lastReconnectTime; - if (sinceLastReconnect < Parameters.ReconnectInterval) - await Task.Delay(Parameters.ReconnectInterval - sinceLastReconnect).ConfigureAwait(false); + // Delay here to prevent very repid looping when a connection to the server is accepted and immediately disconnected + var initialDelay = GetReconnectDelay(); + await Task.Delay(initialDelay).ConfigureAwait(false); while (!_stopRequested) { @@ -282,13 +283,17 @@ namespace CryptoExchange.Net.Sockets _ctsSource = new CancellationTokenSource(); while (_sendBuffer.TryDequeue(out _)) { } // Clear send buffer + _reconnectAttempt++; var connected = await ConnectInternalAsync().ConfigureAwait(false); if (!connected) { - await Task.Delay(Parameters.ReconnectInterval).ConfigureAwait(false); + // Delay between reconnect attempts + var delay = GetReconnectDelay(); + await Task.Delay(delay).ConfigureAwait(false); continue; } + _reconnectAttempt = 0; _lastReconnectTime = DateTime.UtcNow; await (OnReconnected?.Invoke() ?? Task.CompletedTask).ConfigureAwait(false); break; @@ -298,6 +303,24 @@ namespace CryptoExchange.Net.Sockets _processState = ProcessState.Idle; } + private TimeSpan GetReconnectDelay() + { + if (_reconnectAttempt == 0) + { + // Means this is directly after disconnecting. Only delay if the last reconnect time is very recent + var sinceLastReconnect = DateTime.UtcNow - _lastReconnectTime; + if (sinceLastReconnect < TimeSpan.FromSeconds(5)) + return TimeSpan.FromSeconds(5) - sinceLastReconnect; + + return TimeSpan.FromMilliseconds(1); + } + + var delay = Parameters.ReconnectPolicy == ReconnectPolicy.FixedDelay ? Parameters.ReconnectInterval : TimeSpan.FromSeconds(Math.Pow(2, Math.Min(5, _reconnectAttempt))); + if (delay > TimeSpan.Zero) + return delay; + return TimeSpan.FromMilliseconds(1); + } + /// public virtual void Send(int id, string data, int weight) { diff --git a/CryptoExchange.Net/Sockets/Query.cs b/CryptoExchange.Net/Sockets/Query.cs index 149c17d..d660b2a 100644 --- a/CryptoExchange.Net/Sockets/Query.cs +++ b/CryptoExchange.Net/Sockets/Query.cs @@ -19,11 +19,6 @@ namespace CryptoExchange.Net.Sockets /// public int Id { get; } = ExchangeHelpers.NextId(); - /// - /// Can handle data - /// - public bool CanHandleData => true; - /// /// Has this query been completed /// diff --git a/CryptoExchange.Net/Sockets/SocketConnection.cs b/CryptoExchange.Net/Sockets/SocketConnection.cs index 13deab3..8238964 100644 --- a/CryptoExchange.Net/Sockets/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/SocketConnection.cs @@ -443,7 +443,7 @@ namespace CryptoExchange.Net.Sockets // 4. Get the listeners interested in this message List processors; lock (_listenersLock) - processors = _listeners.Where(s => s.ListenerIdentifiers.Contains(listenId) && s.CanHandleData).ToList(); + processors = _listeners.Where(s => s.ListenerIdentifiers.Contains(listenId)).ToList(); if (processors.Count == 0) { @@ -451,7 +451,7 @@ namespace CryptoExchange.Net.Sockets { List listenerIds; lock (_listenersLock) - listenerIds = _listeners.Where(l => l.CanHandleData).SelectMany(l => l.ListenerIdentifiers).ToList(); + listenerIds = _listeners.SelectMany(l => l.ListenerIdentifiers).ToList(); _logger.ReceivedMessageNotMatchedToAnyListener(SocketId, listenId, string.Join(",", listenerIds)); UnhandledMessage?.Invoke(_accessor); } @@ -478,6 +478,10 @@ namespace CryptoExchange.Net.Sockets continue; } + if (processor is Subscription subscriptionProcessor && !subscriptionProcessor.Confirmed) + // If this message is for this listener then it is automatically confirmed, even if the subscription is not (yet) confirmed + subscriptionProcessor.Confirmed = true; + // 6. Deserialize the message object? deserialized = null; desCache?.TryGetValue(messageType, out deserialized); @@ -861,6 +865,8 @@ namespace CryptoExchange.Net.Sockets { subscription.HandleSubQueryResponse(subQuery.Response!); waitEvent.Set(); + if (r.Result.Success) + subscription.Confirmed = true; return r.Result; })); } @@ -870,9 +876,6 @@ namespace CryptoExchange.Net.Sockets return taskList.First(t => !t.Result.Success).Result; } - foreach (var subscription in subList) - subscription.Confirmed = true; - if (!_socket.IsOpen) return new CallResult(new WebError("Socket not connected")); diff --git a/CryptoExchange.Net/Sockets/Subscription.cs b/CryptoExchange.Net/Sockets/Subscription.cs index 5ff44d5..cae4e0a 100644 --- a/CryptoExchange.Net/Sockets/Subscription.cs +++ b/CryptoExchange.Net/Sockets/Subscription.cs @@ -18,11 +18,6 @@ namespace CryptoExchange.Net.Sockets /// public int Id { get; set; } - /// - /// Can handle data - /// - public bool CanHandleData => Confirmed || HandleUpdatesBeforeConfirmation; - /// /// Total amount of invocations /// @@ -42,11 +37,6 @@ namespace CryptoExchange.Net.Sockets /// Has the subscription been confirmed /// public bool Confirmed { get; set; } - - /// - /// Whether this subscription should handle update messages before confirmation - /// - public bool HandleUpdatesBeforeConfirmation { get; set; } /// /// Is the subscription closed