1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-06-07 16:06:15 +00:00

Improved closing logic websockets

This commit is contained in:
JKorf 2024-08-25 18:38:37 +02:00
parent 93e4722a81
commit 3e6bdaafc6
2 changed files with 76 additions and 30 deletions

View File

@ -25,6 +25,7 @@ namespace CryptoExchange.Net.Logging.Extensions
private static readonly Action<ILogger, int, string, Exception?> _sendLoopStoppedWithException;
private static readonly Action<ILogger, int, Exception?> _sendLoopFinished;
private static readonly Action<ILogger, int, string, string ,Exception?> _receivedCloseMessage;
private static readonly Action<ILogger, int, string, string ,Exception?> _receivedCloseConfirmation;
private static readonly Action<ILogger, int, int, Exception?> _receivedPartialMessage;
private static readonly Action<ILogger, int, int, Exception?> _receivedSingleMessage;
private static readonly Action<ILogger, int, long, Exception?> _reassembledMessage;
@ -33,6 +34,7 @@ namespace CryptoExchange.Net.Logging.Extensions
private static readonly Action<ILogger, int, Exception?> _receiveLoopFinished;
private static readonly Action<ILogger, int, TimeSpan?, Exception?> _startingTaskForNoDataReceivedCheck;
private static readonly Action<ILogger, int, TimeSpan?, Exception?> _noDataReceiveTimoutReconnect;
private static readonly Action<ILogger, int, string, string, Exception?> _socketProcessingStateChanged;
static CryptoExchangeWebSocketClientLoggingExtension()
{
@ -170,6 +172,17 @@ namespace CryptoExchange.Net.Logging.Extensions
LogLevel.Debug,
new EventId(1027, "NoDataReceiveTimeoutReconnect"),
"[Sckt {SocketId}] no data received for {Timeout}, reconnecting socket");
_receivedCloseConfirmation = LoggerMessage.Define<int, string, string>(
LogLevel.Debug,
new EventId(1028, "ReceivedCloseMessage"),
"[Sckt {SocketId}] received `Close` message confirming our close request, CloseStatus: {CloseStatus}, CloseStatusDescription: {CloseStatusDescription}");
_socketProcessingStateChanged = LoggerMessage.Define<int, string, string>(
LogLevel.Trace,
new EventId(1028, "SocketProcessingStateChanged"),
"[Sckt {Id}] processing state change: {PreviousState} -> {NewState}");
}
public static void SocketConnecting(
@ -286,6 +299,12 @@ namespace CryptoExchange.Net.Logging.Extensions
_receivedCloseMessage(logger, socketId, webSocketCloseStatus, closeStatusDescription, null);
}
public static void SocketReceivedCloseConfirmation(
this ILogger logger, int socketId, string webSocketCloseStatus, string closeStatusDescription)
{
_receivedCloseConfirmation(logger, socketId, webSocketCloseStatus, closeStatusDescription, null);
}
public static void SocketReceivedPartialMessage(
this ILogger logger, int socketId, int countBytes)
{
@ -333,5 +352,11 @@ namespace CryptoExchange.Net.Logging.Extensions
{
_noDataReceiveTimoutReconnect(logger, socketId, timeSpan, null);
}
public static void SocketProcessingStateChanged(
this ILogger logger, int socketId, string prevState, string newState)
{
_socketProcessingStateChanged(logger, socketId, prevState, newState, null);
}
}
}

View File

@ -233,14 +233,14 @@ namespace CryptoExchange.Net.Sockets
while (!_stopRequested)
{
_logger.SocketStartingProcessing(Id);
_processState = ProcessState.Processing;
SetProcessState(ProcessState.Processing);
var sendTask = SendLoopAsync();
var receiveTask = ReceiveLoopAsync();
var timeoutTask = Parameters.Timeout != null && Parameters.Timeout > TimeSpan.FromSeconds(0) ? CheckTimeoutAsync() : Task.CompletedTask;
await Task.WhenAll(sendTask, receiveTask, timeoutTask).ConfigureAwait(false);
_logger.SocketFinishedProcessing(Id);
_processState = ProcessState.WaitingForClose;
SetProcessState(ProcessState.WaitingForClose);
while (_closeTask == null)
await Task.Delay(50).ConfigureAwait(false);
@ -250,14 +250,14 @@ namespace CryptoExchange.Net.Sockets
if (Parameters.ReconnectPolicy == ReconnectPolicy.Disabled)
{
_processState = ProcessState.Idle;
SetProcessState(ProcessState.Idle);
await (OnClose?.Invoke() ?? Task.CompletedTask).ConfigureAwait(false);
return;
}
if (!_stopRequested)
{
_processState = ProcessState.Reconnecting;
SetProcessState(ProcessState.Reconnecting);
await (OnReconnecting?.Invoke() ?? Task.CompletedTask).ConfigureAwait(false);
}
@ -296,12 +296,15 @@ namespace CryptoExchange.Net.Sockets
_reconnectAttempt = 0;
_lastReconnectTime = DateTime.UtcNow;
// Set to processing before reconnect handling
SetProcessState(ProcessState.Processing);
await (OnReconnected?.Invoke() ?? Task.CompletedTask).ConfigureAwait(false);
break;
}
}
_processState = ProcessState.Idle;
SetProcessState(ProcessState.Idle);
}
private TimeSpan GetReconnectDelay()
@ -391,34 +394,33 @@ namespace CryptoExchange.Net.Sockets
if (_disposed)
return;
_ctsSource.Cancel();
if (_socket.State == WebSocketState.Open)
{
try
{
await _socket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Closing", default).ConfigureAwait(false);
}
catch (Exception)
{
// Can sometimes throw an exception when socket is in aborted state due to timing
// Websocket is set to Aborted state when the cancelation token is set during SendAsync/ReceiveAsync
// So socket might go to aborted state, might still be open
}
}
else if(_socket.State == WebSocketState.CloseReceived)
{
try
if (_socket.State == WebSocketState.CloseReceived)
{
await _socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing", default).ConfigureAwait(false);
}
else if (_socket.State == WebSocketState.Open)
{
await _socket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Closing", default).ConfigureAwait(false);
var startWait = DateTime.UtcNow;
while (_socket.State != WebSocketState.Closed && _socket.State != WebSocketState.Aborted)
{
// Wait until we receive close confirmation
await Task.Delay(10).ConfigureAwait(false);
if (DateTime.UtcNow - startWait > TimeSpan.FromSeconds(5))
break; // Wait for max 5 seconds, then just abort the connection
}
}
}
catch (Exception)
{
// Can sometimes throw an exception when socket is in aborted state due to timing
// Websocket is set to Aborted state when the cancelation token is set during SendAsync/ReceiveAsync
// So socket might go to aborted state, might still be open
}
}
_ctsSource.Cancel();
}
/// <summary>
@ -565,10 +567,20 @@ namespace CryptoExchange.Net.Sockets
if (receiveResult.MessageType == WebSocketMessageType.Close)
{
// Connection closed unexpectedly
// Connection closed
if (_socket.State == WebSocketState.CloseReceived)
{
// Close received means it server initiated, we should send a confirmation and close the socket
_logger.SocketReceivedCloseMessage(Id, receiveResult.CloseStatus.ToString(), receiveResult.CloseStatusDescription);
if (_closeTask?.IsCompleted != false)
_closeTask = CloseInternalAsync();
}
else
{
// Means the socket is now closed and we were the one initiating it
_logger.SocketReceivedCloseConfirmation(Id, receiveResult.CloseStatus.ToString(), receiveResult.CloseStatusDescription);
}
break;
}
@ -758,6 +770,15 @@ namespace CryptoExchange.Net.Sockets
if (proxy.Login != null)
socket.Options.Proxy.Credentials = new NetworkCredential(proxy.Login, proxy.Password);
}
private void SetProcessState(ProcessState state)
{
if (_processState == state)
return;
_logger.SocketProcessingStateChanged(Id, _processState.ToString(), state.ToString());
_processState = state;
}
}
/// <summary>