diff --git a/CryptoExchange.Net.UnitTests/SocketClientTests.cs b/CryptoExchange.Net.UnitTests/SocketClientTests.cs index 7f85384..1273221 100644 --- a/CryptoExchange.Net.UnitTests/SocketClientTests.cs +++ b/CryptoExchange.Net.UnitTests/SocketClientTests.cs @@ -202,7 +202,7 @@ namespace CryptoExchange.Net.UnitTests await sub; // assert - ClassicAssert.IsFalse(client.SubClient.TestSubscription.Confirmed); + ClassicAssert.IsTrue(client.SubClient.TestSubscription.Status != SubscriptionStatus.Subscribed); } [TestCase()] @@ -225,7 +225,7 @@ namespace CryptoExchange.Net.UnitTests await sub; // assert - Assert.That(client.SubClient.TestSubscription.Confirmed); + Assert.That(client.SubClient.TestSubscription.Status == SubscriptionStatus.Subscribed); } } } diff --git a/CryptoExchange.Net/Clients/SocketApiClient.cs b/CryptoExchange.Net/Clients/SocketApiClient.cs index bc1ddd7..1c0c3f3 100644 --- a/CryptoExchange.Net/Clients/SocketApiClient.cs +++ b/CryptoExchange.Net/Clients/SocketApiClient.cs @@ -269,6 +269,7 @@ namespace CryptoExchange.Net.Clients return new CallResult(new ServerError(new ErrorInfo(ErrorType.WebsocketPaused, "Socket is paused"))); } + subscription.Status = SubscriptionStatus.Subscribing; var waitEvent = new AsyncResetEvent(false); var subQuery = subscription.CreateSubscriptionQuery(socketConnection); if (subQuery != null) @@ -279,7 +280,7 @@ namespace CryptoExchange.Net.Clients { waitEvent?.Set(); var isTimeout = subResult.Error is CancellationRequestedError; - if (isTimeout && subscription.Confirmed) + if (isTimeout && subscription.Status == SubscriptionStatus.Subscribed) { // No response received, but the subscription did receive updates. We'll assume success } @@ -287,6 +288,7 @@ namespace CryptoExchange.Net.Clients { _logger.FailedToSubscribe(socketConnection.SocketId, subResult.Error?.ToString()); // If this was a timeout we still need to send an unsubscribe to prevent messages coming in later + subscription.Status = SubscriptionStatus.Pending; await socketConnection.CloseAsync(subscription).ConfigureAwait(false); return new CallResult(subResult.Error!); } @@ -295,7 +297,7 @@ namespace CryptoExchange.Net.Clients subscription.HandleSubQueryResponse(subQuery.Response!); } - subscription.Confirmed = true; + subscription.Status = SubscriptionStatus.Subscribed; if (ct != default) { subscription.CancellationTokenRegistration = ct.Register(async () => @@ -847,7 +849,7 @@ namespace CryptoExchange.Net.Clients cs.SubscriptionStates.ForEach(subState => { sb.AppendLine($"\t\t\tId: {subState.Id}"); - sb.AppendLine($"\t\t\tConfirmed: {subState.Confirmed}"); + sb.AppendLine($"\t\t\tStatus: {subState.Status}"); sb.AppendLine($"\t\t\tInvocations: {subState.Invocations}"); sb.AppendLine($"\t\t\tIdentifiers: [{subState.ListenMatcher.ToString()}]"); }); diff --git a/CryptoExchange.Net/Objects/Enums.cs b/CryptoExchange.Net/Objects/Enums.cs index 3efc973..87331a9 100644 --- a/CryptoExchange.Net/Objects/Enums.cs +++ b/CryptoExchange.Net/Objects/Enums.cs @@ -267,4 +267,31 @@ namespace CryptoExchange.Net.Objects Succeed } + /// + /// Subscription status + /// + public enum SubscriptionStatus + { + /// + /// Pending, waiting before (re)subscription can be started + /// + Pending, + /// + /// Currently (re)subscribing, will start producing updates soon if subscription is successful + /// + Subscribing, + /// + /// Subscribed and listening to updates + /// + Subscribed, + /// + /// Subscription is being closed and will stop producing updates + /// + Closing, + /// + /// Subscription is closed and will no long produce updates + /// + Closed + } + } diff --git a/CryptoExchange.Net/Objects/Sockets/UpdateSubscription.cs b/CryptoExchange.Net/Objects/Sockets/UpdateSubscription.cs index bfca2dc..2f89c11 100644 --- a/CryptoExchange.Net/Objects/Sockets/UpdateSubscription.cs +++ b/CryptoExchange.Net/Objects/Sockets/UpdateSubscription.cs @@ -15,6 +15,7 @@ namespace CryptoExchange.Net.Objects.Sockets private readonly Subscription _listener; private object _eventLock = new object(); + private bool _connectionEventsSubscribed = true; private List _connectionClosedEventHandlers = new List(); private List _connectionLostEventHandlers = new List(); private List> _resubscribeFailedEventHandlers = new List>(); @@ -22,6 +23,11 @@ namespace CryptoExchange.Net.Objects.Sockets private List _activityPausedEventHandlers = new List(); private List _activityUnpausedEventHandlers = new List(); + /// + /// Event when the status of the subscription changes + /// + public event Action? SubscriptionStatusChanged; + /// /// Event when the connection is lost. The socket will automatically reconnect when possible. /// @@ -113,21 +119,34 @@ namespace CryptoExchange.Net.Objects.Sockets _connection.ActivityUnpaused += HandleUnpausedEvent; _listener = subscription; - _listener.Unsubscribed += HandleUnsubscribed; + _listener.StatusChanged += (x) => SubscriptionStatusChanged?.Invoke(x); } - private void HandleUnsubscribed() + private void UnsubscribeConnectionEvents() { - _connection.ConnectionClosed -= HandleConnectionClosedEvent; - _connection.ConnectionLost -= HandleConnectionLostEvent; - _connection.ConnectionRestored -= HandleConnectionRestoredEvent; - _connection.ResubscribingFailed -= HandleResubscribeFailedEvent; - _connection.ActivityPaused -= HandlePausedEvent; - _connection.ActivityUnpaused -= HandleUnpausedEvent; + lock (_eventLock) + { + if (!_connectionEventsSubscribed) + return; + + _connection.ConnectionClosed -= HandleConnectionClosedEvent; + _connection.ConnectionLost -= HandleConnectionLostEvent; + _connection.ConnectionRestored -= HandleConnectionRestoredEvent; + _connection.ResubscribingFailed -= HandleResubscribeFailedEvent; + _connection.ActivityPaused -= HandlePausedEvent; + _connection.ActivityUnpaused -= HandleUnpausedEvent; + _connectionEventsSubscribed = false; + } } private void HandleConnectionClosedEvent() { + UnsubscribeConnectionEvents(); + + // If we're not the subscription closing this connection don't bother emitting + if (!_listener.IsClosingConnection) + return; + List handlers; lock (_eventLock) handlers = _connectionClosedEventHandlers.ToList(); @@ -138,6 +157,12 @@ namespace CryptoExchange.Net.Objects.Sockets private void HandleConnectionLostEvent() { + if (!_listener.Active) + { + UnsubscribeConnectionEvents(); + return; + } + List handlers; lock (_eventLock) handlers = _connectionLostEventHandlers.ToList(); @@ -148,6 +173,12 @@ namespace CryptoExchange.Net.Objects.Sockets private void HandleConnectionRestoredEvent(TimeSpan period) { + if (!_listener.Active) + { + UnsubscribeConnectionEvents(); + return; + } + List> handlers; lock (_eventLock) handlers = _connectionRestoredEventHandlers.ToList(); @@ -158,6 +189,12 @@ namespace CryptoExchange.Net.Objects.Sockets private void HandleResubscribeFailedEvent(Error error) { + if (!_listener.Active) + { + UnsubscribeConnectionEvents(); + return; + } + List> handlers; lock (_eventLock) handlers = _resubscribeFailedEventHandlers.ToList(); @@ -168,6 +205,12 @@ namespace CryptoExchange.Net.Objects.Sockets private void HandlePausedEvent() { + if (!_listener.Active) + { + UnsubscribeConnectionEvents(); + return; + } + List handlers; lock (_eventLock) handlers = _activityPausedEventHandlers.ToList(); @@ -178,6 +221,12 @@ namespace CryptoExchange.Net.Objects.Sockets private void HandleUnpausedEvent() { + if (!_listener.Active) + { + UnsubscribeConnectionEvents(); + return; + } + List handlers; lock (_eventLock) handlers = _activityUnpausedEventHandlers.ToList(); diff --git a/CryptoExchange.Net/Sockets/SocketConnection.cs b/CryptoExchange.Net/Sockets/SocketConnection.cs index 32f90d8..cc26940 100644 --- a/CryptoExchange.Net/Sockets/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/SocketConnection.cs @@ -296,7 +296,7 @@ namespace CryptoExchange.Net.Sockets lock (_listenersLock) { - foreach (var subscription in _listeners.OfType().Where(l => l.UserSubscription)) + foreach (var subscription in _listeners.OfType().Where(l => l.UserSubscription && !l.IsClosingConnection)) subscription.Reset(); foreach (var query in _listeners.OfType().ToList()) @@ -527,10 +527,10 @@ namespace CryptoExchange.Net.Sockets continue; } - if (processor is Subscription subscriptionProcessor && !subscriptionProcessor.Confirmed) + if (processor is Subscription subscriptionProcessor && subscriptionProcessor.Status == SubscriptionStatus.Subscribing) { // If this message is for this listener then it is automatically confirmed, even if the subscription is not (yet) confirmed - subscriptionProcessor.Confirmed = true; + subscriptionProcessor.Status = SubscriptionStatus.Subscribed; if (subscriptionProcessor.SubscriptionQuery?.TimeoutBehavior == TimeoutBehavior.Succeed) // If this subscription has a query waiting for a timeout (success if there is no error response) // then time it out now as the data is being received, so we assume it's successful @@ -657,13 +657,16 @@ namespace CryptoExchange.Net.Sockets public async Task CloseAsync(Subscription subscription) { // If we are resubscribing this subscription at this moment we'll want to wait for a bit until it is finished to avoid concurrency issues - while (subscription.IsResubscribing) + while (subscription.Status == SubscriptionStatus.Subscribing) await Task.Delay(50).ConfigureAwait(false); - subscription.Closed = true; + subscription.Status = SubscriptionStatus.Closing; if (Status == SocketStatus.Closing || Status == SocketStatus.Closed || Status == SocketStatus.Disposed) + { + subscription.Status = SubscriptionStatus.Closed; return; + } _logger.ClosingSubscription(SocketId, subscription.Id); if (subscription.CancellationTokenRegistration.HasValue) @@ -675,7 +678,7 @@ namespace CryptoExchange.Net.Sockets bool shouldCloseConnection; lock (_listenersLock) - shouldCloseConnection = _listeners.OfType().All(r => !r.UserSubscription || r.Closed) && !DedicatedRequestConnection.IsDedicatedRequestConnection; + shouldCloseConnection = _listeners.OfType().All(r => !r.UserSubscription || r.Status == SubscriptionStatus.Closing || r.Status == SubscriptionStatus.Closed) && !DedicatedRequestConnection.IsDedicatedRequestConnection; if (!anyDuplicateSubscription) { @@ -693,6 +696,7 @@ namespace CryptoExchange.Net.Sockets if (Status == SocketStatus.Closing) { + subscription.Status = SubscriptionStatus.Closed; _logger.AlreadyClosing(SocketId); return; } @@ -700,6 +704,7 @@ namespace CryptoExchange.Net.Sockets if (shouldCloseConnection) { Status = SocketStatus.Closing; + subscription.IsClosingConnection = true; _logger.ClosingNoMoreSubscriptions(SocketId); await CloseAsync().ConfigureAwait(false); } @@ -707,7 +712,7 @@ namespace CryptoExchange.Net.Sockets lock (_listenersLock) _listeners.Remove(subscription); - subscription.InvokeUnsubscribedHandler(); + subscription.Status = SubscriptionStatus.Closed; } /// @@ -991,7 +996,7 @@ namespace CryptoExchange.Net.Sockets List subList; lock (_listenersLock) - subList = _listeners.OfType().Where(x => !x.Closed).Skip(batch * batchSize).Take(batchSize).ToList(); + subList = _listeners.OfType().Where(x => x.Active).Skip(batch * batchSize).Take(batchSize).ToList(); if (subList.Count == 0) break; @@ -1000,34 +1005,32 @@ namespace CryptoExchange.Net.Sockets foreach (var subscription in subList) { subscription.ConnectionInvocations = 0; - if (subscription.Closed) + if (!subscription.Active) // Can be closed during resubscribing continue; - subscription.IsResubscribing = true; + subscription.Status = SubscriptionStatus.Subscribing; var result = await ApiClient.RevitalizeRequestAsync(subscription).ConfigureAwait(false); if (!result) { _logger.FailedRequestRevitalization(SocketId, result.Error?.ToString()); - subscription.IsResubscribing = false; + subscription.Status = SubscriptionStatus.Pending; return result; } var subQuery = subscription.CreateSubscriptionQuery(this); if (subQuery == null) { - subscription.IsResubscribing = false; + subscription.Status = SubscriptionStatus.Subscribed; continue; } var waitEvent = new AsyncResetEvent(false); taskList.Add(SendAndWaitQueryAsync(subQuery, waitEvent).ContinueWith((r) => { - subscription.IsResubscribing = false; + subscription.Status = r.Result.Success ? SubscriptionStatus.Subscribed: SubscriptionStatus.Pending; subscription.HandleSubQueryResponse(subQuery.Response!); waitEvent.Set(); - if (r.Result.Success) - subscription.Confirmed = true; return r.Result; })); } diff --git a/CryptoExchange.Net/Sockets/Subscription.cs b/CryptoExchange.Net/Sockets/Subscription.cs index 612fc4c..90318f1 100644 --- a/CryptoExchange.Net/Sockets/Subscription.cs +++ b/CryptoExchange.Net/Sockets/Subscription.cs @@ -34,21 +34,33 @@ namespace CryptoExchange.Net.Sockets /// Is it a user subscription /// public bool UserSubscription { get; set; } - + + private SubscriptionStatus _status; /// - /// Has the subscription been confirmed + /// Current subscription status /// - public bool Confirmed { get; set; } + public SubscriptionStatus Status + { + get => _status; + set + { + if (_status == value) + return; + + _status = value; + Task.Run(() => StatusChanged?.Invoke(value)); + } + } /// - /// Is the subscription closed + /// Whether the subscription is active /// - public bool Closed { get; set; } + public bool Active => Status != SubscriptionStatus.Closing && Status != SubscriptionStatus.Closed; /// - /// Is the subscription currently resubscribing + /// Whether the unsubscribing of this subscription lead to the closing of the connection /// - public bool IsResubscribing { get; set; } + public bool IsClosingConnection { get; set; } /// /// Logger @@ -77,7 +89,7 @@ namespace CryptoExchange.Net.Sockets /// /// Listener unsubscribed event /// - public event Action? Unsubscribed; + public event Action? StatusChanged; /// /// Subscription topic @@ -167,7 +179,7 @@ namespace CryptoExchange.Net.Sockets /// public void Reset() { - Confirmed = false; + Status = SubscriptionStatus.Pending; DoHandleReset(); } @@ -185,24 +197,16 @@ namespace CryptoExchange.Net.Sockets Exception?.Invoke(e); } - /// - /// Invoke the unsubscribed event - /// - public void InvokeUnsubscribedHandler() - { - Unsubscribed?.Invoke(); - } - /// /// State of this subscription /// /// The id of the subscription - /// True when the subscription query is handled (either accepted or rejected) + /// Subscription status /// Number of times this subscription got a message /// Matcher for this subscription public record SubscriptionState( int Id, - bool Confirmed, + SubscriptionStatus Status, int Invocations, MessageMatcher ListenMatcher ); @@ -213,7 +217,7 @@ namespace CryptoExchange.Net.Sockets /// public SubscriptionState GetState() { - return new SubscriptionState(Id, Confirmed, TotalInvocations, MessageMatcher); + return new SubscriptionState(Id, Status, TotalInvocations, MessageMatcher); } } diff --git a/CryptoExchange.Net/Sockets/SystemSubscription.cs b/CryptoExchange.Net/Sockets/SystemSubscription.cs index 97fe449..bef4ec0 100644 --- a/CryptoExchange.Net/Sockets/SystemSubscription.cs +++ b/CryptoExchange.Net/Sockets/SystemSubscription.cs @@ -18,7 +18,7 @@ namespace CryptoExchange.Net.Sockets /// public SystemSubscription(ILogger logger, bool authenticated = false) : base(logger, authenticated, false) { - Confirmed = true; + Status = SubscriptionStatus.Subscribed; } ///