1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-06-08 00:16:27 +00:00

Fixed concurrency issue when unsubscribing websocket subscription during reconnection

This commit is contained in:
Jkorf 2024-11-12 16:21:15 +01:00
parent ab0243445d
commit 8414e9d94f
2 changed files with 20 additions and 1 deletions

View File

@ -615,6 +615,10 @@ namespace CryptoExchange.Net.Sockets
/// <returns></returns> /// <returns></returns>
public async Task CloseAsync(Subscription subscription) public async Task CloseAsync(Subscription subscription)
{ {
// If we are resubscribing this subscription at this moment we'll want to wait for a bit until it is finished to avoid concurrency issues
while (subscription.IsResubscribing)
await Task.Delay(50).ConfigureAwait(false);
subscription.Closed = true; subscription.Closed = true;
if (Status == SocketStatus.Closing || Status == SocketStatus.Closed || Status == SocketStatus.Disposed) if (Status == SocketStatus.Closing || Status == SocketStatus.Closed || Status == SocketStatus.Disposed)
@ -898,7 +902,7 @@ namespace CryptoExchange.Net.Sockets
List<Subscription> subList; List<Subscription> subList;
lock (_listenersLock) lock (_listenersLock)
subList = _listeners.OfType<Subscription>().Skip(batch * batchSize).Take(batchSize).ToList(); subList = _listeners.OfType<Subscription>().Where(x => !x.Closed).Skip(batch * batchSize).Take(batchSize).ToList();
if (subList.Count == 0) if (subList.Count == 0)
break; break;
@ -907,20 +911,30 @@ namespace CryptoExchange.Net.Sockets
foreach (var subscription in subList) foreach (var subscription in subList)
{ {
subscription.ConnectionInvocations = 0; subscription.ConnectionInvocations = 0;
if (subscription.Closed)
// Can be closed during resubscribing
continue;
subscription.IsResubscribing = true;
var result = await ApiClient.RevitalizeRequestAsync(subscription).ConfigureAwait(false); var result = await ApiClient.RevitalizeRequestAsync(subscription).ConfigureAwait(false);
if (!result) if (!result)
{ {
_logger.FailedRequestRevitalization(SocketId, result.Error?.ToString()); _logger.FailedRequestRevitalization(SocketId, result.Error?.ToString());
subscription.IsResubscribing = false;
return result; return result;
} }
var subQuery = subscription.GetSubQuery(this); var subQuery = subscription.GetSubQuery(this);
if (subQuery == null) if (subQuery == null)
{
subscription.IsResubscribing = false;
continue; continue;
}
var waitEvent = new AsyncResetEvent(false); var waitEvent = new AsyncResetEvent(false);
taskList.Add(SendAndWaitQueryAsync(subQuery, waitEvent).ContinueWith((r) => taskList.Add(SendAndWaitQueryAsync(subQuery, waitEvent).ContinueWith((r) =>
{ {
subscription.IsResubscribing = false;
subscription.HandleSubQueryResponse(subQuery.Response!); subscription.HandleSubQueryResponse(subQuery.Response!);
waitEvent.Set(); waitEvent.Set();
if (r.Result.Success) if (r.Result.Success)

View File

@ -44,6 +44,11 @@ namespace CryptoExchange.Net.Sockets
/// </summary> /// </summary>
public bool Closed { get; set; } public bool Closed { get; set; }
/// <summary>
/// Is the subscription currently resubscribing
/// </summary>
public bool IsResubscribing { get; set; }
/// <summary> /// <summary>
/// Logger /// Logger
/// </summary> /// </summary>