From df25221960ac602e421678541a78c7cbdd094cfc Mon Sep 17 00:00:00 2001 From: Jkorf Date: Tue, 10 Feb 2026 15:40:13 +0100 Subject: [PATCH] Combined subscribe and re-subscribe logic --- CryptoExchange.Net/Clients/SocketApiClient.cs | 52 +---------- .../Sockets/Default/SocketConnection.cs | 91 ++++++++++++------- 2 files changed, 60 insertions(+), 83 deletions(-) diff --git a/CryptoExchange.Net/Clients/SocketApiClient.cs b/CryptoExchange.Net/Clients/SocketApiClient.cs index 1c4dbc2..b22d2cf 100644 --- a/CryptoExchange.Net/Clients/SocketApiClient.cs +++ b/CryptoExchange.Net/Clients/SocketApiClient.cs @@ -304,55 +304,9 @@ namespace CryptoExchange.Net.Clients return new CallResult(new ServerError(new ErrorInfo(ErrorType.WebsocketPaused, "Socket is paused"))); } - void HandleSubscriptionComplete(bool success, object? response) - { - if (!success) - return; - - subscription.HandleSubQueryResponse(socketConnection, response); - subscription.Status = SubscriptionStatus.Subscribed; - if (ct != default) - { - subscription.CancellationTokenRegistration = ct.Register(async () => - { - _logger.CancellationTokenSetClosingSubscription(socketConnection.SocketId, subscription.Id); - await socketConnection.CloseAsync(subscription).ConfigureAwait(false); - }, false); - } - } - - subscription.Status = SubscriptionStatus.Subscribing; - var subQuery = subscription.CreateSubscriptionQuery(socketConnection); - if (subQuery != null) - { - subQuery.OnComplete = () => HandleSubscriptionComplete(subQuery.Result?.Success ?? false, subQuery.Response); - - // Send the request and wait for answer - var subResult = await socketConnection.SendAndWaitQueryAsync(subQuery, ct).ConfigureAwait(false); - if (!subResult) - { - var isTimeout = subResult.Error is CancellationRequestedError; - if (isTimeout && subscription.Status == SubscriptionStatus.Subscribed) - { - // No response received, but the subscription did receive updates. We'll assume success - } - else - { - _logger.FailedToSubscribe(socketConnection.SocketId, subResult.Error?.ToString()); - // If this was a server process error we still might need to send an unsubscribe to prevent messages coming in later - subscription.Status = SubscriptionStatus.Pending; - await socketConnection.CloseAsync(subscription).ConfigureAwait(false); - return new CallResult(subResult.Error!); - } - } - - if (!subQuery.ExpectsResponse) - HandleSubscriptionComplete(true, null); - } - else - { - HandleSubscriptionComplete(true, null); - } + var subscribeResult = await socketConnection.TrySubscribeAsync(subscription, true, ct).ConfigureAwait(false); + if (!subscribeResult) + return new CallResult(subscribeResult.Error!); _logger.SubscriptionCompletedSuccessfully(socketConnection.SocketId, subscription.Id); return new CallResult(new UpdateSubscription(socketConnection, subscription)); diff --git a/CryptoExchange.Net/Sockets/Default/SocketConnection.cs b/CryptoExchange.Net/Sockets/Default/SocketConnection.cs index f4c46df..2fbb129 100644 --- a/CryptoExchange.Net/Sockets/Default/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/Default/SocketConnection.cs @@ -1082,40 +1082,8 @@ namespace CryptoExchange.Net.Sockets.Default var taskList = new List>(); foreach (var subscription in subList) { - subscription.ConnectionInvocations = 0; - if (!subscription.Active) - // Can be closed during resubscribing - continue; - - subscription.Status = SubscriptionStatus.Subscribing; - var result = await ApiClient.RevitalizeRequestAsync(subscription).ConfigureAwait(false); - if (!result) - { - _logger.FailedRequestRevitalization(SocketId, result.Error?.ToString()); - subscription.Status = SubscriptionStatus.Pending; - return result; - } - - var subQuery = subscription.CreateSubscriptionQuery(this); - if (subQuery == null) - { - subscription.Status = SubscriptionStatus.Subscribed; - continue; - } - subQuery.OnComplete = () => - { - subscription.Status = subQuery.Result!.Success ? SubscriptionStatus.Subscribed : SubscriptionStatus.Pending; - subscription.HandleSubQueryResponse(this, subQuery.Response); - }; - - taskList.Add(SendAndWaitQueryAsync(subQuery)); - - if (!subQuery.ExpectsResponse) - { - // If there won't be an answer we can immediately set this - subscription.Status = SubscriptionStatus.Subscribed; - subscription.HandleSubQueryResponse(this, null); - } + var subscribeTask = TrySubscribeAsync(subscription, false, default); + taskList.Add(subscribeTask); } await Task.WhenAll(taskList).ConfigureAwait(false); @@ -1132,6 +1100,61 @@ namespace CryptoExchange.Net.Sockets.Default return CallResult.SuccessResult; } + protected internal async Task TrySubscribeAsync(Subscription subscription, bool newSubscription, CancellationToken subCancelToken) + { + subscription.ConnectionInvocations = 0; + + if (!newSubscription) + { + if (!subscription.Active) + // Can be closed during resubscribing + return CallResult.SuccessResult; + + var result = await ApiClient.RevitalizeRequestAsync(subscription).ConfigureAwait(false); + if (!result) + { + _logger.FailedRequestRevitalization(SocketId, result.Error?.ToString()); + subscription.Status = SubscriptionStatus.Pending; + return result; + } + } + + subscription.Status = SubscriptionStatus.Subscribing; + var subQuery = subscription.CreateSubscriptionQuery(this); + if (subQuery == null) + { + // No sub query, so successful + subscription.Status = SubscriptionStatus.Subscribed; + return CallResult.SuccessResult; + } + + subQuery.OnComplete = () => + { + subscription.Status = subQuery.Result!.Success ? SubscriptionStatus.Subscribed : SubscriptionStatus.Pending; + subscription.HandleSubQueryResponse(this, subQuery.Response); + if (newSubscription && subQuery.Result.Success && subCancelToken != default) + { + subscription.CancellationTokenRegistration = subCancelToken.Register(async () => + { + _logger.CancellationTokenSetClosingSubscription(SocketId, subscription.Id); + await CloseAsync(subscription).ConfigureAwait(false); + }, false); + } + }; + + var subQueryResult = await SendAndWaitQueryAsync(subQuery).ConfigureAwait(false); + if (!subQueryResult) + { + _logger.FailedToSubscribe(SocketId, subQueryResult.Error?.ToString()); + // If this was a server process error or timeout we still send an unsubscribe to prevent messages coming in later + if (newSubscription) + await CloseAsync(subscription).ConfigureAwait(false); + return new CallResult(subQueryResult.Error!); + } + + return subQueryResult; + } + internal async Task UnsubscribeAsync(Subscription subscription) { var unsubscribeRequest = subscription.CreateUnsubscriptionQuery(this);