1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-06-08 16:36:15 +00:00

Small refactor QueryPeriodic to only run on connections

This commit is contained in:
JKorf 2024-02-04 13:50:15 +01:00
parent d0fc67355d
commit 3fa8277a30
4 changed files with 118 additions and 65 deletions

View File

@ -48,16 +48,6 @@ namespace CryptoExchange.Net
/// </summary> /// </summary>
protected List<SystemSubscription> systemSubscriptions = new(); protected List<SystemSubscription> systemSubscriptions = new();
/// <summary>
/// The task that is sending periodic data on the websocket. Can be used for sending Ping messages every x seconds or similair. Not necesarry.
/// </summary>
protected Task? periodicTask;
/// <summary>
/// Wait event for the periodicTask
/// </summary>
protected AsyncResetEvent? periodicEvent;
/// <summary> /// <summary>
/// If a message is received on the socket which is not handled by a handler this boolean determines whether this logs an error message /// If a message is received on the socket which is not handled by a handler this boolean determines whether this logs an error message
/// </summary> /// </summary>
@ -73,6 +63,11 @@ namespace CryptoExchange.Net
/// </summary> /// </summary>
protected internal IEnumerable<IRateLimiter>? RateLimiters { get; set; } protected internal IEnumerable<IRateLimiter>? RateLimiters { get; set; }
/// <summary>
/// Periodic task regisrations
/// </summary>
protected List<PeriodicTaskRegistration> PeriodicTaskRegistrations { get; set; } = new List<PeriodicTaskRegistration>();
/// <inheritdoc /> /// <inheritdoc />
public double IncomingKbps public double IncomingKbps
{ {
@ -129,6 +124,24 @@ namespace CryptoExchange.Net
RateLimiters = rateLimiters; RateLimiters = rateLimiters;
} }
/// <summary>
/// Add a query to periodically send on each connection
/// </summary>
/// <param name="identifier"></param>
/// <param name="interval"></param>
/// <param name="queryDelegate"></param>
/// <param name="callback"></param>
protected virtual void RegisterPeriodicQuery(string identifier, TimeSpan interval, Func<SocketConnection, Query> queryDelegate, Action<CallResult>? callback)
{
PeriodicTaskRegistrations.Add(new PeriodicTaskRegistration
{
Identifier = identifier,
Callback = callback,
Interval = interval,
QueryDelegate = queryDelegate
});
}
/// <summary> /// <summary>
/// Connect to an url and listen for data on the BaseAddress /// Connect to an url and listen for data on the BaseAddress
/// </summary> /// </summary>
@ -457,6 +470,9 @@ namespace CryptoExchange.Net
var socketConnection = new SocketConnection(_logger, this, socket, address); var socketConnection = new SocketConnection(_logger, this, socket, address);
socketConnection.UnhandledMessage += HandleUnhandledMessage; socketConnection.UnhandledMessage += HandleUnhandledMessage;
foreach (var ptg in PeriodicTaskRegistrations)
socketConnection.QueryPeriodic(ptg.Identifier, ptg.Interval, ptg.QueryDelegate, ptg.Callback);
foreach (var systemSubscription in systemSubscriptions) foreach (var systemSubscription in systemSubscriptions)
socketConnection.AddSubscription(systemSubscription); socketConnection.AddSubscription(systemSubscription);
@ -515,58 +531,6 @@ namespace CryptoExchange.Net
return socket; return socket;
} }
/// <summary>
/// Periodically sends data over a socket connection
/// </summary>
/// <param name="identifier">Identifier for the periodic send</param>
/// <param name="interval">How often</param>
/// <param name="queryDelegate">Method returning the query to send</param>
/// <param name="callback">The callback for processing the response</param>
protected virtual void QueryPeriodic(string identifier, TimeSpan interval, Func<SocketConnection, Query> queryDelegate, Action<CallResult>? callback)
{
if (queryDelegate == null)
throw new ArgumentNullException(nameof(queryDelegate));
// TODO instead of having this on ApiClient level, this should be registered on the socket connection
// This would prevent this looping without any connections
periodicEvent = new AsyncResetEvent();
periodicTask = Task.Run(async () =>
{
while (!_disposing)
{
await periodicEvent.WaitAsync(interval).ConfigureAwait(false);
if (_disposing)
break;
foreach (var socketConnection in socketConnections.Values)
{
if (_disposing)
break;
if (!socketConnection.Connected)
continue;
var query = queryDelegate(socketConnection);
if (query == null)
continue;
_logger.Log(LogLevel.Trace, $"[Sckt {socketConnection.SocketId}] sending periodic {identifier}");
try
{
var result = await socketConnection.SendAndWaitQueryAsync(query).ConfigureAwait(false);
callback?.Invoke(result);
}
catch (Exception ex)
{
_logger.Log(LogLevel.Warning, $"[Sckt {socketConnection.SocketId}] Periodic send {identifier} failed: " + ex.ToLogString());
}
}
}
});
}
/// <summary> /// <summary>
/// Unsubscribe an update subscription /// Unsubscribe an update subscription
/// </summary> /// </summary>
@ -682,8 +646,6 @@ namespace CryptoExchange.Net
public override void Dispose() public override void Dispose()
{ {
_disposing = true; _disposing = true;
periodicEvent?.Set();
periodicEvent?.Dispose();
if (socketConnections.Sum(s => s.Value.UserSubscriptionCount) > 0) if (socketConnections.Sum(s => s.Value.UserSubscriptionCount) > 0)
{ {
_logger.Log(LogLevel.Debug, "Disposing socket client, closing all subscriptions"); _logger.Log(LogLevel.Debug, "Disposing socket client, closing all subscriptions");

View File

@ -41,7 +41,7 @@
<PrivateAssets>all</PrivateAssets> <PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets> <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference> </PackageReference>
<PackageReference Include="Microsoft.CodeAnalysis.NetAnalyzers" Version="6.0.0"> <PackageReference Include="Microsoft.CodeAnalysis.NetAnalyzers" Version="8.0.0">
<PrivateAssets>all</PrivateAssets> <PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets> <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference> </PackageReference>

View File

@ -0,0 +1,30 @@
using CryptoExchange.Net.Objects;
using System;
using System.Collections.Generic;
using System.Text;
namespace CryptoExchange.Net.Sockets
{
/// <summary>
/// Periodic task registration
/// </summary>
public class PeriodicTaskRegistration
{
/// <summary>
/// Identifier
/// </summary>
public string Identifier { get; set; } = string.Empty;
/// <summary>
/// Interval of query
/// </summary>
public TimeSpan Interval { get; set; }
/// <summary>
/// Delegate for getting the query
/// </summary>
public Func<SocketConnection, Query> QueryDelegate { get; set; } = null!;
/// <summary>
/// Callback after query
/// </summary>
public Action<CallResult>? Callback { get; set; }
}
}

View File

@ -163,6 +163,16 @@ namespace CryptoExchange.Net.Sockets
private IMessageSerializer _serializer; private IMessageSerializer _serializer;
private IMessageAccessor _accessor; private IMessageAccessor _accessor;
/// <summary>
/// The task that is sending periodic data on the websocket. Can be used for sending Ping messages every x seconds or similair. Not necesarry.
/// </summary>
protected Task? periodicTask;
/// <summary>
/// Wait event for the periodicTask
/// </summary>
protected AsyncResetEvent? periodicEvent;
/// <summary> /// <summary>
/// The underlying websocket /// The underlying websocket
/// </summary> /// </summary>
@ -561,6 +571,8 @@ namespace CryptoExchange.Net.Sockets
public void Dispose() public void Dispose()
{ {
Status = SocketStatus.Disposed; Status = SocketStatus.Disposed;
periodicEvent?.Set();
periodicEvent?.Dispose();
_socket.Dispose(); _socket.Dispose();
} }
@ -818,6 +830,55 @@ namespace CryptoExchange.Net.Sockets
return result; return result;
} }
/// <summary>
/// Periodically sends data over a socket connection
/// </summary>
/// <param name="identifier">Identifier for the periodic send</param>
/// <param name="interval">How often</param>
/// <param name="queryDelegate">Method returning the query to send</param>
/// <param name="callback">The callback for processing the response</param>
public virtual void QueryPeriodic(string identifier, TimeSpan interval, Func<SocketConnection, Query> queryDelegate, Action<CallResult>? callback)
{
if (queryDelegate == null)
throw new ArgumentNullException(nameof(queryDelegate));
periodicEvent = new AsyncResetEvent();
periodicTask = Task.Run(async () =>
{
while (Status != SocketStatus.Disposed
&& Status != SocketStatus.Closed
&& Status != SocketStatus.Closing)
{
await periodicEvent.WaitAsync(interval).ConfigureAwait(false);
if (Status == SocketStatus.Disposed
|| Status == SocketStatus.Closed
|| Status == SocketStatus.Closing)
{
break;
}
if (!Connected)
continue;
var query = queryDelegate(this);
if (query == null)
continue;
_logger.Log(LogLevel.Trace, $"[Sckt {SocketId}] sending periodic {identifier}");
try
{
var result = await SendAndWaitQueryAsync(query).ConfigureAwait(false);
callback?.Invoke(result);
}
catch (Exception ex)
{
_logger.Log(LogLevel.Warning, $"[Sckt {SocketId}] Periodic send {identifier} failed: " + ex.ToLogString());
}
}
});
}
/// <summary> /// <summary>
/// Status of the socket connection /// Status of the socket connection
/// </summary> /// </summary>