1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-06-07 16:06:15 +00:00

Added CancellationToken support for websocket queries

This commit is contained in:
JKorf 2024-06-13 11:58:52 +02:00
parent 7229438a0b
commit 287aadc720
4 changed files with 50 additions and 26 deletions

View File

@ -282,10 +282,11 @@ namespace CryptoExchange.Net.Clients
/// <typeparam name="THandlerResponse">Expected result type</typeparam> /// <typeparam name="THandlerResponse">Expected result type</typeparam>
/// <typeparam name="TServerResponse">The type returned to the caller</typeparam> /// <typeparam name="TServerResponse">The type returned to the caller</typeparam>
/// <param name="query">The query</param> /// <param name="query">The query</param>
/// <param name="ct">Cancellation token</param>
/// <returns></returns> /// <returns></returns>
protected virtual Task<CallResult<THandlerResponse>> QueryAsync<TServerResponse, THandlerResponse>(Query<TServerResponse, THandlerResponse> query) protected virtual Task<CallResult<THandlerResponse>> QueryAsync<TServerResponse, THandlerResponse>(Query<TServerResponse, THandlerResponse> query, CancellationToken ct = default)
{ {
return QueryAsync(BaseAddress, query); return QueryAsync(BaseAddress, query, ct);
} }
/// <summary> /// <summary>
@ -295,12 +296,16 @@ namespace CryptoExchange.Net.Clients
/// <typeparam name="TServerResponse">The type returned to the caller</typeparam> /// <typeparam name="TServerResponse">The type returned to the caller</typeparam>
/// <param name="url">The url for the request</param> /// <param name="url">The url for the request</param>
/// <param name="query">The query</param> /// <param name="query">The query</param>
/// <param name="ct">Cancellation token</param>
/// <returns></returns> /// <returns></returns>
protected virtual async Task<CallResult<THandlerResponse>> QueryAsync<TServerResponse, THandlerResponse>(string url, Query<TServerResponse, THandlerResponse> query) protected virtual async Task<CallResult<THandlerResponse>> QueryAsync<TServerResponse, THandlerResponse>(string url, Query<TServerResponse, THandlerResponse> query, CancellationToken ct = default)
{ {
if (_disposing) if (_disposing)
return new CallResult<THandlerResponse>(new InvalidOperationError("Client disposed, can't query")); return new CallResult<THandlerResponse>(new InvalidOperationError("Client disposed, can't query"));
if (ct.IsCancellationRequested)
return new CallResult<THandlerResponse>(new CancellationRequestedError());
SocketConnection socketConnection; SocketConnection socketConnection;
var released = false; var released = false;
await semaphoreSlim.WaitAsync().ConfigureAwait(false); await semaphoreSlim.WaitAsync().ConfigureAwait(false);
@ -335,7 +340,10 @@ namespace CryptoExchange.Net.Clients
return new CallResult<THandlerResponse>(new ServerError("Socket is paused")); return new CallResult<THandlerResponse>(new ServerError("Socket is paused"));
} }
return await socketConnection.SendAndWaitQueryAsync<TServerResponse, THandlerResponse>(query).ConfigureAwait(false); if (ct.IsCancellationRequested)
return new CallResult<THandlerResponse>(new CancellationRequestedError());
return await socketConnection.SendAndWaitQueryAsync(query, null, ct).ConfigureAwait(false);
} }
/// <summary> /// <summary>

View File

@ -32,7 +32,7 @@ namespace CryptoExchange.Net.Objects
/// Wait for the AutoResetEvent to be set /// Wait for the AutoResetEvent to be set
/// </summary> /// </summary>
/// <returns></returns> /// <returns></returns>
public Task<bool> WaitAsync(TimeSpan? timeout = null) public Task<bool> WaitAsync(TimeSpan? timeout = null, CancellationToken ct = default)
{ {
lock (_waits) lock (_waits)
{ {
@ -44,22 +44,29 @@ namespace CryptoExchange.Net.Objects
} }
else else
{ {
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously); if (ct.IsCancellationRequested)
if(timeout != null) return _completed;
{
var cancellationSource = new CancellationTokenSource(timeout.Value);
var registration = cancellationSource.Token.Register(() =>
{
lock (_waits)
{
tcs.TrySetResult(false);
// Not the cleanest but it works var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
_waits = new Queue<TaskCompletionSource<bool>>(_waits.Where(i => i != tcs)); if (timeout.HasValue)
} {
}, useSynchronizationContext: false); var timeoutSource = new CancellationTokenSource(timeout.Value);
var cancellationSource = CancellationTokenSource.CreateLinkedTokenSource(timeoutSource.Token, ct);
ct = cancellationSource.Token;
} }
var registration = ct.Register(() =>
{
lock (_waits)
{
tcs.TrySetResult(false);
// Not the cleanest but it works
_waits = new Queue<TaskCompletionSource<bool>>(_waits.Where(i => i != tcs));
}
}, useSynchronizationContext: false);
_waits.Enqueue(tcs); _waits.Enqueue(tcs);
return tcs.Task; return tcs.Task;
} }

View File

@ -111,8 +111,9 @@ namespace CryptoExchange.Net.Sockets
/// Wait untill timeout or the request is competed /// Wait untill timeout or the request is competed
/// </summary> /// </summary>
/// <param name="timeout"></param> /// <param name="timeout"></param>
/// <param name="ct">Cancellation token</param>
/// <returns></returns> /// <returns></returns>
public async Task WaitAsync(TimeSpan timeout) => await _event.WaitAsync(timeout).ConfigureAwait(false); public async Task WaitAsync(TimeSpan timeout, CancellationToken ct) => await _event.WaitAsync(timeout, ct).ConfigureAwait(false);
/// <inheritdoc /> /// <inheritdoc />
public virtual CallResult<object> Deserialize(IMessageAccessor message, Type type) => message.Deserialize(type); public virtual CallResult<object> Deserialize(IMessageAccessor message, Type type) => message.Deserialize(type);

View File

@ -690,10 +690,11 @@ namespace CryptoExchange.Net.Sockets
/// </summary> /// </summary>
/// <param name="query">Query to send</param> /// <param name="query">Query to send</param>
/// <param name="continueEvent">Wait event for when the socket message handler can continue</param> /// <param name="continueEvent">Wait event for when the socket message handler can continue</param>
/// <param name="ct">Cancellation token</param>
/// <returns></returns> /// <returns></returns>
public virtual async Task<CallResult> SendAndWaitQueryAsync(Query query, ManualResetEvent? continueEvent = null) public virtual async Task<CallResult> SendAndWaitQueryAsync(Query query, ManualResetEvent? continueEvent = null, CancellationToken ct = default)
{ {
await SendAndWaitIntAsync(query, continueEvent).ConfigureAwait(false); await SendAndWaitIntAsync(query, continueEvent, ct).ConfigureAwait(false);
return query.Result ?? new CallResult(new ServerError("Timeout")); return query.Result ?? new CallResult(new ServerError("Timeout"));
} }
@ -704,14 +705,15 @@ namespace CryptoExchange.Net.Sockets
/// <typeparam name="TServerResponse">The type returned to the caller</typeparam> /// <typeparam name="TServerResponse">The type returned to the caller</typeparam>
/// <param name="query">Query to send</param> /// <param name="query">Query to send</param>
/// <param name="continueEvent">Wait event for when the socket message handler can continue</param> /// <param name="continueEvent">Wait event for when the socket message handler can continue</param>
/// <param name="ct">Cancellation token</param>
/// <returns></returns> /// <returns></returns>
public virtual async Task<CallResult<THandlerResponse>> SendAndWaitQueryAsync<TServerResponse, THandlerResponse>(Query<TServerResponse, THandlerResponse> query, ManualResetEvent? continueEvent = null) public virtual async Task<CallResult<THandlerResponse>> SendAndWaitQueryAsync<TServerResponse, THandlerResponse>(Query<TServerResponse, THandlerResponse> query, ManualResetEvent? continueEvent = null, CancellationToken ct = default)
{ {
await SendAndWaitIntAsync(query, continueEvent).ConfigureAwait(false); await SendAndWaitIntAsync(query, continueEvent, ct).ConfigureAwait(false);
return query.TypedResult ?? new CallResult<THandlerResponse>(new ServerError("Timeout")); return query.TypedResult ?? new CallResult<THandlerResponse>(new ServerError("Timeout"));
} }
private async Task SendAndWaitIntAsync(Query query, ManualResetEvent? continueEvent) private async Task SendAndWaitIntAsync(Query query, ManualResetEvent? continueEvent, CancellationToken ct = default)
{ {
lock(_listenersLock) lock(_listenersLock)
_listeners.Add(query); _listeners.Add(query);
@ -728,7 +730,7 @@ namespace CryptoExchange.Net.Sockets
try try
{ {
while (true) while (!ct.IsCancellationRequested)
{ {
if (!_socket.IsOpen) if (!_socket.IsOpen)
{ {
@ -739,11 +741,17 @@ namespace CryptoExchange.Net.Sockets
if (query.Completed) if (query.Completed)
return; return;
await query.WaitAsync(TimeSpan.FromMilliseconds(500)).ConfigureAwait(false); await query.WaitAsync(TimeSpan.FromMilliseconds(500), ct).ConfigureAwait(false);
if (query.Completed) if (query.Completed)
return; return;
} }
if (ct.IsCancellationRequested)
{
query.Fail(new CancellationRequestedError());
return;
}
} }
finally finally
{ {