1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-06-08 16:36:15 +00:00
This commit is contained in:
JKorf 2022-07-07 22:17:55 +02:00
parent ea9375d582
commit 91e33cc42c
3 changed files with 328 additions and 265 deletions

View File

@ -27,6 +27,14 @@ namespace CryptoExchange.Net.Interfaces
/// Websocket opened event /// Websocket opened event
/// </summary> /// </summary>
event Action OnOpen; event Action OnOpen;
/// <summary>
/// Websocket has lost connection to the server and is attempting to reconnect
/// </summary>
event Action OnReconnecting;
/// <summary>
/// Websocket has reconnected to the server
/// </summary>
event Action OnReconnected;
/// <summary> /// <summary>
/// Unique id for this socket /// Unique id for this socket
@ -91,19 +99,15 @@ namespace CryptoExchange.Net.Interfaces
/// <returns></returns> /// <returns></returns>
Task<bool> ConnectAsync(); Task<bool> ConnectAsync();
/// <summary> /// <summary>
/// Receive and send messages over the connection. Resulting task should complete when closing the socket.
/// </summary>
/// <returns></returns>
Task ProcessAsync();
/// <summary>
/// Send data /// Send data
/// </summary> /// </summary>
/// <param name="data"></param> /// <param name="data"></param>
void Send(string data); void Send(string data);
/// <summary> /// <summary>
/// Reset socket when a connection is lost to prepare for a new connection /// Reconnect the socket
/// </summary> /// </summary>
void Reset(); /// <returns></returns>
Task ReconnectAsync();
/// <summary> /// <summary>
/// Close the connection /// Close the connection
/// </summary> /// </summary>

View File

@ -22,6 +22,22 @@ namespace CryptoExchange.Net.Sockets
/// </summary> /// </summary>
public class CryptoExchangeWebSocketClient : IWebsocket public class CryptoExchangeWebSocketClient : IWebsocket
{ {
// TODO keep the same ID's for subscriptions/sockets when reconnecting
enum ProcessState
{
Idle,
Processing,
WaitingForClose,
Reconnecting
}
enum CloseState
{
Idle,
Closing,
Closed
}
internal static int lastStreamId; internal static int lastStreamId;
private static readonly object streamIdLock = new(); private static readonly object streamIdLock = new();
@ -35,8 +51,13 @@ namespace CryptoExchange.Net.Sockets
private readonly List<DateTime> _outgoingMessages; private readonly List<DateTime> _outgoingMessages;
private DateTime _lastReceivedMessagesUpdate; private DateTime _lastReceivedMessagesUpdate;
private bool _closed; private Task? _processTask;
private Task? _closeTask;
private bool _stopRequested;
private bool _disposed; private bool _disposed;
private ProcessState _processState;
//private CloseState _closeState;
private SemaphoreSlim _closeSem;
/// <summary> /// <summary>
/// Received messages, the size and the timstamp /// Received messages, the size and the timstamp
@ -53,23 +74,6 @@ namespace CryptoExchange.Net.Sockets
/// </summary> /// </summary>
protected Log log; protected Log log;
/// <summary>
/// Handlers for when an error happens on the socket
/// </summary>
protected readonly List<Action<Exception>> errorHandlers = new();
/// <summary>
/// Handlers for when the socket connection is opened
/// </summary>
protected readonly List<Action> openHandlers = new();
/// <summary>
/// Handlers for when the connection is closed
/// </summary>
protected readonly List<Action> closeHandlers = new();
/// <summary>
/// Handlers for when a message is received
/// </summary>
protected readonly List<Action<string>> messageHandlers = new();
/// <inheritdoc /> /// <inheritdoc />
public int Id { get; } public int Id { get; }
@ -146,32 +150,17 @@ namespace CryptoExchange.Net.Sockets
} }
/// <inheritdoc /> /// <inheritdoc />
public event Action OnClose public event Action? OnClose;
{
add => closeHandlers.Add(value);
remove => closeHandlers.Remove(value);
}
/// <inheritdoc /> /// <inheritdoc />
public event Action<string> OnMessage public event Action<string>? OnMessage;
{
add => messageHandlers.Add(value);
remove => messageHandlers.Remove(value);
}
/// <inheritdoc /> /// <inheritdoc />
public event Action<Exception> OnError public event Action<Exception>? OnError;
{
add => errorHandlers.Add(value);
remove => errorHandlers.Remove(value);
}
/// <inheritdoc /> /// <inheritdoc />
public event Action OnOpen public event Action? OnOpen;
{ /// <inheritdoc />
add => openHandlers.Add(value); public event Action? OnReconnecting;
remove => openHandlers.Remove(value); /// <inheritdoc />
} public event Action? OnReconnected;
/// <summary> /// <summary>
/// ctor /// ctor
@ -204,6 +193,7 @@ namespace CryptoExchange.Net.Sockets
_ctsSource = new CancellationTokenSource(); _ctsSource = new CancellationTokenSource();
_receivedMessagesLock = new object(); _receivedMessagesLock = new object();
_closeSem = new SemaphoreSlim(1, 1);
_socket = CreateSocket(); _socket = CreateSocket();
} }
@ -228,14 +218,22 @@ namespace CryptoExchange.Net.Sockets
/// <inheritdoc /> /// <inheritdoc />
public virtual async Task<bool> ConnectAsync() public virtual async Task<bool> ConnectAsync()
{
if (!await ConnectInternalAsync().ConfigureAwait(false))
return false;
OnOpen?.Invoke();
_processTask = ProcessAsync();
return true;
}
private async Task<bool> ConnectInternalAsync()
{ {
log.Write(LogLevel.Debug, $"Socket {Id} connecting"); log.Write(LogLevel.Debug, $"Socket {Id} connecting");
try try
{ {
using CancellationTokenSource tcs = new(TimeSpan.FromSeconds(10)); using CancellationTokenSource tcs = new(TimeSpan.FromSeconds(10));
await _socket.ConnectAsync(Uri, tcs.Token).ConfigureAwait(false); await _socket.ConnectAsync(Uri, tcs.Token).ConfigureAwait(false);
Handle(openHandlers);
} }
catch (Exception e) catch (Exception e)
{ {
@ -248,15 +246,53 @@ namespace CryptoExchange.Net.Sockets
} }
/// <inheritdoc /> /// <inheritdoc />
public virtual async Task ProcessAsync() private async Task ProcessAsync()
{ {
log.Write(LogLevel.Trace, $"Socket {Id} ProcessAsync started"); while (!_stopRequested)
var sendTask = SendLoopAsync(); {
var receiveTask = ReceiveLoopAsync(); log.Write(LogLevel.Trace, $"Socket {Id} ProcessAsync started");
var timeoutTask = Timeout != default ? CheckTimeoutAsync() : Task.CompletedTask; _processState = ProcessState.Processing;
log.Write(LogLevel.Trace, $"Socket {Id} processing startup completed"); var sendTask = SendLoopAsync();
await Task.WhenAll(sendTask, receiveTask, timeoutTask).ConfigureAwait(false); var receiveTask = ReceiveLoopAsync();
log.Write(LogLevel.Trace, $"Socket {Id} ProcessAsync finished"); var timeoutTask = Timeout != default ? CheckTimeoutAsync() : Task.CompletedTask;
await Task.WhenAll(sendTask, receiveTask, timeoutTask).ConfigureAwait(false);
log.Write(LogLevel.Trace, $"Socket {Id} ProcessAsync finished");
_processState = ProcessState.WaitingForClose;
while (_closeTask == null)
await Task.Delay(50).ConfigureAwait(false);
await _closeTask.ConfigureAwait(false);
_closeTask = null;
//_closeState = CloseState.Idle;
if (!_stopRequested)
{
_processState = ProcessState.Reconnecting;
OnReconnecting?.Invoke();
}
while (!_stopRequested)
{
log.Write(LogLevel.Trace, $"Socket {Id} attempting to reconnect");
_socket = CreateSocket();
_ctsSource.Dispose();
_ctsSource = new CancellationTokenSource();
while (_sendBuffer.TryDequeue(out _)) { } // Clear send buffer
var connected = await ConnectInternalAsync().ConfigureAwait(false);
if (!connected)
{
await Task.Delay(5000).ConfigureAwait(false);
continue;
}
OnReconnected?.Invoke();
break;
}
}
_processState = ProcessState.Idle;
} }
/// <inheritdoc /> /// <inheritdoc />
@ -271,11 +307,50 @@ namespace CryptoExchange.Net.Sockets
_sendEvent.Set(); _sendEvent.Set();
} }
/// <inheritdoc />
public virtual async Task ReconnectAsync()
{
if (_processState != ProcessState.Processing)
return;
log.Write(LogLevel.Debug, $"Socket {Id} reconnecting");
_closeTask = CloseInternalAsync();
await _closeTask.ConfigureAwait(false);
}
/// <inheritdoc /> /// <inheritdoc />
public virtual async Task CloseAsync() public virtual async Task CloseAsync()
{ {
log.Write(LogLevel.Debug, $"Socket {Id} closing"); await _closeSem.WaitAsync().ConfigureAwait(false);
await CloseInternalAsync().ConfigureAwait(false); try
{
if (_closeTask != null && !_closeTask.IsCompleted)
{
log.Write(LogLevel.Debug, $"Socket {Id} CloseAsync() waiting for existing close task");
await _closeTask.ConfigureAwait(false);
return;
}
if (!IsOpen)
{
log.Write(LogLevel.Debug, $"Socket {Id} CloseAsync() socket not open");
return;
}
log.Write(LogLevel.Debug, $"Socket {Id} closing");
_stopRequested = true;
_closeTask = CloseInternalAsync();
}
finally
{
_closeSem.Release();
}
await _closeTask.ConfigureAwait(false);
await _processTask!.ConfigureAwait(false);
OnClose?.Invoke();
log.Write(LogLevel.Debug, $"Socket {Id} closed");
} }
/// <summary> /// <summary>
@ -284,10 +359,10 @@ namespace CryptoExchange.Net.Sockets
/// <returns></returns> /// <returns></returns>
private async Task CloseInternalAsync() private async Task CloseInternalAsync()
{ {
if (_closed || _disposed) if (_disposed)
return; return;
_closed = true; //_closeState = CloseState.Closing;
_ctsSource.Cancel(); _ctsSource.Cancel();
_sendEvent.Set(); _sendEvent.Set();
@ -309,8 +384,6 @@ namespace CryptoExchange.Net.Sockets
catch (Exception) catch (Exception)
{ } // Can sometimes throw an exception when socket is in aborted state due to timing { } // Can sometimes throw an exception when socket is in aborted state due to timing
} }
log.Write(LogLevel.Debug, $"Socket {Id} closed");
Handle(closeHandlers);
} }
/// <summary> /// <summary>
@ -325,28 +398,9 @@ namespace CryptoExchange.Net.Sockets
_disposed = true; _disposed = true;
_socket.Dispose(); _socket.Dispose();
_ctsSource.Dispose(); _ctsSource.Dispose();
errorHandlers.Clear();
openHandlers.Clear();
closeHandlers.Clear();
messageHandlers.Clear();
log.Write(LogLevel.Trace, $"Socket {Id} disposed"); log.Write(LogLevel.Trace, $"Socket {Id} disposed");
} }
/// <inheritdoc />
public void Reset()
{
log.Write(LogLevel.Debug, $"Socket {Id} resetting");
_ctsSource = new CancellationTokenSource();
while (_sendBuffer.TryDequeue(out _)) { } // Clear send buffer
_socket = CreateSocket();
if (_proxy != null)
SetProxy(_proxy);
_closed = false;
}
/// <summary> /// <summary>
/// Create the socket object /// Create the socket object
/// </summary> /// </summary>
@ -362,6 +416,8 @@ namespace CryptoExchange.Net.Sockets
socket.Options.SetRequestHeader(header.Key, header.Value); socket.Options.SetRequestHeader(header.Key, header.Value);
socket.Options.KeepAliveInterval = KeepAliveInterval; socket.Options.KeepAliveInterval = KeepAliveInterval;
socket.Options.SetBuffer(65536, 65536); // Setting it to anything bigger than 65536 throws an exception in .net framework socket.Options.SetBuffer(65536, 65536); // Setting it to anything bigger than 65536 throws an exception in .net framework
if (_proxy != null)
SetProxy(_proxy);
return socket; return socket;
} }
@ -413,8 +469,8 @@ namespace CryptoExchange.Net.Sockets
catch (Exception ioe) catch (Exception ioe)
{ {
// Connection closed unexpectedly, .NET framework // Connection closed unexpectedly, .NET framework
Handle(errorHandlers, ioe); OnError?.Invoke(ioe);
await CloseInternalAsync().ConfigureAwait(false); _closeTask = CloseInternalAsync();
break; break;
} }
} }
@ -425,7 +481,7 @@ namespace CryptoExchange.Net.Sockets
// Because this is running in a separate task and not awaited until the socket gets closed // Because this is running in a separate task and not awaited until the socket gets closed
// any exception here will crash the send processing, but do so silently unless the socket get's stopped. // any exception here will crash the send processing, but do so silently unless the socket get's stopped.
// Make sure we at least let the owner know there was an error // Make sure we at least let the owner know there was an error
Handle(errorHandlers, e); OnError?.Invoke(e);
throw; throw;
} }
finally finally
@ -469,8 +525,8 @@ namespace CryptoExchange.Net.Sockets
catch (Exception wse) catch (Exception wse)
{ {
// Connection closed unexpectedly // Connection closed unexpectedly
Handle(errorHandlers, wse); OnError?.Invoke(wse);
await CloseInternalAsync().ConfigureAwait(false); _closeTask = CloseInternalAsync();
break; break;
} }
@ -478,7 +534,7 @@ namespace CryptoExchange.Net.Sockets
{ {
// Connection closed unexpectedly // Connection closed unexpectedly
log.Write(LogLevel.Debug, $"Socket {Id} received `Close` message"); log.Write(LogLevel.Debug, $"Socket {Id} received `Close` message");
await CloseInternalAsync().ConfigureAwait(false); _closeTask = CloseInternalAsync();
break; break;
} }
@ -543,7 +599,7 @@ namespace CryptoExchange.Net.Sockets
// Because this is running in a separate task and not awaited until the socket gets closed // Because this is running in a separate task and not awaited until the socket gets closed
// any exception here will crash the receive processing, but do so silently unless the socket gets stopped. // any exception here will crash the receive processing, but do so silently unless the socket gets stopped.
// Make sure we at least let the owner know there was an error // Make sure we at least let the owner know there was an error
Handle(errorHandlers, e); OnError?.Invoke(e);
throw; throw;
} }
finally finally
@ -597,7 +653,7 @@ namespace CryptoExchange.Net.Sockets
try try
{ {
Handle(messageHandlers, strData); OnMessage?.Invoke(strData);
} }
catch(Exception e) catch(Exception e)
{ {
@ -641,35 +697,11 @@ namespace CryptoExchange.Net.Sockets
// Because this is running in a separate task and not awaited until the socket gets closed // Because this is running in a separate task and not awaited until the socket gets closed
// any exception here will stop the timeout checking, but do so silently unless the socket get's stopped. // any exception here will stop the timeout checking, but do so silently unless the socket get's stopped.
// Make sure we at least let the owner know there was an error // Make sure we at least let the owner know there was an error
Handle(errorHandlers, e); OnError?.Invoke(e);
throw; throw;
} }
} }
/// <summary>
/// Helper to invoke handlers
/// </summary>
/// <param name="handlers"></param>
protected void Handle(List<Action> handlers)
{
LastActionTime = DateTime.UtcNow;
foreach (var handle in new List<Action>(handlers))
handle?.Invoke();
}
/// <summary>
/// Helper to invoke handlers
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="handlers"></param>
/// <param name="data"></param>
protected void Handle<T>(List<Action<T>> handlers, T data)
{
LastActionTime = DateTime.UtcNow;
foreach (var handle in new List<Action<T>>(handlers))
handle?.Invoke(data);
}
/// <summary> /// <summary>
/// Get the next identifier /// Get the next identifier
/// </summary> /// </summary>

View File

@ -134,14 +134,10 @@ namespace CryptoExchange.Net.Sockets
private readonly List<SocketSubscription> subscriptions; private readonly List<SocketSubscription> subscriptions;
private readonly object subscriptionLock = new(); private readonly object subscriptionLock = new();
private bool lostTriggered;
private readonly Log log; private readonly Log log;
private readonly BaseSocketClient socketClient; private readonly BaseSocketClient socketClient;
private readonly List<PendingRequest> pendingRequests; private readonly List<PendingRequest> pendingRequests;
private Task? _socketProcessTask;
private Task? _socketReconnectTask;
private readonly AsyncResetEvent _reconnectWaitEvent;
private SocketStatus _status; private SocketStatus _status;
@ -153,6 +149,9 @@ namespace CryptoExchange.Net.Sockets
get => _status; get => _status;
private set private set
{ {
if (_status == value)
return;
var oldStatus = _status; var oldStatus = _status;
_status = value; _status = value;
log.Write(LogLevel.Trace, $"Socket {SocketId} status changed from {oldStatus} to {_status}"); log.Write(LogLevel.Trace, $"Socket {SocketId} status changed from {oldStatus} to {_status}");
@ -181,13 +180,51 @@ namespace CryptoExchange.Net.Sockets
subscriptions = new List<SocketSubscription>(); subscriptions = new List<SocketSubscription>();
_socket = socket; _socket = socket;
_reconnectWaitEvent = new AsyncResetEvent(false, true);
_socket.Timeout = client.ClientOptions.SocketNoDataTimeout; _socket.Timeout = client.ClientOptions.SocketNoDataTimeout;
_socket.OnMessage += ProcessMessage; _socket.OnMessage += ProcessMessage;
_socket.OnOpen += SocketOnOpen; _socket.OnOpen += SocketOnOpen;
_socket.OnClose += () => _reconnectWaitEvent.Set(); _socket.OnClose += HandleClose;
_socket.OnReconnecting += HandleReconnecting;
_socket.OnReconnected += HandleReconnected;
}
private void HandleClose()
{
Status = SocketStatus.Closed;
ConnectionClosed?.Invoke();
}
private void HandleReconnecting()
{
Status = SocketStatus.Reconnecting;
DisconnectTime = DateTime.UtcNow;
Task.Run(() => ConnectionLost?.Invoke());
}
private async void HandleReconnected()
{
log.Write(LogLevel.Debug, "Socket reconnected, processing");
lock (pendingRequests)
{
foreach (var pendingRequest in pendingRequests.ToList())
{
pendingRequest.Fail();
pendingRequests.Remove(pendingRequest);
}
}
// TODO Track amount of failed reconencts and failed resubscriptions
var reconnectSuccessful = await ProcessReconnectAsync().ConfigureAwait(false);
if (!reconnectSuccessful)
await _socket.ReconnectAsync().ConfigureAwait(false);
else
{
Status = SocketStatus.Connected;
ConnectionRestored?.Invoke(DateTime.UtcNow - DisconnectTime!.Value);
}
} }
/// <summary> /// <summary>
@ -196,15 +233,7 @@ namespace CryptoExchange.Net.Sockets
/// <returns></returns> /// <returns></returns>
public async Task<bool> ConnectAsync() public async Task<bool> ConnectAsync()
{ {
var connected = await _socket.ConnectAsync().ConfigureAwait(false); return await _socket.ConnectAsync().ConfigureAwait(false);
if (connected)
{
Status = SocketStatus.Connected;
_socketReconnectTask = ReconnectWatcherAsync();
_socketProcessTask = _socket.ProcessAsync();
}
return connected;
} }
/// <summary> /// <summary>
@ -222,7 +251,7 @@ namespace CryptoExchange.Net.Sockets
/// <returns></returns> /// <returns></returns>
public async Task TriggerReconnectAsync() public async Task TriggerReconnectAsync()
{ {
await _socket.CloseAsync().ConfigureAwait(false); await _socket.ReconnectAsync().ConfigureAwait(false);
} }
/// <summary> /// <summary>
@ -252,8 +281,6 @@ namespace CryptoExchange.Net.Sockets
await Task.Delay(100).ConfigureAwait(false); await Task.Delay(100).ConfigureAwait(false);
await _socket.CloseAsync().ConfigureAwait(false); await _socket.CloseAsync().ConfigureAwait(false);
if(_socketProcessTask != null)
await _socketProcessTask.ConfigureAwait(false);
_socket.Dispose(); _socket.Dispose();
} }
@ -298,134 +325,133 @@ namespace CryptoExchange.Net.Sockets
subscriptions.Remove(subscription); subscriptions.Remove(subscription);
} }
private async Task ReconnectAsync() //private async Task ReconnectAsync()
{ //{
// Fail all pending requests // // Fail all pending requests
lock (pendingRequests) // lock (pendingRequests)
{ // {
foreach (var pendingRequest in pendingRequests.ToList()) // foreach (var pendingRequest in pendingRequests.ToList())
{ // {
pendingRequest.Fail(); // pendingRequest.Fail();
pendingRequests.Remove(pendingRequest); // pendingRequests.Remove(pendingRequest);
} // }
} // }
if (socketClient.ClientOptions.AutoReconnect && ShouldReconnect) // if (socketClient.ClientOptions.AutoReconnect && ShouldReconnect)
{ // {
// Should reconnect // // Should reconnect
DisconnectTime = DateTime.UtcNow; // DisconnectTime = DateTime.UtcNow;
log.Write(LogLevel.Warning, $"Socket {SocketId} Connection lost, will try to reconnect"); // log.Write(LogLevel.Warning, $"Socket {SocketId} Connection lost, will try to reconnect");
if (!lostTriggered) // if (!lostTriggered)
{ // {
lostTriggered = true; // lostTriggered = true;
_ = Task.Run(() => ConnectionLost?.Invoke()); // _ = Task.Run(() => ConnectionLost?.Invoke());
} // }
while (ShouldReconnect) // while (ShouldReconnect)
{ // {
if (ReconnectTry > 0) // if (ReconnectTry > 0)
{ // {
// Wait a bit before attempting reconnect // // Wait a bit before attempting reconnect
await Task.Delay(socketClient.ClientOptions.ReconnectInterval).ConfigureAwait(false); // await Task.Delay(socketClient.ClientOptions.ReconnectInterval).ConfigureAwait(false);
} // }
if (!ShouldReconnect) // if (!ShouldReconnect)
{ // {
// Should reconnect changed to false while waiting to reconnect // // Should reconnect changed to false while waiting to reconnect
return; // return;
} // }
_socket.Reset(); // _socket.Reset();
if (!await _socket.ConnectAsync().ConfigureAwait(false)) // if (!await _socket.ConnectAsync().ConfigureAwait(false))
{ // {
// Reconnect failed // // Reconnect failed
ReconnectTry++; // ReconnectTry++;
ResubscribeTry = 0; // ResubscribeTry = 0;
if (socketClient.ClientOptions.MaxReconnectTries != null // if (socketClient.ClientOptions.MaxReconnectTries != null
&& ReconnectTry >= socketClient.ClientOptions.MaxReconnectTries) // && ReconnectTry >= socketClient.ClientOptions.MaxReconnectTries)
{ // {
log.Write(LogLevel.Warning, $"Socket {SocketId} failed to reconnect after {ReconnectTry} tries, closing"); // log.Write(LogLevel.Warning, $"Socket {SocketId} failed to reconnect after {ReconnectTry} tries, closing");
ShouldReconnect = false; // ShouldReconnect = false;
if (socketClient.socketConnections.ContainsKey(SocketId)) // if (socketClient.socketConnections.ContainsKey(SocketId))
socketClient.socketConnections.TryRemove(SocketId, out _); // socketClient.socketConnections.TryRemove(SocketId, out _);
_ = Task.Run(() => ConnectionClosed?.Invoke()); // _ = Task.Run(() => ConnectionClosed?.Invoke());
// Reached max tries, break loop and leave connection closed // // Reached max tries, break loop and leave connection closed
break; // break;
} // }
// Continue to try again // // Continue to try again
log.Write(LogLevel.Debug, $"Socket {SocketId} failed to reconnect{(socketClient.ClientOptions.MaxReconnectTries != null ? $", try {ReconnectTry}/{socketClient.ClientOptions.MaxReconnectTries}" : "")}, will try again in {socketClient.ClientOptions.ReconnectInterval}"); // log.Write(LogLevel.Debug, $"Socket {SocketId} failed to reconnect{(socketClient.ClientOptions.MaxReconnectTries != null ? $", try {ReconnectTry}/{socketClient.ClientOptions.MaxReconnectTries}" : "")}, will try again in {socketClient.ClientOptions.ReconnectInterval}");
continue; // continue;
} // }
// Successfully reconnected, start processing // // Successfully reconnected, start processing
Status = SocketStatus.Connected; // Status = SocketStatus.Connected;
_socketProcessTask = _socket.ProcessAsync();
ReconnectTry = 0; // ReconnectTry = 0;
var time = DisconnectTime; // var time = DisconnectTime;
DisconnectTime = null; // DisconnectTime = null;
log.Write(LogLevel.Information, $"Socket {SocketId} reconnected after {DateTime.UtcNow - time}"); // log.Write(LogLevel.Information, $"Socket {SocketId} reconnected after {DateTime.UtcNow - time}");
var reconnectResult = await ProcessReconnectAsync().ConfigureAwait(false); // var reconnectResult = await ProcessReconnectAsync().ConfigureAwait(false);
if (!reconnectResult) // if (!reconnectResult)
{ // {
// Failed to resubscribe everything // // Failed to resubscribe everything
ResubscribeTry++; // ResubscribeTry++;
DisconnectTime = time; // DisconnectTime = time;
if (socketClient.ClientOptions.MaxResubscribeTries != null && // if (socketClient.ClientOptions.MaxResubscribeTries != null &&
ResubscribeTry >= socketClient.ClientOptions.MaxResubscribeTries) // ResubscribeTry >= socketClient.ClientOptions.MaxResubscribeTries)
{ // {
log.Write(LogLevel.Warning, $"Socket {SocketId} failed to resubscribe after {ResubscribeTry} tries, closing. Last resubscription error: {reconnectResult.Error}"); // log.Write(LogLevel.Warning, $"Socket {SocketId} failed to resubscribe after {ResubscribeTry} tries, closing. Last resubscription error: {reconnectResult.Error}");
ShouldReconnect = false; // ShouldReconnect = false;
if (socketClient.socketConnections.ContainsKey(SocketId)) // if (socketClient.socketConnections.ContainsKey(SocketId))
socketClient.socketConnections.TryRemove(SocketId, out _); // socketClient.socketConnections.TryRemove(SocketId, out _);
_ = Task.Run(() => ConnectionClosed?.Invoke()); // _ = Task.Run(() => ConnectionClosed?.Invoke());
} // }
else // else
log.Write(LogLevel.Debug, $"Socket {SocketId} resubscribing all subscriptions failed on reconnected socket{(socketClient.ClientOptions.MaxResubscribeTries != null ? $", try {ResubscribeTry}/{socketClient.ClientOptions.MaxResubscribeTries}" : "")}. Error: {reconnectResult.Error}. Disconnecting and reconnecting."); // log.Write(LogLevel.Debug, $"Socket {SocketId} resubscribing all subscriptions failed on reconnected socket{(socketClient.ClientOptions.MaxResubscribeTries != null ? $", try {ResubscribeTry}/{socketClient.ClientOptions.MaxResubscribeTries}" : "")}. Error: {reconnectResult.Error}. Disconnecting and reconnecting.");
// Failed resubscribe, close socket if it is still open // // Failed resubscribe, close socket if it is still open
if (_socket.IsOpen) // if (_socket.IsOpen)
await _socket.CloseAsync().ConfigureAwait(false); // await _socket.CloseAsync().ConfigureAwait(false);
else // else
DisconnectTime = DateTime.UtcNow; // DisconnectTime = DateTime.UtcNow;
// Break out of the loop, the new processing task should reconnect again // // Break out of the loop, the new processing task should reconnect again
break; // break;
} // }
else // else
{ // {
// Succesfully reconnected // // Succesfully reconnected
log.Write(LogLevel.Information, $"Socket {SocketId} data connection restored."); // log.Write(LogLevel.Information, $"Socket {SocketId} data connection restored.");
ResubscribeTry = 0; // ResubscribeTry = 0;
if (lostTriggered) // if (lostTriggered)
{ // {
lostTriggered = false; // lostTriggered = false;
_ = Task.Run(() => ConnectionRestored?.Invoke(time.HasValue ? DateTime.UtcNow - time.Value : TimeSpan.FromSeconds(0))); // _ = Task.Run(() => ConnectionRestored?.Invoke(time.HasValue ? DateTime.UtcNow - time.Value : TimeSpan.FromSeconds(0)));
} // }
break; // break;
} // }
} // }
} // }
else // else
{ // {
if (!socketClient.ClientOptions.AutoReconnect && ShouldReconnect) // if (!socketClient.ClientOptions.AutoReconnect && ShouldReconnect)
_ = Task.Run(() => ConnectionClosed?.Invoke()); // _ = Task.Run(() => ConnectionClosed?.Invoke());
// No reconnecting needed // // No reconnecting needed
log.Write(LogLevel.Information, $"Socket {SocketId} closed"); // log.Write(LogLevel.Information, $"Socket {SocketId} closed");
if (socketClient.socketConnections.ContainsKey(SocketId)) // if (socketClient.socketConnections.ContainsKey(SocketId))
socketClient.socketConnections.TryRemove(SocketId, out _); // socketClient.socketConnections.TryRemove(SocketId, out _);
} // }
} //}
/// <summary> /// <summary>
/// Dispose the connection /// Dispose the connection
@ -654,25 +680,26 @@ namespace CryptoExchange.Net.Sockets
/// </summary> /// </summary>
protected virtual void SocketOnOpen() protected virtual void SocketOnOpen()
{ {
Status = SocketStatus.Connected;
ReconnectTry = 0; ReconnectTry = 0;
PausedActivity = false; PausedActivity = false;
} }
private async Task ReconnectWatcherAsync() //private async Task ReconnectWatcherAsync()
{ //{
while (true) // while (true)
{ // {
await _reconnectWaitEvent.WaitAsync().ConfigureAwait(false); // await _reconnectWaitEvent.WaitAsync().ConfigureAwait(false);
if (!ShouldReconnect) // if (!ShouldReconnect)
return; // return;
Status = SocketStatus.Reconnecting; // Status = SocketStatus.Reconnecting;
await ReconnectAsync().ConfigureAwait(false); // await ReconnectAsync().ConfigureAwait(false);
if (!ShouldReconnect) // if (!ShouldReconnect)
return; // return;
} // }
} //}
private async Task<CallResult<bool>> ProcessReconnectAsync() private async Task<CallResult<bool>> ProcessReconnectAsync()
{ {