1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-09-02 21:51:27 +00:00

Added better support for subscriptions without subscribe confirmation

This commit is contained in:
Jkorf 2025-08-22 10:17:16 +02:00
parent 3e1b5ada69
commit d92f3b7904
6 changed files with 83 additions and 15 deletions

View File

@ -270,7 +270,7 @@ namespace CryptoExchange.Net.Clients
} }
var waitEvent = new AsyncResetEvent(false); var waitEvent = new AsyncResetEvent(false);
var subQuery = subscription.GetSubQuery(socketConnection); var subQuery = subscription.CreateSubscriptionQuery(socketConnection);
if (subQuery != null) if (subQuery != null)
{ {
// Send the request and wait for answer // Send the request and wait for answer

View File

@ -251,4 +251,20 @@ namespace CryptoExchange.Net.Objects
/// </summary> /// </summary>
DEX DEX
} }
/// <summary>
/// Timeout behavior for queries
/// </summary>
public enum TimeoutBehavior
{
/// <summary>
/// Fail the request
/// </summary>
Fail,
/// <summary>
/// Mark the query as successful
/// </summary>
Succeed
}
} }

View File

@ -29,6 +29,11 @@ namespace CryptoExchange.Net.Sockets
/// </summary> /// </summary>
public TimeSpan? RequestTimeout { get; set; } public TimeSpan? RequestTimeout { get; set; }
/// <summary>
/// What should happen if the query times out
/// </summary>
public TimeoutBehavior TimeoutBehavior { get; set; } = TimeoutBehavior.Fail;
/// <summary> /// <summary>
/// The number of required responses. Can be more than 1 when for example subscribing multiple symbols streams in a single request, /// The number of required responses. Can be more than 1 when for example subscribing multiple symbols streams in a single request,
/// and each symbol receives it's own confirmation response /// and each symbol receives it's own confirmation response
@ -183,7 +188,7 @@ namespace CryptoExchange.Net.Sockets
/// <inheritdoc /> /// <inheritdoc />
public override async Task<CallResult> Handle(SocketConnection connection, DataEvent<object> message, MessageHandlerLink check) public override async Task<CallResult> Handle(SocketConnection connection, DataEvent<object> message, MessageHandlerLink check)
{ {
if (!PreCheckMessage(message)) if (!PreCheckMessage(connection, message))
return CallResult.SuccessResult; return CallResult.SuccessResult;
CurrentResponses++; CurrentResponses++;
@ -208,18 +213,20 @@ namespace CryptoExchange.Net.Sockets
/// <summary> /// <summary>
/// Validate if a message is actually processable by this query /// Validate if a message is actually processable by this query
/// </summary> /// </summary>
/// <param name="message"></param> public virtual bool PreCheckMessage(SocketConnection connection, DataEvent<object> message) => true;
/// <returns></returns>
public virtual bool PreCheckMessage(DataEvent<object> message) => true;
/// <inheritdoc /> /// <inheritdoc />
public override void Timeout() public override void Timeout()
{ {
if (Completed) if (Completed)
return; return;
Completed = true; Completed = true;
Result = new CallResult<THandlerResponse>(new TimeoutError()); if (TimeoutBehavior == TimeoutBehavior.Fail)
Result = new CallResult<THandlerResponse>(new TimeoutError());
else
Result = new CallResult<THandlerResponse>(default, null, default);
ContinueAwaiter?.Set(); ContinueAwaiter?.Set();
_event.Set(); _event.Set();
} }

View File

@ -202,6 +202,18 @@ namespace CryptoExchange.Net.Sockets
} }
} }
/// <summary>
/// The number of current pending requests
/// </summary>
public int PendingRequests
{
get
{
lock (_listenersLock)
return _listeners.OfType<Query>().Where(x => !x.Completed).Count();
}
}
private bool _pausedActivity; private bool _pausedActivity;
private readonly object _listenersLock; private readonly object _listenersLock;
private readonly List<IMessageProcessor> _listeners; private readonly List<IMessageProcessor> _listeners;
@ -519,7 +531,10 @@ namespace CryptoExchange.Net.Sockets
{ {
// If this message is for this listener then it is automatically confirmed, even if the subscription is not (yet) confirmed // If this message is for this listener then it is automatically confirmed, even if the subscription is not (yet) confirmed
subscriptionProcessor.Confirmed = true; subscriptionProcessor.Confirmed = true;
// This doesn't trigger a waiting subscribe query, should probably also somehow set the wait event for that if (subscriptionProcessor.SubscriptionQuery?.TimeoutBehavior == TimeoutBehavior.Succeed)
// If this subscription has a query waiting for a timeout (success if there is no error response)
// then time it out now as the data is being received, so we assume it's successful
subscriptionProcessor.SubscriptionQuery.Timeout();
} }
// 5. Deserialize the message // 5. Deserialize the message
@ -996,7 +1011,7 @@ namespace CryptoExchange.Net.Sockets
return result; return result;
} }
var subQuery = subscription.GetSubQuery(this); var subQuery = subscription.CreateSubscriptionQuery(this);
if (subQuery == null) if (subQuery == null)
{ {
subscription.IsResubscribing = false; subscription.IsResubscribing = false;
@ -1031,7 +1046,7 @@ namespace CryptoExchange.Net.Sockets
internal async Task UnsubscribeAsync(Subscription subscription) internal async Task UnsubscribeAsync(Subscription subscription)
{ {
var unsubscribeRequest = subscription.GetUnsubQuery(); var unsubscribeRequest = subscription.CreateUnsubscriptionQuery(this);
if (unsubscribeRequest == null) if (unsubscribeRequest == null)
return; return;
@ -1044,7 +1059,7 @@ namespace CryptoExchange.Net.Sockets
if (!_socket.IsOpen) if (!_socket.IsOpen)
return new CallResult(new WebError("Socket is not connected")); return new CallResult(new WebError("Socket is not connected"));
var subQuery = subscription.GetSubQuery(this); var subQuery = subscription.CreateSubscriptionQuery(this);
if (subQuery == null) if (subQuery == null)
return CallResult.SuccessResult; return CallResult.SuccessResult;

View File

@ -80,6 +80,16 @@ namespace CryptoExchange.Net.Sockets
/// </summary> /// </summary>
public string? Topic { get; set; } public string? Topic { get; set; }
/// <summary>
/// The subscribe query for this subscription
/// </summary>
public Query? SubscriptionQuery { get; private set; }
/// <summary>
/// The unsubscribe query for this subscription
/// </summary>
public Query? UnsubscriptionQuery { get; private set; }
/// <summary> /// <summary>
/// ctor /// ctor
/// </summary> /// </summary>
@ -91,11 +101,21 @@ namespace CryptoExchange.Net.Sockets
Id = ExchangeHelpers.NextId(); Id = ExchangeHelpers.NextId();
} }
/// <summary>
/// Create a new subscription query
/// </summary>
public Query? CreateSubscriptionQuery(SocketConnection connection)
{
var query = GetSubQuery(connection);
SubscriptionQuery = query;
return query;
}
/// <summary> /// <summary>
/// Get the subscribe query to send when subscribing /// Get the subscribe query to send when subscribing
/// </summary> /// </summary>
/// <returns></returns> /// <returns></returns>
public abstract Query? GetSubQuery(SocketConnection connection); protected abstract Query? GetSubQuery(SocketConnection connection);
/// <summary> /// <summary>
/// Handle a subscription query response /// Handle a subscription query response
@ -109,11 +129,21 @@ namespace CryptoExchange.Net.Sockets
/// <param name="message"></param> /// <param name="message"></param>
public virtual void HandleUnsubQueryResponse(object message) { } public virtual void HandleUnsubQueryResponse(object message) { }
/// <summary>
/// Create a new unsubscription query
/// </summary>
public Query? CreateUnsubscriptionQuery(SocketConnection connection)
{
var query = GetUnsubQuery(connection);
UnsubscriptionQuery = query;
return query;
}
/// <summary> /// <summary>
/// Get the unsubscribe query to send when unsubscribing /// Get the unsubscribe query to send when unsubscribing
/// </summary> /// </summary>
/// <returns></returns> /// <returns></returns>
public abstract Query? GetUnsubQuery(); protected abstract Query? GetUnsubQuery(SocketConnection connection);
/// <inheritdoc /> /// <inheritdoc />
public virtual CallResult<object> Deserialize(IMessageAccessor message, Type type) => message.Deserialize(type); public virtual CallResult<object> Deserialize(IMessageAccessor message, Type type) => message.Deserialize(type);

View File

@ -22,9 +22,9 @@ namespace CryptoExchange.Net.Sockets
} }
/// <inheritdoc /> /// <inheritdoc />
public override Query? GetSubQuery(SocketConnection connection) => null; protected override Query? GetSubQuery(SocketConnection connection) => null;
/// <inheritdoc /> /// <inheritdoc />
public override Query? GetUnsubQuery() => null; protected override Query? GetUnsubQuery(SocketConnection connection) => null;
} }
} }