diff --git a/CryptoExchange.Net/CryptoExchange.Net.xml b/CryptoExchange.Net/CryptoExchange.Net.xml index 37f0ea8..ba89938 100644 --- a/CryptoExchange.Net/CryptoExchange.Net.xml +++ b/CryptoExchange.Net/CryptoExchange.Net.xml @@ -3865,6 +3865,11 @@ The data to send + + + Handler for a socket opening + + Handler for a socket closing. Reconnects the socket if needed, or removes it from the active socket list if not diff --git a/CryptoExchange.Net/Sockets/SocketConnection.cs b/CryptoExchange.Net/Sockets/SocketConnection.cs index 8402edb..3def904 100644 --- a/CryptoExchange.Net/Sockets/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/SocketConnection.cs @@ -132,12 +132,7 @@ namespace CryptoExchange.Net.Sockets Socket.Timeout = client.SocketNoDataTimeout; Socket.OnMessage += ProcessMessage; Socket.OnClose += SocketOnClose; - Socket.OnOpen += () => - { - ReconnectTry = 0; - PausedActivity = false; - Connected = true; - }; + Socket.OnOpen += SocketOnOpen; } /// @@ -159,34 +154,36 @@ namespace CryptoExchange.Net.Sockets return; } - var messageEvent = new MessageEvent(this, tokenData, socketClient.OutputOriginalData ? data: null, timestamp); var handledResponse = false; PendingRequest[] requests; - lock(pendingRequests) - { + lock(pendingRequests) requests = pendingRequests.ToArray(); - } + + // Remove any timed out requests + foreach (var request in requests.Where(r => r.Completed)) + { + lock (pendingRequests) + pendingRequests.Remove(request); + } + // Check if this message is an answer on any pending requests foreach (var pendingRequest in requests) { - if (pendingRequest.Check(tokenData)) + if (pendingRequest.CheckData(tokenData)) { - lock (pendingRequests) - { + lock (pendingRequests) pendingRequests.Remove(pendingRequest); - } - if (pendingRequest.Result == null) - { - continue; // A previous timeout. - } + if (!socketClient.ContinueOnQueryResponse) return; + handledResponse = true; break; } } - + // Message was not a request response, check data handlers + var messageEvent = new MessageEvent(this, tokenData, socketClient.OutputOriginalData ? data: null, timestamp); if (!HandleData(messageEvent) && !handledResponse) { if (!socketClient.UnhandledMessageExpected) @@ -309,26 +306,45 @@ namespace CryptoExchange.Net.Sockets Socket.Send(data); } + /// + /// Handler for a socket opening + /// + protected virtual void SocketOnOpen() + { + ReconnectTry = 0; + PausedActivity = false; + Connected = true; + } + /// /// Handler for a socket closing. Reconnects the socket if needed, or removes it from the active socket list if not /// protected virtual void SocketOnClose() { + lock (pendingRequests) + { + foreach(var pendingRequest in pendingRequests.ToList()) + { + pendingRequest.Fail(); + pendingRequests.Remove(pendingRequest); + } + } + if (socketClient.AutoReconnect && ShouldReconnect) { if (Socket.Reconnecting) return; // Already reconnecting + Socket.Reconnecting = true; + DisconnectTime = DateTime.UtcNow; + log.Write(LogLevel.Information, $"Socket {Socket.Id} Connection lost, will try to reconnect after {socketClient.ReconnectInterval}"); if (!lostTriggered) { lostTriggered = true; ConnectionLost?.Invoke(); } - Socket.Reconnecting = true; - - log.Write(LogLevel.Information, $"Socket {Socket.Id} Connection lost, will try to reconnect after {socketClient.ReconnectInterval}"); Task.Run(async () => { while (ShouldReconnect) @@ -346,7 +362,8 @@ namespace CryptoExchange.Net.Sockets if (!await Socket.ConnectAsync().ConfigureAwait(false)) { ReconnectTry++; - if(socketClient.MaxReconnectTries != null + ResubscribeTry = 0; + if (socketClient.MaxReconnectTries != null && ReconnectTry >= socketClient.MaxReconnectTries) { log.Write(LogLevel.Debug, $"Socket {Socket.Id} failed to reconnect after {ReconnectTry} tries, closing"); @@ -390,10 +407,14 @@ namespace CryptoExchange.Net.Sockets else log.Write(LogLevel.Debug, $"Socket {Socket.Id} resubscribing all subscriptions failed on reconnected socket{(socketClient.MaxResubscribeTries != null ? $", try {ResubscribeTry}/{socketClient.MaxResubscribeTries}" : "")}. Disconnecting and reconnecting."); - await Socket.CloseAsync().ConfigureAwait(false); + if (Socket.IsOpen) + await Socket.CloseAsync().ConfigureAwait(false); + else + DisconnectTime = DateTime.UtcNow; } else { + log.Write(LogLevel.Debug, $"Socket {Socket.Id} data connection restored."); ResubscribeTry = 0; if (lostTriggered) { @@ -525,36 +546,39 @@ namespace CryptoExchange.Net.Sockets { public Func Handler { get; } public JToken? Result { get; private set; } + public bool Completed { get; private set; } public AsyncResetEvent Event { get; } public TimeSpan Timeout { get; } - private readonly DateTime startTime; + private CancellationTokenSource cts; public PendingRequest(Func handler, TimeSpan timeout) { Handler = handler; Event = new AsyncResetEvent(false, false); Timeout = timeout; - startTime = DateTime.UtcNow; + + cts = new CancellationTokenSource(timeout); + cts.Token.Register(Fail, false); } - public bool Check(JToken data) + public bool CheckData(JToken data) { if (Handler(data)) { Result = data; + Completed = true; Event.Set(); return true; - } - - if (DateTime.UtcNow - startTime > Timeout) - { - // Timed out - Event.Set(); - return true; - } + } return false; } + + public void Fail() + { + Completed = true; + Event.Set(); + } } }