From 91e33cc42c5725aece765b6c8f6a7f35ab87a80e Mon Sep 17 00:00:00 2001 From: JKorf Date: Thu, 7 Jul 2022 22:17:55 +0200 Subject: [PATCH] wip --- CryptoExchange.Net/Interfaces/IWebsocket.cs | 20 +- .../Sockets/CryptoExchangeWebSocketClient.cs | 262 ++++++++------- .../Sockets/SocketConnection.cs | 311 ++++++++++-------- 3 files changed, 328 insertions(+), 265 deletions(-) diff --git a/CryptoExchange.Net/Interfaces/IWebsocket.cs b/CryptoExchange.Net/Interfaces/IWebsocket.cs index 9d40d86..0b8eebc 100644 --- a/CryptoExchange.Net/Interfaces/IWebsocket.cs +++ b/CryptoExchange.Net/Interfaces/IWebsocket.cs @@ -27,6 +27,14 @@ namespace CryptoExchange.Net.Interfaces /// Websocket opened event /// event Action OnOpen; + /// + /// Websocket has lost connection to the server and is attempting to reconnect + /// + event Action OnReconnecting; + /// + /// Websocket has reconnected to the server + /// + event Action OnReconnected; /// /// Unique id for this socket @@ -89,21 +97,17 @@ namespace CryptoExchange.Net.Interfaces /// Connect the socket /// /// - Task ConnectAsync(); - /// - /// Receive and send messages over the connection. Resulting task should complete when closing the socket. - /// - /// - Task ProcessAsync(); + Task ConnectAsync(); /// /// Send data /// /// void Send(string data); /// - /// Reset socket when a connection is lost to prepare for a new connection + /// Reconnect the socket /// - void Reset(); + /// + Task ReconnectAsync(); /// /// Close the connection /// diff --git a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs index 92f5670..c5cc984 100644 --- a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs +++ b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs @@ -22,6 +22,22 @@ namespace CryptoExchange.Net.Sockets /// public class CryptoExchangeWebSocketClient : IWebsocket { + // TODO keep the same ID's for subscriptions/sockets when reconnecting + enum ProcessState + { + Idle, + Processing, + WaitingForClose, + Reconnecting + } + + enum CloseState + { + Idle, + Closing, + Closed + } + internal static int lastStreamId; private static readonly object streamIdLock = new(); @@ -35,8 +51,13 @@ namespace CryptoExchange.Net.Sockets private readonly List _outgoingMessages; private DateTime _lastReceivedMessagesUpdate; - private bool _closed; + private Task? _processTask; + private Task? _closeTask; + private bool _stopRequested; private bool _disposed; + private ProcessState _processState; + //private CloseState _closeState; + private SemaphoreSlim _closeSem; /// /// Received messages, the size and the timstamp @@ -53,23 +74,6 @@ namespace CryptoExchange.Net.Sockets /// protected Log log; - /// - /// Handlers for when an error happens on the socket - /// - protected readonly List> errorHandlers = new(); - /// - /// Handlers for when the socket connection is opened - /// - protected readonly List openHandlers = new(); - /// - /// Handlers for when the connection is closed - /// - protected readonly List closeHandlers = new(); - /// - /// Handlers for when a message is received - /// - protected readonly List> messageHandlers = new(); - /// public int Id { get; } @@ -146,32 +150,17 @@ namespace CryptoExchange.Net.Sockets } /// - public event Action OnClose - { - add => closeHandlers.Add(value); - remove => closeHandlers.Remove(value); - } - + public event Action? OnClose; /// - public event Action OnMessage - { - add => messageHandlers.Add(value); - remove => messageHandlers.Remove(value); - } - + public event Action? OnMessage; /// - public event Action OnError - { - add => errorHandlers.Add(value); - remove => errorHandlers.Remove(value); - } - + public event Action? OnError; /// - public event Action OnOpen - { - add => openHandlers.Add(value); - remove => openHandlers.Remove(value); - } + public event Action? OnOpen; + /// + public event Action? OnReconnecting; + /// + public event Action? OnReconnected; /// /// ctor @@ -204,6 +193,7 @@ namespace CryptoExchange.Net.Sockets _ctsSource = new CancellationTokenSource(); _receivedMessagesLock = new object(); + _closeSem = new SemaphoreSlim(1, 1); _socket = CreateSocket(); } @@ -228,35 +218,81 @@ namespace CryptoExchange.Net.Sockets /// public virtual async Task ConnectAsync() + { + if (!await ConnectInternalAsync().ConfigureAwait(false)) + return false; + + OnOpen?.Invoke(); + _processTask = ProcessAsync(); + return true; + } + + private async Task ConnectInternalAsync() { log.Write(LogLevel.Debug, $"Socket {Id} connecting"); try { - using CancellationTokenSource tcs = new(TimeSpan.FromSeconds(10)); + using CancellationTokenSource tcs = new(TimeSpan.FromSeconds(10)); await _socket.ConnectAsync(Uri, tcs.Token).ConfigureAwait(false); - - Handle(openHandlers); } catch (Exception e) { log.Write(LogLevel.Debug, $"Socket {Id} connection failed: " + e.ToLogString()); return false; } - + log.Write(LogLevel.Debug, $"Socket {Id} connected to {Uri}"); return true; } /// - public virtual async Task ProcessAsync() + private async Task ProcessAsync() { - log.Write(LogLevel.Trace, $"Socket {Id} ProcessAsync started"); - var sendTask = SendLoopAsync(); - var receiveTask = ReceiveLoopAsync(); - var timeoutTask = Timeout != default ? CheckTimeoutAsync() : Task.CompletedTask; - log.Write(LogLevel.Trace, $"Socket {Id} processing startup completed"); - await Task.WhenAll(sendTask, receiveTask, timeoutTask).ConfigureAwait(false); - log.Write(LogLevel.Trace, $"Socket {Id} ProcessAsync finished"); + while (!_stopRequested) + { + log.Write(LogLevel.Trace, $"Socket {Id} ProcessAsync started"); + _processState = ProcessState.Processing; + var sendTask = SendLoopAsync(); + var receiveTask = ReceiveLoopAsync(); + var timeoutTask = Timeout != default ? CheckTimeoutAsync() : Task.CompletedTask; + await Task.WhenAll(sendTask, receiveTask, timeoutTask).ConfigureAwait(false); + log.Write(LogLevel.Trace, $"Socket {Id} ProcessAsync finished"); + + _processState = ProcessState.WaitingForClose; + while (_closeTask == null) + await Task.Delay(50).ConfigureAwait(false); + + await _closeTask.ConfigureAwait(false); + _closeTask = null; + //_closeState = CloseState.Idle; + + if (!_stopRequested) + { + _processState = ProcessState.Reconnecting; + OnReconnecting?.Invoke(); + } + + while (!_stopRequested) + { + log.Write(LogLevel.Trace, $"Socket {Id} attempting to reconnect"); + _socket = CreateSocket(); + _ctsSource.Dispose(); + _ctsSource = new CancellationTokenSource(); + while (_sendBuffer.TryDequeue(out _)) { } // Clear send buffer + + var connected = await ConnectInternalAsync().ConfigureAwait(false); + if (!connected) + { + await Task.Delay(5000).ConfigureAwait(false); + continue; + } + + OnReconnected?.Invoke(); + break; + } + } + + _processState = ProcessState.Idle; } /// @@ -271,23 +307,62 @@ namespace CryptoExchange.Net.Sockets _sendEvent.Set(); } + /// + public virtual async Task ReconnectAsync() + { + if (_processState != ProcessState.Processing) + return; + + log.Write(LogLevel.Debug, $"Socket {Id} reconnecting"); + _closeTask = CloseInternalAsync(); + await _closeTask.ConfigureAwait(false); + } + /// public virtual async Task CloseAsync() { - log.Write(LogLevel.Debug, $"Socket {Id} closing"); - await CloseInternalAsync().ConfigureAwait(false); + await _closeSem.WaitAsync().ConfigureAwait(false); + try + { + if (_closeTask != null && !_closeTask.IsCompleted) + { + log.Write(LogLevel.Debug, $"Socket {Id} CloseAsync() waiting for existing close task"); + await _closeTask.ConfigureAwait(false); + return; + } + + if (!IsOpen) + { + log.Write(LogLevel.Debug, $"Socket {Id} CloseAsync() socket not open"); + return; + } + + log.Write(LogLevel.Debug, $"Socket {Id} closing"); + _stopRequested = true; + + _closeTask = CloseInternalAsync(); + } + finally + { + _closeSem.Release(); + } + + await _closeTask.ConfigureAwait(false); + await _processTask!.ConfigureAwait(false); + OnClose?.Invoke(); + log.Write(LogLevel.Debug, $"Socket {Id} closed"); } - + /// /// Internal close method /// /// private async Task CloseInternalAsync() { - if (_closed || _disposed) + if (_disposed) return; - _closed = true; + //_closeState = CloseState.Closing; _ctsSource.Cancel(); _sendEvent.Set(); @@ -309,8 +384,6 @@ namespace CryptoExchange.Net.Sockets catch (Exception) { } // Can sometimes throw an exception when socket is in aborted state due to timing } - log.Write(LogLevel.Debug, $"Socket {Id} closed"); - Handle(closeHandlers); } /// @@ -325,28 +398,9 @@ namespace CryptoExchange.Net.Sockets _disposed = true; _socket.Dispose(); _ctsSource.Dispose(); - - errorHandlers.Clear(); - openHandlers.Clear(); - closeHandlers.Clear(); - messageHandlers.Clear(); log.Write(LogLevel.Trace, $"Socket {Id} disposed"); } - - /// - public void Reset() - { - log.Write(LogLevel.Debug, $"Socket {Id} resetting"); - _ctsSource = new CancellationTokenSource(); - - while (_sendBuffer.TryDequeue(out _)) { } // Clear send buffer - - _socket = CreateSocket(); - if (_proxy != null) - SetProxy(_proxy); - _closed = false; - } - + /// /// Create the socket object /// @@ -362,6 +416,8 @@ namespace CryptoExchange.Net.Sockets socket.Options.SetRequestHeader(header.Key, header.Value); socket.Options.KeepAliveInterval = KeepAliveInterval; socket.Options.SetBuffer(65536, 65536); // Setting it to anything bigger than 65536 throws an exception in .net framework + if (_proxy != null) + SetProxy(_proxy); return socket; } @@ -412,9 +468,9 @@ namespace CryptoExchange.Net.Sockets } catch (Exception ioe) { - // Connection closed unexpectedly, .NET framework - Handle(errorHandlers, ioe); - await CloseInternalAsync().ConfigureAwait(false); + // Connection closed unexpectedly, .NET framework + OnError?.Invoke(ioe); + _closeTask = CloseInternalAsync(); break; } } @@ -425,7 +481,7 @@ namespace CryptoExchange.Net.Sockets // Because this is running in a separate task and not awaited until the socket gets closed // any exception here will crash the send processing, but do so silently unless the socket get's stopped. // Make sure we at least let the owner know there was an error - Handle(errorHandlers, e); + OnError?.Invoke(e); throw; } finally @@ -468,9 +524,9 @@ namespace CryptoExchange.Net.Sockets } catch (Exception wse) { - // Connection closed unexpectedly - Handle(errorHandlers, wse); - await CloseInternalAsync().ConfigureAwait(false); + // Connection closed unexpectedly + OnError?.Invoke(wse); + _closeTask = CloseInternalAsync(); break; } @@ -478,7 +534,7 @@ namespace CryptoExchange.Net.Sockets { // Connection closed unexpectedly log.Write(LogLevel.Debug, $"Socket {Id} received `Close` message"); - await CloseInternalAsync().ConfigureAwait(false); + _closeTask = CloseInternalAsync(); break; } @@ -543,7 +599,7 @@ namespace CryptoExchange.Net.Sockets // Because this is running in a separate task and not awaited until the socket gets closed // any exception here will crash the receive processing, but do so silently unless the socket gets stopped. // Make sure we at least let the owner know there was an error - Handle(errorHandlers, e); + OnError?.Invoke(e); throw; } finally @@ -597,7 +653,7 @@ namespace CryptoExchange.Net.Sockets try { - Handle(messageHandlers, strData); + OnMessage?.Invoke(strData); } catch(Exception e) { @@ -641,35 +697,11 @@ namespace CryptoExchange.Net.Sockets // Because this is running in a separate task and not awaited until the socket gets closed // any exception here will stop the timeout checking, but do so silently unless the socket get's stopped. // Make sure we at least let the owner know there was an error - Handle(errorHandlers, e); + OnError?.Invoke(e); throw; } } - /// - /// Helper to invoke handlers - /// - /// - protected void Handle(List handlers) - { - LastActionTime = DateTime.UtcNow; - foreach (var handle in new List(handlers)) - handle?.Invoke(); - } - - /// - /// Helper to invoke handlers - /// - /// - /// - /// - protected void Handle(List> handlers, T data) - { - LastActionTime = DateTime.UtcNow; - foreach (var handle in new List>(handlers)) - handle?.Invoke(data); - } - /// /// Get the next identifier /// diff --git a/CryptoExchange.Net/Sockets/SocketConnection.cs b/CryptoExchange.Net/Sockets/SocketConnection.cs index 3e61a1d..a143448 100644 --- a/CryptoExchange.Net/Sockets/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/SocketConnection.cs @@ -134,14 +134,10 @@ namespace CryptoExchange.Net.Sockets private readonly List subscriptions; private readonly object subscriptionLock = new(); - private bool lostTriggered; private readonly Log log; private readonly BaseSocketClient socketClient; private readonly List pendingRequests; - private Task? _socketProcessTask; - private Task? _socketReconnectTask; - private readonly AsyncResetEvent _reconnectWaitEvent; private SocketStatus _status; @@ -153,6 +149,9 @@ namespace CryptoExchange.Net.Sockets get => _status; private set { + if (_status == value) + return; + var oldStatus = _status; _status = value; log.Write(LogLevel.Trace, $"Socket {SocketId} status changed from {oldStatus} to {_status}"); @@ -181,13 +180,51 @@ namespace CryptoExchange.Net.Sockets subscriptions = new List(); _socket = socket; - _reconnectWaitEvent = new AsyncResetEvent(false, true); _socket.Timeout = client.ClientOptions.SocketNoDataTimeout; _socket.OnMessage += ProcessMessage; _socket.OnOpen += SocketOnOpen; - _socket.OnClose += () => _reconnectWaitEvent.Set(); + _socket.OnClose += HandleClose; + _socket.OnReconnecting += HandleReconnecting; + _socket.OnReconnected += HandleReconnected; + } + private void HandleClose() + { + Status = SocketStatus.Closed; + ConnectionClosed?.Invoke(); + } + + private void HandleReconnecting() + { + Status = SocketStatus.Reconnecting; + DisconnectTime = DateTime.UtcNow; + Task.Run(() => ConnectionLost?.Invoke()); + } + + private async void HandleReconnected() + { + log.Write(LogLevel.Debug, "Socket reconnected, processing"); + + lock (pendingRequests) + { + foreach (var pendingRequest in pendingRequests.ToList()) + { + pendingRequest.Fail(); + pendingRequests.Remove(pendingRequest); + } + } + + // TODO Track amount of failed reconencts and failed resubscriptions + + var reconnectSuccessful = await ProcessReconnectAsync().ConfigureAwait(false); + if (!reconnectSuccessful) + await _socket.ReconnectAsync().ConfigureAwait(false); + else + { + Status = SocketStatus.Connected; + ConnectionRestored?.Invoke(DateTime.UtcNow - DisconnectTime!.Value); + } } /// @@ -196,15 +233,7 @@ namespace CryptoExchange.Net.Sockets /// public async Task ConnectAsync() { - var connected = await _socket.ConnectAsync().ConfigureAwait(false); - if (connected) - { - Status = SocketStatus.Connected; - _socketReconnectTask = ReconnectWatcherAsync(); - _socketProcessTask = _socket.ProcessAsync(); - } - - return connected; + return await _socket.ConnectAsync().ConfigureAwait(false); } /// @@ -222,7 +251,7 @@ namespace CryptoExchange.Net.Sockets /// public async Task TriggerReconnectAsync() { - await _socket.CloseAsync().ConfigureAwait(false); + await _socket.ReconnectAsync().ConfigureAwait(false); } /// @@ -252,8 +281,6 @@ namespace CryptoExchange.Net.Sockets await Task.Delay(100).ConfigureAwait(false); await _socket.CloseAsync().ConfigureAwait(false); - if(_socketProcessTask != null) - await _socketProcessTask.ConfigureAwait(false); _socket.Dispose(); } @@ -298,134 +325,133 @@ namespace CryptoExchange.Net.Sockets subscriptions.Remove(subscription); } - private async Task ReconnectAsync() - { - // Fail all pending requests - lock (pendingRequests) - { - foreach (var pendingRequest in pendingRequests.ToList()) - { - pendingRequest.Fail(); - pendingRequests.Remove(pendingRequest); - } - } + //private async Task ReconnectAsync() + //{ + // // Fail all pending requests + // lock (pendingRequests) + // { + // foreach (var pendingRequest in pendingRequests.ToList()) + // { + // pendingRequest.Fail(); + // pendingRequests.Remove(pendingRequest); + // } + // } - if (socketClient.ClientOptions.AutoReconnect && ShouldReconnect) - { - // Should reconnect - DisconnectTime = DateTime.UtcNow; - log.Write(LogLevel.Warning, $"Socket {SocketId} Connection lost, will try to reconnect"); - if (!lostTriggered) - { - lostTriggered = true; - _ = Task.Run(() => ConnectionLost?.Invoke()); - } + // if (socketClient.ClientOptions.AutoReconnect && ShouldReconnect) + // { + // // Should reconnect + // DisconnectTime = DateTime.UtcNow; + // log.Write(LogLevel.Warning, $"Socket {SocketId} Connection lost, will try to reconnect"); + // if (!lostTriggered) + // { + // lostTriggered = true; + // _ = Task.Run(() => ConnectionLost?.Invoke()); + // } - while (ShouldReconnect) - { - if (ReconnectTry > 0) - { - // Wait a bit before attempting reconnect - await Task.Delay(socketClient.ClientOptions.ReconnectInterval).ConfigureAwait(false); - } + // while (ShouldReconnect) + // { + // if (ReconnectTry > 0) + // { + // // Wait a bit before attempting reconnect + // await Task.Delay(socketClient.ClientOptions.ReconnectInterval).ConfigureAwait(false); + // } - if (!ShouldReconnect) - { - // Should reconnect changed to false while waiting to reconnect - return; - } + // if (!ShouldReconnect) + // { + // // Should reconnect changed to false while waiting to reconnect + // return; + // } - _socket.Reset(); - if (!await _socket.ConnectAsync().ConfigureAwait(false)) - { - // Reconnect failed - ReconnectTry++; - ResubscribeTry = 0; - if (socketClient.ClientOptions.MaxReconnectTries != null - && ReconnectTry >= socketClient.ClientOptions.MaxReconnectTries) - { - log.Write(LogLevel.Warning, $"Socket {SocketId} failed to reconnect after {ReconnectTry} tries, closing"); - ShouldReconnect = false; + // _socket.Reset(); + // if (!await _socket.ConnectAsync().ConfigureAwait(false)) + // { + // // Reconnect failed + // ReconnectTry++; + // ResubscribeTry = 0; + // if (socketClient.ClientOptions.MaxReconnectTries != null + // && ReconnectTry >= socketClient.ClientOptions.MaxReconnectTries) + // { + // log.Write(LogLevel.Warning, $"Socket {SocketId} failed to reconnect after {ReconnectTry} tries, closing"); + // ShouldReconnect = false; - if (socketClient.socketConnections.ContainsKey(SocketId)) - socketClient.socketConnections.TryRemove(SocketId, out _); + // if (socketClient.socketConnections.ContainsKey(SocketId)) + // socketClient.socketConnections.TryRemove(SocketId, out _); - _ = Task.Run(() => ConnectionClosed?.Invoke()); - // Reached max tries, break loop and leave connection closed - break; - } + // _ = Task.Run(() => ConnectionClosed?.Invoke()); + // // Reached max tries, break loop and leave connection closed + // break; + // } - // Continue to try again - log.Write(LogLevel.Debug, $"Socket {SocketId} failed to reconnect{(socketClient.ClientOptions.MaxReconnectTries != null ? $", try {ReconnectTry}/{socketClient.ClientOptions.MaxReconnectTries}" : "")}, will try again in {socketClient.ClientOptions.ReconnectInterval}"); - continue; - } + // // Continue to try again + // log.Write(LogLevel.Debug, $"Socket {SocketId} failed to reconnect{(socketClient.ClientOptions.MaxReconnectTries != null ? $", try {ReconnectTry}/{socketClient.ClientOptions.MaxReconnectTries}" : "")}, will try again in {socketClient.ClientOptions.ReconnectInterval}"); + // continue; + // } - // Successfully reconnected, start processing - Status = SocketStatus.Connected; - _socketProcessTask = _socket.ProcessAsync(); + // // Successfully reconnected, start processing + // Status = SocketStatus.Connected; - ReconnectTry = 0; - var time = DisconnectTime; - DisconnectTime = null; + // ReconnectTry = 0; + // var time = DisconnectTime; + // DisconnectTime = null; - log.Write(LogLevel.Information, $"Socket {SocketId} reconnected after {DateTime.UtcNow - time}"); + // log.Write(LogLevel.Information, $"Socket {SocketId} reconnected after {DateTime.UtcNow - time}"); - var reconnectResult = await ProcessReconnectAsync().ConfigureAwait(false); - if (!reconnectResult) - { - // Failed to resubscribe everything - ResubscribeTry++; - DisconnectTime = time; + // var reconnectResult = await ProcessReconnectAsync().ConfigureAwait(false); + // if (!reconnectResult) + // { + // // Failed to resubscribe everything + // ResubscribeTry++; + // DisconnectTime = time; - if (socketClient.ClientOptions.MaxResubscribeTries != null && - ResubscribeTry >= socketClient.ClientOptions.MaxResubscribeTries) - { - log.Write(LogLevel.Warning, $"Socket {SocketId} failed to resubscribe after {ResubscribeTry} tries, closing. Last resubscription error: {reconnectResult.Error}"); - ShouldReconnect = false; + // if (socketClient.ClientOptions.MaxResubscribeTries != null && + // ResubscribeTry >= socketClient.ClientOptions.MaxResubscribeTries) + // { + // log.Write(LogLevel.Warning, $"Socket {SocketId} failed to resubscribe after {ResubscribeTry} tries, closing. Last resubscription error: {reconnectResult.Error}"); + // ShouldReconnect = false; - if (socketClient.socketConnections.ContainsKey(SocketId)) - socketClient.socketConnections.TryRemove(SocketId, out _); + // if (socketClient.socketConnections.ContainsKey(SocketId)) + // socketClient.socketConnections.TryRemove(SocketId, out _); - _ = Task.Run(() => ConnectionClosed?.Invoke()); - } - else - log.Write(LogLevel.Debug, $"Socket {SocketId} resubscribing all subscriptions failed on reconnected socket{(socketClient.ClientOptions.MaxResubscribeTries != null ? $", try {ResubscribeTry}/{socketClient.ClientOptions.MaxResubscribeTries}" : "")}. Error: {reconnectResult.Error}. Disconnecting and reconnecting."); + // _ = Task.Run(() => ConnectionClosed?.Invoke()); + // } + // else + // log.Write(LogLevel.Debug, $"Socket {SocketId} resubscribing all subscriptions failed on reconnected socket{(socketClient.ClientOptions.MaxResubscribeTries != null ? $", try {ResubscribeTry}/{socketClient.ClientOptions.MaxResubscribeTries}" : "")}. Error: {reconnectResult.Error}. Disconnecting and reconnecting."); - // Failed resubscribe, close socket if it is still open - if (_socket.IsOpen) - await _socket.CloseAsync().ConfigureAwait(false); - else - DisconnectTime = DateTime.UtcNow; + // // Failed resubscribe, close socket if it is still open + // if (_socket.IsOpen) + // await _socket.CloseAsync().ConfigureAwait(false); + // else + // DisconnectTime = DateTime.UtcNow; - // Break out of the loop, the new processing task should reconnect again - break; - } - else - { - // Succesfully reconnected - log.Write(LogLevel.Information, $"Socket {SocketId} data connection restored."); - ResubscribeTry = 0; - if (lostTriggered) - { - lostTriggered = false; - _ = Task.Run(() => ConnectionRestored?.Invoke(time.HasValue ? DateTime.UtcNow - time.Value : TimeSpan.FromSeconds(0))); - } + // // Break out of the loop, the new processing task should reconnect again + // break; + // } + // else + // { + // // Succesfully reconnected + // log.Write(LogLevel.Information, $"Socket {SocketId} data connection restored."); + // ResubscribeTry = 0; + // if (lostTriggered) + // { + // lostTriggered = false; + // _ = Task.Run(() => ConnectionRestored?.Invoke(time.HasValue ? DateTime.UtcNow - time.Value : TimeSpan.FromSeconds(0))); + // } - break; - } - } - } - else - { - if (!socketClient.ClientOptions.AutoReconnect && ShouldReconnect) - _ = Task.Run(() => ConnectionClosed?.Invoke()); + // break; + // } + // } + // } + // else + // { + // if (!socketClient.ClientOptions.AutoReconnect && ShouldReconnect) + // _ = Task.Run(() => ConnectionClosed?.Invoke()); - // No reconnecting needed - log.Write(LogLevel.Information, $"Socket {SocketId} closed"); - if (socketClient.socketConnections.ContainsKey(SocketId)) - socketClient.socketConnections.TryRemove(SocketId, out _); - } - } + // // No reconnecting needed + // log.Write(LogLevel.Information, $"Socket {SocketId} closed"); + // if (socketClient.socketConnections.ContainsKey(SocketId)) + // socketClient.socketConnections.TryRemove(SocketId, out _); + // } + //} /// /// Dispose the connection @@ -654,25 +680,26 @@ namespace CryptoExchange.Net.Sockets /// protected virtual void SocketOnOpen() { + Status = SocketStatus.Connected; ReconnectTry = 0; PausedActivity = false; } - private async Task ReconnectWatcherAsync() - { - while (true) - { - await _reconnectWaitEvent.WaitAsync().ConfigureAwait(false); - if (!ShouldReconnect) - return; + //private async Task ReconnectWatcherAsync() + //{ + // while (true) + // { + // await _reconnectWaitEvent.WaitAsync().ConfigureAwait(false); + // if (!ShouldReconnect) + // return; - Status = SocketStatus.Reconnecting; - await ReconnectAsync().ConfigureAwait(false); + // Status = SocketStatus.Reconnecting; + // await ReconnectAsync().ConfigureAwait(false); - if (!ShouldReconnect) - return; - } - } + // if (!ShouldReconnect) + // return; + // } + //} private async Task> ProcessReconnectAsync() { @@ -705,7 +732,7 @@ namespace CryptoExchange.Net.Sockets var taskList = new List>>(); foreach (var subscription in subscriptionList.Skip(i).Take(socketClient.ClientOptions.MaxConcurrentResubscriptionsPerSocket)) - taskList.Add(socketClient.SubscribeAndWaitAsync(this, subscription.Request!, subscription)); + taskList.Add(socketClient.SubscribeAndWaitAsync(this, subscription.Request!, subscription)); await Task.WhenAll(taskList).ConfigureAwait(false); if (taskList.Any(t => !t.Result.Success))