diff --git a/CryptoExchange.Net/Clients/BaseSocketClient.cs b/CryptoExchange.Net/Clients/BaseSocketClient.cs index d4af631..b81c132 100644 --- a/CryptoExchange.Net/Clients/BaseSocketClient.cs +++ b/CryptoExchange.Net/Clients/BaseSocketClient.cs @@ -92,6 +92,18 @@ namespace CryptoExchange.Net } } + public int CurrentConnections => socketConnections.Count; + public int CurrentSubscriptions + { + get + { + if (!socketConnections.Any()) + return 0; + + return socketConnections.Sum(s => s.Value.SubscriptionCount); + } + } + /// /// Client options /// @@ -164,7 +176,7 @@ namespace CryptoExchange.Net return new CallResult(new InvalidOperationError("Client disposed, can't subscribe")); SocketConnection socketConnection; - SocketSubscription subscription; + SocketSubscription? subscription; var released = false; // Wait for a semaphore here, so we only connect 1 socket at a time. // This is necessary for being able to see if connections can be combined @@ -179,23 +191,34 @@ namespace CryptoExchange.Net try { - // Get a new or existing socket connection - socketConnection = GetSocketConnection(apiClient, url, authenticated); - - // Add a subscription on the socket connection - subscription = AddSubscription(request, identifier, true, socketConnection, dataHandler); - if (ClientOptions.SocketSubscriptionsCombineTarget == 1) + while (true) { - // Only 1 subscription per connection, so no need to wait for connection since a new subscription will create a new connection anyway - semaphoreSlim.Release(); - released = true; + // Get a new or existing socket connection + socketConnection = GetSocketConnection(apiClient, url, authenticated); + + // Add a subscription on the socket connection + subscription = AddSubscription(request, identifier, true, socketConnection, dataHandler); + if (subscription == null) + { + log.Write(LogLevel.Trace, $"Socket {socketConnection.SocketId} failed to add subscription, retrying on different connection"); + continue; + } + + if (ClientOptions.SocketSubscriptionsCombineTarget == 1) + { + // Only 1 subscription per connection, so no need to wait for connection since a new subscription will create a new connection anyway + semaphoreSlim.Release(); + released = true; + } + + var needsConnecting = !socketConnection.Connected; + + var connectResult = await ConnectIfNeededAsync(socketConnection, authenticated).ConfigureAwait(false); + if (!connectResult) + return new CallResult(connectResult.Error!); + + break; } - - var needsConnecting = !socketConnection.Connected; - - var connectResult = await ConnectIfNeededAsync(socketConnection, authenticated).ConfigureAwait(false); - if (!connectResult) - return new CallResult(connectResult.Error!); } finally { @@ -443,9 +466,6 @@ namespace CryptoExchange.Net /// /// protected internal virtual JToken ProcessTokenData(JToken message) - - - { return message; } @@ -460,7 +480,7 @@ namespace CryptoExchange.Net /// The socket connection the handler is on /// The handler of the data received /// - protected virtual SocketSubscription AddSubscription(object? request, string? identifier, bool userSubscription, SocketConnection connection, Action> dataHandler) + protected virtual SocketSubscription? AddSubscription(object? request, string? identifier, bool userSubscription, SocketConnection connection, Action> dataHandler) { void InternalHandler(MessageEvent messageEvent) { @@ -484,7 +504,8 @@ namespace CryptoExchange.Net var subscription = request == null ? SocketSubscription.CreateForIdentifier(NextId(), identifier!, userSubscription, InternalHandler) : SocketSubscription.CreateForRequest(NextId(), request, userSubscription, InternalHandler); - connection.AddSubscription(subscription); + if (!connection.AddSubscription(subscription)) + return null; return subscription; } @@ -510,7 +531,8 @@ namespace CryptoExchange.Net /// protected virtual SocketConnection GetSocketConnection(SocketApiClient apiClient, string address, bool authenticated) { - var socketResult = socketConnections.Where(s => s.Value.Uri.ToString().TrimEnd('/') == address.TrimEnd('/') + var socketResult = socketConnections.Where(s => (s.Value.Status == SocketConnection.SocketStatus.None || s.Value.Status == SocketConnection.SocketStatus.Connected) + && s.Value.Uri.ToString().TrimEnd('/') == address.TrimEnd('/') && (s.Value.ApiClient.GetType() == apiClient.GetType()) && (s.Value.Authenticated == authenticated || !authenticated) && s.Value.Connected).OrderBy(s => s.Value.SubscriptionCount).FirstOrDefault(); var result = socketResult.Equals(default(KeyValuePair)) ? null : socketResult.Value; diff --git a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs index ad9b85e..75617f2 100644 --- a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs +++ b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs @@ -34,6 +34,8 @@ namespace CryptoExchange.Net.Sockets private readonly List _outgoingMessages; private DateTime _lastReceivedMessagesUpdate; + private bool _closed; + private bool _disposed; /// /// Received messages, the size and the timstamp @@ -279,6 +281,10 @@ namespace CryptoExchange.Net.Sockets /// private async Task CloseInternalAsync() { + if (_closed || _disposed) + return; + + _closed = true; _ctsSource.Cancel(); _sendEvent.Set(); @@ -291,6 +297,15 @@ namespace CryptoExchange.Net.Sockets catch(Exception) { } // Can sometimes throw an exception when socket is in aborted state due to timing } + else if(_socket.State == WebSocketState.CloseReceived) + { + try + { + await _socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing", default).ConfigureAwait(false); + } + catch (Exception) + { } // Can sometimes throw an exception when socket is in aborted state due to timing + } log.Write(LogLevel.Debug, $"Socket {Id} closed"); Handle(closeHandlers); } @@ -300,7 +315,11 @@ namespace CryptoExchange.Net.Sockets /// public void Dispose() { + if (_disposed) + return; + log.Write(LogLevel.Debug, $"Socket {Id} disposing"); + _disposed = true; _socket.Dispose(); _ctsSource.Dispose(); @@ -320,6 +339,7 @@ namespace CryptoExchange.Net.Sockets while (_sendBuffer.TryDequeue(out _)) { } // Clear send buffer _socket = CreateSocket(); + _closed = false; } /// diff --git a/CryptoExchange.Net/Sockets/SocketConnection.cs b/CryptoExchange.Net/Sockets/SocketConnection.cs index b4efe6e..3e61a1d 100644 --- a/CryptoExchange.Net/Sockets/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/SocketConnection.cs @@ -139,10 +139,26 @@ namespace CryptoExchange.Net.Sockets private readonly BaseSocketClient socketClient; private readonly List pendingRequests; - private Task? _socketProcessReconnectTask; + private Task? _socketProcessTask; + private Task? _socketReconnectTask; + private readonly AsyncResetEvent _reconnectWaitEvent; private SocketStatus _status; + /// + /// Status of the socket connection + /// + public SocketStatus Status + { + get => _status; + private set + { + var oldStatus = _status; + _status = value; + log.Write(LogLevel.Trace, $"Socket {SocketId} status changed from {oldStatus} to {_status}"); + } + } + /// /// The underlying websocket /// @@ -165,9 +181,13 @@ namespace CryptoExchange.Net.Sockets subscriptions = new List(); _socket = socket; + _reconnectWaitEvent = new AsyncResetEvent(false, true); + _socket.Timeout = client.ClientOptions.SocketNoDataTimeout; _socket.OnMessage += ProcessMessage; _socket.OnOpen += SocketOnOpen; + _socket.OnClose += () => _reconnectWaitEvent.Set(); + } /// @@ -178,7 +198,11 @@ namespace CryptoExchange.Net.Sockets { var connected = await _socket.ConnectAsync().ConfigureAwait(false); if (connected) - StartProcessingTask(); + { + Status = SocketStatus.Connected; + _socketReconnectTask = ReconnectWatcherAsync(); + _socketProcessTask = _socket.ProcessAsync(); + } return connected; } @@ -207,6 +231,9 @@ namespace CryptoExchange.Net.Sockets /// public async Task CloseAsync() { + if (Status == SocketStatus.Closed || Status == SocketStatus.Disposed) + return; + ShouldReconnect = false; if (socketClient.socketConnections.ContainsKey(SocketId)) socketClient.socketConnections.TryRemove(SocketId, out _); @@ -220,24 +247,13 @@ namespace CryptoExchange.Net.Sockets } } - if (_status == SocketStatus.Reconnecting) - { - // Wait for reconnect task to finish - log.Write(LogLevel.Trace, "In reconnecting state, waiting for reconnecting to end"); - if (_socketProcessReconnectTask != null) - await _socketProcessReconnectTask.ConfigureAwait(false); - - await _socket.CloseAsync().ConfigureAwait(false); - } - else - { - // Close before waiting for process task to finish - await _socket.CloseAsync().ConfigureAwait(false); - - if (_socketProcessReconnectTask != null) - await _socketProcessReconnectTask.ConfigureAwait(false); - } + while (Status == SocketStatus.Reconnecting) + // Wait for reconnecting to finish + await Task.Delay(100).ConfigureAwait(false); + await _socket.CloseAsync().ConfigureAwait(false); + if(_socketProcessTask != null) + await _socketProcessTask.ConfigureAwait(false); _socket.Dispose(); } @@ -248,39 +264,40 @@ namespace CryptoExchange.Net.Sockets /// public async Task CloseAsync(SocketSubscription subscription) { - if (!_socket.IsOpen || _status == SocketStatus.Disposed) + if (Status == SocketStatus.Closing || Status == SocketStatus.Closed || Status == SocketStatus.Disposed) return; + log.Write(LogLevel.Trace, $"Socket {SocketId} closing subscription {subscription.Id}"); if (subscription.CancellationTokenRegistration.HasValue) subscription.CancellationTokenRegistration.Value.Dispose(); - if (subscription.Confirmed) + if (subscription.Confirmed && _socket.IsOpen) await socketClient.UnsubscribeAsync(this, subscription).ConfigureAwait(false); bool shouldCloseConnection; lock (subscriptionLock) - shouldCloseConnection = !subscriptions.Any(r => r.UserSubscription && subscription != r); + { + if (Status == SocketStatus.Closing) + { + log.Write(LogLevel.Trace, $"Socket {SocketId} already closing"); + return; + } + + shouldCloseConnection = subscriptions.All(r => !r.UserSubscription || r == subscription); + if (shouldCloseConnection) + Status = SocketStatus.Closing; + } if (shouldCloseConnection) + { + log.Write(LogLevel.Trace, $"Socket {SocketId} closing as there are no more subscriptions"); await CloseAsync().ConfigureAwait(false); + } lock (subscriptionLock) subscriptions.Remove(subscription); } - private void StartProcessingTask() - { - log.Write(LogLevel.Trace, $"Starting {SocketId} process/reconnect task"); - _status = SocketStatus.Processing; - _socketProcessReconnectTask = Task.Run(async () => - { - await _socket.ProcessAsync().ConfigureAwait(false); - _status = SocketStatus.Reconnecting; - await ReconnectAsync().ConfigureAwait(false); - log.Write(LogLevel.Trace, $"Process/reconnect {SocketId} task finished"); - }); - } - private async Task ReconnectAsync() { // Fail all pending requests @@ -344,7 +361,8 @@ namespace CryptoExchange.Net.Sockets } // Successfully reconnected, start processing - StartProcessingTask(); + Status = SocketStatus.Connected; + _socketProcessTask = _socket.ProcessAsync(); ReconnectTry = 0; var time = DisconnectTime; @@ -414,7 +432,7 @@ namespace CryptoExchange.Net.Sockets /// public void Dispose() { - _status = SocketStatus.Disposed; + Status = SocketStatus.Disposed; _socket.Dispose(); } @@ -487,10 +505,17 @@ namespace CryptoExchange.Net.Sockets /// Add a subscription to this connection /// /// - public void AddSubscription(SocketSubscription subscription) + public bool AddSubscription(SocketSubscription subscription) { - lock(subscriptionLock) + lock (subscriptionLock) + { + if (Status != SocketStatus.None && Status != SocketStatus.Connected) + return false; + subscriptions.Add(subscription); + log.Write(LogLevel.Trace, $"Socket {SocketId} adding new subscription with id {subscription.Id}, total subscriptions on connection: {subscriptions.Count}"); + return true; + } } /// @@ -633,6 +658,22 @@ namespace CryptoExchange.Net.Sockets PausedActivity = false; } + private async Task ReconnectWatcherAsync() + { + while (true) + { + await _reconnectWaitEvent.WaitAsync().ConfigureAwait(false); + if (!ShouldReconnect) + return; + + Status = SocketStatus.Reconnecting; + await ReconnectAsync().ConfigureAwait(false); + + if (!ShouldReconnect) + return; + } + } + private async Task> ProcessReconnectAsync() { if (!_socket.IsOpen) @@ -691,11 +732,34 @@ namespace CryptoExchange.Net.Sockets return await socketClient.SubscribeAndWaitAsync(this, socketSubscription.Request!, socketSubscription).ConfigureAwait(false); } - private enum SocketStatus + /// + /// Status of the socket connection + /// + public enum SocketStatus { + /// + /// None/Initial + /// None, - Processing, + /// + /// Connected + /// + Connected, + /// + /// Reconnecting + /// Reconnecting, + /// + /// Closing + /// + Closing, + /// + /// Closed + /// + Closed, + /// + /// Disposed + /// Disposed } }