diff --git a/CryptoExchange.Net/Clients/BaseSocketClient.cs b/CryptoExchange.Net/Clients/BaseSocketClient.cs index 2e1541a..2e249ac 100644 --- a/CryptoExchange.Net/Clients/BaseSocketClient.cs +++ b/CryptoExchange.Net/Clients/BaseSocketClient.cs @@ -30,7 +30,7 @@ namespace CryptoExchange.Net /// /// List of socket connections currently connecting/connected /// - protected internal ConcurrentDictionary sockets = new(); + protected internal ConcurrentDictionary socketConnections = new(); /// /// Semaphore used while creating sockets /// @@ -85,10 +85,10 @@ namespace CryptoExchange.Net { get { - if (!sockets.Any()) + if (!socketConnections.Any()) return 0; - return sockets.Sum(s => s.Value.Socket.IncomingKbps); + return socketConnections.Sum(s => s.Value.IncomingKbps); } } @@ -205,7 +205,7 @@ namespace CryptoExchange.Net if (socketConnection.PausedActivity) { - log.Write(LogLevel.Warning, $"Socket {socketConnection.Socket.Id} has been paused, can't subscribe at this moment"); + log.Write(LogLevel.Warning, $"Socket {socketConnection.SocketId} has been paused, can't subscribe at this moment"); return new CallResult( new ServerError("Socket is paused")); } @@ -230,12 +230,12 @@ namespace CryptoExchange.Net { subscription.CancellationTokenRegistration = ct.Register(async () => { - log.Write(LogLevel.Information, $"Socket {socketConnection.Socket.Id} Cancellation token set, closing subscription"); + log.Write(LogLevel.Information, $"Socket {socketConnection.SocketId} Cancellation token set, closing subscription"); await socketConnection.CloseAsync(subscription).ConfigureAwait(false); }, false); } - log.Write(LogLevel.Information, $"Socket {socketConnection.Socket.Id} subscription completed"); + log.Write(LogLevel.Information, $"Socket {socketConnection.SocketId} subscription completed"); return new CallResult(new UpdateSubscription(socketConnection, subscription)); } @@ -317,7 +317,7 @@ namespace CryptoExchange.Net if (socketConnection.PausedActivity) { - log.Write(LogLevel.Warning, $"Socket {socketConnection.Socket.Id} has been paused, can't send query at this moment"); + log.Write(LogLevel.Warning, $"Socket {socketConnection.SocketId} has been paused, can't send query at this moment"); return new CallResult(new ServerError("Socket is paused")); } @@ -368,7 +368,7 @@ namespace CryptoExchange.Net if (!result) { await socket.CloseAsync().ConfigureAwait(false); - log.Write(LogLevel.Warning, $"Socket {socket.Socket.Id} authentication failed"); + log.Write(LogLevel.Warning, $"Socket {socket.SocketId} authentication failed"); result.Error!.Message = "Authentication failed: " + result.Error.Message; return new CallResult(result.Error); } @@ -471,7 +471,7 @@ namespace CryptoExchange.Net var desResult = Deserialize(messageEvent.JsonData); if (!desResult) { - log.Write(LogLevel.Warning, $"Socket {connection.Socket.Id} Failed to deserialize data into type {typeof(T)}: {desResult.Error}"); + log.Write(LogLevel.Warning, $"Socket {connection.SocketId} Failed to deserialize data into type {typeof(T)}: {desResult.Error}"); return; } @@ -494,7 +494,7 @@ namespace CryptoExchange.Net { genericHandlers.Add(identifier, action); var subscription = SocketSubscription.CreateForIdentifier(NextId(), identifier, false, action); - foreach (var connection in sockets.Values) + foreach (var connection in socketConnections.Values) connection.AddSubscription(subscription); } @@ -507,13 +507,13 @@ namespace CryptoExchange.Net /// protected virtual SocketConnection GetSocketConnection(SocketApiClient apiClient, string address, bool authenticated) { - var socketResult = sockets.Where(s => s.Value.Socket.Url.TrimEnd('/') == address.TrimEnd('/') + var socketResult = socketConnections.Where(s => s.Value.Uri.ToString().TrimEnd('/') == address.TrimEnd('/') && (s.Value.ApiClient.GetType() == apiClient.GetType()) && (s.Value.Authenticated == authenticated || !authenticated) && s.Value.Connected).OrderBy(s => s.Value.SubscriptionCount).FirstOrDefault(); var result = socketResult.Equals(default(KeyValuePair)) ? null : socketResult.Value; if (result != null) { - if (result.SubscriptionCount < ClientOptions.SocketSubscriptionsCombineTarget || (sockets.Count >= MaxSocketConnections && sockets.All(s => s.Value.SubscriptionCount >= ClientOptions.SocketSubscriptionsCombineTarget))) + if (result.SubscriptionCount < ClientOptions.SocketSubscriptionsCombineTarget || (socketConnections.Count >= MaxSocketConnections && socketConnections.All(s => s.Value.SubscriptionCount >= ClientOptions.SocketSubscriptionsCombineTarget))) { // Use existing socket if it has less than target connections OR it has the least connections and we can't make new return result; @@ -548,13 +548,13 @@ namespace CryptoExchange.Net /// protected virtual async Task> ConnectSocketAsync(SocketConnection socketConnection) { - if (await socketConnection.Socket.ConnectAsync().ConfigureAwait(false)) + if (await socketConnection.ConnectAsync().ConfigureAwait(false)) { - sockets.TryAdd(socketConnection.Socket.Id, socketConnection); + socketConnections.TryAdd(socketConnection.SocketId, socketConnection); return new CallResult(true); } - socketConnection.Socket.Dispose(); + socketConnection.Dispose(); return new CallResult(new CantConnectError()); } @@ -605,19 +605,19 @@ namespace CryptoExchange.Net if (disposing) break; - foreach (var socket in sockets.Values) + foreach (var socket in socketConnections.Values) { if (disposing) break; - if (!socket.Socket.IsOpen) + if (!socket.Connected) continue; var obj = objGetter(socket); if (obj == null) continue; - log.Write(LogLevel.Trace, $"Socket {socket.Socket.Id} sending periodic {identifier}"); + log.Write(LogLevel.Trace, $"Socket {socket.SocketId} sending periodic {identifier}"); try { @@ -625,7 +625,7 @@ namespace CryptoExchange.Net } catch (Exception ex) { - log.Write(LogLevel.Warning, $"Socket {socket.Socket.Id} Periodic send {identifier} failed: " + ex.ToLogString()); + log.Write(LogLevel.Warning, $"Socket {socket.SocketId} Periodic send {identifier} failed: " + ex.ToLogString()); } } } @@ -642,7 +642,7 @@ namespace CryptoExchange.Net SocketSubscription? subscription = null; SocketConnection? connection = null; - foreach(var socket in sockets.Values.ToList()) + foreach(var socket in socketConnections.Values.ToList()) { subscription = socket.GetSubscription(subscriptionId); if (subscription != null) @@ -679,13 +679,13 @@ namespace CryptoExchange.Net /// public virtual async Task UnsubscribeAllAsync() { - log.Write(LogLevel.Information, $"Closing all {sockets.Sum(s => s.Value.SubscriptionCount)} subscriptions"); + log.Write(LogLevel.Information, $"Closing all {socketConnections.Sum(s => s.Value.SubscriptionCount)} subscriptions"); await Task.Run(async () => { var tasks = new List(); { - var socketList = sockets.Values; + var socketList = socketConnections.Values; foreach (var sub in socketList) tasks.Add(sub.CloseAsync()); } diff --git a/CryptoExchange.Net/Interfaces/IWebsocket.cs b/CryptoExchange.Net/Interfaces/IWebsocket.cs index af84536..5cde778 100644 --- a/CryptoExchange.Net/Interfaces/IWebsocket.cs +++ b/CryptoExchange.Net/Interfaces/IWebsocket.cs @@ -41,10 +41,6 @@ namespace CryptoExchange.Net.Interfaces /// Encoding? Encoding { get; set; } /// - /// Whether socket is in the process of reconnecting - /// - bool Reconnecting { get; set; } - /// /// The max amount of outgoing messages per second /// int? RatelimitPerSecond { get; set; } @@ -61,9 +57,9 @@ namespace CryptoExchange.Net.Interfaces /// Func? DataInterpreterString { get; set; } /// - /// The url the socket connects to + /// The uri the socket connects to /// - string Url { get; } + Uri Uri { get; } /// /// Whether the socket connection is closed /// @@ -90,6 +86,7 @@ namespace CryptoExchange.Net.Interfaces /// /// Task ConnectAsync(); + Task ProcessAsync(); /// /// Send data /// diff --git a/CryptoExchange.Net/OrderBook/SymbolOrderBook.cs b/CryptoExchange.Net/OrderBook/SymbolOrderBook.cs index 4504b39..47d8c15 100644 --- a/CryptoExchange.Net/OrderBook/SymbolOrderBook.cs +++ b/CryptoExchange.Net/OrderBook/SymbolOrderBook.cs @@ -483,7 +483,7 @@ namespace CryptoExchange.Net.OrderBook /// Max wait time /// Cancellation token /// - protected async Task> WaitForSetOrderBookAsync(int timeout, CancellationToken ct) + protected async Task> WaitForSetOrderBookAsync(TimeSpan timeout, CancellationToken ct) { var startWait = DateTime.UtcNow; while (!bookSet && Status == OrderBookStatus.Syncing) @@ -491,12 +491,12 @@ namespace CryptoExchange.Net.OrderBook if(ct.IsCancellationRequested) return new CallResult(new CancellationRequestedError()); - if ((DateTime.UtcNow - startWait).TotalMilliseconds > timeout) + if (DateTime.UtcNow - startWait > timeout) return new CallResult(new ServerError("Timeout while waiting for data")); try { - await Task.Delay(10, ct).ConfigureAwait(false); + await Task.Delay(50, ct).ConfigureAwait(false); } catch (OperationCanceledException) { } diff --git a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs index 9371231..76461dc 100644 --- a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs +++ b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs @@ -26,17 +26,11 @@ namespace CryptoExchange.Net.Sockets private static readonly object streamIdLock = new object(); private ClientWebSocket _socket; - private Task? _sendTask; - private Task? _receiveTask; - private Task? _timeoutTask; private readonly AsyncResetEvent _sendEvent; private readonly ConcurrentQueue _sendBuffer; private readonly IDictionary cookies; private readonly IDictionary headers; private CancellationTokenSource _ctsSource; - private bool _closing; - private bool _startedSent; - private bool _startedReceive; private readonly List _outgoingMessages; private DateTime _lastReceivedMessagesUpdate; @@ -78,9 +72,6 @@ namespace CryptoExchange.Net.Sockets /// public string? Origin { get; set; } - /// - public bool Reconnecting { get; set; } - /// /// The timestamp this socket has been active for the last time /// @@ -97,13 +88,13 @@ namespace CryptoExchange.Net.Sockets public Func? DataInterpreterString { get; set; } /// - public string Url { get; } + public Uri Uri { get; } /// public bool IsClosed => _socket.State == WebSocketState.Closed; /// - public bool IsOpen => _socket.State == WebSocketState.Open && !_closing; + public bool IsOpen => _socket.State == WebSocketState.Open && !_ctsSource.IsCancellationRequested; /// /// Ssl protocols supported. NOT USED BY THIS IMPLEMENTATION @@ -179,8 +170,8 @@ namespace CryptoExchange.Net.Sockets /// ctor /// /// The log object to use - /// The url the socket should connect to - public CryptoExchangeWebSocketClient(Log log, string url) : this(log, url, new Dictionary(), new Dictionary()) + /// The uri the socket should connect to + public CryptoExchangeWebSocketClient(Log log, Uri uri) : this(log, uri, new Dictionary(), new Dictionary()) { } @@ -188,14 +179,14 @@ namespace CryptoExchange.Net.Sockets /// ctor /// /// The log object to use - /// The url the socket should connect to + /// The uri the socket should connect to /// Cookies to sent in the socket connection request /// Headers to sent in the socket connection request - public CryptoExchangeWebSocketClient(Log log, string url, IDictionary cookies, IDictionary headers) + public CryptoExchangeWebSocketClient(Log log, Uri uri, IDictionary cookies, IDictionary headers) { Id = NextStreamId(); this.log = log; - Url = url; + Uri = uri; this.cookies = cookies; this.headers = headers; @@ -212,13 +203,16 @@ namespace CryptoExchange.Net.Sockets /// public virtual void SetProxy(ApiProxy proxy) { - Uri.TryCreate($"{proxy.Host}:{proxy.Port}", UriKind.Absolute, out var uri); + if (!Uri.TryCreate($"{proxy.Host}:{proxy.Port}", UriKind.Absolute, out var uri)) + throw new ArgumentException("Proxy settings invalid, {proxy.Host}:{proxy.Port} not a valid URI", nameof(proxy)); + _socket.Options.Proxy = uri?.Scheme == null ? _socket.Options.Proxy = new WebProxy(proxy.Host, proxy.Port) : _socket.Options.Proxy = new WebProxy { Address = uri }; + if (proxy.Login != null) _socket.Options.Proxy.Credentials = new NetworkCredential(proxy.Login, proxy.Password); } @@ -230,7 +224,7 @@ namespace CryptoExchange.Net.Sockets try { using CancellationTokenSource tcs = new CancellationTokenSource(TimeSpan.FromSeconds(10)); - await _socket.ConnectAsync(new Uri(Url), tcs.Token).ConfigureAwait(false); + await _socket.ConnectAsync(Uri, tcs.Token).ConfigureAwait(false); Handle(openHandlers); } @@ -239,35 +233,25 @@ namespace CryptoExchange.Net.Sockets log.Write(LogLevel.Debug, $"Socket {Id} connection failed: " + e.ToLogString()); return false; } - - log.Write(LogLevel.Trace, $"Socket {Id} connection succeeded, starting communication"); - _sendTask = Task.Factory.StartNew(SendLoopAsync, default, TaskCreationOptions.LongRunning | TaskCreationOptions.DenyChildAttach, TaskScheduler.Default).Unwrap(); - _receiveTask = Task.Factory.StartNew(ReceiveLoopAsync, default, TaskCreationOptions.LongRunning | TaskCreationOptions.DenyChildAttach, TaskScheduler.Default).Unwrap(); - if (Timeout != default) - _timeoutTask = Task.Run(CheckTimeoutAsync); - - var sw = Stopwatch.StartNew(); - while (!_startedSent || !_startedReceive) - { - // Wait for the tasks to have actually started - await Task.Delay(10).ConfigureAwait(false); - - if(sw.ElapsedMilliseconds > 5000) - { - _ = _socket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "", default); - log.Write(LogLevel.Debug, $"Socket {Id} startup interupted"); - return false; - } - } - - log.Write(LogLevel.Debug, $"Socket {Id} connected to {Url}"); + + log.Write(LogLevel.Debug, $"Socket {Id} connected to {Uri}"); return true; } + /// + public virtual async Task ProcessAsync() + { + var sendTask = SendLoopAsync(); + var receiveTask = ReceiveLoopAsync(); + var timeoutTask = Timeout != default ? CheckTimeoutAsync() : Task.CompletedTask; + log.Write(LogLevel.Debug, $"Socket {Id} processing started"); + await Task.WhenAll(sendTask, receiveTask, timeoutTask).ConfigureAwait(false); + } + /// public virtual void Send(string data) { - if (_closing) + if (_ctsSource.IsCancellationRequested) throw new InvalidOperationException($"Socket {Id} Can't send data when socket is not connected"); var bytes = _encoding.GetBytes(data); @@ -280,36 +264,21 @@ namespace CryptoExchange.Net.Sockets public virtual async Task CloseAsync() { log.Write(LogLevel.Debug, $"Socket {Id} closing"); - await CloseInternalAsync(true, true).ConfigureAwait(false); + await CloseInternalAsync().ConfigureAwait(false); } /// - /// Internal close method, will wait for each task to complete to gracefully close + /// Internal close method /// - /// - /// /// - private async Task CloseInternalAsync(bool waitSend, bool waitReceive) + private async Task CloseInternalAsync() { - if (_closing) - return; - - _closing = true; - var tasksToAwait = new List(); - if (_socket.State == WebSocketState.Open) - tasksToAwait.Add(_socket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Closing", default)); - _ctsSource.Cancel(); _sendEvent.Set(); - if (waitSend) - tasksToAwait.Add(_sendTask!); - if (waitReceive) - tasksToAwait.Add(_receiveTask!); - if (_timeoutTask != null) - tasksToAwait.Add(_timeoutTask); - log.Write(LogLevel.Trace, $"Socket {Id} waiting for communication loops to finish"); - await Task.WhenAll(tasksToAwait).ConfigureAwait(false); + if (_socket.State == WebSocketState.Open) + await _socket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Closing", default).ConfigureAwait(false); + log.Write(LogLevel.Debug, $"Socket {Id} closed"); Handle(closeHandlers); } @@ -335,7 +304,6 @@ namespace CryptoExchange.Net.Sockets { log.Write(LogLevel.Debug, $"Socket {Id} resetting"); _ctsSource = new CancellationTokenSource(); - _closing = false; while (_sendBuffer.TryDequeue(out _)) { } // Clear send buffer @@ -366,17 +334,16 @@ namespace CryptoExchange.Net.Sockets /// private async Task SendLoopAsync() { - _startedSent = true; try { while (true) { - if (_closing) + if (_ctsSource.IsCancellationRequested) break; await _sendEvent.WaitAsync().ConfigureAwait(false); - if (_closing) + if (_ctsSource.IsCancellationRequested) break; while (_sendBuffer.TryDequeue(out var data)) @@ -388,7 +355,7 @@ namespace CryptoExchange.Net.Sockets while (MessagesSentLastSecond() >= RatelimitPerSecond) { start ??= DateTime.UtcNow; - await Task.Delay(10).ConfigureAwait(false); + await Task.Delay(50).ConfigureAwait(false); } if (start != null) @@ -410,7 +377,7 @@ namespace CryptoExchange.Net.Sockets { // Connection closed unexpectedly, .NET framework Handle(errorHandlers, ioe); - _ = Task.Run(async () => await CloseInternalAsync(false, true).ConfigureAwait(false)); + await CloseInternalAsync().ConfigureAwait(false); break; } } @@ -427,7 +394,6 @@ namespace CryptoExchange.Net.Sockets finally { log.Write(LogLevel.Trace, $"Socket {Id} Send loop finished"); - _startedSent = false; } } @@ -437,14 +403,13 @@ namespace CryptoExchange.Net.Sockets /// private async Task ReceiveLoopAsync() { - _startedReceive = true; var buffer = new ArraySegment(new byte[65536]); var received = 0; try { while (true) { - if (_closing) + if (_ctsSource.IsCancellationRequested) break; MemoryStream? memoryStream = null; @@ -468,7 +433,7 @@ namespace CryptoExchange.Net.Sockets { // Connection closed unexpectedly Handle(errorHandlers, wse); - _ = Task.Run(async () => await CloseInternalAsync(true, true).ConfigureAwait(false)); + await CloseInternalAsync().ConfigureAwait(false); break; } @@ -476,7 +441,7 @@ namespace CryptoExchange.Net.Sockets { // Connection closed unexpectedly log.Write(LogLevel.Debug, $"Socket {Id} received `Close` message"); - _ = Task.Run(async () => await CloseInternalAsync(true, true).ConfigureAwait(false)); + await CloseInternalAsync().ConfigureAwait(false); break; } @@ -515,7 +480,7 @@ namespace CryptoExchange.Net.Sockets break; } - if (receiveResult == null || _closing) + if (receiveResult == null || _ctsSource.IsCancellationRequested) { // Error during receiving or cancellation requested, stop. break; @@ -547,7 +512,6 @@ namespace CryptoExchange.Net.Sockets finally { log.Write(LogLevel.Trace, $"Socket {Id} Receive loop finished"); - _startedReceive = false; } } @@ -615,7 +579,7 @@ namespace CryptoExchange.Net.Sockets { while (true) { - if (_closing) + if (_ctsSource.IsCancellationRequested) return; if (DateTime.UtcNow - LastActionTime > Timeout) diff --git a/CryptoExchange.Net/Sockets/SocketConnection.cs b/CryptoExchange.Net/Sockets/SocketConnection.cs index 18920de..58a7ebd 100644 --- a/CryptoExchange.Net/Sockets/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/SocketConnection.cs @@ -65,12 +65,16 @@ namespace CryptoExchange.Net.Sockets /// /// If connection is made /// - public bool Connected { get; private set; } + public bool Connected => Socket.IsOpen; /// /// The underlying websocket /// - public IWebsocket Socket { get; set; } + private IWebsocket Socket { get; set; } + public int SocketId => Socket.Id; + + public double IncomingKbps => Socket.IncomingKbps; + public Uri Uri => Socket.Uri; /// /// The API client the connection is for @@ -129,6 +133,7 @@ namespace CryptoExchange.Net.Sockets private readonly BaseSocketClient socketClient; private readonly List pendingRequests; + private Task _socketProcessReconnectTask; /// /// New socket connection @@ -149,10 +154,215 @@ namespace CryptoExchange.Net.Sockets Socket.Timeout = client.ClientOptions.SocketNoDataTimeout; Socket.OnMessage += ProcessMessage; - Socket.OnClose += SocketOnClose; Socket.OnOpen += SocketOnOpen; } + public async Task ConnectAsync() + { + var connected = await Socket.ConnectAsync().ConfigureAwait(false); + if (connected) + StartProcessingTask(); + + return connected; + } + + public async Task TriggerReconnectAsync() + { + await Socket.CloseAsync().ConfigureAwait(false); + } + + /// + /// Close the connection + /// + /// + public async Task CloseAsync() + { + ShouldReconnect = false; + if (socketClient.socketConnections.ContainsKey(Socket.Id)) + socketClient.socketConnections.TryRemove(Socket.Id, out _); + + lock (subscriptionLock) + { + foreach (var subscription in subscriptions) + { + if (subscription.CancellationTokenRegistration.HasValue) + subscription.CancellationTokenRegistration.Value.Dispose(); + } + } + await Socket.CloseAsync().ConfigureAwait(false); + Socket.Dispose(); + await _socketProcessReconnectTask.ConfigureAwait(false); + } + + /// + /// Close a subscription on this connection. If all subscriptions on this connection are closed the connection gets closed as well + /// + /// Subscription to close + /// + public async Task CloseAsync(SocketSubscription subscription) + { + if (!Socket.IsOpen) + return; + + if (subscription.CancellationTokenRegistration.HasValue) + subscription.CancellationTokenRegistration.Value.Dispose(); + + if (subscription.Confirmed) + await socketClient.UnsubscribeAsync(this, subscription).ConfigureAwait(false); + + bool shouldCloseConnection; + lock (subscriptionLock) + shouldCloseConnection = !subscriptions.Any(r => r.UserSubscription && subscription != r); + + if (shouldCloseConnection) + await CloseAsync().ConfigureAwait(false); + + lock (subscriptionLock) + subscriptions.Remove(subscription); + } + + private void StartProcessingTask() + { + _socketProcessReconnectTask = Task.Run(async () => + { + await Socket.ProcessAsync().ConfigureAwait(false); + await ReconnectAsync().ConfigureAwait(false); + }); + } + + private async Task ReconnectAsync() + { + // Fail all pending requests + lock (pendingRequests) + { + foreach (var pendingRequest in pendingRequests.ToList()) + { + pendingRequest.Fail(); + pendingRequests.Remove(pendingRequest); + } + } + + if (socketClient.ClientOptions.AutoReconnect && ShouldReconnect) + { + // Should reconnect + DisconnectTime = DateTime.UtcNow; + log.Write(LogLevel.Warning, $"Socket {Socket.Id} Connection lost, will try to reconnect"); + if (!lostTriggered) + { + lostTriggered = true; + ConnectionLost?.Invoke(); + } + + while (ShouldReconnect) + { + if (ReconnectTry > 0) + { + // Wait a bit before attempting reconnect + await Task.Delay(socketClient.ClientOptions.ReconnectInterval).ConfigureAwait(false); + } + + if (!ShouldReconnect) + { + // Should reconnect changed to false while waiting to reconnect + return; + } + + Socket.Reset(); + if (!await Socket.ConnectAsync().ConfigureAwait(false)) + { + // Reconnect failed + ReconnectTry++; + ResubscribeTry = 0; + if (socketClient.ClientOptions.MaxReconnectTries != null + && ReconnectTry >= socketClient.ClientOptions.MaxReconnectTries) + { + log.Write(LogLevel.Warning, $"Socket {Socket.Id} failed to reconnect after {ReconnectTry} tries, closing"); + ShouldReconnect = false; + + if (socketClient.socketConnections.ContainsKey(Socket.Id)) + socketClient.socketConnections.TryRemove(Socket.Id, out _); + + _ = Task.Run(() => ConnectionClosed?.Invoke()); + // Reached max tries, break loop and leave connection closed + break; + } + + // Continue to try again + log.Write(LogLevel.Debug, $"Socket {Socket.Id} failed to reconnect{(socketClient.ClientOptions.MaxReconnectTries != null ? $", try {ReconnectTry}/{socketClient.ClientOptions.MaxReconnectTries}" : "")}, will try again in {socketClient.ClientOptions.ReconnectInterval}"); + continue; + } + + // Successfully reconnected, start processing + StartProcessingTask(); + + ReconnectTry = 0; + var time = DisconnectTime; + DisconnectTime = null; + + log.Write(LogLevel.Information, $"Socket {Socket.Id} reconnected after {DateTime.UtcNow - time}"); + + var reconnectResult = await ProcessReconnectAsync().ConfigureAwait(false); + if (!reconnectResult) + { + // Failed to resubscribe everything + ResubscribeTry++; + DisconnectTime = time; + + if (socketClient.ClientOptions.MaxResubscribeTries != null && + ResubscribeTry >= socketClient.ClientOptions.MaxResubscribeTries) + { + log.Write(LogLevel.Warning, $"Socket {Socket.Id} failed to resubscribe after {ResubscribeTry} tries, closing. Last resubscription error: {reconnectResult.Error}"); + ShouldReconnect = false; + + if (socketClient.socketConnections.ContainsKey(Socket.Id)) + socketClient.socketConnections.TryRemove(Socket.Id, out _); + + _ = Task.Run(() => ConnectionClosed?.Invoke()); + } + else + log.Write(LogLevel.Debug, $"Socket {Socket.Id} resubscribing all subscriptions failed on reconnected socket{(socketClient.ClientOptions.MaxResubscribeTries != null ? $", try {ResubscribeTry}/{socketClient.ClientOptions.MaxResubscribeTries}" : "")}. Error: {reconnectResult.Error}. Disconnecting and reconnecting."); + + // Failed resubscribe, close socket if it is still open + if (Socket.IsOpen) + await Socket.CloseAsync().ConfigureAwait(false); + else + DisconnectTime = DateTime.UtcNow; + + // Break out of the loop, the new processing task should reconnect again + break; + } + else + { + // Succesfully reconnected + log.Write(LogLevel.Information, $"Socket {Socket.Id} data connection restored."); + ResubscribeTry = 0; + if (lostTriggered) + { + lostTriggered = false; + _ = Task.Run(() => ConnectionRestored?.Invoke(time.HasValue ? DateTime.UtcNow - time.Value : TimeSpan.FromSeconds(0))).ConfigureAwait(false); + } + + break; + } + } + } + else + { + if (!socketClient.ClientOptions.AutoReconnect && ShouldReconnect) + _ = Task.Run(() => ConnectionClosed?.Invoke()); + + // No reconnecting needed + log.Write(LogLevel.Information, $"Socket {Socket.Id} closed"); + if (socketClient.socketConnections.ContainsKey(Socket.Id)) + socketClient.socketConnections.TryRemove(Socket.Id, out _); + } + } + + public void Dispose() + { + Socket.Dispose(); + } + /// /// Process a message received by the socket /// @@ -202,12 +412,20 @@ namespace CryptoExchange.Net.Sockets // Message was not a request response, check data handlers var messageEvent = new MessageEvent(this, tokenData, socketClient.ClientOptions.OutputOriginalData ? data: null, timestamp); - if (!HandleData(messageEvent) && !handledResponse) + var (handled, userProcessTime) = HandleData(messageEvent); + if (!handled && !handledResponse) { if (!socketClient.UnhandledMessageExpected) log.Write(LogLevel.Warning, $"Socket {Socket.Id} Message not handled: " + tokenData); UnhandledMessage?.Invoke(tokenData); } + + var total = DateTime.UtcNow - timestamp; + if (userProcessTime.TotalMilliseconds > 500) + log.Write(LogLevel.Debug, $"Socket {Socket.Id} message processing slow ({(int)total.TotalMilliseconds}ms), consider offloading data handling to another thread. " + + "Data from this socket may arrive late or not at all if message processing is continuously slow."); + + log.Write(LogLevel.Trace, $"Socket {Socket.Id} message processed in {(int)total.TotalMilliseconds}ms, ({(int)userProcessTime.TotalMilliseconds}ms user code)"); } /// @@ -246,13 +464,13 @@ namespace CryptoExchange.Net.Sockets /// /// /// True if the data was successfully handled - private bool HandleData(MessageEvent messageEvent) + private (bool, TimeSpan) HandleData(MessageEvent messageEvent) { SocketSubscription? currentSubscription = null; try { var handled = false; - var sw = Stopwatch.StartNew(); + TimeSpan userCodeDuration = TimeSpan.Zero; // Loop the subscriptions to check if any of them signal us that the message is for them List subscriptionsCopy; @@ -267,7 +485,10 @@ namespace CryptoExchange.Net.Sockets if (socketClient.MessageMatchesHandler(this, messageEvent.JsonData, subscription.Identifier!)) { handled = true; + var userSw = Stopwatch.StartNew(); subscription.MessageHandler(messageEvent); + userSw.Stop(); + userCodeDuration = userSw.Elapsed; } } else @@ -276,24 +497,21 @@ namespace CryptoExchange.Net.Sockets { handled = true; messageEvent.JsonData = socketClient.ProcessTokenData(messageEvent.JsonData); + var userSw = Stopwatch.StartNew(); subscription.MessageHandler(messageEvent); + userSw.Stop(); + userCodeDuration = userSw.Elapsed; } } } - - sw.Stop(); - if (sw.ElapsedMilliseconds > 500) - log.Write(LogLevel.Debug, $"Socket {Socket.Id} message processing slow ({sw.ElapsedMilliseconds}ms), consider offloading data handling to another thread. " + - "Data from this socket may arrive late or not at all if message processing is continuously slow."); - else - log.Write(LogLevel.Trace, $"Socket {Socket.Id} message processed in {sw.ElapsedMilliseconds}ms"); - return handled; + + return (handled, userCodeDuration); } catch (Exception ex) { log.Write(LogLevel.Error, $"Socket {Socket.Id} Exception during message processing\r\nException: {ex.ToLogString()}\r\nData: {messageEvent.JsonData}"); currentSubscription?.InvokeExceptionHandler(ex); - return false; + return (false, TimeSpan.Zero); } } @@ -347,135 +565,6 @@ namespace CryptoExchange.Net.Sockets { ReconnectTry = 0; PausedActivity = false; - Connected = true; - } - - /// - /// Handler for a socket closing. Reconnects the socket if needed, or removes it from the active socket list if not - /// - protected virtual void SocketOnClose() - { - lock (pendingRequests) - { - foreach(var pendingRequest in pendingRequests.ToList()) - { - pendingRequest.Fail(); - pendingRequests.Remove(pendingRequest); - } - } - - if (socketClient.ClientOptions.AutoReconnect && ShouldReconnect) - { - if (Socket.Reconnecting) - return; // Already reconnecting - - Socket.Reconnecting = true; - - DisconnectTime = DateTime.UtcNow; - log.Write(LogLevel.Warning, $"Socket {Socket.Id} Connection lost, will try to reconnect"); - if (!lostTriggered) - { - lostTriggered = true; - ConnectionLost?.Invoke(); - } - - Task.Run(async () => - { - while (ShouldReconnect) - { - if (ReconnectTry > 0) - { - // Wait a bit before attempting reconnect - await Task.Delay(socketClient.ClientOptions.ReconnectInterval).ConfigureAwait(false); - } - - if (!ShouldReconnect) - { - // Should reconnect changed to false while waiting to reconnect - Socket.Reconnecting = false; - return; - } - - Socket.Reset(); - if (!await Socket.ConnectAsync().ConfigureAwait(false)) - { - ReconnectTry++; - ResubscribeTry = 0; - if (socketClient.ClientOptions.MaxReconnectTries != null - && ReconnectTry >= socketClient.ClientOptions.MaxReconnectTries) - { - log.Write(LogLevel.Warning, $"Socket {Socket.Id} failed to reconnect after {ReconnectTry} tries, closing"); - ShouldReconnect = false; - - if (socketClient.sockets.ContainsKey(Socket.Id)) - socketClient.sockets.TryRemove(Socket.Id, out _); - - _ = Task.Run(() => ConnectionClosed?.Invoke()); - break; - } - - log.Write(LogLevel.Debug, $"Socket {Socket.Id} failed to reconnect{(socketClient.ClientOptions.MaxReconnectTries != null ? $", try {ReconnectTry}/{socketClient.ClientOptions.MaxReconnectTries}": "")}, will try again in {socketClient.ClientOptions.ReconnectInterval}"); - continue; - } - - // Successfully reconnected - var time = DisconnectTime; - DisconnectTime = null; - - log.Write(LogLevel.Information, $"Socket {Socket.Id} reconnected after {DateTime.UtcNow - time}"); - - var reconnectResult = await ProcessReconnectAsync().ConfigureAwait(false); - if (!reconnectResult) - { - ResubscribeTry++; - DisconnectTime = time; - - if (socketClient.ClientOptions.MaxResubscribeTries != null && - ResubscribeTry >= socketClient.ClientOptions.MaxResubscribeTries) - { - log.Write(LogLevel.Warning, $"Socket {Socket.Id} failed to resubscribe after {ResubscribeTry} tries, closing. Last resubscription error: {reconnectResult.Error}"); - ShouldReconnect = false; - - if (socketClient.sockets.ContainsKey(Socket.Id)) - socketClient.sockets.TryRemove(Socket.Id, out _); - - _ = Task.Run(() => ConnectionClosed?.Invoke()); - } - else - log.Write(LogLevel.Debug, $"Socket {Socket.Id} resubscribing all subscriptions failed on reconnected socket{(socketClient.ClientOptions.MaxResubscribeTries != null ? $", try {ResubscribeTry}/{socketClient.ClientOptions.MaxResubscribeTries}" : "")}. Error: {reconnectResult.Error}. Disconnecting and reconnecting."); - - if (Socket.IsOpen) - await Socket.CloseAsync().ConfigureAwait(false); - else - DisconnectTime = DateTime.UtcNow; - } - else - { - log.Write(LogLevel.Information, $"Socket {Socket.Id} data connection restored."); - ResubscribeTry = 0; - if (lostTriggered) - { - lostTriggered = false; - _ = Task.Run(() => ConnectionRestored?.Invoke(time.HasValue ? DateTime.UtcNow - time.Value : TimeSpan.FromSeconds(0))).ConfigureAwait(false); - } - - break; - } - } - - Socket.Reconnecting = false; - }); - } - else - { - if (!socketClient.ClientOptions.AutoReconnect && ShouldReconnect) - _ = Task.Run(() => ConnectionClosed?.Invoke()); - - // No reconnecting needed - log.Write(LogLevel.Information, $"Socket {Socket.Id} closed"); - if (socketClient.sockets.ContainsKey(Socket.Id)) - socketClient.sockets.TryRemove(Socket.Id, out _); - } } private async Task> ProcessReconnectAsync() @@ -535,56 +624,6 @@ namespace CryptoExchange.Net.Sockets return await socketClient.SubscribeAndWaitAsync(this, socketSubscription.Request!, socketSubscription).ConfigureAwait(false); } - - /// - /// Close the connection - /// - /// - public async Task CloseAsync() - { - Connected = false; - ShouldReconnect = false; - if (socketClient.sockets.ContainsKey(Socket.Id)) - socketClient.sockets.TryRemove(Socket.Id, out _); - - lock (subscriptionLock) - { - foreach (var subscription in subscriptions) - { - if (subscription.CancellationTokenRegistration.HasValue) - subscription.CancellationTokenRegistration.Value.Dispose(); - } - } - await Socket.CloseAsync().ConfigureAwait(false); - Socket.Dispose(); - } - - /// - /// Close a subscription on this connection. If all subscriptions on this connection are closed the connection gets closed as well - /// - /// Subscription to close - /// - public async Task CloseAsync(SocketSubscription subscription) - { - if (!Socket.IsOpen) - return; - - if (subscription.CancellationTokenRegistration.HasValue) - subscription.CancellationTokenRegistration.Value.Dispose(); - - if (subscription.Confirmed) - await socketClient.UnsubscribeAsync(this, subscription).ConfigureAwait(false); - - bool shouldCloseConnection; - lock (subscriptionLock) - shouldCloseConnection = !subscriptions.Any(r => r.UserSubscription && subscription != r); - - if (shouldCloseConnection) - await CloseAsync().ConfigureAwait(false); - - lock (subscriptionLock) - subscriptions.Remove(subscription); - } } internal class PendingRequest diff --git a/CryptoExchange.Net/Sockets/UpdateSubscription.cs b/CryptoExchange.Net/Sockets/UpdateSubscription.cs index 50ef025..4462515 100644 --- a/CryptoExchange.Net/Sockets/UpdateSubscription.cs +++ b/CryptoExchange.Net/Sockets/UpdateSubscription.cs @@ -72,7 +72,7 @@ namespace CryptoExchange.Net.Sockets /// /// The id of the socket /// - public int SocketId => connection.Socket.Id; + public int SocketId => connection.SocketId; /// /// The id of the subscription @@ -103,9 +103,9 @@ namespace CryptoExchange.Net.Sockets /// Close the socket to cause a reconnect /// /// - internal Task ReconnectAsync() + public Task ReconnectAsync() { - return connection.Socket.CloseAsync(); + return connection.TriggerReconnectAsync(); } /// diff --git a/CryptoExchange.Net/Sockets/WebsocketFactory.cs b/CryptoExchange.Net/Sockets/WebsocketFactory.cs index 33ca49e..5a2e67b 100644 --- a/CryptoExchange.Net/Sockets/WebsocketFactory.cs +++ b/CryptoExchange.Net/Sockets/WebsocketFactory.cs @@ -1,4 +1,5 @@ -using System.Collections.Generic; +using System; +using System.Collections.Generic; using CryptoExchange.Net.Interfaces; using CryptoExchange.Net.Logging; @@ -12,13 +13,13 @@ namespace CryptoExchange.Net.Sockets /// public IWebsocket CreateWebsocket(Log log, string url) { - return new CryptoExchangeWebSocketClient(log, url); + return new CryptoExchangeWebSocketClient(log, new Uri(url)); } /// public IWebsocket CreateWebsocket(Log log, string url, IDictionary cookies, IDictionary headers) { - return new CryptoExchangeWebSocketClient(log, url, cookies, headers); + return new CryptoExchangeWebSocketClient(log, new Uri(url), cookies, headers); } } }