diff --git a/CryptoExchange.Net/Interfaces/IWebsocket.cs b/CryptoExchange.Net/Interfaces/IWebsocket.cs
index 9d40d86..0b8eebc 100644
--- a/CryptoExchange.Net/Interfaces/IWebsocket.cs
+++ b/CryptoExchange.Net/Interfaces/IWebsocket.cs
@@ -27,6 +27,14 @@ namespace CryptoExchange.Net.Interfaces
/// Websocket opened event
///
event Action OnOpen;
+ ///
+ /// Websocket has lost connection to the server and is attempting to reconnect
+ ///
+ event Action OnReconnecting;
+ ///
+ /// Websocket has reconnected to the server
+ ///
+ event Action OnReconnected;
///
/// Unique id for this socket
@@ -89,21 +97,17 @@ namespace CryptoExchange.Net.Interfaces
/// Connect the socket
///
///
- Task ConnectAsync();
- ///
- /// Receive and send messages over the connection. Resulting task should complete when closing the socket.
- ///
- ///
- Task ProcessAsync();
+ Task ConnectAsync();
///
/// Send data
///
///
void Send(string data);
///
- /// Reset socket when a connection is lost to prepare for a new connection
+ /// Reconnect the socket
///
- void Reset();
+ ///
+ Task ReconnectAsync();
///
/// Close the connection
///
diff --git a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs
index 92f5670..c5cc984 100644
--- a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs
+++ b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs
@@ -22,6 +22,22 @@ namespace CryptoExchange.Net.Sockets
///
public class CryptoExchangeWebSocketClient : IWebsocket
{
+ // TODO keep the same ID's for subscriptions/sockets when reconnecting
+ enum ProcessState
+ {
+ Idle,
+ Processing,
+ WaitingForClose,
+ Reconnecting
+ }
+
+ enum CloseState
+ {
+ Idle,
+ Closing,
+ Closed
+ }
+
internal static int lastStreamId;
private static readonly object streamIdLock = new();
@@ -35,8 +51,13 @@ namespace CryptoExchange.Net.Sockets
private readonly List _outgoingMessages;
private DateTime _lastReceivedMessagesUpdate;
- private bool _closed;
+ private Task? _processTask;
+ private Task? _closeTask;
+ private bool _stopRequested;
private bool _disposed;
+ private ProcessState _processState;
+ //private CloseState _closeState;
+ private SemaphoreSlim _closeSem;
///
/// Received messages, the size and the timstamp
@@ -53,23 +74,6 @@ namespace CryptoExchange.Net.Sockets
///
protected Log log;
- ///
- /// Handlers for when an error happens on the socket
- ///
- protected readonly List> errorHandlers = new();
- ///
- /// Handlers for when the socket connection is opened
- ///
- protected readonly List openHandlers = new();
- ///
- /// Handlers for when the connection is closed
- ///
- protected readonly List closeHandlers = new();
- ///
- /// Handlers for when a message is received
- ///
- protected readonly List> messageHandlers = new();
-
///
public int Id { get; }
@@ -146,32 +150,17 @@ namespace CryptoExchange.Net.Sockets
}
///
- public event Action OnClose
- {
- add => closeHandlers.Add(value);
- remove => closeHandlers.Remove(value);
- }
-
+ public event Action? OnClose;
///
- public event Action OnMessage
- {
- add => messageHandlers.Add(value);
- remove => messageHandlers.Remove(value);
- }
-
+ public event Action? OnMessage;
///
- public event Action OnError
- {
- add => errorHandlers.Add(value);
- remove => errorHandlers.Remove(value);
- }
-
+ public event Action? OnError;
///
- public event Action OnOpen
- {
- add => openHandlers.Add(value);
- remove => openHandlers.Remove(value);
- }
+ public event Action? OnOpen;
+ ///
+ public event Action? OnReconnecting;
+ ///
+ public event Action? OnReconnected;
///
/// ctor
@@ -204,6 +193,7 @@ namespace CryptoExchange.Net.Sockets
_ctsSource = new CancellationTokenSource();
_receivedMessagesLock = new object();
+ _closeSem = new SemaphoreSlim(1, 1);
_socket = CreateSocket();
}
@@ -228,35 +218,81 @@ namespace CryptoExchange.Net.Sockets
///
public virtual async Task ConnectAsync()
+ {
+ if (!await ConnectInternalAsync().ConfigureAwait(false))
+ return false;
+
+ OnOpen?.Invoke();
+ _processTask = ProcessAsync();
+ return true;
+ }
+
+ private async Task ConnectInternalAsync()
{
log.Write(LogLevel.Debug, $"Socket {Id} connecting");
try
{
- using CancellationTokenSource tcs = new(TimeSpan.FromSeconds(10));
+ using CancellationTokenSource tcs = new(TimeSpan.FromSeconds(10));
await _socket.ConnectAsync(Uri, tcs.Token).ConfigureAwait(false);
-
- Handle(openHandlers);
}
catch (Exception e)
{
log.Write(LogLevel.Debug, $"Socket {Id} connection failed: " + e.ToLogString());
return false;
}
-
+
log.Write(LogLevel.Debug, $"Socket {Id} connected to {Uri}");
return true;
}
///
- public virtual async Task ProcessAsync()
+ private async Task ProcessAsync()
{
- log.Write(LogLevel.Trace, $"Socket {Id} ProcessAsync started");
- var sendTask = SendLoopAsync();
- var receiveTask = ReceiveLoopAsync();
- var timeoutTask = Timeout != default ? CheckTimeoutAsync() : Task.CompletedTask;
- log.Write(LogLevel.Trace, $"Socket {Id} processing startup completed");
- await Task.WhenAll(sendTask, receiveTask, timeoutTask).ConfigureAwait(false);
- log.Write(LogLevel.Trace, $"Socket {Id} ProcessAsync finished");
+ while (!_stopRequested)
+ {
+ log.Write(LogLevel.Trace, $"Socket {Id} ProcessAsync started");
+ _processState = ProcessState.Processing;
+ var sendTask = SendLoopAsync();
+ var receiveTask = ReceiveLoopAsync();
+ var timeoutTask = Timeout != default ? CheckTimeoutAsync() : Task.CompletedTask;
+ await Task.WhenAll(sendTask, receiveTask, timeoutTask).ConfigureAwait(false);
+ log.Write(LogLevel.Trace, $"Socket {Id} ProcessAsync finished");
+
+ _processState = ProcessState.WaitingForClose;
+ while (_closeTask == null)
+ await Task.Delay(50).ConfigureAwait(false);
+
+ await _closeTask.ConfigureAwait(false);
+ _closeTask = null;
+ //_closeState = CloseState.Idle;
+
+ if (!_stopRequested)
+ {
+ _processState = ProcessState.Reconnecting;
+ OnReconnecting?.Invoke();
+ }
+
+ while (!_stopRequested)
+ {
+ log.Write(LogLevel.Trace, $"Socket {Id} attempting to reconnect");
+ _socket = CreateSocket();
+ _ctsSource.Dispose();
+ _ctsSource = new CancellationTokenSource();
+ while (_sendBuffer.TryDequeue(out _)) { } // Clear send buffer
+
+ var connected = await ConnectInternalAsync().ConfigureAwait(false);
+ if (!connected)
+ {
+ await Task.Delay(5000).ConfigureAwait(false);
+ continue;
+ }
+
+ OnReconnected?.Invoke();
+ break;
+ }
+ }
+
+ _processState = ProcessState.Idle;
}
///
@@ -271,23 +307,62 @@ namespace CryptoExchange.Net.Sockets
_sendEvent.Set();
}
+ ///
+ public virtual async Task ReconnectAsync()
+ {
+ if (_processState != ProcessState.Processing)
+ return;
+
+ log.Write(LogLevel.Debug, $"Socket {Id} reconnecting");
+ _closeTask = CloseInternalAsync();
+ await _closeTask.ConfigureAwait(false);
+ }
+
///
public virtual async Task CloseAsync()
{
- log.Write(LogLevel.Debug, $"Socket {Id} closing");
- await CloseInternalAsync().ConfigureAwait(false);
+ await _closeSem.WaitAsync().ConfigureAwait(false);
+ try
+ {
+ if (_closeTask != null && !_closeTask.IsCompleted)
+ {
+ log.Write(LogLevel.Debug, $"Socket {Id} CloseAsync() waiting for existing close task");
+ await _closeTask.ConfigureAwait(false);
+ return;
+ }
+
+ if (!IsOpen)
+ {
+ log.Write(LogLevel.Debug, $"Socket {Id} CloseAsync() socket not open");
+ return;
+ }
+
+ log.Write(LogLevel.Debug, $"Socket {Id} closing");
+ _stopRequested = true;
+
+ _closeTask = CloseInternalAsync();
+ }
+ finally
+ {
+ _closeSem.Release();
+ }
+
+ await _closeTask.ConfigureAwait(false);
+ await _processTask!.ConfigureAwait(false);
+ OnClose?.Invoke();
+ log.Write(LogLevel.Debug, $"Socket {Id} closed");
}
-
+
///
/// Internal close method
///
///
private async Task CloseInternalAsync()
{
- if (_closed || _disposed)
+ if (_disposed)
return;
- _closed = true;
+ //_closeState = CloseState.Closing;
_ctsSource.Cancel();
_sendEvent.Set();
@@ -309,8 +384,6 @@ namespace CryptoExchange.Net.Sockets
catch (Exception)
{ } // Can sometimes throw an exception when socket is in aborted state due to timing
}
- log.Write(LogLevel.Debug, $"Socket {Id} closed");
- Handle(closeHandlers);
}
///
@@ -325,28 +398,9 @@ namespace CryptoExchange.Net.Sockets
_disposed = true;
_socket.Dispose();
_ctsSource.Dispose();
-
- errorHandlers.Clear();
- openHandlers.Clear();
- closeHandlers.Clear();
- messageHandlers.Clear();
log.Write(LogLevel.Trace, $"Socket {Id} disposed");
}
-
- ///
- public void Reset()
- {
- log.Write(LogLevel.Debug, $"Socket {Id} resetting");
- _ctsSource = new CancellationTokenSource();
-
- while (_sendBuffer.TryDequeue(out _)) { } // Clear send buffer
-
- _socket = CreateSocket();
- if (_proxy != null)
- SetProxy(_proxy);
- _closed = false;
- }
-
+
///
/// Create the socket object
///
@@ -362,6 +416,8 @@ namespace CryptoExchange.Net.Sockets
socket.Options.SetRequestHeader(header.Key, header.Value);
socket.Options.KeepAliveInterval = KeepAliveInterval;
socket.Options.SetBuffer(65536, 65536); // Setting it to anything bigger than 65536 throws an exception in .net framework
+ if (_proxy != null)
+ SetProxy(_proxy);
return socket;
}
@@ -412,9 +468,9 @@ namespace CryptoExchange.Net.Sockets
}
catch (Exception ioe)
{
- // Connection closed unexpectedly, .NET framework
- Handle(errorHandlers, ioe);
- await CloseInternalAsync().ConfigureAwait(false);
+ // Connection closed unexpectedly, .NET framework
+ OnError?.Invoke(ioe);
+ _closeTask = CloseInternalAsync();
break;
}
}
@@ -425,7 +481,7 @@ namespace CryptoExchange.Net.Sockets
// Because this is running in a separate task and not awaited until the socket gets closed
// any exception here will crash the send processing, but do so silently unless the socket get's stopped.
// Make sure we at least let the owner know there was an error
- Handle(errorHandlers, e);
+ OnError?.Invoke(e);
throw;
}
finally
@@ -468,9 +524,9 @@ namespace CryptoExchange.Net.Sockets
}
catch (Exception wse)
{
- // Connection closed unexpectedly
- Handle(errorHandlers, wse);
- await CloseInternalAsync().ConfigureAwait(false);
+ // Connection closed unexpectedly
+ OnError?.Invoke(wse);
+ _closeTask = CloseInternalAsync();
break;
}
@@ -478,7 +534,7 @@ namespace CryptoExchange.Net.Sockets
{
// Connection closed unexpectedly
log.Write(LogLevel.Debug, $"Socket {Id} received `Close` message");
- await CloseInternalAsync().ConfigureAwait(false);
+ _closeTask = CloseInternalAsync();
break;
}
@@ -543,7 +599,7 @@ namespace CryptoExchange.Net.Sockets
// Because this is running in a separate task and not awaited until the socket gets closed
// any exception here will crash the receive processing, but do so silently unless the socket gets stopped.
// Make sure we at least let the owner know there was an error
- Handle(errorHandlers, e);
+ OnError?.Invoke(e);
throw;
}
finally
@@ -597,7 +653,7 @@ namespace CryptoExchange.Net.Sockets
try
{
- Handle(messageHandlers, strData);
+ OnMessage?.Invoke(strData);
}
catch(Exception e)
{
@@ -641,35 +697,11 @@ namespace CryptoExchange.Net.Sockets
// Because this is running in a separate task and not awaited until the socket gets closed
// any exception here will stop the timeout checking, but do so silently unless the socket get's stopped.
// Make sure we at least let the owner know there was an error
- Handle(errorHandlers, e);
+ OnError?.Invoke(e);
throw;
}
}
- ///
- /// Helper to invoke handlers
- ///
- ///
- protected void Handle(List handlers)
- {
- LastActionTime = DateTime.UtcNow;
- foreach (var handle in new List(handlers))
- handle?.Invoke();
- }
-
- ///
- /// Helper to invoke handlers
- ///
- ///
- ///
- ///
- protected void Handle(List> handlers, T data)
- {
- LastActionTime = DateTime.UtcNow;
- foreach (var handle in new List>(handlers))
- handle?.Invoke(data);
- }
-
///
/// Get the next identifier
///
diff --git a/CryptoExchange.Net/Sockets/SocketConnection.cs b/CryptoExchange.Net/Sockets/SocketConnection.cs
index 3e61a1d..a143448 100644
--- a/CryptoExchange.Net/Sockets/SocketConnection.cs
+++ b/CryptoExchange.Net/Sockets/SocketConnection.cs
@@ -134,14 +134,10 @@ namespace CryptoExchange.Net.Sockets
private readonly List subscriptions;
private readonly object subscriptionLock = new();
- private bool lostTriggered;
private readonly Log log;
private readonly BaseSocketClient socketClient;
private readonly List pendingRequests;
- private Task? _socketProcessTask;
- private Task? _socketReconnectTask;
- private readonly AsyncResetEvent _reconnectWaitEvent;
private SocketStatus _status;
@@ -153,6 +149,9 @@ namespace CryptoExchange.Net.Sockets
get => _status;
private set
{
+ if (_status == value)
+ return;
+
var oldStatus = _status;
_status = value;
log.Write(LogLevel.Trace, $"Socket {SocketId} status changed from {oldStatus} to {_status}");
@@ -181,13 +180,51 @@ namespace CryptoExchange.Net.Sockets
subscriptions = new List();
_socket = socket;
- _reconnectWaitEvent = new AsyncResetEvent(false, true);
_socket.Timeout = client.ClientOptions.SocketNoDataTimeout;
_socket.OnMessage += ProcessMessage;
_socket.OnOpen += SocketOnOpen;
- _socket.OnClose += () => _reconnectWaitEvent.Set();
+ _socket.OnClose += HandleClose;
+ _socket.OnReconnecting += HandleReconnecting;
+ _socket.OnReconnected += HandleReconnected;
+ }
+ private void HandleClose()
+ {
+ Status = SocketStatus.Closed;
+ ConnectionClosed?.Invoke();
+ }
+
+ private void HandleReconnecting()
+ {
+ Status = SocketStatus.Reconnecting;
+ DisconnectTime = DateTime.UtcNow;
+ Task.Run(() => ConnectionLost?.Invoke());
+ }
+
+ private async void HandleReconnected()
+ {
+ log.Write(LogLevel.Debug, "Socket reconnected, processing");
+
+ lock (pendingRequests)
+ {
+ foreach (var pendingRequest in pendingRequests.ToList())
+ {
+ pendingRequest.Fail();
+ pendingRequests.Remove(pendingRequest);
+ }
+ }
+
+ // TODO Track amount of failed reconencts and failed resubscriptions
+
+ var reconnectSuccessful = await ProcessReconnectAsync().ConfigureAwait(false);
+ if (!reconnectSuccessful)
+ await _socket.ReconnectAsync().ConfigureAwait(false);
+ else
+ {
+ Status = SocketStatus.Connected;
+ ConnectionRestored?.Invoke(DateTime.UtcNow - DisconnectTime!.Value);
+ }
}
///
@@ -196,15 +233,7 @@ namespace CryptoExchange.Net.Sockets
///
public async Task ConnectAsync()
{
- var connected = await _socket.ConnectAsync().ConfigureAwait(false);
- if (connected)
- {
- Status = SocketStatus.Connected;
- _socketReconnectTask = ReconnectWatcherAsync();
- _socketProcessTask = _socket.ProcessAsync();
- }
-
- return connected;
+ return await _socket.ConnectAsync().ConfigureAwait(false);
}
///
@@ -222,7 +251,7 @@ namespace CryptoExchange.Net.Sockets
///
public async Task TriggerReconnectAsync()
{
- await _socket.CloseAsync().ConfigureAwait(false);
+ await _socket.ReconnectAsync().ConfigureAwait(false);
}
///
@@ -252,8 +281,6 @@ namespace CryptoExchange.Net.Sockets
await Task.Delay(100).ConfigureAwait(false);
await _socket.CloseAsync().ConfigureAwait(false);
- if(_socketProcessTask != null)
- await _socketProcessTask.ConfigureAwait(false);
_socket.Dispose();
}
@@ -298,134 +325,133 @@ namespace CryptoExchange.Net.Sockets
subscriptions.Remove(subscription);
}
- private async Task ReconnectAsync()
- {
- // Fail all pending requests
- lock (pendingRequests)
- {
- foreach (var pendingRequest in pendingRequests.ToList())
- {
- pendingRequest.Fail();
- pendingRequests.Remove(pendingRequest);
- }
- }
+ //private async Task ReconnectAsync()
+ //{
+ // // Fail all pending requests
+ // lock (pendingRequests)
+ // {
+ // foreach (var pendingRequest in pendingRequests.ToList())
+ // {
+ // pendingRequest.Fail();
+ // pendingRequests.Remove(pendingRequest);
+ // }
+ // }
- if (socketClient.ClientOptions.AutoReconnect && ShouldReconnect)
- {
- // Should reconnect
- DisconnectTime = DateTime.UtcNow;
- log.Write(LogLevel.Warning, $"Socket {SocketId} Connection lost, will try to reconnect");
- if (!lostTriggered)
- {
- lostTriggered = true;
- _ = Task.Run(() => ConnectionLost?.Invoke());
- }
+ // if (socketClient.ClientOptions.AutoReconnect && ShouldReconnect)
+ // {
+ // // Should reconnect
+ // DisconnectTime = DateTime.UtcNow;
+ // log.Write(LogLevel.Warning, $"Socket {SocketId} Connection lost, will try to reconnect");
+ // if (!lostTriggered)
+ // {
+ // lostTriggered = true;
+ // _ = Task.Run(() => ConnectionLost?.Invoke());
+ // }
- while (ShouldReconnect)
- {
- if (ReconnectTry > 0)
- {
- // Wait a bit before attempting reconnect
- await Task.Delay(socketClient.ClientOptions.ReconnectInterval).ConfigureAwait(false);
- }
+ // while (ShouldReconnect)
+ // {
+ // if (ReconnectTry > 0)
+ // {
+ // // Wait a bit before attempting reconnect
+ // await Task.Delay(socketClient.ClientOptions.ReconnectInterval).ConfigureAwait(false);
+ // }
- if (!ShouldReconnect)
- {
- // Should reconnect changed to false while waiting to reconnect
- return;
- }
+ // if (!ShouldReconnect)
+ // {
+ // // Should reconnect changed to false while waiting to reconnect
+ // return;
+ // }
- _socket.Reset();
- if (!await _socket.ConnectAsync().ConfigureAwait(false))
- {
- // Reconnect failed
- ReconnectTry++;
- ResubscribeTry = 0;
- if (socketClient.ClientOptions.MaxReconnectTries != null
- && ReconnectTry >= socketClient.ClientOptions.MaxReconnectTries)
- {
- log.Write(LogLevel.Warning, $"Socket {SocketId} failed to reconnect after {ReconnectTry} tries, closing");
- ShouldReconnect = false;
+ // _socket.Reset();
+ // if (!await _socket.ConnectAsync().ConfigureAwait(false))
+ // {
+ // // Reconnect failed
+ // ReconnectTry++;
+ // ResubscribeTry = 0;
+ // if (socketClient.ClientOptions.MaxReconnectTries != null
+ // && ReconnectTry >= socketClient.ClientOptions.MaxReconnectTries)
+ // {
+ // log.Write(LogLevel.Warning, $"Socket {SocketId} failed to reconnect after {ReconnectTry} tries, closing");
+ // ShouldReconnect = false;
- if (socketClient.socketConnections.ContainsKey(SocketId))
- socketClient.socketConnections.TryRemove(SocketId, out _);
+ // if (socketClient.socketConnections.ContainsKey(SocketId))
+ // socketClient.socketConnections.TryRemove(SocketId, out _);
- _ = Task.Run(() => ConnectionClosed?.Invoke());
- // Reached max tries, break loop and leave connection closed
- break;
- }
+ // _ = Task.Run(() => ConnectionClosed?.Invoke());
+ // // Reached max tries, break loop and leave connection closed
+ // break;
+ // }
- // Continue to try again
- log.Write(LogLevel.Debug, $"Socket {SocketId} failed to reconnect{(socketClient.ClientOptions.MaxReconnectTries != null ? $", try {ReconnectTry}/{socketClient.ClientOptions.MaxReconnectTries}" : "")}, will try again in {socketClient.ClientOptions.ReconnectInterval}");
- continue;
- }
+ // // Continue to try again
+ // log.Write(LogLevel.Debug, $"Socket {SocketId} failed to reconnect{(socketClient.ClientOptions.MaxReconnectTries != null ? $", try {ReconnectTry}/{socketClient.ClientOptions.MaxReconnectTries}" : "")}, will try again in {socketClient.ClientOptions.ReconnectInterval}");
+ // continue;
+ // }
- // Successfully reconnected, start processing
- Status = SocketStatus.Connected;
- _socketProcessTask = _socket.ProcessAsync();
+ // // Successfully reconnected, start processing
+ // Status = SocketStatus.Connected;
- ReconnectTry = 0;
- var time = DisconnectTime;
- DisconnectTime = null;
+ // ReconnectTry = 0;
+ // var time = DisconnectTime;
+ // DisconnectTime = null;
- log.Write(LogLevel.Information, $"Socket {SocketId} reconnected after {DateTime.UtcNow - time}");
+ // log.Write(LogLevel.Information, $"Socket {SocketId} reconnected after {DateTime.UtcNow - time}");
- var reconnectResult = await ProcessReconnectAsync().ConfigureAwait(false);
- if (!reconnectResult)
- {
- // Failed to resubscribe everything
- ResubscribeTry++;
- DisconnectTime = time;
+ // var reconnectResult = await ProcessReconnectAsync().ConfigureAwait(false);
+ // if (!reconnectResult)
+ // {
+ // // Failed to resubscribe everything
+ // ResubscribeTry++;
+ // DisconnectTime = time;
- if (socketClient.ClientOptions.MaxResubscribeTries != null &&
- ResubscribeTry >= socketClient.ClientOptions.MaxResubscribeTries)
- {
- log.Write(LogLevel.Warning, $"Socket {SocketId} failed to resubscribe after {ResubscribeTry} tries, closing. Last resubscription error: {reconnectResult.Error}");
- ShouldReconnect = false;
+ // if (socketClient.ClientOptions.MaxResubscribeTries != null &&
+ // ResubscribeTry >= socketClient.ClientOptions.MaxResubscribeTries)
+ // {
+ // log.Write(LogLevel.Warning, $"Socket {SocketId} failed to resubscribe after {ResubscribeTry} tries, closing. Last resubscription error: {reconnectResult.Error}");
+ // ShouldReconnect = false;
- if (socketClient.socketConnections.ContainsKey(SocketId))
- socketClient.socketConnections.TryRemove(SocketId, out _);
+ // if (socketClient.socketConnections.ContainsKey(SocketId))
+ // socketClient.socketConnections.TryRemove(SocketId, out _);
- _ = Task.Run(() => ConnectionClosed?.Invoke());
- }
- else
- log.Write(LogLevel.Debug, $"Socket {SocketId} resubscribing all subscriptions failed on reconnected socket{(socketClient.ClientOptions.MaxResubscribeTries != null ? $", try {ResubscribeTry}/{socketClient.ClientOptions.MaxResubscribeTries}" : "")}. Error: {reconnectResult.Error}. Disconnecting and reconnecting.");
+ // _ = Task.Run(() => ConnectionClosed?.Invoke());
+ // }
+ // else
+ // log.Write(LogLevel.Debug, $"Socket {SocketId} resubscribing all subscriptions failed on reconnected socket{(socketClient.ClientOptions.MaxResubscribeTries != null ? $", try {ResubscribeTry}/{socketClient.ClientOptions.MaxResubscribeTries}" : "")}. Error: {reconnectResult.Error}. Disconnecting and reconnecting.");
- // Failed resubscribe, close socket if it is still open
- if (_socket.IsOpen)
- await _socket.CloseAsync().ConfigureAwait(false);
- else
- DisconnectTime = DateTime.UtcNow;
+ // // Failed resubscribe, close socket if it is still open
+ // if (_socket.IsOpen)
+ // await _socket.CloseAsync().ConfigureAwait(false);
+ // else
+ // DisconnectTime = DateTime.UtcNow;
- // Break out of the loop, the new processing task should reconnect again
- break;
- }
- else
- {
- // Succesfully reconnected
- log.Write(LogLevel.Information, $"Socket {SocketId} data connection restored.");
- ResubscribeTry = 0;
- if (lostTriggered)
- {
- lostTriggered = false;
- _ = Task.Run(() => ConnectionRestored?.Invoke(time.HasValue ? DateTime.UtcNow - time.Value : TimeSpan.FromSeconds(0)));
- }
+ // // Break out of the loop, the new processing task should reconnect again
+ // break;
+ // }
+ // else
+ // {
+ // // Succesfully reconnected
+ // log.Write(LogLevel.Information, $"Socket {SocketId} data connection restored.");
+ // ResubscribeTry = 0;
+ // if (lostTriggered)
+ // {
+ // lostTriggered = false;
+ // _ = Task.Run(() => ConnectionRestored?.Invoke(time.HasValue ? DateTime.UtcNow - time.Value : TimeSpan.FromSeconds(0)));
+ // }
- break;
- }
- }
- }
- else
- {
- if (!socketClient.ClientOptions.AutoReconnect && ShouldReconnect)
- _ = Task.Run(() => ConnectionClosed?.Invoke());
+ // break;
+ // }
+ // }
+ // }
+ // else
+ // {
+ // if (!socketClient.ClientOptions.AutoReconnect && ShouldReconnect)
+ // _ = Task.Run(() => ConnectionClosed?.Invoke());
- // No reconnecting needed
- log.Write(LogLevel.Information, $"Socket {SocketId} closed");
- if (socketClient.socketConnections.ContainsKey(SocketId))
- socketClient.socketConnections.TryRemove(SocketId, out _);
- }
- }
+ // // No reconnecting needed
+ // log.Write(LogLevel.Information, $"Socket {SocketId} closed");
+ // if (socketClient.socketConnections.ContainsKey(SocketId))
+ // socketClient.socketConnections.TryRemove(SocketId, out _);
+ // }
+ //}
///
/// Dispose the connection
@@ -654,25 +680,26 @@ namespace CryptoExchange.Net.Sockets
///
protected virtual void SocketOnOpen()
{
+ Status = SocketStatus.Connected;
ReconnectTry = 0;
PausedActivity = false;
}
- private async Task ReconnectWatcherAsync()
- {
- while (true)
- {
- await _reconnectWaitEvent.WaitAsync().ConfigureAwait(false);
- if (!ShouldReconnect)
- return;
+ //private async Task ReconnectWatcherAsync()
+ //{
+ // while (true)
+ // {
+ // await _reconnectWaitEvent.WaitAsync().ConfigureAwait(false);
+ // if (!ShouldReconnect)
+ // return;
- Status = SocketStatus.Reconnecting;
- await ReconnectAsync().ConfigureAwait(false);
+ // Status = SocketStatus.Reconnecting;
+ // await ReconnectAsync().ConfigureAwait(false);
- if (!ShouldReconnect)
- return;
- }
- }
+ // if (!ShouldReconnect)
+ // return;
+ // }
+ //}
private async Task> ProcessReconnectAsync()
{
@@ -705,7 +732,7 @@ namespace CryptoExchange.Net.Sockets
var taskList = new List>>();
foreach (var subscription in subscriptionList.Skip(i).Take(socketClient.ClientOptions.MaxConcurrentResubscriptionsPerSocket))
- taskList.Add(socketClient.SubscribeAndWaitAsync(this, subscription.Request!, subscription));
+ taskList.Add(socketClient.SubscribeAndWaitAsync(this, subscription.Request!, subscription));
await Task.WhenAll(taskList).ConfigureAwait(false);
if (taskList.Any(t => !t.Result.Success))