1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-06-07 16:06:15 +00:00

Improved socket reconnect logic so unsubscribing while processing resubscribing won't result in a ghost subscription

This commit is contained in:
JKorf 2024-07-23 12:01:31 +02:00
parent 3cd505ac8b
commit 3b735d66fd

View File

@ -854,11 +854,22 @@ namespace CryptoExchange.Net.Sockets
_logger.AuthenticationSucceeded(SocketId); _logger.AuthenticationSucceeded(SocketId);
} }
// Get a list of all subscriptions on the socket // Foreach subscription which is subscribed by a subscription request we will need to resend that request to resubscribe
int batch = 0;
int batchSize = ApiClient.ClientOptions.MaxConcurrentResubscriptionsPerSocket;
while (true)
{
if (!_socket.IsOpen)
return new CallResult(new WebError("Socket not connected"));
List<Subscription> subList; List<Subscription> subList;
lock (_listenersLock) lock (_listenersLock)
subList = _listeners.OfType<Subscription>().ToList(); subList = _listeners.OfType<Subscription>().Skip(batch * batchSize).Take(batchSize).ToList();
if (subList.Count == 0)
break;
var taskList = new List<Task<CallResult>>();
foreach (var subscription in subList) foreach (var subscription in subList)
{ {
subscription.ConnectionInvocations = 0; subscription.ConnectionInvocations = 0;
@ -868,17 +879,7 @@ namespace CryptoExchange.Net.Sockets
_logger.FailedRequestRevitalization(SocketId, result.Error?.ToString()); _logger.FailedRequestRevitalization(SocketId, result.Error?.ToString());
return result; return result;
} }
}
// Foreach subscription which is subscribed by a subscription request we will need to resend that request to resubscribe
for (var i = 0; i < subList.Count; i += ApiClient.ClientOptions.MaxConcurrentResubscriptionsPerSocket)
{
if (!_socket.IsOpen)
return new CallResult(new WebError("Socket not connected"));
var taskList = new List<Task<CallResult>>();
foreach (var subscription in subList.Skip(i).Take(ApiClient.ClientOptions.MaxConcurrentResubscriptionsPerSocket))
{
var subQuery = subscription.GetSubQuery(this); var subQuery = subscription.GetSubQuery(this);
if (subQuery == null) if (subQuery == null)
continue; continue;
@ -897,6 +898,8 @@ namespace CryptoExchange.Net.Sockets
await Task.WhenAll(taskList).ConfigureAwait(false); await Task.WhenAll(taskList).ConfigureAwait(false);
if (taskList.Any(t => !t.Result.Success)) if (taskList.Any(t => !t.Result.Success))
return taskList.First(t => !t.Result.Success).Result; return taskList.First(t => !t.Result.Success).Result;
batch++;
} }
if (!_socket.IsOpen) if (!_socket.IsOpen)