From 7da8cedf66fe3e1da7457957e571e72182b5ff66 Mon Sep 17 00:00:00 2001 From: Jkorf Date: Tue, 20 May 2025 14:49:43 +0200 Subject: [PATCH] Improved response time on CancellationToken cancel during subscribing --- .../TestImplementations/TestSocketClient.cs | 2 +- CryptoExchange.Net/Clients/SocketApiClient.cs | 18 ++++++++++-------- CryptoExchange.Net/Interfaces/IWebsocket.cs | 3 ++- ...oExchangeWebSocketClientLoggingExtension.cs | 13 +++++++++++++ .../Sockets/CryptoExchangeWebSocketClient.cs | 16 ++++++++++------ CryptoExchange.Net/Sockets/SocketConnection.cs | 2 +- .../Testing/Implementations/TestSocket.cs | 3 ++- .../Testing/SocketSubscriptionValidator.cs | 4 ++-- 8 files changed, 41 insertions(+), 20 deletions(-) diff --git a/CryptoExchange.Net.UnitTests/TestImplementations/TestSocketClient.cs b/CryptoExchange.Net.UnitTests/TestImplementations/TestSocketClient.cs index 11080ec..3225fbb 100644 --- a/CryptoExchange.Net.UnitTests/TestImplementations/TestSocketClient.cs +++ b/CryptoExchange.Net.UnitTests/TestImplementations/TestSocketClient.cs @@ -114,7 +114,7 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations public CallResult ConnectSocketSub(SocketConnection sub) { - return ConnectSocketAsync(sub).Result; + return ConnectSocketAsync(sub, default).Result; } public override string GetListenerIdentifier(IMessageAccessor message) diff --git a/CryptoExchange.Net/Clients/SocketApiClient.cs b/CryptoExchange.Net/Clients/SocketApiClient.cs index ac0cf25..d605818 100644 --- a/CryptoExchange.Net/Clients/SocketApiClient.cs +++ b/CryptoExchange.Net/Clients/SocketApiClient.cs @@ -244,7 +244,7 @@ namespace CryptoExchange.Net.Clients var needsConnecting = !socketConnection.Connected; - var connectResult = await ConnectIfNeededAsync(socketConnection, subscription.Authenticated).ConfigureAwait(false); + var connectResult = await ConnectIfNeededAsync(socketConnection, subscription.Authenticated, ct).ConfigureAwait(false); if (!connectResult) return new CallResult(connectResult.Error!); @@ -268,7 +268,7 @@ namespace CryptoExchange.Net.Clients if (subQuery != null) { // Send the request and wait for answer - var subResult = await socketConnection.SendAndWaitQueryAsync(subQuery, waitEvent).ConfigureAwait(false); + var subResult = await socketConnection.SendAndWaitQueryAsync(subQuery, waitEvent, ct).ConfigureAwait(false); if (!subResult) { waitEvent?.Set(); @@ -352,7 +352,7 @@ namespace CryptoExchange.Net.Clients released = true; } - var connectResult = await ConnectIfNeededAsync(socketConnection, query.Authenticated).ConfigureAwait(false); + var connectResult = await ConnectIfNeededAsync(socketConnection, query.Authenticated, ct).ConfigureAwait(false); if (!connectResult) return new CallResult(connectResult.Error!); } @@ -379,13 +379,14 @@ namespace CryptoExchange.Net.Clients /// /// The connection to check /// Whether the socket should authenticated + /// Cancellation token /// - protected virtual async Task ConnectIfNeededAsync(SocketConnection socket, bool authenticated) + protected virtual async Task ConnectIfNeededAsync(SocketConnection socket, bool authenticated, CancellationToken ct) { if (socket.Connected) return CallResult.SuccessResult; - var connectResult = await ConnectSocketAsync(socket).ConfigureAwait(false); + var connectResult = await ConnectSocketAsync(socket, ct).ConfigureAwait(false); if (!connectResult) return connectResult; @@ -579,10 +580,11 @@ namespace CryptoExchange.Net.Clients /// Connect a socket /// /// The socket to connect + /// Cancellation token /// - protected virtual async Task ConnectSocketAsync(SocketConnection socketConnection) + protected virtual async Task ConnectSocketAsync(SocketConnection socketConnection, CancellationToken ct) { - var connectResult = await socketConnection.ConnectAsync().ConfigureAwait(false); + var connectResult = await socketConnection.ConnectAsync(ct).ConfigureAwait(false); if (connectResult) { socketConnections.TryAdd(socketConnection.SocketId, socketConnection); @@ -714,7 +716,7 @@ namespace CryptoExchange.Net.Clients if (!socketResult) return socketResult.AsDataless(); - var connectResult = await ConnectIfNeededAsync(socketResult.Data, item.Authenticated).ConfigureAwait(false); + var connectResult = await ConnectIfNeededAsync(socketResult.Data, item.Authenticated, default).ConfigureAwait(false); if (!connectResult) return new CallResult(connectResult.Error!); } diff --git a/CryptoExchange.Net/Interfaces/IWebsocket.cs b/CryptoExchange.Net/Interfaces/IWebsocket.cs index 31bae16..1d4c2b9 100644 --- a/CryptoExchange.Net/Interfaces/IWebsocket.cs +++ b/CryptoExchange.Net/Interfaces/IWebsocket.cs @@ -1,6 +1,7 @@ using CryptoExchange.Net.Objects; using System; using System.Net.WebSockets; +using System.Threading; using System.Threading.Tasks; namespace CryptoExchange.Net.Interfaces @@ -75,7 +76,7 @@ namespace CryptoExchange.Net.Interfaces /// Connect the socket /// /// - Task ConnectAsync(); + Task ConnectAsync(CancellationToken ct); /// /// Send data /// diff --git a/CryptoExchange.Net/Logging/Extensions/CryptoExchangeWebSocketClientLoggingExtension.cs b/CryptoExchange.Net/Logging/Extensions/CryptoExchangeWebSocketClientLoggingExtension.cs index 758cb5b..3a27529 100644 --- a/CryptoExchange.Net/Logging/Extensions/CryptoExchangeWebSocketClientLoggingExtension.cs +++ b/CryptoExchange.Net/Logging/Extensions/CryptoExchangeWebSocketClientLoggingExtension.cs @@ -8,6 +8,7 @@ namespace CryptoExchange.Net.Logging.Extensions { private static readonly Action _connecting; private static readonly Action _connectionFailed; + private static readonly Action _connectingCanceled; private static readonly Action _connected; private static readonly Action _startingProcessing; private static readonly Action _finishedProcessing; @@ -189,6 +190,12 @@ namespace CryptoExchange.Net.Logging.Extensions new EventId(1030, "SocketPingTimeout"), "[Sckt {Id}] ping frame timeout; reconnecting socket"); + _connectingCanceled = LoggerMessage.Define( + LogLevel.Debug, + new EventId(1031, "ConnectingCanceled"), + "[Sckt {SocketId}] connecting canceled"); + + } public static void SocketConnecting( @@ -370,5 +377,11 @@ namespace CryptoExchange.Net.Logging.Extensions { _socketPingTimeout(logger, socketId, null); } + + public static void SocketConnectingCanceled( + this ILogger logger, int socketId) + { + _connectingCanceled(logger, socketId, null); + } } } diff --git a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs index 8be0316..908a25a 100644 --- a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs +++ b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs @@ -166,9 +166,9 @@ namespace CryptoExchange.Net.Sockets } /// - public virtual async Task ConnectAsync() + public virtual async Task ConnectAsync(CancellationToken ct) { - var connectResult = await ConnectInternalAsync().ConfigureAwait(false); + var connectResult = await ConnectInternalAsync(ct).ConfigureAwait(false); if (!connectResult) return connectResult; @@ -215,7 +215,7 @@ namespace CryptoExchange.Net.Sockets return socket; } - private async Task ConnectInternalAsync() + private async Task ConnectInternalAsync(CancellationToken ct) { _logger.SocketConnecting(Id); try @@ -229,12 +229,16 @@ namespace CryptoExchange.Net.Sockets } using CancellationTokenSource tcs = new(TimeSpan.FromSeconds(10)); - using var linked = CancellationTokenSource.CreateLinkedTokenSource(tcs.Token, _ctsSource.Token); + using var linked = CancellationTokenSource.CreateLinkedTokenSource(tcs.Token, _ctsSource.Token, ct); await _socket.ConnectAsync(Uri, linked.Token).ConfigureAwait(false); } catch (Exception e) { - if (!_ctsSource.IsCancellationRequested) + if (ct.IsCancellationRequested) + { + _logger.SocketConnectingCanceled(Id); + } + else if (!_ctsSource.IsCancellationRequested) { // if _ctsSource was canceled this was already logged _logger.SocketConnectionFailed(Id, e.Message, e); @@ -325,7 +329,7 @@ namespace CryptoExchange.Net.Sockets while (_sendBuffer.TryDequeue(out _)) { } // Clear send buffer _reconnectAttempt++; - var connected = await ConnectInternalAsync().ConfigureAwait(false); + var connected = await ConnectInternalAsync(default).ConfigureAwait(false); if (!connected) { // Delay between reconnect attempts diff --git a/CryptoExchange.Net/Sockets/SocketConnection.cs b/CryptoExchange.Net/Sockets/SocketConnection.cs index d22928e..72680f6 100644 --- a/CryptoExchange.Net/Sockets/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/SocketConnection.cs @@ -571,7 +571,7 @@ namespace CryptoExchange.Net.Sockets /// Connect the websocket /// /// - public async Task ConnectAsync() => await _socket.ConnectAsync().ConfigureAwait(false); + public async Task ConnectAsync(CancellationToken ct) => await _socket.ConnectAsync(ct).ConfigureAwait(false); /// /// Retrieve the underlying socket diff --git a/CryptoExchange.Net/Testing/Implementations/TestSocket.cs b/CryptoExchange.Net/Testing/Implementations/TestSocket.cs index b746ac7..b2d2f3b 100644 --- a/CryptoExchange.Net/Testing/Implementations/TestSocket.cs +++ b/CryptoExchange.Net/Testing/Implementations/TestSocket.cs @@ -2,6 +2,7 @@ using System.Net.WebSockets; using System.Text; using System.Text.Json; +using System.Threading; using System.Threading.Tasks; using CryptoExchange.Net.Interfaces; using CryptoExchange.Net.Objects; @@ -50,7 +51,7 @@ namespace CryptoExchange.Net.Testing.Implementations } } - public Task ConnectAsync() + public Task ConnectAsync(CancellationToken ct) { Connected = CanConnect; return Task.FromResult(CanConnect ? new CallResult(null) : new CallResult(new CantConnectError())); diff --git a/CryptoExchange.Net/Testing/SocketSubscriptionValidator.cs b/CryptoExchange.Net/Testing/SocketSubscriptionValidator.cs index 9e9ca41..f3bdee2 100644 --- a/CryptoExchange.Net/Testing/SocketSubscriptionValidator.cs +++ b/CryptoExchange.Net/Testing/SocketSubscriptionValidator.cs @@ -98,9 +98,9 @@ namespace CryptoExchange.Net.Testing { var task = methodInvoke(_client, x => { update = x.Data; }); } - catch(Exception ex) + catch(Exception) { - + throw; } var replaceValues = new Dictionary();