diff --git a/CryptoExchange.Net/Clients/SocketApiClient.cs b/CryptoExchange.Net/Clients/SocketApiClient.cs index 0626864..bc1ddd7 100644 --- a/CryptoExchange.Net/Clients/SocketApiClient.cs +++ b/CryptoExchange.Net/Clients/SocketApiClient.cs @@ -227,7 +227,7 @@ namespace CryptoExchange.Net.Clients while (true) { // Get a new or existing socket connection - var socketResult = await GetSocketConnection(url, subscription.Authenticated, false, subscription.Topic).ConfigureAwait(false); + var socketResult = await GetSocketConnection(url, subscription.Authenticated, false, ct, subscription.Topic).ConfigureAwait(false); if (!socketResult) return socketResult.As(null); @@ -343,7 +343,7 @@ namespace CryptoExchange.Net.Clients await semaphoreSlim.WaitAsync().ConfigureAwait(false); try { - var socketResult = await GetSocketConnection(url, query.Authenticated, true).ConfigureAwait(false); + var socketResult = await GetSocketConnection(url, query.Authenticated, true, ct).ConfigureAwait(false); if (!socketResult) return socketResult.As(default); @@ -494,25 +494,56 @@ namespace CryptoExchange.Net.Clients /// The address the socket is for /// Whether the socket should be authenticated /// Whether a dedicated request connection should be returned + /// Cancellation token /// The subscription topic, can be provided when multiple of the same topics are not allowed on a connection /// - protected virtual async Task> GetSocketConnection(string address, bool authenticated, bool dedicatedRequestConnection, string? topic = null) + protected virtual async Task> GetSocketConnection(string address, bool authenticated, bool dedicatedRequestConnection, CancellationToken ct, string? topic = null) { - var socketQuery = socketConnections.Where(s => (s.Value.Status == SocketConnection.SocketStatus.None || s.Value.Status == SocketConnection.SocketStatus.Connected) - && s.Value.Tag.TrimEnd('/') == address.TrimEnd('/') + var socketQuery = socketConnections.Where(s => s.Value.Tag.TrimEnd('/') == address.TrimEnd('/') && s.Value.ApiClient.GetType() == GetType() - && (s.Value.Authenticated == authenticated || !authenticated) - && (AllowTopicsOnTheSameConnection || !s.Value.Topics.Contains(topic)) - && s.Value.Connected); + && (AllowTopicsOnTheSameConnection || !s.Value.Topics.Contains(topic))) + .Select(x => x.Value) + .ToList(); - SocketConnection connection; + // If all current socket connections are reconnecting or resubscribing wait for that to finish as we can probably use the existing connection + var delayStart = DateTime.UtcNow; + var delayed = false; + while (socketQuery.Count >= 1 && socketQuery.All(x => x.Status == SocketConnection.SocketStatus.Reconnecting || x.Status == SocketConnection.SocketStatus.Resubscribing)) + { + if (DateTime.UtcNow - delayStart > TimeSpan.FromSeconds(10)) + { + if (socketQuery.Count >= 1 && socketQuery.All(x => x.Status == SocketConnection.SocketStatus.Reconnecting || x.Status == SocketConnection.SocketStatus.Resubscribing)) + { + // If after this time we still trying to reconnect/reprocess there is some issue in the connection + _logger.TimeoutWaitingForReconnectingSocket(); + return new CallResult(new CantConnectError()); + } + + break; + } + + delayed = true; + try { await Task.Delay(50, ct).ConfigureAwait(false); } catch (Exception) { } + + if (ct.IsCancellationRequested) + return new CallResult(new CancellationRequestedError()); + } + + if (delayed) + _logger.WaitedForReconnectingSocket((long)(DateTime.UtcNow - delayStart).TotalMilliseconds); + + socketQuery = socketQuery.Where(s => (s.Status == SocketConnection.SocketStatus.None || s.Status == SocketConnection.SocketStatus.Connected) + && (s.Authenticated == authenticated || !authenticated) + && s.Connected).ToList(); + + SocketConnection? connection; if (!dedicatedRequestConnection) { - connection = socketQuery.Where(s => !s.Value.DedicatedRequestConnection.IsDedicatedRequestConnection).OrderBy(s => s.Value.UserSubscriptionCount).FirstOrDefault().Value; + connection = socketQuery.Where(s => !s.DedicatedRequestConnection.IsDedicatedRequestConnection).OrderBy(s => s.UserSubscriptionCount).FirstOrDefault(); } else { - connection = socketQuery.Where(s => s.Value.DedicatedRequestConnection.IsDedicatedRequestConnection).FirstOrDefault().Value; + connection = socketQuery.Where(s => s.DedicatedRequestConnection.IsDedicatedRequestConnection).FirstOrDefault(); if (connection != null && !connection.DedicatedRequestConnection.Authenticated) // Mark dedicated request connection as authenticated if the request is authenticated connection.DedicatedRequestConnection.Authenticated = authenticated; @@ -520,9 +551,12 @@ namespace CryptoExchange.Net.Clients if (connection != null) { - if (connection.UserSubscriptionCount < ClientOptions.SocketSubscriptionsCombineTarget || (socketConnections.Count >= (ApiOptions.MaxSocketConnections ?? ClientOptions.MaxSocketConnections) && socketConnections.All(s => s.Value.UserSubscriptionCount >= ClientOptions.SocketSubscriptionsCombineTarget))) + if (connection.UserSubscriptionCount < ClientOptions.SocketSubscriptionsCombineTarget + || (socketConnections.Count >= (ApiOptions.MaxSocketConnections ?? ClientOptions.MaxSocketConnections) && socketConnections.All(s => s.Value.UserSubscriptionCount >= ClientOptions.SocketSubscriptionsCombineTarget))) + { // Use existing socket if it has less than target connections OR it has the least connections and we can't make new return new CallResult(connection); + } } var connectionAddress = await GetConnectionUrlAsync(address, authenticated).ConfigureAwait(false); @@ -716,7 +750,7 @@ namespace CryptoExchange.Net.Clients { foreach (var item in DedicatedConnectionConfigs) { - var socketResult = await GetSocketConnection(item.SocketAddress, item.Authenticated, true).ConfigureAwait(false); + var socketResult = await GetSocketConnection(item.SocketAddress, item.Authenticated, true, CancellationToken.None).ConfigureAwait(false); if (!socketResult) return socketResult.AsDataless(); diff --git a/CryptoExchange.Net/Logging/Extensions/SocketApiClientLoggingExtension.cs b/CryptoExchange.Net/Logging/Extensions/SocketApiClientLoggingExtension.cs index c8e553a..75dfe26 100644 --- a/CryptoExchange.Net/Logging/Extensions/SocketApiClientLoggingExtension.cs +++ b/CryptoExchange.Net/Logging/Extensions/SocketApiClientLoggingExtension.cs @@ -23,6 +23,8 @@ namespace CryptoExchange.Net.Logging.Extensions private static readonly Action _unsubscribingSubscription; private static readonly Action _reconnectingAllConnections; private static readonly Action _addingRetryAfterGuard; + private static readonly Action _timeoutWaitingForReconnectingSocket; + private static readonly Action _waitedForReconnectingSocket; static SocketApiClientLoggingExtension() { @@ -110,6 +112,16 @@ namespace CryptoExchange.Net.Logging.Extensions LogLevel.Warning, new EventId(3018, "AddRetryAfterGuard"), "Adding RetryAfterGuard ({RetryAfter}) because the connection attempt was rate limited"); + + _timeoutWaitingForReconnectingSocket = LoggerMessage.Define( + LogLevel.Debug, + new EventId(3019, "TimeoutWaitingForReconnectingSocket"), + "Timeout while waiting for existing socket reconnection, failing request"); + + _waitedForReconnectingSocket = LoggerMessage.Define( + LogLevel.Trace, + new EventId(3020, "WaitedForReconnectingSocket"), + "Waited for reconnecting socket for {Timespan}ms"); } public static void FailedToAddSubscriptionRetryOnDifferentConnection(this ILogger logger, int socketId) @@ -196,5 +208,14 @@ namespace CryptoExchange.Net.Logging.Extensions { _addingRetryAfterGuard(logger, retryAfter, null); } + + public static void TimeoutWaitingForReconnectingSocket(this ILogger logger) + { + _timeoutWaitingForReconnectingSocket(logger, null); + } + public static void WaitedForReconnectingSocket(this ILogger logger, long milliseconds) + { + _waitedForReconnectingSocket(logger, milliseconds, null); + } } }