From 3fa8277a3082e6ba9861d8e173821f02dc78e771 Mon Sep 17 00:00:00 2001 From: JKorf Date: Sun, 4 Feb 2024 13:50:15 +0100 Subject: [PATCH] Small refactor QueryPeriodic to only run on connections --- CryptoExchange.Net/Clients/SocketApiClient.cs | 90 ++++++------------- CryptoExchange.Net/CryptoExchange.Net.csproj | 2 +- .../Sockets/PeriodicTaskRegistration.cs | 30 +++++++ .../Sockets/SocketConnection.cs | 61 +++++++++++++ 4 files changed, 118 insertions(+), 65 deletions(-) create mode 100644 CryptoExchange.Net/Sockets/PeriodicTaskRegistration.cs diff --git a/CryptoExchange.Net/Clients/SocketApiClient.cs b/CryptoExchange.Net/Clients/SocketApiClient.cs index 122a1f9..c9c843d 100644 --- a/CryptoExchange.Net/Clients/SocketApiClient.cs +++ b/CryptoExchange.Net/Clients/SocketApiClient.cs @@ -48,16 +48,6 @@ namespace CryptoExchange.Net /// protected List systemSubscriptions = new(); - /// - /// The task that is sending periodic data on the websocket. Can be used for sending Ping messages every x seconds or similair. Not necesarry. - /// - protected Task? periodicTask; - - /// - /// Wait event for the periodicTask - /// - protected AsyncResetEvent? periodicEvent; - /// /// If a message is received on the socket which is not handled by a handler this boolean determines whether this logs an error message /// @@ -73,6 +63,11 @@ namespace CryptoExchange.Net /// protected internal IEnumerable? RateLimiters { get; set; } + /// + /// Periodic task regisrations + /// + protected List PeriodicTaskRegistrations { get; set; } = new List(); + /// public double IncomingKbps { @@ -129,6 +124,24 @@ namespace CryptoExchange.Net RateLimiters = rateLimiters; } + /// + /// Add a query to periodically send on each connection + /// + /// + /// + /// + /// + protected virtual void RegisterPeriodicQuery(string identifier, TimeSpan interval, Func queryDelegate, Action? callback) + { + PeriodicTaskRegistrations.Add(new PeriodicTaskRegistration + { + Identifier = identifier, + Callback = callback, + Interval = interval, + QueryDelegate = queryDelegate + }); + } + /// /// Connect to an url and listen for data on the BaseAddress /// @@ -457,6 +470,9 @@ namespace CryptoExchange.Net var socketConnection = new SocketConnection(_logger, this, socket, address); socketConnection.UnhandledMessage += HandleUnhandledMessage; + foreach (var ptg in PeriodicTaskRegistrations) + socketConnection.QueryPeriodic(ptg.Identifier, ptg.Interval, ptg.QueryDelegate, ptg.Callback); + foreach (var systemSubscription in systemSubscriptions) socketConnection.AddSubscription(systemSubscription); @@ -515,58 +531,6 @@ namespace CryptoExchange.Net return socket; } - /// - /// Periodically sends data over a socket connection - /// - /// Identifier for the periodic send - /// How often - /// Method returning the query to send - /// The callback for processing the response - protected virtual void QueryPeriodic(string identifier, TimeSpan interval, Func queryDelegate, Action? callback) - { - if (queryDelegate == null) - throw new ArgumentNullException(nameof(queryDelegate)); - - // TODO instead of having this on ApiClient level, this should be registered on the socket connection - // This would prevent this looping without any connections - - periodicEvent = new AsyncResetEvent(); - periodicTask = Task.Run(async () => - { - while (!_disposing) - { - await periodicEvent.WaitAsync(interval).ConfigureAwait(false); - if (_disposing) - break; - - foreach (var socketConnection in socketConnections.Values) - { - if (_disposing) - break; - - if (!socketConnection.Connected) - continue; - - var query = queryDelegate(socketConnection); - if (query == null) - continue; - - _logger.Log(LogLevel.Trace, $"[Sckt {socketConnection.SocketId}] sending periodic {identifier}"); - - try - { - var result = await socketConnection.SendAndWaitQueryAsync(query).ConfigureAwait(false); - callback?.Invoke(result); - } - catch (Exception ex) - { - _logger.Log(LogLevel.Warning, $"[Sckt {socketConnection.SocketId}] Periodic send {identifier} failed: " + ex.ToLogString()); - } - } - } - }); - } - /// /// Unsubscribe an update subscription /// @@ -682,8 +646,6 @@ namespace CryptoExchange.Net public override void Dispose() { _disposing = true; - periodicEvent?.Set(); - periodicEvent?.Dispose(); if (socketConnections.Sum(s => s.Value.UserSubscriptionCount) > 0) { _logger.Log(LogLevel.Debug, "Disposing socket client, closing all subscriptions"); diff --git a/CryptoExchange.Net/CryptoExchange.Net.csproj b/CryptoExchange.Net/CryptoExchange.Net.csproj index 91c6652..de60e49 100644 --- a/CryptoExchange.Net/CryptoExchange.Net.csproj +++ b/CryptoExchange.Net/CryptoExchange.Net.csproj @@ -41,7 +41,7 @@ all runtime; build; native; contentfiles; analyzers; buildtransitive - + all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/CryptoExchange.Net/Sockets/PeriodicTaskRegistration.cs b/CryptoExchange.Net/Sockets/PeriodicTaskRegistration.cs new file mode 100644 index 0000000..df80830 --- /dev/null +++ b/CryptoExchange.Net/Sockets/PeriodicTaskRegistration.cs @@ -0,0 +1,30 @@ +using CryptoExchange.Net.Objects; +using System; +using System.Collections.Generic; +using System.Text; + +namespace CryptoExchange.Net.Sockets +{ + /// + /// Periodic task registration + /// + public class PeriodicTaskRegistration + { + /// + /// Identifier + /// + public string Identifier { get; set; } = string.Empty; + /// + /// Interval of query + /// + public TimeSpan Interval { get; set; } + /// + /// Delegate for getting the query + /// + public Func QueryDelegate { get; set; } = null!; + /// + /// Callback after query + /// + public Action? Callback { get; set; } + } +} diff --git a/CryptoExchange.Net/Sockets/SocketConnection.cs b/CryptoExchange.Net/Sockets/SocketConnection.cs index 69d8229..894542b 100644 --- a/CryptoExchange.Net/Sockets/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/SocketConnection.cs @@ -163,6 +163,16 @@ namespace CryptoExchange.Net.Sockets private IMessageSerializer _serializer; private IMessageAccessor _accessor; + /// + /// The task that is sending periodic data on the websocket. Can be used for sending Ping messages every x seconds or similair. Not necesarry. + /// + protected Task? periodicTask; + + /// + /// Wait event for the periodicTask + /// + protected AsyncResetEvent? periodicEvent; + /// /// The underlying websocket /// @@ -561,6 +571,8 @@ namespace CryptoExchange.Net.Sockets public void Dispose() { Status = SocketStatus.Disposed; + periodicEvent?.Set(); + periodicEvent?.Dispose(); _socket.Dispose(); } @@ -818,6 +830,55 @@ namespace CryptoExchange.Net.Sockets return result; } + /// + /// Periodically sends data over a socket connection + /// + /// Identifier for the periodic send + /// How often + /// Method returning the query to send + /// The callback for processing the response + public virtual void QueryPeriodic(string identifier, TimeSpan interval, Func queryDelegate, Action? callback) + { + if (queryDelegate == null) + throw new ArgumentNullException(nameof(queryDelegate)); + + periodicEvent = new AsyncResetEvent(); + periodicTask = Task.Run(async () => + { + while (Status != SocketStatus.Disposed + && Status != SocketStatus.Closed + && Status != SocketStatus.Closing) + { + await periodicEvent.WaitAsync(interval).ConfigureAwait(false); + if (Status == SocketStatus.Disposed + || Status == SocketStatus.Closed + || Status == SocketStatus.Closing) + { + break; + } + + if (!Connected) + continue; + + var query = queryDelegate(this); + if (query == null) + continue; + + _logger.Log(LogLevel.Trace, $"[Sckt {SocketId}] sending periodic {identifier}"); + + try + { + var result = await SendAndWaitQueryAsync(query).ConfigureAwait(false); + callback?.Invoke(result); + } + catch (Exception ex) + { + _logger.Log(LogLevel.Warning, $"[Sckt {SocketId}] Periodic send {identifier} failed: " + ex.ToLogString()); + } + } + }); + } + /// /// Status of the socket connection ///