1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-10-27 00:17:31 +00:00

Updated socket Subscription status handling, fixing timing issue for connection events and adding SubscriptionStatusChanged event

This commit is contained in:
Jkorf 2025-10-06 13:22:40 +02:00
parent da70ba6ec7
commit 1ece13f5bc
7 changed files with 134 additions and 49 deletions

View File

@ -202,7 +202,7 @@ namespace CryptoExchange.Net.UnitTests
await sub; await sub;
// assert // assert
ClassicAssert.IsFalse(client.SubClient.TestSubscription.Confirmed); ClassicAssert.IsTrue(client.SubClient.TestSubscription.Status != SubscriptionStatus.Subscribed);
} }
[TestCase()] [TestCase()]
@ -225,7 +225,7 @@ namespace CryptoExchange.Net.UnitTests
await sub; await sub;
// assert // assert
Assert.That(client.SubClient.TestSubscription.Confirmed); Assert.That(client.SubClient.TestSubscription.Status == SubscriptionStatus.Subscribed);
} }
} }
} }

View File

@ -269,6 +269,7 @@ namespace CryptoExchange.Net.Clients
return new CallResult<UpdateSubscription>(new ServerError(new ErrorInfo(ErrorType.WebsocketPaused, "Socket is paused"))); return new CallResult<UpdateSubscription>(new ServerError(new ErrorInfo(ErrorType.WebsocketPaused, "Socket is paused")));
} }
subscription.Status = SubscriptionStatus.Subscribing;
var waitEvent = new AsyncResetEvent(false); var waitEvent = new AsyncResetEvent(false);
var subQuery = subscription.CreateSubscriptionQuery(socketConnection); var subQuery = subscription.CreateSubscriptionQuery(socketConnection);
if (subQuery != null) if (subQuery != null)
@ -279,7 +280,7 @@ namespace CryptoExchange.Net.Clients
{ {
waitEvent?.Set(); waitEvent?.Set();
var isTimeout = subResult.Error is CancellationRequestedError; var isTimeout = subResult.Error is CancellationRequestedError;
if (isTimeout && subscription.Confirmed) if (isTimeout && subscription.Status == SubscriptionStatus.Subscribed)
{ {
// No response received, but the subscription did receive updates. We'll assume success // No response received, but the subscription did receive updates. We'll assume success
} }
@ -287,6 +288,7 @@ namespace CryptoExchange.Net.Clients
{ {
_logger.FailedToSubscribe(socketConnection.SocketId, subResult.Error?.ToString()); _logger.FailedToSubscribe(socketConnection.SocketId, subResult.Error?.ToString());
// If this was a timeout we still need to send an unsubscribe to prevent messages coming in later // If this was a timeout we still need to send an unsubscribe to prevent messages coming in later
subscription.Status = SubscriptionStatus.Pending;
await socketConnection.CloseAsync(subscription).ConfigureAwait(false); await socketConnection.CloseAsync(subscription).ConfigureAwait(false);
return new CallResult<UpdateSubscription>(subResult.Error!); return new CallResult<UpdateSubscription>(subResult.Error!);
} }
@ -295,7 +297,7 @@ namespace CryptoExchange.Net.Clients
subscription.HandleSubQueryResponse(subQuery.Response!); subscription.HandleSubQueryResponse(subQuery.Response!);
} }
subscription.Confirmed = true; subscription.Status = SubscriptionStatus.Subscribed;
if (ct != default) if (ct != default)
{ {
subscription.CancellationTokenRegistration = ct.Register(async () => subscription.CancellationTokenRegistration = ct.Register(async () =>
@ -847,7 +849,7 @@ namespace CryptoExchange.Net.Clients
cs.SubscriptionStates.ForEach(subState => cs.SubscriptionStates.ForEach(subState =>
{ {
sb.AppendLine($"\t\t\tId: {subState.Id}"); sb.AppendLine($"\t\t\tId: {subState.Id}");
sb.AppendLine($"\t\t\tConfirmed: {subState.Confirmed}"); sb.AppendLine($"\t\t\tStatus: {subState.Status}");
sb.AppendLine($"\t\t\tInvocations: {subState.Invocations}"); sb.AppendLine($"\t\t\tInvocations: {subState.Invocations}");
sb.AppendLine($"\t\t\tIdentifiers: [{subState.ListenMatcher.ToString()}]"); sb.AppendLine($"\t\t\tIdentifiers: [{subState.ListenMatcher.ToString()}]");
}); });

View File

@ -267,4 +267,31 @@ namespace CryptoExchange.Net.Objects
Succeed Succeed
} }
/// <summary>
/// Subscription status
/// </summary>
public enum SubscriptionStatus
{
/// <summary>
/// Pending, waiting before (re)subscription can be started
/// </summary>
Pending,
/// <summary>
/// Currently (re)subscribing, will start producing updates soon if subscription is successful
/// </summary>
Subscribing,
/// <summary>
/// Subscribed and listening to updates
/// </summary>
Subscribed,
/// <summary>
/// Subscription is being closed and will stop producing updates
/// </summary>
Closing,
/// <summary>
/// Subscription is closed and will no long produce updates
/// </summary>
Closed
}
} }

View File

@ -15,6 +15,7 @@ namespace CryptoExchange.Net.Objects.Sockets
private readonly Subscription _listener; private readonly Subscription _listener;
private object _eventLock = new object(); private object _eventLock = new object();
private bool _connectionEventsSubscribed = true;
private List<Action> _connectionClosedEventHandlers = new List<Action>(); private List<Action> _connectionClosedEventHandlers = new List<Action>();
private List<Action> _connectionLostEventHandlers = new List<Action>(); private List<Action> _connectionLostEventHandlers = new List<Action>();
private List<Action<Error>> _resubscribeFailedEventHandlers = new List<Action<Error>>(); private List<Action<Error>> _resubscribeFailedEventHandlers = new List<Action<Error>>();
@ -22,6 +23,11 @@ namespace CryptoExchange.Net.Objects.Sockets
private List<Action> _activityPausedEventHandlers = new List<Action>(); private List<Action> _activityPausedEventHandlers = new List<Action>();
private List<Action> _activityUnpausedEventHandlers = new List<Action>(); private List<Action> _activityUnpausedEventHandlers = new List<Action>();
/// <summary>
/// Event when the status of the subscription changes
/// </summary>
public event Action<SubscriptionStatus>? SubscriptionStatusChanged;
/// <summary> /// <summary>
/// Event when the connection is lost. The socket will automatically reconnect when possible. /// Event when the connection is lost. The socket will automatically reconnect when possible.
/// </summary> /// </summary>
@ -113,21 +119,34 @@ namespace CryptoExchange.Net.Objects.Sockets
_connection.ActivityUnpaused += HandleUnpausedEvent; _connection.ActivityUnpaused += HandleUnpausedEvent;
_listener = subscription; _listener = subscription;
_listener.Unsubscribed += HandleUnsubscribed; _listener.StatusChanged += (x) => SubscriptionStatusChanged?.Invoke(x);
} }
private void HandleUnsubscribed() private void UnsubscribeConnectionEvents()
{ {
lock (_eventLock)
{
if (!_connectionEventsSubscribed)
return;
_connection.ConnectionClosed -= HandleConnectionClosedEvent; _connection.ConnectionClosed -= HandleConnectionClosedEvent;
_connection.ConnectionLost -= HandleConnectionLostEvent; _connection.ConnectionLost -= HandleConnectionLostEvent;
_connection.ConnectionRestored -= HandleConnectionRestoredEvent; _connection.ConnectionRestored -= HandleConnectionRestoredEvent;
_connection.ResubscribingFailed -= HandleResubscribeFailedEvent; _connection.ResubscribingFailed -= HandleResubscribeFailedEvent;
_connection.ActivityPaused -= HandlePausedEvent; _connection.ActivityPaused -= HandlePausedEvent;
_connection.ActivityUnpaused -= HandleUnpausedEvent; _connection.ActivityUnpaused -= HandleUnpausedEvent;
_connectionEventsSubscribed = false;
}
} }
private void HandleConnectionClosedEvent() private void HandleConnectionClosedEvent()
{ {
UnsubscribeConnectionEvents();
// If we're not the subscription closing this connection don't bother emitting
if (!_listener.IsClosingConnection)
return;
List<Action> handlers; List<Action> handlers;
lock (_eventLock) lock (_eventLock)
handlers = _connectionClosedEventHandlers.ToList(); handlers = _connectionClosedEventHandlers.ToList();
@ -138,6 +157,12 @@ namespace CryptoExchange.Net.Objects.Sockets
private void HandleConnectionLostEvent() private void HandleConnectionLostEvent()
{ {
if (!_listener.Active)
{
UnsubscribeConnectionEvents();
return;
}
List<Action> handlers; List<Action> handlers;
lock (_eventLock) lock (_eventLock)
handlers = _connectionLostEventHandlers.ToList(); handlers = _connectionLostEventHandlers.ToList();
@ -148,6 +173,12 @@ namespace CryptoExchange.Net.Objects.Sockets
private void HandleConnectionRestoredEvent(TimeSpan period) private void HandleConnectionRestoredEvent(TimeSpan period)
{ {
if (!_listener.Active)
{
UnsubscribeConnectionEvents();
return;
}
List<Action<TimeSpan>> handlers; List<Action<TimeSpan>> handlers;
lock (_eventLock) lock (_eventLock)
handlers = _connectionRestoredEventHandlers.ToList(); handlers = _connectionRestoredEventHandlers.ToList();
@ -158,6 +189,12 @@ namespace CryptoExchange.Net.Objects.Sockets
private void HandleResubscribeFailedEvent(Error error) private void HandleResubscribeFailedEvent(Error error)
{ {
if (!_listener.Active)
{
UnsubscribeConnectionEvents();
return;
}
List<Action<Error>> handlers; List<Action<Error>> handlers;
lock (_eventLock) lock (_eventLock)
handlers = _resubscribeFailedEventHandlers.ToList(); handlers = _resubscribeFailedEventHandlers.ToList();
@ -168,6 +205,12 @@ namespace CryptoExchange.Net.Objects.Sockets
private void HandlePausedEvent() private void HandlePausedEvent()
{ {
if (!_listener.Active)
{
UnsubscribeConnectionEvents();
return;
}
List<Action> handlers; List<Action> handlers;
lock (_eventLock) lock (_eventLock)
handlers = _activityPausedEventHandlers.ToList(); handlers = _activityPausedEventHandlers.ToList();
@ -178,6 +221,12 @@ namespace CryptoExchange.Net.Objects.Sockets
private void HandleUnpausedEvent() private void HandleUnpausedEvent()
{ {
if (!_listener.Active)
{
UnsubscribeConnectionEvents();
return;
}
List<Action> handlers; List<Action> handlers;
lock (_eventLock) lock (_eventLock)
handlers = _activityUnpausedEventHandlers.ToList(); handlers = _activityUnpausedEventHandlers.ToList();

View File

@ -296,7 +296,7 @@ namespace CryptoExchange.Net.Sockets
lock (_listenersLock) lock (_listenersLock)
{ {
foreach (var subscription in _listeners.OfType<Subscription>().Where(l => l.UserSubscription)) foreach (var subscription in _listeners.OfType<Subscription>().Where(l => l.UserSubscription && !l.IsClosingConnection))
subscription.Reset(); subscription.Reset();
foreach (var query in _listeners.OfType<Query>().ToList()) foreach (var query in _listeners.OfType<Query>().ToList())
@ -527,10 +527,10 @@ namespace CryptoExchange.Net.Sockets
continue; continue;
} }
if (processor is Subscription subscriptionProcessor && !subscriptionProcessor.Confirmed) if (processor is Subscription subscriptionProcessor && subscriptionProcessor.Status == SubscriptionStatus.Subscribing)
{ {
// If this message is for this listener then it is automatically confirmed, even if the subscription is not (yet) confirmed // If this message is for this listener then it is automatically confirmed, even if the subscription is not (yet) confirmed
subscriptionProcessor.Confirmed = true; subscriptionProcessor.Status = SubscriptionStatus.Subscribed;
if (subscriptionProcessor.SubscriptionQuery?.TimeoutBehavior == TimeoutBehavior.Succeed) if (subscriptionProcessor.SubscriptionQuery?.TimeoutBehavior == TimeoutBehavior.Succeed)
// If this subscription has a query waiting for a timeout (success if there is no error response) // If this subscription has a query waiting for a timeout (success if there is no error response)
// then time it out now as the data is being received, so we assume it's successful // then time it out now as the data is being received, so we assume it's successful
@ -657,13 +657,16 @@ namespace CryptoExchange.Net.Sockets
public async Task CloseAsync(Subscription subscription) public async Task CloseAsync(Subscription subscription)
{ {
// If we are resubscribing this subscription at this moment we'll want to wait for a bit until it is finished to avoid concurrency issues // If we are resubscribing this subscription at this moment we'll want to wait for a bit until it is finished to avoid concurrency issues
while (subscription.IsResubscribing) while (subscription.Status == SubscriptionStatus.Subscribing)
await Task.Delay(50).ConfigureAwait(false); await Task.Delay(50).ConfigureAwait(false);
subscription.Closed = true; subscription.Status = SubscriptionStatus.Closing;
if (Status == SocketStatus.Closing || Status == SocketStatus.Closed || Status == SocketStatus.Disposed) if (Status == SocketStatus.Closing || Status == SocketStatus.Closed || Status == SocketStatus.Disposed)
{
subscription.Status = SubscriptionStatus.Closed;
return; return;
}
_logger.ClosingSubscription(SocketId, subscription.Id); _logger.ClosingSubscription(SocketId, subscription.Id);
if (subscription.CancellationTokenRegistration.HasValue) if (subscription.CancellationTokenRegistration.HasValue)
@ -675,7 +678,7 @@ namespace CryptoExchange.Net.Sockets
bool shouldCloseConnection; bool shouldCloseConnection;
lock (_listenersLock) lock (_listenersLock)
shouldCloseConnection = _listeners.OfType<Subscription>().All(r => !r.UserSubscription || r.Closed) && !DedicatedRequestConnection.IsDedicatedRequestConnection; shouldCloseConnection = _listeners.OfType<Subscription>().All(r => !r.UserSubscription || r.Status == SubscriptionStatus.Closing || r.Status == SubscriptionStatus.Closed) && !DedicatedRequestConnection.IsDedicatedRequestConnection;
if (!anyDuplicateSubscription) if (!anyDuplicateSubscription)
{ {
@ -693,6 +696,7 @@ namespace CryptoExchange.Net.Sockets
if (Status == SocketStatus.Closing) if (Status == SocketStatus.Closing)
{ {
subscription.Status = SubscriptionStatus.Closed;
_logger.AlreadyClosing(SocketId); _logger.AlreadyClosing(SocketId);
return; return;
} }
@ -700,6 +704,7 @@ namespace CryptoExchange.Net.Sockets
if (shouldCloseConnection) if (shouldCloseConnection)
{ {
Status = SocketStatus.Closing; Status = SocketStatus.Closing;
subscription.IsClosingConnection = true;
_logger.ClosingNoMoreSubscriptions(SocketId); _logger.ClosingNoMoreSubscriptions(SocketId);
await CloseAsync().ConfigureAwait(false); await CloseAsync().ConfigureAwait(false);
} }
@ -707,7 +712,7 @@ namespace CryptoExchange.Net.Sockets
lock (_listenersLock) lock (_listenersLock)
_listeners.Remove(subscription); _listeners.Remove(subscription);
subscription.InvokeUnsubscribedHandler(); subscription.Status = SubscriptionStatus.Closed;
} }
/// <summary> /// <summary>
@ -991,7 +996,7 @@ namespace CryptoExchange.Net.Sockets
List<Subscription> subList; List<Subscription> subList;
lock (_listenersLock) lock (_listenersLock)
subList = _listeners.OfType<Subscription>().Where(x => !x.Closed).Skip(batch * batchSize).Take(batchSize).ToList(); subList = _listeners.OfType<Subscription>().Where(x => x.Active).Skip(batch * batchSize).Take(batchSize).ToList();
if (subList.Count == 0) if (subList.Count == 0)
break; break;
@ -1000,34 +1005,32 @@ namespace CryptoExchange.Net.Sockets
foreach (var subscription in subList) foreach (var subscription in subList)
{ {
subscription.ConnectionInvocations = 0; subscription.ConnectionInvocations = 0;
if (subscription.Closed) if (!subscription.Active)
// Can be closed during resubscribing // Can be closed during resubscribing
continue; continue;
subscription.IsResubscribing = true; subscription.Status = SubscriptionStatus.Subscribing;
var result = await ApiClient.RevitalizeRequestAsync(subscription).ConfigureAwait(false); var result = await ApiClient.RevitalizeRequestAsync(subscription).ConfigureAwait(false);
if (!result) if (!result)
{ {
_logger.FailedRequestRevitalization(SocketId, result.Error?.ToString()); _logger.FailedRequestRevitalization(SocketId, result.Error?.ToString());
subscription.IsResubscribing = false; subscription.Status = SubscriptionStatus.Pending;
return result; return result;
} }
var subQuery = subscription.CreateSubscriptionQuery(this); var subQuery = subscription.CreateSubscriptionQuery(this);
if (subQuery == null) if (subQuery == null)
{ {
subscription.IsResubscribing = false; subscription.Status = SubscriptionStatus.Subscribed;
continue; continue;
} }
var waitEvent = new AsyncResetEvent(false); var waitEvent = new AsyncResetEvent(false);
taskList.Add(SendAndWaitQueryAsync(subQuery, waitEvent).ContinueWith((r) => taskList.Add(SendAndWaitQueryAsync(subQuery, waitEvent).ContinueWith((r) =>
{ {
subscription.IsResubscribing = false; subscription.Status = r.Result.Success ? SubscriptionStatus.Subscribed: SubscriptionStatus.Pending;
subscription.HandleSubQueryResponse(subQuery.Response!); subscription.HandleSubQueryResponse(subQuery.Response!);
waitEvent.Set(); waitEvent.Set();
if (r.Result.Success)
subscription.Confirmed = true;
return r.Result; return r.Result;
})); }));
} }

View File

@ -35,20 +35,32 @@ namespace CryptoExchange.Net.Sockets
/// </summary> /// </summary>
public bool UserSubscription { get; set; } public bool UserSubscription { get; set; }
private SubscriptionStatus _status;
/// <summary> /// <summary>
/// Has the subscription been confirmed /// Current subscription status
/// </summary> /// </summary>
public bool Confirmed { get; set; } public SubscriptionStatus Status
{
get => _status;
set
{
if (_status == value)
return;
_status = value;
Task.Run(() => StatusChanged?.Invoke(value));
}
}
/// <summary> /// <summary>
/// Is the subscription closed /// Whether the subscription is active
/// </summary> /// </summary>
public bool Closed { get; set; } public bool Active => Status != SubscriptionStatus.Closing && Status != SubscriptionStatus.Closed;
/// <summary> /// <summary>
/// Is the subscription currently resubscribing /// Whether the unsubscribing of this subscription lead to the closing of the connection
/// </summary> /// </summary>
public bool IsResubscribing { get; set; } public bool IsClosingConnection { get; set; }
/// <summary> /// <summary>
/// Logger /// Logger
@ -77,7 +89,7 @@ namespace CryptoExchange.Net.Sockets
/// <summary> /// <summary>
/// Listener unsubscribed event /// Listener unsubscribed event
/// </summary> /// </summary>
public event Action? Unsubscribed; public event Action<SubscriptionStatus>? StatusChanged;
/// <summary> /// <summary>
/// Subscription topic /// Subscription topic
@ -167,7 +179,7 @@ namespace CryptoExchange.Net.Sockets
/// </summary> /// </summary>
public void Reset() public void Reset()
{ {
Confirmed = false; Status = SubscriptionStatus.Pending;
DoHandleReset(); DoHandleReset();
} }
@ -185,24 +197,16 @@ namespace CryptoExchange.Net.Sockets
Exception?.Invoke(e); Exception?.Invoke(e);
} }
/// <summary>
/// Invoke the unsubscribed event
/// </summary>
public void InvokeUnsubscribedHandler()
{
Unsubscribed?.Invoke();
}
/// <summary> /// <summary>
/// State of this subscription /// State of this subscription
/// </summary> /// </summary>
/// <param name="Id">The id of the subscription</param> /// <param name="Id">The id of the subscription</param>
/// <param name="Confirmed">True when the subscription query is handled (either accepted or rejected)</param> /// <param name="Status">Subscription status</param>
/// <param name="Invocations">Number of times this subscription got a message</param> /// <param name="Invocations">Number of times this subscription got a message</param>
/// <param name="ListenMatcher">Matcher for this subscription</param> /// <param name="ListenMatcher">Matcher for this subscription</param>
public record SubscriptionState( public record SubscriptionState(
int Id, int Id,
bool Confirmed, SubscriptionStatus Status,
int Invocations, int Invocations,
MessageMatcher ListenMatcher MessageMatcher ListenMatcher
); );
@ -213,7 +217,7 @@ namespace CryptoExchange.Net.Sockets
/// <returns></returns> /// <returns></returns>
public SubscriptionState GetState() public SubscriptionState GetState()
{ {
return new SubscriptionState(Id, Confirmed, TotalInvocations, MessageMatcher); return new SubscriptionState(Id, Status, TotalInvocations, MessageMatcher);
} }
} }

View File

@ -18,7 +18,7 @@ namespace CryptoExchange.Net.Sockets
/// <param name="authenticated"></param> /// <param name="authenticated"></param>
public SystemSubscription(ILogger logger, bool authenticated = false) : base(logger, authenticated, false) public SystemSubscription(ILogger logger, bool authenticated = false) : base(logger, authenticated, false)
{ {
Confirmed = true; Status = SubscriptionStatus.Subscribed;
} }
/// <inheritdoc /> /// <inheritdoc />