From fb90d1e015bc56210d335e0b21eea99f044d39a2 Mon Sep 17 00:00:00 2001 From: Jan Korf Date: Sun, 22 May 2022 11:35:39 +0200 Subject: [PATCH] Fixed exception when disposing client in reconnecting state --- .../Sockets/SocketConnection.cs | 37 ++++++++++++++++--- 1 file changed, 32 insertions(+), 5 deletions(-) diff --git a/CryptoExchange.Net/Sockets/SocketConnection.cs b/CryptoExchange.Net/Sockets/SocketConnection.cs index 6ea4346..bbca717 100644 --- a/CryptoExchange.Net/Sockets/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/SocketConnection.cs @@ -141,6 +141,8 @@ namespace CryptoExchange.Net.Sockets private readonly List pendingRequests; private Task? _socketProcessReconnectTask; + private SocketStatus _status; + /// /// The underlying websocket /// @@ -217,10 +219,24 @@ namespace CryptoExchange.Net.Sockets subscription.CancellationTokenRegistration.Value.Dispose(); } } - await _socket.CloseAsync().ConfigureAwait(false); - if (_socketProcessReconnectTask != null) - await _socketProcessReconnectTask.ConfigureAwait(false); + if (_status == SocketStatus.Reconnecting) + { + // Wait for reconnect task to finish + log.Write(LogLevel.Trace, "In reconnecting state, waiting for reconnecting to end"); + if (_socketProcessReconnectTask != null) + await _socketProcessReconnectTask.ConfigureAwait(false); + + await _socket.CloseAsync().ConfigureAwait(false); + } + else + { + // Close before waiting for process task to finish + await _socket.CloseAsync().ConfigureAwait(false); + + if (_socketProcessReconnectTask != null) + await _socketProcessReconnectTask.ConfigureAwait(false); + } _socket.Dispose(); } @@ -232,7 +248,7 @@ namespace CryptoExchange.Net.Sockets /// public async Task CloseAsync(SocketSubscription subscription) { - if (!_socket.IsOpen) + if (!_socket.IsOpen || _status == SocketStatus.Disposed) return; if (subscription.CancellationTokenRegistration.HasValue) @@ -255,9 +271,11 @@ namespace CryptoExchange.Net.Sockets private void StartProcessingTask() { log.Write(LogLevel.Trace, $"Starting {SocketId} process/reconnect task"); + _status = SocketStatus.Processing; _socketProcessReconnectTask = Task.Run(async () => { await _socket.ProcessAsync().ConfigureAwait(false); + _status = SocketStatus.Reconnecting; await ReconnectAsync().ConfigureAwait(false); log.Write(LogLevel.Trace, $"Process/reconnect {SocketId} task finished"); }); @@ -396,6 +414,7 @@ namespace CryptoExchange.Net.Sockets /// public void Dispose() { + _status = SocketStatus.Disposed; _socket.Dispose(); } @@ -458,7 +477,7 @@ namespace CryptoExchange.Net.Sockets var total = DateTime.UtcNow - timestamp; if (userProcessTime.TotalMilliseconds > 500) - log.Write(LogLevel.Debug, $"Socket {SocketId} message processing slow ({(int)total.TotalMilliseconds}ms), consider offloading data handling to another thread. " + + log.Write(LogLevel.Debug, $"Socket {SocketId} message processing slow ({(int)total.TotalMilliseconds}ms, {(int)userProcessTime.TotalMilliseconds}ms user code), consider offloading data handling to another thread. " + "Data from this socket may arrive late or not at all if message processing is continuously slow."); log.Write(LogLevel.Trace, $"Socket {SocketId} message processed in {(int)total.TotalMilliseconds}ms, ({(int)userProcessTime.TotalMilliseconds}ms user code)"); @@ -671,5 +690,13 @@ namespace CryptoExchange.Net.Sockets return await socketClient.SubscribeAndWaitAsync(this, socketSubscription.Request!, socketSubscription).ConfigureAwait(false); } + + private enum SocketStatus + { + None, + Processing, + Reconnecting, + Disposed + } } }