1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-06-07 16:06:15 +00:00

Fixed exception when disposing client in reconnecting state

This commit is contained in:
Jan Korf 2022-05-22 11:35:39 +02:00
parent 4b44861e43
commit fb90d1e015

View File

@ -141,6 +141,8 @@ namespace CryptoExchange.Net.Sockets
private readonly List<PendingRequest> pendingRequests;
private Task? _socketProcessReconnectTask;
private SocketStatus _status;
/// <summary>
/// The underlying websocket
/// </summary>
@ -217,10 +219,24 @@ namespace CryptoExchange.Net.Sockets
subscription.CancellationTokenRegistration.Value.Dispose();
}
}
if (_status == SocketStatus.Reconnecting)
{
// Wait for reconnect task to finish
log.Write(LogLevel.Trace, "In reconnecting state, waiting for reconnecting to end");
if (_socketProcessReconnectTask != null)
await _socketProcessReconnectTask.ConfigureAwait(false);
await _socket.CloseAsync().ConfigureAwait(false);
}
else
{
// Close before waiting for process task to finish
await _socket.CloseAsync().ConfigureAwait(false);
if (_socketProcessReconnectTask != null)
await _socketProcessReconnectTask.ConfigureAwait(false);
}
_socket.Dispose();
}
@ -232,7 +248,7 @@ namespace CryptoExchange.Net.Sockets
/// <returns></returns>
public async Task CloseAsync(SocketSubscription subscription)
{
if (!_socket.IsOpen)
if (!_socket.IsOpen || _status == SocketStatus.Disposed)
return;
if (subscription.CancellationTokenRegistration.HasValue)
@ -255,9 +271,11 @@ namespace CryptoExchange.Net.Sockets
private void StartProcessingTask()
{
log.Write(LogLevel.Trace, $"Starting {SocketId} process/reconnect task");
_status = SocketStatus.Processing;
_socketProcessReconnectTask = Task.Run(async () =>
{
await _socket.ProcessAsync().ConfigureAwait(false);
_status = SocketStatus.Reconnecting;
await ReconnectAsync().ConfigureAwait(false);
log.Write(LogLevel.Trace, $"Process/reconnect {SocketId} task finished");
});
@ -396,6 +414,7 @@ namespace CryptoExchange.Net.Sockets
/// </summary>
public void Dispose()
{
_status = SocketStatus.Disposed;
_socket.Dispose();
}
@ -458,7 +477,7 @@ namespace CryptoExchange.Net.Sockets
var total = DateTime.UtcNow - timestamp;
if (userProcessTime.TotalMilliseconds > 500)
log.Write(LogLevel.Debug, $"Socket {SocketId} message processing slow ({(int)total.TotalMilliseconds}ms), consider offloading data handling to another thread. " +
log.Write(LogLevel.Debug, $"Socket {SocketId} message processing slow ({(int)total.TotalMilliseconds}ms, {(int)userProcessTime.TotalMilliseconds}ms user code), consider offloading data handling to another thread. " +
"Data from this socket may arrive late or not at all if message processing is continuously slow.");
log.Write(LogLevel.Trace, $"Socket {SocketId} message processed in {(int)total.TotalMilliseconds}ms, ({(int)userProcessTime.TotalMilliseconds}ms user code)");
@ -671,5 +690,13 @@ namespace CryptoExchange.Net.Sockets
return await socketClient.SubscribeAndWaitAsync(this, socketSubscription.Request!, socketSubscription).ConfigureAwait(false);
}
private enum SocketStatus
{
None,
Processing,
Reconnecting,
Disposed
}
}
}