diff --git a/CryptoExchange.Net/Sockets/SocketConnection.cs b/CryptoExchange.Net/Sockets/SocketConnection.cs index 93b19b6..3e4364f 100644 --- a/CryptoExchange.Net/Sockets/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/SocketConnection.cs @@ -615,6 +615,10 @@ namespace CryptoExchange.Net.Sockets /// public async Task CloseAsync(Subscription subscription) { + // If we are resubscribing this subscription at this moment we'll want to wait for a bit until it is finished to avoid concurrency issues + while (subscription.IsResubscribing) + await Task.Delay(50).ConfigureAwait(false); + subscription.Closed = true; if (Status == SocketStatus.Closing || Status == SocketStatus.Closed || Status == SocketStatus.Disposed) @@ -898,7 +902,7 @@ namespace CryptoExchange.Net.Sockets List subList; lock (_listenersLock) - subList = _listeners.OfType().Skip(batch * batchSize).Take(batchSize).ToList(); + subList = _listeners.OfType().Where(x => !x.Closed).Skip(batch * batchSize).Take(batchSize).ToList(); if (subList.Count == 0) break; @@ -907,20 +911,30 @@ namespace CryptoExchange.Net.Sockets foreach (var subscription in subList) { subscription.ConnectionInvocations = 0; + if (subscription.Closed) + // Can be closed during resubscribing + continue; + + subscription.IsResubscribing = true; var result = await ApiClient.RevitalizeRequestAsync(subscription).ConfigureAwait(false); if (!result) { _logger.FailedRequestRevitalization(SocketId, result.Error?.ToString()); + subscription.IsResubscribing = false; return result; } var subQuery = subscription.GetSubQuery(this); if (subQuery == null) + { + subscription.IsResubscribing = false; continue; + } var waitEvent = new AsyncResetEvent(false); taskList.Add(SendAndWaitQueryAsync(subQuery, waitEvent).ContinueWith((r) => { + subscription.IsResubscribing = false; subscription.HandleSubQueryResponse(subQuery.Response!); waitEvent.Set(); if (r.Result.Success) diff --git a/CryptoExchange.Net/Sockets/Subscription.cs b/CryptoExchange.Net/Sockets/Subscription.cs index ef3dee3..4e35683 100644 --- a/CryptoExchange.Net/Sockets/Subscription.cs +++ b/CryptoExchange.Net/Sockets/Subscription.cs @@ -44,6 +44,11 @@ namespace CryptoExchange.Net.Sockets /// public bool Closed { get; set; } + /// + /// Is the subscription currently resubscribing + /// + public bool IsResubscribing { get; set; } + /// /// Logger ///