1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-06-08 00:16:27 +00:00

Fixed socket client options setting, added automatic unsubscribe if the subscription confirmation comes in after request timeout

This commit is contained in:
JKorf 2023-02-12 14:05:00 +01:00
parent 6361c5ef25
commit a222bb3f02
4 changed files with 44 additions and 22 deletions

View File

@ -265,7 +265,7 @@ namespace CryptoExchange.Net
protected internal virtual async Task<CallResult<bool>> SubscribeAndWaitAsync(SocketConnection socketConnection, object request, SocketSubscription subscription) protected internal virtual async Task<CallResult<bool>> SubscribeAndWaitAsync(SocketConnection socketConnection, object request, SocketSubscription subscription)
{ {
CallResult<object>? callResult = null; CallResult<object>? callResult = null;
await socketConnection.SendAndWaitAsync(request, Options.SocketResponseTimeout, data => HandleSubscriptionResponse(socketConnection, subscription, request, data, out callResult)).ConfigureAwait(false); await socketConnection.SendAndWaitAsync(request, Options.SocketResponseTimeout, subscription, data => HandleSubscriptionResponse(socketConnection, subscription, request, data, out callResult)).ConfigureAwait(false);
if (callResult?.Success == true) if (callResult?.Success == true)
{ {
@ -351,7 +351,7 @@ namespace CryptoExchange.Net
protected virtual async Task<CallResult<T>> QueryAndWaitAsync<T>(SocketConnection socket, object request) protected virtual async Task<CallResult<T>> QueryAndWaitAsync<T>(SocketConnection socket, object request)
{ {
var dataResult = new CallResult<T>(new ServerError("No response on query received")); var dataResult = new CallResult<T>(new ServerError("No response on query received"));
await socket.SendAndWaitAsync(request, Options.SocketResponseTimeout, data => await socket.SendAndWaitAsync(request, Options.SocketResponseTimeout, null, data =>
{ {
if (!HandleQueryResponse<T>(socket, request, data, out var callResult)) if (!HandleQueryResponse<T>(socket, request, data, out var callResult))
return false; return false;

View File

@ -298,14 +298,14 @@ namespace CryptoExchange.Net.Objects
if (baseOptions == null) if (baseOptions == null)
return; return;
AutoReconnect = baseOptions.AutoReconnect; AutoReconnect = newValues?.AutoReconnect ?? baseOptions.AutoReconnect;
ReconnectInterval = baseOptions.ReconnectInterval; ReconnectInterval = newValues?.ReconnectInterval ?? baseOptions.ReconnectInterval;
MaxConcurrentResubscriptionsPerSocket = baseOptions.MaxConcurrentResubscriptionsPerSocket; MaxConcurrentResubscriptionsPerSocket = newValues?.MaxConcurrentResubscriptionsPerSocket ?? baseOptions.MaxConcurrentResubscriptionsPerSocket;
SocketResponseTimeout = baseOptions.SocketResponseTimeout; SocketResponseTimeout = newValues?.SocketResponseTimeout ?? baseOptions.SocketResponseTimeout;
SocketNoDataTimeout = baseOptions.SocketNoDataTimeout; SocketNoDataTimeout = newValues?.SocketNoDataTimeout ?? baseOptions.SocketNoDataTimeout;
SocketSubscriptionsCombineTarget = baseOptions.SocketSubscriptionsCombineTarget; SocketSubscriptionsCombineTarget = newValues?.SocketSubscriptionsCombineTarget ?? baseOptions.SocketSubscriptionsCombineTarget;
MaxSocketConnections = baseOptions.MaxSocketConnections; MaxSocketConnections = newValues?.MaxSocketConnections ?? baseOptions.MaxSocketConnections;
DelayAfterConnect = baseOptions.DelayAfterConnect; DelayAfterConnect = newValues?.DelayAfterConnect ?? baseOptions.DelayAfterConnect;
} }
/// <inheritdoc /> /// <inheritdoc />

View File

@ -11,15 +11,19 @@ namespace CryptoExchange.Net.Sockets
public JToken? Result { get; private set; } public JToken? Result { get; private set; }
public bool Completed { get; private set; } public bool Completed { get; private set; }
public AsyncResetEvent Event { get; } public AsyncResetEvent Event { get; }
public DateTime RequestTimestamp { get; set; }
public TimeSpan Timeout { get; } public TimeSpan Timeout { get; }
public SocketSubscription? Subscription { get; }
private CancellationTokenSource cts; private CancellationTokenSource cts;
public PendingRequest(Func<JToken, bool> handler, TimeSpan timeout) public PendingRequest(Func<JToken, bool> handler, TimeSpan timeout, SocketSubscription? subscription)
{ {
Handler = handler; Handler = handler;
Event = new AsyncResetEvent(false, false); Event = new AsyncResetEvent(false, false);
Timeout = timeout; Timeout = timeout;
RequestTimestamp = DateTime.UtcNow;
Subscription = subscription;
cts = new CancellationTokenSource(timeout); cts = new CancellationTokenSource(timeout);
cts.Token.Register(Fail, false); cts.Token.Register(Fail, false);
@ -27,7 +31,10 @@ namespace CryptoExchange.Net.Sockets
public bool CheckData(JToken data) public bool CheckData(JToken data)
{ {
if (Handler(data)) return Handler(data);
}
public bool Succeed(JToken data)
{ {
Result = data; Result = data;
Completed = true; Completed = true;
@ -35,9 +42,6 @@ namespace CryptoExchange.Net.Sockets
return true; return true;
} }
return false;
}
public void Fail() public void Fail()
{ {
Completed = true; Completed = true;

View File

@ -304,18 +304,35 @@ namespace CryptoExchange.Net.Sockets
PendingRequest[] requests; PendingRequest[] requests;
lock (_pendingRequests) lock (_pendingRequests)
{ {
_pendingRequests.RemoveAll(r => r.Completed); // Remove only timed out requests after 5 minutes have passed so we can still process any
// message coming in after the request timeout
_pendingRequests.RemoveAll(r => r.Completed && DateTime.UtcNow - r.RequestTimestamp > TimeSpan.FromMinutes(5));
requests = _pendingRequests.ToArray(); requests = _pendingRequests.ToArray();
} }
// Check if this message is an answer on any pending requests // Check if this message is an answer on any pending requests
foreach (var pendingRequest in requests) foreach (var pendingRequest in requests)
{ {
if (pendingRequest.CheckData(tokenData)) if (pendingRequest.CheckData(tokenData))
{ {
lock (_pendingRequests) lock (_pendingRequests)
_pendingRequests.Remove(pendingRequest); _pendingRequests.Remove(pendingRequest);
if (pendingRequest.Completed)
{
// Answer to a timed out request, unsub if it is a subscription request
if (pendingRequest.Subscription != null)
{
_log.Write(LogLevel.Warning, "Received subscription info after request timed out; unsubscribing. Consider increasing the SocketResponseTimout");
_ = ApiClient.UnsubscribeAsync(this, pendingRequest.Subscription).ConfigureAwait(false);
}
}
else
{
pendingRequest.Succeed(tokenData);
}
if (!ApiClient.ContinueOnQueryResponse) if (!ApiClient.ContinueOnQueryResponse)
return; return;
@ -546,11 +563,12 @@ namespace CryptoExchange.Net.Sockets
/// <typeparam name="T">The data type expected in response</typeparam> /// <typeparam name="T">The data type expected in response</typeparam>
/// <param name="obj">The object to send</param> /// <param name="obj">The object to send</param>
/// <param name="timeout">The timeout for response</param> /// <param name="timeout">The timeout for response</param>
/// <param name="subscription">Subscription if this is a subscribe request</param>
/// <param name="handler">The response handler, should return true if the received JToken was the response to the request</param> /// <param name="handler">The response handler, should return true if the received JToken was the response to the request</param>
/// <returns></returns> /// <returns></returns>
public virtual Task SendAndWaitAsync<T>(T obj, TimeSpan timeout, Func<JToken, bool> handler) public virtual Task SendAndWaitAsync<T>(T obj, TimeSpan timeout, SocketSubscription? subscription, Func<JToken, bool> handler)
{ {
var pending = new PendingRequest(handler, timeout); var pending = new PendingRequest(handler, timeout, subscription);
lock (_pendingRequests) lock (_pendingRequests)
{ {
_pendingRequests.Add(pending); _pendingRequests.Add(pending);