mirror of
https://github.com/JKorf/CryptoExchange.Net
synced 2025-10-27 08:27:19 +00:00
Fixed issue increasing the number of websocket connections increasing when sending a query when a previous connection was attempting to reconnect
This commit is contained in:
parent
0ba7b46680
commit
51732c5ce6
@ -227,7 +227,7 @@ namespace CryptoExchange.Net.Clients
|
|||||||
while (true)
|
while (true)
|
||||||
{
|
{
|
||||||
// Get a new or existing socket connection
|
// Get a new or existing socket connection
|
||||||
var socketResult = await GetSocketConnection(url, subscription.Authenticated, false, subscription.Topic).ConfigureAwait(false);
|
var socketResult = await GetSocketConnection(url, subscription.Authenticated, false, ct, subscription.Topic).ConfigureAwait(false);
|
||||||
if (!socketResult)
|
if (!socketResult)
|
||||||
return socketResult.As<UpdateSubscription>(null);
|
return socketResult.As<UpdateSubscription>(null);
|
||||||
|
|
||||||
@ -343,7 +343,7 @@ namespace CryptoExchange.Net.Clients
|
|||||||
await semaphoreSlim.WaitAsync().ConfigureAwait(false);
|
await semaphoreSlim.WaitAsync().ConfigureAwait(false);
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
var socketResult = await GetSocketConnection(url, query.Authenticated, true).ConfigureAwait(false);
|
var socketResult = await GetSocketConnection(url, query.Authenticated, true, ct).ConfigureAwait(false);
|
||||||
if (!socketResult)
|
if (!socketResult)
|
||||||
return socketResult.As<THandlerResponse>(default);
|
return socketResult.As<THandlerResponse>(default);
|
||||||
|
|
||||||
@ -494,25 +494,56 @@ namespace CryptoExchange.Net.Clients
|
|||||||
/// <param name="address">The address the socket is for</param>
|
/// <param name="address">The address the socket is for</param>
|
||||||
/// <param name="authenticated">Whether the socket should be authenticated</param>
|
/// <param name="authenticated">Whether the socket should be authenticated</param>
|
||||||
/// <param name="dedicatedRequestConnection">Whether a dedicated request connection should be returned</param>
|
/// <param name="dedicatedRequestConnection">Whether a dedicated request connection should be returned</param>
|
||||||
|
/// <param name="ct">Cancellation token</param>
|
||||||
/// <param name="topic">The subscription topic, can be provided when multiple of the same topics are not allowed on a connection</param>
|
/// <param name="topic">The subscription topic, can be provided when multiple of the same topics are not allowed on a connection</param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
protected virtual async Task<CallResult<SocketConnection>> GetSocketConnection(string address, bool authenticated, bool dedicatedRequestConnection, string? topic = null)
|
protected virtual async Task<CallResult<SocketConnection>> GetSocketConnection(string address, bool authenticated, bool dedicatedRequestConnection, CancellationToken ct, string? topic = null)
|
||||||
{
|
{
|
||||||
var socketQuery = socketConnections.Where(s => (s.Value.Status == SocketConnection.SocketStatus.None || s.Value.Status == SocketConnection.SocketStatus.Connected)
|
var socketQuery = socketConnections.Where(s => s.Value.Tag.TrimEnd('/') == address.TrimEnd('/')
|
||||||
&& s.Value.Tag.TrimEnd('/') == address.TrimEnd('/')
|
|
||||||
&& s.Value.ApiClient.GetType() == GetType()
|
&& s.Value.ApiClient.GetType() == GetType()
|
||||||
&& (s.Value.Authenticated == authenticated || !authenticated)
|
&& (AllowTopicsOnTheSameConnection || !s.Value.Topics.Contains(topic)))
|
||||||
&& (AllowTopicsOnTheSameConnection || !s.Value.Topics.Contains(topic))
|
.Select(x => x.Value)
|
||||||
&& s.Value.Connected);
|
.ToList();
|
||||||
|
|
||||||
SocketConnection connection;
|
// If all current socket connections are reconnecting or resubscribing wait for that to finish as we can probably use the existing connection
|
||||||
|
var delayStart = DateTime.UtcNow;
|
||||||
|
var delayed = false;
|
||||||
|
while (socketQuery.Count >= 1 && socketQuery.All(x => x.Status == SocketConnection.SocketStatus.Reconnecting || x.Status == SocketConnection.SocketStatus.Resubscribing))
|
||||||
|
{
|
||||||
|
if (DateTime.UtcNow - delayStart > TimeSpan.FromSeconds(10))
|
||||||
|
{
|
||||||
|
if (socketQuery.Count >= 1 && socketQuery.All(x => x.Status == SocketConnection.SocketStatus.Reconnecting || x.Status == SocketConnection.SocketStatus.Resubscribing))
|
||||||
|
{
|
||||||
|
// If after this time we still trying to reconnect/reprocess there is some issue in the connection
|
||||||
|
_logger.TimeoutWaitingForReconnectingSocket();
|
||||||
|
return new CallResult<SocketConnection>(new CantConnectError());
|
||||||
|
}
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
delayed = true;
|
||||||
|
try { await Task.Delay(50, ct).ConfigureAwait(false); } catch (Exception) { }
|
||||||
|
|
||||||
|
if (ct.IsCancellationRequested)
|
||||||
|
return new CallResult<SocketConnection>(new CancellationRequestedError());
|
||||||
|
}
|
||||||
|
|
||||||
|
if (delayed)
|
||||||
|
_logger.WaitedForReconnectingSocket((long)(DateTime.UtcNow - delayStart).TotalMilliseconds);
|
||||||
|
|
||||||
|
socketQuery = socketQuery.Where(s => (s.Status == SocketConnection.SocketStatus.None || s.Status == SocketConnection.SocketStatus.Connected)
|
||||||
|
&& (s.Authenticated == authenticated || !authenticated)
|
||||||
|
&& s.Connected).ToList();
|
||||||
|
|
||||||
|
SocketConnection? connection;
|
||||||
if (!dedicatedRequestConnection)
|
if (!dedicatedRequestConnection)
|
||||||
{
|
{
|
||||||
connection = socketQuery.Where(s => !s.Value.DedicatedRequestConnection.IsDedicatedRequestConnection).OrderBy(s => s.Value.UserSubscriptionCount).FirstOrDefault().Value;
|
connection = socketQuery.Where(s => !s.DedicatedRequestConnection.IsDedicatedRequestConnection).OrderBy(s => s.UserSubscriptionCount).FirstOrDefault();
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
connection = socketQuery.Where(s => s.Value.DedicatedRequestConnection.IsDedicatedRequestConnection).FirstOrDefault().Value;
|
connection = socketQuery.Where(s => s.DedicatedRequestConnection.IsDedicatedRequestConnection).FirstOrDefault();
|
||||||
if (connection != null && !connection.DedicatedRequestConnection.Authenticated)
|
if (connection != null && !connection.DedicatedRequestConnection.Authenticated)
|
||||||
// Mark dedicated request connection as authenticated if the request is authenticated
|
// Mark dedicated request connection as authenticated if the request is authenticated
|
||||||
connection.DedicatedRequestConnection.Authenticated = authenticated;
|
connection.DedicatedRequestConnection.Authenticated = authenticated;
|
||||||
@ -520,10 +551,13 @@ namespace CryptoExchange.Net.Clients
|
|||||||
|
|
||||||
if (connection != null)
|
if (connection != null)
|
||||||
{
|
{
|
||||||
if (connection.UserSubscriptionCount < ClientOptions.SocketSubscriptionsCombineTarget || (socketConnections.Count >= (ApiOptions.MaxSocketConnections ?? ClientOptions.MaxSocketConnections) && socketConnections.All(s => s.Value.UserSubscriptionCount >= ClientOptions.SocketSubscriptionsCombineTarget)))
|
if (connection.UserSubscriptionCount < ClientOptions.SocketSubscriptionsCombineTarget
|
||||||
|
|| (socketConnections.Count >= (ApiOptions.MaxSocketConnections ?? ClientOptions.MaxSocketConnections) && socketConnections.All(s => s.Value.UserSubscriptionCount >= ClientOptions.SocketSubscriptionsCombineTarget)))
|
||||||
|
{
|
||||||
// Use existing socket if it has less than target connections OR it has the least connections and we can't make new
|
// Use existing socket if it has less than target connections OR it has the least connections and we can't make new
|
||||||
return new CallResult<SocketConnection>(connection);
|
return new CallResult<SocketConnection>(connection);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
var connectionAddress = await GetConnectionUrlAsync(address, authenticated).ConfigureAwait(false);
|
var connectionAddress = await GetConnectionUrlAsync(address, authenticated).ConfigureAwait(false);
|
||||||
if (!connectionAddress)
|
if (!connectionAddress)
|
||||||
@ -716,7 +750,7 @@ namespace CryptoExchange.Net.Clients
|
|||||||
{
|
{
|
||||||
foreach (var item in DedicatedConnectionConfigs)
|
foreach (var item in DedicatedConnectionConfigs)
|
||||||
{
|
{
|
||||||
var socketResult = await GetSocketConnection(item.SocketAddress, item.Authenticated, true).ConfigureAwait(false);
|
var socketResult = await GetSocketConnection(item.SocketAddress, item.Authenticated, true, CancellationToken.None).ConfigureAwait(false);
|
||||||
if (!socketResult)
|
if (!socketResult)
|
||||||
return socketResult.AsDataless();
|
return socketResult.AsDataless();
|
||||||
|
|
||||||
|
|||||||
@ -23,6 +23,8 @@ namespace CryptoExchange.Net.Logging.Extensions
|
|||||||
private static readonly Action<ILogger, int, int, Exception?> _unsubscribingSubscription;
|
private static readonly Action<ILogger, int, int, Exception?> _unsubscribingSubscription;
|
||||||
private static readonly Action<ILogger, int, Exception?> _reconnectingAllConnections;
|
private static readonly Action<ILogger, int, Exception?> _reconnectingAllConnections;
|
||||||
private static readonly Action<ILogger, DateTime, Exception?> _addingRetryAfterGuard;
|
private static readonly Action<ILogger, DateTime, Exception?> _addingRetryAfterGuard;
|
||||||
|
private static readonly Action<ILogger, Exception?> _timeoutWaitingForReconnectingSocket;
|
||||||
|
private static readonly Action<ILogger, long, Exception?> _waitedForReconnectingSocket;
|
||||||
|
|
||||||
static SocketApiClientLoggingExtension()
|
static SocketApiClientLoggingExtension()
|
||||||
{
|
{
|
||||||
@ -110,6 +112,16 @@ namespace CryptoExchange.Net.Logging.Extensions
|
|||||||
LogLevel.Warning,
|
LogLevel.Warning,
|
||||||
new EventId(3018, "AddRetryAfterGuard"),
|
new EventId(3018, "AddRetryAfterGuard"),
|
||||||
"Adding RetryAfterGuard ({RetryAfter}) because the connection attempt was rate limited");
|
"Adding RetryAfterGuard ({RetryAfter}) because the connection attempt was rate limited");
|
||||||
|
|
||||||
|
_timeoutWaitingForReconnectingSocket = LoggerMessage.Define(
|
||||||
|
LogLevel.Debug,
|
||||||
|
new EventId(3019, "TimeoutWaitingForReconnectingSocket"),
|
||||||
|
"Timeout while waiting for existing socket reconnection, failing request");
|
||||||
|
|
||||||
|
_waitedForReconnectingSocket = LoggerMessage.Define<long>(
|
||||||
|
LogLevel.Trace,
|
||||||
|
new EventId(3020, "WaitedForReconnectingSocket"),
|
||||||
|
"Waited for reconnecting socket for {Timespan}ms");
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void FailedToAddSubscriptionRetryOnDifferentConnection(this ILogger logger, int socketId)
|
public static void FailedToAddSubscriptionRetryOnDifferentConnection(this ILogger logger, int socketId)
|
||||||
@ -196,5 +208,14 @@ namespace CryptoExchange.Net.Logging.Extensions
|
|||||||
{
|
{
|
||||||
_addingRetryAfterGuard(logger, retryAfter, null);
|
_addingRetryAfterGuard(logger, retryAfter, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static void TimeoutWaitingForReconnectingSocket(this ILogger logger)
|
||||||
|
{
|
||||||
|
_timeoutWaitingForReconnectingSocket(logger, null);
|
||||||
|
}
|
||||||
|
public static void WaitedForReconnectingSocket(this ILogger logger, long milliseconds)
|
||||||
|
{
|
||||||
|
_waitedForReconnectingSocket(logger, milliseconds, null);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user