1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-06-07 07:56:12 +00:00

Removed HandleUpdatesBeforeConfirmation flag, allow messages to trigger listeners even if not confirmed and mark as confirmed then. Updated websocket reconnection delay handling

This commit is contained in:
JKorf 2024-06-12 16:56:06 +02:00
parent c8c98e13d0
commit d27f394b46
9 changed files with 86 additions and 58 deletions

View File

@ -52,11 +52,6 @@ namespace CryptoExchange.Net.Clients
/// </summary>
protected internal bool UnhandledMessageExpected { get; set; }
/// <summary>
/// If true a subscription will accept message before the confirmation of a subscription has been received
/// </summary>
protected bool HandleMessageBeforeConfirmation { get; set; }
/// <summary>
/// The rate limiters
/// </summary>
@ -203,7 +198,6 @@ namespace CryptoExchange.Net.Clients
return socketResult.As<UpdateSubscription>(null);
socketConnection = socketResult.Data;
subscription.HandleUpdatesBeforeConfirmation = subscription.HandleUpdatesBeforeConfirmation || HandleMessageBeforeConfirmation;
// Add a subscription on the socket connection
var success = socketConnection.AddSubscription(subscription);
@ -250,11 +244,18 @@ namespace CryptoExchange.Net.Clients
if (!subResult)
{
waitEvent?.Set();
_logger.FailedToSubscribe(socketConnection.SocketId, subResult.Error?.ToString());
// If this was a timeout we still need to send an unsubscribe to prevent messages coming in later
var unsubscribe = subResult.Error is CancellationRequestedError;
await socketConnection.CloseAsync(subscription, unsubscribe).ConfigureAwait(false);
return new CallResult<UpdateSubscription>(subResult.Error!);
var isTimeout = subResult.Error is CancellationRequestedError;
if (isTimeout && subscription.Confirmed)
{
// No response received, but the subscription did receive updates. We'll assume success
}
else
{
_logger.FailedToSubscribe(socketConnection.SocketId, subResult.Error?.ToString());
// If this was a timeout we still need to send an unsubscribe to prevent messages coming in later
await socketConnection.CloseAsync(subscription, isTimeout).ConfigureAwait(false);
return new CallResult<UpdateSubscription>(subResult.Error!);
}
}
subscription.HandleSubQueryResponse(subQuery.Response!);
@ -517,7 +518,7 @@ namespace CryptoExchange.Net.Clients
/// <param name="address">The address to connect to</param>
/// <returns></returns>
protected virtual WebSocketParameters GetWebSocketParameters(string address)
=> new(new Uri(address), ClientOptions.AutoReconnect)
=> new(new Uri(address), ClientOptions.ReconnectPolicy)
{
KeepAliveInterval = KeepAliveInterval,
ReconnectInterval = ClientOptions.ReconnectInterval,

View File

@ -16,10 +16,6 @@ namespace CryptoExchange.Net.Interfaces
/// </summary>
public int Id { get; }
/// <summary>
/// Whether this listener can handle data
/// </summary>
public bool CanHandleData { get; }
/// <summary>
/// The identifiers for this processor
/// </summary>
public HashSet<string> ListenerIdentifiers { get; }

View File

@ -169,4 +169,23 @@
/// </summary>
Snapshot
}
/// <summary>
/// Reconnect policy
/// </summary>
public enum ReconnectPolicy
{
/// <summary>
/// Reconnect is disabled
/// </summary>
Disabled,
/// <summary>
/// Fixed delay of `ReconnectInterval` between retries
/// </summary>
FixedDelay,
/// <summary>
/// Backof policy of 2^`reconnectAttempt`, where `reconnectAttempt` has a max value of 5
/// </summary>
ExponentialBackoff
}
}

View File

@ -1,4 +1,5 @@
using CryptoExchange.Net.Authentication;
using CryptoExchange.Net.Objects.Sockets;
using System;
namespace CryptoExchange.Net.Objects.Options
@ -9,15 +10,15 @@ namespace CryptoExchange.Net.Objects.Options
public class SocketExchangeOptions : ExchangeOptions
{
/// <summary>
/// Whether or not the socket should automatically reconnect when losing connection
/// </summary>
public bool AutoReconnect { get; set; } = true;
/// <summary>
/// Time to wait between reconnect attempts
/// The fixed time to wait between reconnect attempts, only used when `ReconnectPolicy` is set to `ReconnectPolicy.ExponentialBackoff`
/// </summary>
public TimeSpan ReconnectInterval { get; set; } = TimeSpan.FromSeconds(5);
/// <summary>
/// Reconnect policy
/// </summary>
public ReconnectPolicy ReconnectPolicy { get; set; } = ReconnectPolicy.FixedDelay;
/// <summary>
/// Max number of concurrent resubscription tasks per socket after reconnecting a socket
/// </summary>
@ -57,7 +58,7 @@ namespace CryptoExchange.Net.Objects.Options
{
ApiCredentials = ApiCredentials?.Copy(),
OutputOriginalData = OutputOriginalData,
AutoReconnect = AutoReconnect,
ReconnectPolicy = ReconnectPolicy,
DelayAfterConnect = DelayAfterConnect,
MaxConcurrentResubscriptionsPerSocket = MaxConcurrentResubscriptionsPerSocket,
ReconnectInterval = ReconnectInterval,

View File

@ -26,20 +26,20 @@ namespace CryptoExchange.Net.Objects.Sockets
public IDictionary<string, string> Cookies { get; set; } = new Dictionary<string, string>();
/// <summary>
/// The time to wait between reconnect attempts
/// The fixed time to wait between reconnect attempts, only used when `ReconnectPolicy` is set to `ReconnectPolicy.ExponentialBackoff`
/// </summary>
public TimeSpan ReconnectInterval { get; set; } = TimeSpan.FromSeconds(5);
/// <summary>
/// Reconnect policy
/// </summary>
public ReconnectPolicy ReconnectPolicy { get; set; } = ReconnectPolicy.FixedDelay;
/// <summary>
/// Proxy for the connection
/// </summary>
public ApiProxy? Proxy { get; set; }
/// <summary>
/// Whether the socket should automatically reconnect when connection is lost
/// </summary>
public bool AutoReconnect { get; set; }
/// <summary>
/// The maximum time of no data received before considering the connection lost and closting/reconnecting the socket
/// </summary>
@ -68,11 +68,11 @@ namespace CryptoExchange.Net.Objects.Sockets
/// ctor
/// </summary>
/// <param name="uri">Uri</param>
/// <param name="autoReconnect">Auto reconnect</param>
public WebSocketParameters(Uri uri, bool autoReconnect)
/// <param name="policy">Reconnect policy</param>
public WebSocketParameters(Uri uri, ReconnectPolicy policy)
{
Uri = uri;
AutoReconnect = autoReconnect;
ReconnectPolicy = policy;
}
}
}

View File

@ -47,6 +47,7 @@ namespace CryptoExchange.Net.Sockets
private ProcessState _processState;
private DateTime _lastReconnectTime;
private string _baseAddress;
private int _reconnectAttempt;
private const int _receiveBufferSize = 1048576;
private const int _sendBufferSize = 4096;
@ -246,12 +247,12 @@ namespace CryptoExchange.Net.Sockets
await _closeTask.ConfigureAwait(false);
_closeTask = null;
if (!Parameters.AutoReconnect)
if (Parameters.ReconnectPolicy == ReconnectPolicy.Disabled)
{
_processState = ProcessState.Idle;
await (OnClose?.Invoke() ?? Task.CompletedTask).ConfigureAwait(false);
return;
}
}
if (!_stopRequested)
{
@ -259,9 +260,9 @@ namespace CryptoExchange.Net.Sockets
await (OnReconnecting?.Invoke() ?? Task.CompletedTask).ConfigureAwait(false);
}
var sinceLastReconnect = DateTime.UtcNow - _lastReconnectTime;
if (sinceLastReconnect < Parameters.ReconnectInterval)
await Task.Delay(Parameters.ReconnectInterval - sinceLastReconnect).ConfigureAwait(false);
// Delay here to prevent very repid looping when a connection to the server is accepted and immediately disconnected
var initialDelay = GetReconnectDelay();
await Task.Delay(initialDelay).ConfigureAwait(false);
while (!_stopRequested)
{
@ -282,13 +283,17 @@ namespace CryptoExchange.Net.Sockets
_ctsSource = new CancellationTokenSource();
while (_sendBuffer.TryDequeue(out _)) { } // Clear send buffer
_reconnectAttempt++;
var connected = await ConnectInternalAsync().ConfigureAwait(false);
if (!connected)
{
await Task.Delay(Parameters.ReconnectInterval).ConfigureAwait(false);
// Delay between reconnect attempts
var delay = GetReconnectDelay();
await Task.Delay(delay).ConfigureAwait(false);
continue;
}
_reconnectAttempt = 0;
_lastReconnectTime = DateTime.UtcNow;
await (OnReconnected?.Invoke() ?? Task.CompletedTask).ConfigureAwait(false);
break;
@ -298,6 +303,24 @@ namespace CryptoExchange.Net.Sockets
_processState = ProcessState.Idle;
}
private TimeSpan GetReconnectDelay()
{
if (_reconnectAttempt == 0)
{
// Means this is directly after disconnecting. Only delay if the last reconnect time is very recent
var sinceLastReconnect = DateTime.UtcNow - _lastReconnectTime;
if (sinceLastReconnect < TimeSpan.FromSeconds(5))
return TimeSpan.FromSeconds(5) - sinceLastReconnect;
return TimeSpan.FromMilliseconds(1);
}
var delay = Parameters.ReconnectPolicy == ReconnectPolicy.FixedDelay ? Parameters.ReconnectInterval : TimeSpan.FromSeconds(Math.Pow(2, Math.Min(5, _reconnectAttempt)));
if (delay > TimeSpan.Zero)
return delay;
return TimeSpan.FromMilliseconds(1);
}
/// <inheritdoc />
public virtual void Send(int id, string data, int weight)
{

View File

@ -19,11 +19,6 @@ namespace CryptoExchange.Net.Sockets
/// </summary>
public int Id { get; } = ExchangeHelpers.NextId();
/// <summary>
/// Can handle data
/// </summary>
public bool CanHandleData => true;
/// <summary>
/// Has this query been completed
/// </summary>

View File

@ -443,7 +443,7 @@ namespace CryptoExchange.Net.Sockets
// 4. Get the listeners interested in this message
List<IMessageProcessor> processors;
lock (_listenersLock)
processors = _listeners.Where(s => s.ListenerIdentifiers.Contains(listenId) && s.CanHandleData).ToList();
processors = _listeners.Where(s => s.ListenerIdentifiers.Contains(listenId)).ToList();
if (processors.Count == 0)
{
@ -451,7 +451,7 @@ namespace CryptoExchange.Net.Sockets
{
List<string> listenerIds;
lock (_listenersLock)
listenerIds = _listeners.Where(l => l.CanHandleData).SelectMany(l => l.ListenerIdentifiers).ToList();
listenerIds = _listeners.SelectMany(l => l.ListenerIdentifiers).ToList();
_logger.ReceivedMessageNotMatchedToAnyListener(SocketId, listenId, string.Join(",", listenerIds));
UnhandledMessage?.Invoke(_accessor);
}
@ -478,6 +478,10 @@ namespace CryptoExchange.Net.Sockets
continue;
}
if (processor is Subscription subscriptionProcessor && !subscriptionProcessor.Confirmed)
// If this message is for this listener then it is automatically confirmed, even if the subscription is not (yet) confirmed
subscriptionProcessor.Confirmed = true;
// 6. Deserialize the message
object? deserialized = null;
desCache?.TryGetValue(messageType, out deserialized);
@ -861,6 +865,8 @@ namespace CryptoExchange.Net.Sockets
{
subscription.HandleSubQueryResponse(subQuery.Response!);
waitEvent.Set();
if (r.Result.Success)
subscription.Confirmed = true;
return r.Result;
}));
}
@ -870,9 +876,6 @@ namespace CryptoExchange.Net.Sockets
return taskList.First(t => !t.Result.Success).Result;
}
foreach (var subscription in subList)
subscription.Confirmed = true;
if (!_socket.IsOpen)
return new CallResult(new WebError("Socket not connected"));

View File

@ -18,11 +18,6 @@ namespace CryptoExchange.Net.Sockets
/// </summary>
public int Id { get; set; }
/// <summary>
/// Can handle data
/// </summary>
public bool CanHandleData => Confirmed || HandleUpdatesBeforeConfirmation;
/// <summary>
/// Total amount of invocations
/// </summary>
@ -42,11 +37,6 @@ namespace CryptoExchange.Net.Sockets
/// Has the subscription been confirmed
/// </summary>
public bool Confirmed { get; set; }
/// <summary>
/// Whether this subscription should handle update messages before confirmation
/// </summary>
public bool HandleUpdatesBeforeConfirmation { get; set; }
/// <summary>
/// Is the subscription closed