From d92f3b7904014d3260079afe86bdf8b359558061 Mon Sep 17 00:00:00 2001 From: Jkorf Date: Fri, 22 Aug 2025 10:17:16 +0200 Subject: [PATCH] Added better support for subscriptions without subscribe confirmation --- CryptoExchange.Net/Clients/SocketApiClient.cs | 2 +- CryptoExchange.Net/Objects/Enums.cs | 16 +++++++++ CryptoExchange.Net/Sockets/Query.cs | 19 +++++++---- .../Sockets/SocketConnection.cs | 23 ++++++++++--- CryptoExchange.Net/Sockets/Subscription.cs | 34 +++++++++++++++++-- .../Sockets/SystemSubscription.cs | 4 +-- 6 files changed, 83 insertions(+), 15 deletions(-) diff --git a/CryptoExchange.Net/Clients/SocketApiClient.cs b/CryptoExchange.Net/Clients/SocketApiClient.cs index faac4b1..0626864 100644 --- a/CryptoExchange.Net/Clients/SocketApiClient.cs +++ b/CryptoExchange.Net/Clients/SocketApiClient.cs @@ -270,7 +270,7 @@ namespace CryptoExchange.Net.Clients } var waitEvent = new AsyncResetEvent(false); - var subQuery = subscription.GetSubQuery(socketConnection); + var subQuery = subscription.CreateSubscriptionQuery(socketConnection); if (subQuery != null) { // Send the request and wait for answer diff --git a/CryptoExchange.Net/Objects/Enums.cs b/CryptoExchange.Net/Objects/Enums.cs index 179b3a0..3efc973 100644 --- a/CryptoExchange.Net/Objects/Enums.cs +++ b/CryptoExchange.Net/Objects/Enums.cs @@ -251,4 +251,20 @@ namespace CryptoExchange.Net.Objects /// DEX } + + /// + /// Timeout behavior for queries + /// + public enum TimeoutBehavior + { + /// + /// Fail the request + /// + Fail, + /// + /// Mark the query as successful + /// + Succeed + } + } diff --git a/CryptoExchange.Net/Sockets/Query.cs b/CryptoExchange.Net/Sockets/Query.cs index 56ab226..9d38368 100644 --- a/CryptoExchange.Net/Sockets/Query.cs +++ b/CryptoExchange.Net/Sockets/Query.cs @@ -29,6 +29,11 @@ namespace CryptoExchange.Net.Sockets /// public TimeSpan? RequestTimeout { get; set; } + /// + /// What should happen if the query times out + /// + public TimeoutBehavior TimeoutBehavior { get; set; } = TimeoutBehavior.Fail; + /// /// The number of required responses. Can be more than 1 when for example subscribing multiple symbols streams in a single request, /// and each symbol receives it's own confirmation response @@ -183,7 +188,7 @@ namespace CryptoExchange.Net.Sockets /// public override async Task Handle(SocketConnection connection, DataEvent message, MessageHandlerLink check) { - if (!PreCheckMessage(message)) + if (!PreCheckMessage(connection, message)) return CallResult.SuccessResult; CurrentResponses++; @@ -208,18 +213,20 @@ namespace CryptoExchange.Net.Sockets /// /// Validate if a message is actually processable by this query /// - /// - /// - public virtual bool PreCheckMessage(DataEvent message) => true; + public virtual bool PreCheckMessage(SocketConnection connection, DataEvent message) => true; /// public override void Timeout() { if (Completed) return; - + Completed = true; - Result = new CallResult(new TimeoutError()); + if (TimeoutBehavior == TimeoutBehavior.Fail) + Result = new CallResult(new TimeoutError()); + else + Result = new CallResult(default, null, default); + ContinueAwaiter?.Set(); _event.Set(); } diff --git a/CryptoExchange.Net/Sockets/SocketConnection.cs b/CryptoExchange.Net/Sockets/SocketConnection.cs index 64e0f81..9c8d84e 100644 --- a/CryptoExchange.Net/Sockets/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/SocketConnection.cs @@ -202,6 +202,18 @@ namespace CryptoExchange.Net.Sockets } } + /// + /// The number of current pending requests + /// + public int PendingRequests + { + get + { + lock (_listenersLock) + return _listeners.OfType().Where(x => !x.Completed).Count(); + } + } + private bool _pausedActivity; private readonly object _listenersLock; private readonly List _listeners; @@ -519,7 +531,10 @@ namespace CryptoExchange.Net.Sockets { // If this message is for this listener then it is automatically confirmed, even if the subscription is not (yet) confirmed subscriptionProcessor.Confirmed = true; - // This doesn't trigger a waiting subscribe query, should probably also somehow set the wait event for that + if (subscriptionProcessor.SubscriptionQuery?.TimeoutBehavior == TimeoutBehavior.Succeed) + // If this subscription has a query waiting for a timeout (success if there is no error response) + // then time it out now as the data is being received, so we assume it's successful + subscriptionProcessor.SubscriptionQuery.Timeout(); } // 5. Deserialize the message @@ -996,7 +1011,7 @@ namespace CryptoExchange.Net.Sockets return result; } - var subQuery = subscription.GetSubQuery(this); + var subQuery = subscription.CreateSubscriptionQuery(this); if (subQuery == null) { subscription.IsResubscribing = false; @@ -1031,7 +1046,7 @@ namespace CryptoExchange.Net.Sockets internal async Task UnsubscribeAsync(Subscription subscription) { - var unsubscribeRequest = subscription.GetUnsubQuery(); + var unsubscribeRequest = subscription.CreateUnsubscriptionQuery(this); if (unsubscribeRequest == null) return; @@ -1044,7 +1059,7 @@ namespace CryptoExchange.Net.Sockets if (!_socket.IsOpen) return new CallResult(new WebError("Socket is not connected")); - var subQuery = subscription.GetSubQuery(this); + var subQuery = subscription.CreateSubscriptionQuery(this); if (subQuery == null) return CallResult.SuccessResult; diff --git a/CryptoExchange.Net/Sockets/Subscription.cs b/CryptoExchange.Net/Sockets/Subscription.cs index 081977e..df0b66b 100644 --- a/CryptoExchange.Net/Sockets/Subscription.cs +++ b/CryptoExchange.Net/Sockets/Subscription.cs @@ -80,6 +80,16 @@ namespace CryptoExchange.Net.Sockets /// public string? Topic { get; set; } + /// + /// The subscribe query for this subscription + /// + public Query? SubscriptionQuery { get; private set; } + + /// + /// The unsubscribe query for this subscription + /// + public Query? UnsubscriptionQuery { get; private set; } + /// /// ctor /// @@ -91,11 +101,21 @@ namespace CryptoExchange.Net.Sockets Id = ExchangeHelpers.NextId(); } + /// + /// Create a new subscription query + /// + public Query? CreateSubscriptionQuery(SocketConnection connection) + { + var query = GetSubQuery(connection); + SubscriptionQuery = query; + return query; + } + /// /// Get the subscribe query to send when subscribing /// /// - public abstract Query? GetSubQuery(SocketConnection connection); + protected abstract Query? GetSubQuery(SocketConnection connection); /// /// Handle a subscription query response @@ -109,11 +129,21 @@ namespace CryptoExchange.Net.Sockets /// public virtual void HandleUnsubQueryResponse(object message) { } + /// + /// Create a new unsubscription query + /// + public Query? CreateUnsubscriptionQuery(SocketConnection connection) + { + var query = GetUnsubQuery(connection); + UnsubscriptionQuery = query; + return query; + } + /// /// Get the unsubscribe query to send when unsubscribing /// /// - public abstract Query? GetUnsubQuery(); + protected abstract Query? GetUnsubQuery(SocketConnection connection); /// public virtual CallResult Deserialize(IMessageAccessor message, Type type) => message.Deserialize(type); diff --git a/CryptoExchange.Net/Sockets/SystemSubscription.cs b/CryptoExchange.Net/Sockets/SystemSubscription.cs index 1d228ba..97fe449 100644 --- a/CryptoExchange.Net/Sockets/SystemSubscription.cs +++ b/CryptoExchange.Net/Sockets/SystemSubscription.cs @@ -22,9 +22,9 @@ namespace CryptoExchange.Net.Sockets } /// - public override Query? GetSubQuery(SocketConnection connection) => null; + protected override Query? GetSubQuery(SocketConnection connection) => null; /// - public override Query? GetUnsubQuery() => null; + protected override Query? GetUnsubQuery(SocketConnection connection) => null; } }