1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2026-02-16 14:13:46 +00:00

Combined subscribe and re-subscribe logic

This commit is contained in:
Jkorf 2026-02-10 15:40:13 +01:00
parent 7c67a014f5
commit df25221960
2 changed files with 60 additions and 83 deletions

View File

@ -304,55 +304,9 @@ namespace CryptoExchange.Net.Clients
return new CallResult<UpdateSubscription>(new ServerError(new ErrorInfo(ErrorType.WebsocketPaused, "Socket is paused"))); return new CallResult<UpdateSubscription>(new ServerError(new ErrorInfo(ErrorType.WebsocketPaused, "Socket is paused")));
} }
void HandleSubscriptionComplete(bool success, object? response) var subscribeResult = await socketConnection.TrySubscribeAsync(subscription, true, ct).ConfigureAwait(false);
{ if (!subscribeResult)
if (!success) return new CallResult<UpdateSubscription>(subscribeResult.Error!);
return;
subscription.HandleSubQueryResponse(socketConnection, response);
subscription.Status = SubscriptionStatus.Subscribed;
if (ct != default)
{
subscription.CancellationTokenRegistration = ct.Register(async () =>
{
_logger.CancellationTokenSetClosingSubscription(socketConnection.SocketId, subscription.Id);
await socketConnection.CloseAsync(subscription).ConfigureAwait(false);
}, false);
}
}
subscription.Status = SubscriptionStatus.Subscribing;
var subQuery = subscription.CreateSubscriptionQuery(socketConnection);
if (subQuery != null)
{
subQuery.OnComplete = () => HandleSubscriptionComplete(subQuery.Result?.Success ?? false, subQuery.Response);
// Send the request and wait for answer
var subResult = await socketConnection.SendAndWaitQueryAsync(subQuery, ct).ConfigureAwait(false);
if (!subResult)
{
var isTimeout = subResult.Error is CancellationRequestedError;
if (isTimeout && subscription.Status == SubscriptionStatus.Subscribed)
{
// No response received, but the subscription did receive updates. We'll assume success
}
else
{
_logger.FailedToSubscribe(socketConnection.SocketId, subResult.Error?.ToString());
// If this was a server process error we still might need to send an unsubscribe to prevent messages coming in later
subscription.Status = SubscriptionStatus.Pending;
await socketConnection.CloseAsync(subscription).ConfigureAwait(false);
return new CallResult<UpdateSubscription>(subResult.Error!);
}
}
if (!subQuery.ExpectsResponse)
HandleSubscriptionComplete(true, null);
}
else
{
HandleSubscriptionComplete(true, null);
}
_logger.SubscriptionCompletedSuccessfully(socketConnection.SocketId, subscription.Id); _logger.SubscriptionCompletedSuccessfully(socketConnection.SocketId, subscription.Id);
return new CallResult<UpdateSubscription>(new UpdateSubscription(socketConnection, subscription)); return new CallResult<UpdateSubscription>(new UpdateSubscription(socketConnection, subscription));

View File

@ -1082,40 +1082,8 @@ namespace CryptoExchange.Net.Sockets.Default
var taskList = new List<Task<CallResult>>(); var taskList = new List<Task<CallResult>>();
foreach (var subscription in subList) foreach (var subscription in subList)
{ {
subscription.ConnectionInvocations = 0; var subscribeTask = TrySubscribeAsync(subscription, false, default);
if (!subscription.Active) taskList.Add(subscribeTask);
// Can be closed during resubscribing
continue;
subscription.Status = SubscriptionStatus.Subscribing;
var result = await ApiClient.RevitalizeRequestAsync(subscription).ConfigureAwait(false);
if (!result)
{
_logger.FailedRequestRevitalization(SocketId, result.Error?.ToString());
subscription.Status = SubscriptionStatus.Pending;
return result;
}
var subQuery = subscription.CreateSubscriptionQuery(this);
if (subQuery == null)
{
subscription.Status = SubscriptionStatus.Subscribed;
continue;
}
subQuery.OnComplete = () =>
{
subscription.Status = subQuery.Result!.Success ? SubscriptionStatus.Subscribed : SubscriptionStatus.Pending;
subscription.HandleSubQueryResponse(this, subQuery.Response);
};
taskList.Add(SendAndWaitQueryAsync(subQuery));
if (!subQuery.ExpectsResponse)
{
// If there won't be an answer we can immediately set this
subscription.Status = SubscriptionStatus.Subscribed;
subscription.HandleSubQueryResponse(this, null);
}
} }
await Task.WhenAll(taskList).ConfigureAwait(false); await Task.WhenAll(taskList).ConfigureAwait(false);
@ -1132,6 +1100,61 @@ namespace CryptoExchange.Net.Sockets.Default
return CallResult.SuccessResult; return CallResult.SuccessResult;
} }
protected internal async Task<CallResult> TrySubscribeAsync(Subscription subscription, bool newSubscription, CancellationToken subCancelToken)
{
subscription.ConnectionInvocations = 0;
if (!newSubscription)
{
if (!subscription.Active)
// Can be closed during resubscribing
return CallResult.SuccessResult;
var result = await ApiClient.RevitalizeRequestAsync(subscription).ConfigureAwait(false);
if (!result)
{
_logger.FailedRequestRevitalization(SocketId, result.Error?.ToString());
subscription.Status = SubscriptionStatus.Pending;
return result;
}
}
subscription.Status = SubscriptionStatus.Subscribing;
var subQuery = subscription.CreateSubscriptionQuery(this);
if (subQuery == null)
{
// No sub query, so successful
subscription.Status = SubscriptionStatus.Subscribed;
return CallResult.SuccessResult;
}
subQuery.OnComplete = () =>
{
subscription.Status = subQuery.Result!.Success ? SubscriptionStatus.Subscribed : SubscriptionStatus.Pending;
subscription.HandleSubQueryResponse(this, subQuery.Response);
if (newSubscription && subQuery.Result.Success && subCancelToken != default)
{
subscription.CancellationTokenRegistration = subCancelToken.Register(async () =>
{
_logger.CancellationTokenSetClosingSubscription(SocketId, subscription.Id);
await CloseAsync(subscription).ConfigureAwait(false);
}, false);
}
};
var subQueryResult = await SendAndWaitQueryAsync(subQuery).ConfigureAwait(false);
if (!subQueryResult)
{
_logger.FailedToSubscribe(SocketId, subQueryResult.Error?.ToString());
// If this was a server process error or timeout we still send an unsubscribe to prevent messages coming in later
if (newSubscription)
await CloseAsync(subscription).ConfigureAwait(false);
return new CallResult<UpdateSubscription>(subQueryResult.Error!);
}
return subQueryResult;
}
internal async Task UnsubscribeAsync(Subscription subscription) internal async Task UnsubscribeAsync(Subscription subscription)
{ {
var unsubscribeRequest = subscription.CreateUnsubscriptionQuery(this); var unsubscribeRequest = subscription.CreateUnsubscriptionQuery(this);