From 9e86a083277a5713123996500ce9cefe576e3a4c Mon Sep 17 00:00:00 2001 From: Jan Korf Date: Mon, 28 Oct 2024 10:36:19 +0100 Subject: [PATCH] Trackers (#218) Fix for intermittently failing rate limiting test Added ConnectionId to RequestDefinition to correctly handle connection and path rate limiting configuration Added ValidateMessage method to websocket Query object to filter messages even though it is matched to the query based on the ListenIdentifier Added KlineTracker and TradeTracker implementation --- .../RestClientTests.cs | 2 +- .../Extensions/TrackerLoggingExtensions.cs | 292 ++++++++ CryptoExchange.Net/Objects/Enums.cs | 27 + .../Objects/RequestDefinition.cs | 6 +- .../RateLimiting/Guards/RateLimitGuard.cs | 4 + .../Sockets/CryptoExchangeWebSocketClient.cs | 4 +- CryptoExchange.Net/Sockets/Query.cs | 13 +- CryptoExchange.Net/Trackers/CompareValue.cs | 34 + .../Trackers/Klines/IKlineTracker.cs | 105 +++ .../Trackers/Klines/KlineTracker.cs | 481 +++++++++++++ .../Trackers/Klines/KlinesCompare.cs | 30 + .../Trackers/Klines/KlinesStats.cs | 59 ++ .../Trackers/Trades/ITradeTracker.cs | 100 +++ .../Trackers/Trades/TradeTracker.cs | 495 ++++++++++++++ .../Trackers/Trades/TradesCompare.cs | 37 + .../Trackers/Trades/TradesStats.cs | 65 ++ docs/index.html | 637 ++++++++++++++++++ 17 files changed, 2386 insertions(+), 5 deletions(-) create mode 100644 CryptoExchange.Net/Logging/Extensions/TrackerLoggingExtensions.cs create mode 100644 CryptoExchange.Net/Trackers/CompareValue.cs create mode 100644 CryptoExchange.Net/Trackers/Klines/IKlineTracker.cs create mode 100644 CryptoExchange.Net/Trackers/Klines/KlineTracker.cs create mode 100644 CryptoExchange.Net/Trackers/Klines/KlinesCompare.cs create mode 100644 CryptoExchange.Net/Trackers/Klines/KlinesStats.cs create mode 100644 CryptoExchange.Net/Trackers/Trades/ITradeTracker.cs create mode 100644 CryptoExchange.Net/Trackers/Trades/TradeTracker.cs create mode 100644 CryptoExchange.Net/Trackers/Trades/TradesCompare.cs create mode 100644 CryptoExchange.Net/Trackers/Trades/TradesStats.cs diff --git a/CryptoExchange.Net.UnitTests/RestClientTests.cs b/CryptoExchange.Net.UnitTests/RestClientTests.cs index e06535b..2838af4 100644 --- a/CryptoExchange.Net.UnitTests/RestClientTests.cs +++ b/CryptoExchange.Net.UnitTests/RestClientTests.cs @@ -302,7 +302,7 @@ namespace CryptoExchange.Net.UnitTests public async Task ApiKeyRateLimiterBasics(string key1, string key2, string endpoint1, string endpoint2, bool expectLimited) { var rateLimiter = new RateLimitGate("Test"); - rateLimiter.AddGuard(new RateLimitGuard(RateLimitGuard.PerApiKey, new AuthenticatedEndpointFilter(true), 1, TimeSpan.FromSeconds(0.1), RateLimitWindowType.Fixed)); + rateLimiter.AddGuard(new RateLimitGuard(RateLimitGuard.PerApiKey, new AuthenticatedEndpointFilter(true), 1, TimeSpan.FromSeconds(0.1), RateLimitWindowType.Sliding)); var requestDefinition1 = new RequestDefinition(endpoint1, HttpMethod.Get) { Authenticated = key1 != null }; var requestDefinition2 = new RequestDefinition(endpoint2, HttpMethod.Get) { Authenticated = key2 != null }; diff --git a/CryptoExchange.Net/Logging/Extensions/TrackerLoggingExtensions.cs b/CryptoExchange.Net/Logging/Extensions/TrackerLoggingExtensions.cs new file mode 100644 index 0000000..514db43 --- /dev/null +++ b/CryptoExchange.Net/Logging/Extensions/TrackerLoggingExtensions.cs @@ -0,0 +1,292 @@ +using System; +using CryptoExchange.Net.Objects; +using Microsoft.Extensions.Logging; + +namespace CryptoExchange.Net.Logging.Extensions +{ +#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member + + public static class TrackerLoggingExtensions + { + private static readonly Action _klineTrackerStatusChanged; + private static readonly Action _klineTrackerStarting; + private static readonly Action _klineTrackerStartFailed; + private static readonly Action _klineTrackerStarted; + private static readonly Action _klineTrackerStopping; + private static readonly Action _klineTrackerStopped; + private static readonly Action _klineTrackerInitialDataSet; + private static readonly Action _klineTrackerKlineUpdated; + private static readonly Action _klineTrackerKlineAdded; + private static readonly Action _klineTrackerConnectionLost; + private static readonly Action _klineTrackerConnectionClosed; + private static readonly Action _klineTrackerConnectionRestored; + + private static readonly Action _tradeTrackerStatusChanged; + private static readonly Action _tradeTrackerStarting; + private static readonly Action _tradeTrackerStartFailed; + private static readonly Action _tradeTrackerStarted; + private static readonly Action _tradeTrackerStopping; + private static readonly Action _tradeTrackerStopped; + private static readonly Action _tradeTrackerInitialDataSet; + private static readonly Action _tradeTrackerPreSnapshotSkip; + private static readonly Action _tradeTrackerPreSnapshotApplied; + private static readonly Action _tradeTrackerTradeAdded; + private static readonly Action _tradeTrackerConnectionLost; + private static readonly Action _tradeTrackerConnectionClosed; + private static readonly Action _tradeTrackerConnectionRestored; + + static TrackerLoggingExtensions() + { + _klineTrackerStatusChanged = LoggerMessage.Define( + LogLevel.Debug, + new EventId(6001, "KlineTrackerStatusChanged"), + "Kline tracker for {Symbol} status changed: {OldStatus} => {NewStatus}"); + + _klineTrackerStarting = LoggerMessage.Define( + LogLevel.Debug, + new EventId(6002, "KlineTrackerStarting"), + "Kline tracker for {Symbol} starting"); + + _klineTrackerStartFailed = LoggerMessage.Define( + LogLevel.Warning, + new EventId(6003, "KlineTrackerStartFailed"), + "Kline tracker for {Symbol} failed to start: {Error}"); + + _klineTrackerStarted = LoggerMessage.Define( + LogLevel.Information, + new EventId(6004, "KlineTrackerStarted"), + "Kline tracker for {Symbol} started"); + + _klineTrackerStopping = LoggerMessage.Define( + LogLevel.Debug, + new EventId(6005, "KlineTrackerStopping"), + "Kline tracker for {Symbol} stopping"); + + _klineTrackerStopped = LoggerMessage.Define( + LogLevel.Information, + new EventId(6006, "KlineTrackerStopped"), + "Kline tracker for {Symbol} stopped"); + + _klineTrackerInitialDataSet = LoggerMessage.Define( + LogLevel.Debug, + new EventId(6007, "KlineTrackerInitialDataSet"), + "Kline tracker for {Symbol} initial data set, last timestamp: {LastTime}"); + + _klineTrackerKlineUpdated = LoggerMessage.Define( + LogLevel.Trace, + new EventId(6008, "KlineTrackerKlineUpdated"), + "Kline tracker for {Symbol} kline updated for open time: {LastTime}"); + + _klineTrackerKlineAdded = LoggerMessage.Define( + LogLevel.Trace, + new EventId(6009, "KlineTrackerKlineAdded"), + "Kline tracker for {Symbol} new kline for open time: {LastTime}"); + + _klineTrackerConnectionLost = LoggerMessage.Define( + LogLevel.Warning, + new EventId(6010, "KlineTrackerConnectionLost"), + "Kline tracker for {Symbol} connection lost"); + + _klineTrackerConnectionClosed = LoggerMessage.Define( + LogLevel.Warning, + new EventId(6011, "KlineTrackerConnectionClosed"), + "Kline tracker for {Symbol} disconnected"); + + _klineTrackerConnectionRestored = LoggerMessage.Define( + LogLevel.Information, + new EventId(6012, "KlineTrackerConnectionRestored"), + "Kline tracker for {Symbol} successfully resynchronized"); + + + _tradeTrackerStatusChanged = LoggerMessage.Define( + LogLevel.Debug, + new EventId(6013, "KlineTrackerStatusChanged"), + "Trade tracker for {Symbol} status changed: {OldStatus} => {NewStatus}"); + + _tradeTrackerStarting = LoggerMessage.Define( + LogLevel.Debug, + new EventId(6014, "KlineTrackerStarting"), + "Trade tracker for {Symbol} starting"); + + _tradeTrackerStartFailed = LoggerMessage.Define( + LogLevel.Warning, + new EventId(6015, "KlineTrackerStartFailed"), + "Trade tracker for {Symbol} failed to start: {Error}"); + + _tradeTrackerStarted = LoggerMessage.Define( + LogLevel.Information, + new EventId(6016, "KlineTrackerStarted"), + "Trade tracker for {Symbol} started"); + + _tradeTrackerStopping = LoggerMessage.Define( + LogLevel.Debug, + new EventId(6017, "KlineTrackerStopping"), + "Trade tracker for {Symbol} stopping"); + + _tradeTrackerStopped = LoggerMessage.Define( + LogLevel.Information, + new EventId(6018, "KlineTrackerStopped"), + "Trade tracker for {Symbol} stopped"); + + _tradeTrackerInitialDataSet = LoggerMessage.Define( + LogLevel.Debug, + new EventId(6019, "TradeTrackerInitialDataSet"), + "Trade tracker for {Symbol} snapshot set, Count: {Count}, Last id: {LastId}"); + + _tradeTrackerPreSnapshotSkip = LoggerMessage.Define( + LogLevel.Trace, + new EventId(6020, "TradeTrackerPreSnapshotSkip"), + "Trade tracker for {Symbol} skipping {Id}, already in snapshot"); + + _tradeTrackerPreSnapshotApplied = LoggerMessage.Define( + LogLevel.Trace, + new EventId(6021, "TradeTrackerPreSnapshotApplied"), + "Trade tracker for {Symbol} adding {Id} from pre-snapshot"); + + _tradeTrackerTradeAdded = LoggerMessage.Define( + LogLevel.Trace, + new EventId(6022, "TradeTrackerTradeAdded"), + "Trade tracker for {Symbol} adding trade {Id}"); + + _tradeTrackerConnectionLost = LoggerMessage.Define( + LogLevel.Warning, + new EventId(6023, "TradeTrackerConnectionLost"), + "Trade tracker for {Symbol} connection lost"); + + _tradeTrackerConnectionClosed = LoggerMessage.Define( + LogLevel.Warning, + new EventId(6024, "TradeTrackerConnectionClosed"), + "Trade tracker for {Symbol} disconnected"); + + _tradeTrackerConnectionRestored = LoggerMessage.Define( + LogLevel.Information, + new EventId(6025, "TradeTrackerConnectionRestored"), + "Trade tracker for {Symbol} successfully resynchronized"); + } + + public static void KlineTrackerStatusChanged(this ILogger logger, string symbol, SyncStatus oldStatus, SyncStatus newStatus) + { + _klineTrackerStatusChanged(logger, symbol, oldStatus, newStatus, null); + } + + public static void KlineTrackerStarting(this ILogger logger, string symbol) + { + _klineTrackerStarting(logger, symbol, null); + } + + public static void KlineTrackerStartFailed(this ILogger logger, string symbol, string error) + { + _klineTrackerStartFailed(logger, symbol, error, null); + } + + public static void KlineTrackerStarted(this ILogger logger, string symbol) + { + _klineTrackerStarted(logger, symbol, null); + } + + public static void KlineTrackerStopping(this ILogger logger, string symbol) + { + _klineTrackerStopping(logger, symbol, null); + } + + public static void KlineTrackerStopped(this ILogger logger, string symbol) + { + _klineTrackerStopped(logger, symbol, null); + } + + public static void KlineTrackerInitialDataSet(this ILogger logger, string symbol, DateTime lastTime) + { + _klineTrackerInitialDataSet(logger, symbol, lastTime, null); + } + + public static void KlineTrackerKlineUpdated(this ILogger logger, string symbol, DateTime lastTime) + { + _klineTrackerKlineUpdated(logger, symbol, lastTime, null); + } + + public static void KlineTrackerKlineAdded(this ILogger logger, string symbol, DateTime lastTime) + { + _klineTrackerKlineAdded(logger, symbol, lastTime, null); + } + + public static void KlineTrackerConnectionLost(this ILogger logger, string symbol) + { + _klineTrackerConnectionLost(logger, symbol, null); + } + + public static void KlineTrackerConnectionClosed(this ILogger logger, string symbol) + { + _klineTrackerConnectionClosed(logger, symbol, null); + } + + public static void KlineTrackerConnectionRestored(this ILogger logger, string symbol) + { + _klineTrackerConnectionRestored(logger, symbol, null); + } + + public static void TradeTrackerStatusChanged(this ILogger logger, string symbol, SyncStatus oldStatus, SyncStatus newStatus) + { + _tradeTrackerStatusChanged(logger, symbol, oldStatus, newStatus, null); + } + + public static void TradeTrackerStarting(this ILogger logger, string symbol) + { + _tradeTrackerStarting(logger, symbol, null); + } + + public static void TradeTrackerStartFailed(this ILogger logger, string symbol, string error) + { + _tradeTrackerStartFailed(logger, symbol, error, null); + } + + public static void TradeTrackerStarted(this ILogger logger, string symbol) + { + _tradeTrackerStarted(logger, symbol, null); + } + + public static void TradeTrackerStopping(this ILogger logger, string symbol) + { + _tradeTrackerStopping(logger, symbol, null); + } + + public static void TradeTrackerStopped(this ILogger logger, string symbol) + { + _tradeTrackerStopped(logger, symbol, null); + } + + public static void TradeTrackerInitialDataSet(this ILogger logger, string symbol, int count, long lastId) + { + _tradeTrackerInitialDataSet(logger, symbol, count, lastId, null); + } + + public static void TradeTrackerPreSnapshotSkip(this ILogger logger, string symbol, long lastId) + { + _tradeTrackerPreSnapshotSkip(logger, symbol, lastId, null); + } + + public static void TradeTrackerPreSnapshotApplied(this ILogger logger, string symbol, long lastId) + { + _tradeTrackerPreSnapshotApplied(logger, symbol, lastId, null); + } + + public static void TradeTrackerTradeAdded(this ILogger logger, string symbol, long lastId) + { + _tradeTrackerTradeAdded(logger, symbol, lastId, null); + } + + public static void TradeTrackerConnectionLost(this ILogger logger, string symbol) + { + _tradeTrackerConnectionLost(logger, symbol, null); + } + + public static void TradeTrackerConnectionClosed(this ILogger logger, string symbol) + { + _tradeTrackerConnectionClosed(logger, symbol, null); + } + + public static void TradeTrackerConnectionRestored(this ILogger logger, string symbol) + { + _tradeTrackerConnectionRestored(logger, symbol, null); + } + } +} diff --git a/CryptoExchange.Net/Objects/Enums.cs b/CryptoExchange.Net/Objects/Enums.cs index 56783d8..a0f877c 100644 --- a/CryptoExchange.Net/Objects/Enums.cs +++ b/CryptoExchange.Net/Objects/Enums.cs @@ -68,6 +68,33 @@ Json } + /// + /// Tracker sync status + /// + public enum SyncStatus + { + /// + /// Not connected + /// + Disconnected, + /// + /// Syncing, data connection is being made + /// + Syncing, + /// + /// The connection is active, but the full data backlog is not yet reached. For example, a tracker set to retain 10 minutes of data only has 8 minutes of data at this moment. + /// + PartiallySynced, + /// + /// Synced + /// + Synced, + /// + /// Disposed + /// + Diposed + } + /// /// Status of the order book /// diff --git a/CryptoExchange.Net/Objects/RequestDefinition.cs b/CryptoExchange.Net/Objects/RequestDefinition.cs index 1dd0567..e583ddd 100644 --- a/CryptoExchange.Net/Objects/RequestDefinition.cs +++ b/CryptoExchange.Net/Objects/RequestDefinition.cs @@ -58,12 +58,16 @@ namespace CryptoExchange.Net.Objects /// public IRateLimitGuard? LimitGuard { get; set; } - /// /// Whether this request should never be cached /// public bool PreventCaching { get; set; } + /// + /// Connection id + /// + public int? ConnectionId { get; set; } + /// /// ctor /// diff --git a/CryptoExchange.Net/RateLimiting/Guards/RateLimitGuard.cs b/CryptoExchange.Net/RateLimiting/Guards/RateLimitGuard.cs index 9501360..e9e5127 100644 --- a/CryptoExchange.Net/RateLimiting/Guards/RateLimitGuard.cs +++ b/CryptoExchange.Net/RateLimiting/Guards/RateLimitGuard.cs @@ -18,6 +18,10 @@ namespace CryptoExchange.Net.RateLimiting.Guards /// public static Func PerEndpoint { get; } = new Func((def, host, key) => def.Path + def.Method); /// + /// Apply guard per connection + /// + public static Func PerConnection { get; } = new Func((def, host, key) => def.ConnectionId.ToString()); + /// /// Apply guard per API key /// public static Func PerApiKey { get; } = new Func((def, host, key) => key!); diff --git a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs index 3c0a445..fd3fb65 100644 --- a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs +++ b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs @@ -209,7 +209,7 @@ namespace CryptoExchange.Net.Sockets { if (Parameters.RateLimiter != null) { - var definition = new RequestDefinition(Id.ToString(), HttpMethod.Get); + var definition = new RequestDefinition(Uri.AbsolutePath, HttpMethod.Get) { ConnectionId = Id }; var limitResult = await Parameters.RateLimiter.ProcessAsync(_logger, Id, RateLimitItemType.Connection, definition, _baseAddress, null, 1, Parameters.RateLimitingBehaviour, _ctsSource.Token).ConfigureAwait(false); if (!limitResult) return new CallResult(new ClientRateLimitError("Connection limit reached")); @@ -475,7 +475,7 @@ namespace CryptoExchange.Net.Sockets /// private async Task SendLoopAsync() { - var requestDefinition = new RequestDefinition(Id.ToString(), HttpMethod.Get); + var requestDefinition = new RequestDefinition(Uri.AbsolutePath, HttpMethod.Get) { ConnectionId = Id }; try { while (true) diff --git a/CryptoExchange.Net/Sockets/Query.cs b/CryptoExchange.Net/Sockets/Query.cs index d79279d..833abf8 100644 --- a/CryptoExchange.Net/Sockets/Query.cs +++ b/CryptoExchange.Net/Sockets/Query.cs @@ -177,6 +177,10 @@ namespace CryptoExchange.Net.Sockets /// public override async Task Handle(SocketConnection connection, DataEvent message) { + var typedMessage = message.As((TServerResponse)message.Data); + if (!ValidateMessage(typedMessage)) + return new CallResult(null); + CurrentResponses++; if (CurrentResponses == RequiredResponses) { @@ -186,7 +190,7 @@ namespace CryptoExchange.Net.Sockets if (Result?.Success != false) // If an error result is already set don't override that - Result = HandleMessage(connection, message.As((TServerResponse)message.Data)); + Result = HandleMessage(connection, typedMessage); if (CurrentResponses == RequiredResponses) { @@ -198,6 +202,13 @@ namespace CryptoExchange.Net.Sockets return Result; } + /// + /// Validate if a message is actually processable by this query + /// + /// + /// + public virtual bool ValidateMessage(DataEvent message) => true; + /// /// Handle the query response /// diff --git a/CryptoExchange.Net/Trackers/CompareValue.cs b/CryptoExchange.Net/Trackers/CompareValue.cs new file mode 100644 index 0000000..a6fe033 --- /dev/null +++ b/CryptoExchange.Net/Trackers/CompareValue.cs @@ -0,0 +1,34 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace CryptoExchange.Net.Trackers +{ + + /// + /// Compare value + /// + public record CompareValue + { + /// + /// The value difference + /// + public decimal? Difference { get; set; } + /// + /// The value difference percentage + /// + public decimal? PercentageDifference { get; set; } + + /// + /// ctor + /// + public CompareValue(decimal? value1, decimal? value2) + { + if (value1 == null || value2 == null) + return; + + Difference = value2 - value1; + PercentageDifference = value1.Value == 0 ? null : Math.Round(value2.Value / value1.Value * 100 - 100, 4); + } + } +} diff --git a/CryptoExchange.Net/Trackers/Klines/IKlineTracker.cs b/CryptoExchange.Net/Trackers/Klines/IKlineTracker.cs new file mode 100644 index 0000000..63cc323 --- /dev/null +++ b/CryptoExchange.Net/Trackers/Klines/IKlineTracker.cs @@ -0,0 +1,105 @@ +using CryptoExchange.Net.Objects; +using CryptoExchange.Net.SharedApis; +using System; +using System.Collections.Generic; +using System.Threading.Tasks; + +namespace CryptoExchange.Net.Trackers.Klines +{ + /// + /// A tracker for kline data of a symbol + /// + public interface IKlineTracker + { + /// + /// The total number of klines + /// + int Count { get; } + + /// + /// Exchange name + /// + string Exchange { get; } + + /// + /// Symbol name + /// + string SymbolName { get; } + + /// + /// Symbol + /// + SharedSymbol Symbol { get; } + + /// + /// The max number of klines tracked + /// + int? Limit { get; } + + /// + /// The max age of the data tracked + /// + TimeSpan? Period { get; } + + /// + /// From which timestamp the trades are registered + /// + DateTime? SyncedFrom { get; } + + /// + /// Sync status + /// + SyncStatus Status { get; } + + /// + /// Get the last kline + /// + SharedKline? Last { get; } + + /// + /// Event for when a new kline is added + /// + event Func? OnAdded; + /// + /// Event for when a kline is removed because it's no longer within the period/limit window + /// + event Func? OnRemoved; + /// + /// Event for when a kline is updated + /// + event Func OnUpdated; + /// + /// Event for when the sync status changes + /// + event Func? OnStatusChanged; + + /// + /// Start synchronization + /// + /// + Task StartAsync(bool startWithSnapshot = true); + + /// + /// Stop synchronization + /// + /// + Task StopAsync(); + + /// + /// Get the data tracked + /// + /// Start timestamp to get the data from, defaults to tracked data start time + /// End timestamp to get the data until, defaults to current time + /// + IEnumerable GetData(DateTime? fromTimestamp = null, DateTime? toTimestamp = null); + + /// + /// Get statitistics on the klines + /// + /// Start timestamp to get the data from, defaults to tracked data start time + /// End timestamp to get the data until, defaults to current time + /// + KlinesStats GetStats(DateTime? fromTimestamp = null, DateTime? toTimestamp = null); + + } +} \ No newline at end of file diff --git a/CryptoExchange.Net/Trackers/Klines/KlineTracker.cs b/CryptoExchange.Net/Trackers/Klines/KlineTracker.cs new file mode 100644 index 0000000..9ed47b4 --- /dev/null +++ b/CryptoExchange.Net/Trackers/Klines/KlineTracker.cs @@ -0,0 +1,481 @@ +using CryptoExchange.Net.Logging.Extensions; +using CryptoExchange.Net.Objects; +using CryptoExchange.Net.Objects.Sockets; +using CryptoExchange.Net.SharedApis; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Threading.Tasks; + +namespace CryptoExchange.Net.Trackers.Klines +{ + /// + public class KlineTracker : IKlineTracker + { + private readonly IKlineSocketClient _socketClient; + private readonly IKlineRestClient _restClient; + private SyncStatus _status; + private bool _startWithSnapshot; + + /// + /// The internal data structure + /// + protected readonly Dictionary _data = new Dictionary(); + /// + /// The pre-snapshot queue buffering updates received before the snapshot is set and which will be applied after the snapshot was set + /// + protected readonly List _preSnapshotQueue = new List(); + /// + /// Lock for accessing _data + /// + protected readonly object _lock = new object(); + /// + /// The last time the window was applied + /// + protected DateTime _lastWindowApplied = DateTime.MinValue; + /// + /// Whether or not the data has changed since last window was applied + /// + protected bool _changed = false; + /// + /// The kline interval + /// + protected readonly SharedKlineInterval _interval; + /// + /// Whether the snapshot has been set + /// + protected bool _snapshotSet; + /// + /// Logger + /// + protected readonly ILogger _logger; + /// + /// Update subscription + /// + protected UpdateSubscription? _updateSubscription; + + /// + /// The timestamp of the first item + /// + protected DateTime? _firstTimestamp; + + /// + public SyncStatus Status + { + get => _status; + set + { + if (value == _status) + return; + + var old = _status; + _status = value; + _logger.KlineTrackerStatusChanged(SymbolName, old, value); + OnStatusChanged?.Invoke(old, _status); + } + } + + /// + public string Exchange { get; } + + /// + public string SymbolName { get; } + + /// + public SharedSymbol Symbol { get; } + + /// + public int? Limit { get; } + /// + public TimeSpan? Period { get; } + + /// + public DateTime? SyncedFrom + { + get + { + if (Period == null) + return _firstTimestamp; + + var max = DateTime.UtcNow - Period.Value; + if (_firstTimestamp > max) + return _firstTimestamp; + + return max; + } + } + + /// + public int Count + { + get + { + lock (_lock) + { + ApplyWindow(true); + return _data.Count; + } + } + } + + /// + public SharedKline? Last + { + get + { + lock (_lock) + { + ApplyWindow(true); + return _data.LastOrDefault().Value; + } + } + } + + /// + public event Func? OnAdded; + /// + public event Func? OnUpdated; + /// + public event Func? OnRemoved; + /// + public event Func? OnStatusChanged; + + /// + /// ctor + /// + public KlineTracker( + ILogger? logger, + IKlineRestClient restClient, + IKlineSocketClient socketClient, + SharedSymbol symbol, + SharedKlineInterval interval, + int? limit = null, + TimeSpan? period = null) + { + _logger = logger ?? new NullLogger(); + Symbol = symbol; + SymbolName = socketClient.FormatSymbol(symbol.BaseAsset, symbol.QuoteAsset, symbol.TradingMode, symbol.DeliverTime); + Exchange = restClient.Exchange; + Limit = limit; + Period = period; + _interval = interval; + _socketClient = socketClient; + _restClient = restClient; + } + + /// + public async Task StartAsync(bool startWithSnapshot = true) + { + if (Status != SyncStatus.Disconnected) + throw new InvalidOperationException($"Can't start syncing unless state is {SyncStatus.Disconnected}. Current state: {Status}"); + + _startWithSnapshot = startWithSnapshot; + Status = SyncStatus.Syncing; + _logger.KlineTrackerStarting(SymbolName); + + var startResult = await DoStartAsync().ConfigureAwait(false); + if (!startResult) + { + _logger.KlineTrackerStartFailed(SymbolName, startResult.Error!.ToString()); + Status = SyncStatus.Disconnected; + return new CallResult(startResult.Error!); + } + + _updateSubscription = startResult.Data; + _updateSubscription.ConnectionLost += HandleConnectionLost; + _updateSubscription.ConnectionClosed += HandleConnectionClosed; + _updateSubscription.ConnectionRestored += HandleConnectionRestored; + Status = SyncStatus.Synced; + _logger.KlineTrackerStarted(SymbolName); + return new CallResult(null); + } + + /// + public async Task StopAsync() + { + _logger.KlineTrackerStopping(SymbolName); + Status = SyncStatus.Disconnected; + await DoStopAsync().ConfigureAwait(false); + _data.Clear(); + _preSnapshotQueue.Clear(); + _logger.KlineTrackerStopped(SymbolName); + } + + /// + /// The start procedure needed for kline syncing, generally subscribing to an update stream and requesting the snapshot + /// + /// + protected virtual async Task> DoStartAsync() + { + var subResult = await _socketClient.SubscribeToKlineUpdatesAsync(new SubscribeKlineRequest(Symbol, _interval), + update => + { + AddOrUpdate(update.Data); + }).ConfigureAwait(false); + + if (!subResult) + { + Status = SyncStatus.Disconnected; + return subResult; + } + + if (!_startWithSnapshot) + return subResult; + + var startTime = Period == null ? (DateTime?)null : DateTime.UtcNow.Add(-Period.Value); + if (_restClient.GetKlinesOptions.MaxAge != null && DateTime.UtcNow.Add(-_restClient.GetKlinesOptions.MaxAge.Value) > startTime) + startTime = DateTime.UtcNow.Add(-_restClient.GetKlinesOptions.MaxAge.Value); + + var limit = Math.Min(_restClient.GetKlinesOptions.MaxRequestDataPoints ?? _restClient.GetKlinesOptions.MaxTotalDataPoints ?? 100, Limit ?? 100); + + var request = new GetKlinesRequest(Symbol, _interval, startTime, DateTime.UtcNow, limit: limit); + var data = new List(); + await foreach (var result in ExchangeHelpers.ExecutePages(_restClient.GetKlinesAsync, request).ConfigureAwait(false)) + { + if (!result) + { + _ = subResult.Data.CloseAsync(); + Status = SyncStatus.Disconnected; + return subResult.AsError(result.Error!); + } + + if (Limit != null && data.Count > Limit) + break; + + data.AddRange(result.Data); + } + + SetInitialData(data); + return subResult; + } + + /// + /// The stop procedure needed, generally stopping the update stream + /// + /// + protected virtual Task DoStopAsync() => _updateSubscription?.CloseAsync() ?? Task.CompletedTask; + + /// + public KlinesStats GetStats(DateTime? fromTimestamp = null, DateTime? toTimestamp = null) + { + var compareTime = SyncedFrom?.AddSeconds(-2); + var stats = GetStats(GetData(fromTimestamp, toTimestamp)); + stats.Complete = (fromTimestamp == null || fromTimestamp >= compareTime) && (toTimestamp == null || toTimestamp >= compareTime); + return stats; + } + + private KlinesStats GetStats(IEnumerable klines) + { + if (!klines.Any()) + return new KlinesStats(); + + return new KlinesStats + { + KlineCount = klines.Count(), + FirstOpenTime = klines.First().OpenTime, + LastOpenTime = klines.Last().OpenTime, + HighPrice = klines.Select(d => d.LowPrice).Max(), + LowPrice = klines.Select(d => d.HighPrice).Min(), + Volume = klines.Select(d => d.Volume).Sum(), + AverageVolume = Math.Round(klines.OrderByDescending(d => d.OpenTime).Skip(1).Select(d => d.Volume).DefaultIfEmpty().Average(), 8) + }; + } + + /// + public IEnumerable GetData(DateTime? since = null, DateTime? until = null) + { + lock (_lock) + { + ApplyWindow(true); + + IEnumerable result = _data.Values; + if (since != null) + result = result.Where(d => d.OpenTime >= since); + if (until != null) + result = result.Where(d => d.OpenTime <= until); + + return result.ToList(); + } + } + + /// + /// Set the initial kline data snapshot + /// + /// + protected void SetInitialData(IEnumerable data) + { + lock (_lock) + { + _data.Clear(); + + IEnumerable items = data.OrderByDescending(d => d.OpenTime); + if (Limit != null) + items = items.Take(Limit.Value); + if (Period != null) + items = items.Where(e => e.OpenTime >= DateTime.UtcNow.Add(-Period.Value)); + + foreach (var item in items.OrderBy(d => d.OpenTime)) + _data.Add(item.OpenTime, item); + + _snapshotSet = true; + + foreach (var item in _preSnapshotQueue) + { + if (_data.ContainsKey(item.OpenTime)) + continue; + + _data.Add(item.OpenTime, item); + } + + _firstTimestamp = _data.Min(v => v.Key); + ApplyWindow(false); + _logger.KlineTrackerInitialDataSet(SymbolName, _data.Last().Key); + } + } + + /// + /// Add or update a kline + /// + /// + protected void AddOrUpdate(SharedKline item) => AddOrUpdate(new[] { item }); + + /// + /// Add or update klines + /// + /// + protected void AddOrUpdate(IEnumerable items) + { + lock (_lock) + { + if (_restClient != null && _startWithSnapshot && !_snapshotSet) + { + _preSnapshotQueue.AddRange(items); + return; + } + + foreach (var item in items) + { + if (_data.TryGetValue(item.OpenTime, out var existing)) + { + _data.Remove(item.OpenTime); + _data.Add(item.OpenTime, item); + OnUpdated?.Invoke(item); + _logger.KlineTrackerKlineUpdated(SymbolName, _data.Last().Key); + } + else + { + _data.Add(item.OpenTime, item); + OnAdded?.Invoke(item); + _logger.KlineTrackerKlineAdded(SymbolName, _data.Last().Key); + } + } + + _firstTimestamp = _data.Min(x => x.Key); + _changed = true; + + SetSyncStatus(); + ApplyWindow(true); + } + } + + private void ApplyWindow(bool broadcastEvents) + { + if (!_changed && (DateTime.UtcNow - _lastWindowApplied) < TimeSpan.FromSeconds(1)) + return; + + if (Period != null) + { + var compareDate = DateTime.UtcNow.Add(-Period.Value); + for (var i = 0; i < _data.Count; i++) + { + var item = _data.ElementAt(0); + if (item.Key >= compareDate) + break; + + _data.Remove(item.Key); + if (broadcastEvents) + OnRemoved?.Invoke(item.Value); + } + } + + if (Limit != null && _data.Count > Limit.Value) + { + var toRemove = Math.Max(0, _data.Count - Limit.Value); + for (var i = 0; i < toRemove; i++) + { + var item = _data.ElementAt(0); + _data.Remove(item.Key); + if (broadcastEvents) + OnRemoved?.Invoke(item.Value); + } + } + + _lastWindowApplied = DateTime.UtcNow; + _changed = false; + } + + private void HandleConnectionLost() + { + _logger.KlineTrackerConnectionLost(SymbolName); + if (Status != SyncStatus.Disconnected) + { + Status = SyncStatus.Syncing; + _snapshotSet = false; + _firstTimestamp = null; + _preSnapshotQueue.Clear(); + } + } + + private void HandleConnectionClosed() + { + _logger.KlineTrackerConnectionClosed(SymbolName); + Status = SyncStatus.Disconnected; + _ = StopAsync(); + } + + private async void HandleConnectionRestored(TimeSpan _) + { + Status = SyncStatus.Syncing; + var success = false; + while (!success) + { + if (Status != SyncStatus.Syncing) + return; + + var resyncResult = await DoStartAsync().ConfigureAwait(false); + success = resyncResult; + } + + _logger.KlineTrackerConnectionRestored(SymbolName); + SetSyncStatus(); + } + + private void SetSyncStatus() + { + if (Status == SyncStatus.Synced) + return; + + if (Period != null) + { + if (_firstTimestamp <= DateTime.UtcNow - Period.Value) + Status = SyncStatus.Synced; + else + Status = SyncStatus.PartiallySynced; + } + + if (Limit != null) + { + if (_data.Count == Limit.Value) + Status = SyncStatus.Synced; + else + Status = SyncStatus.PartiallySynced; + } + + if (Period == null && Limit == null) + Status = SyncStatus.Synced; + } + } +} diff --git a/CryptoExchange.Net/Trackers/Klines/KlinesCompare.cs b/CryptoExchange.Net/Trackers/Klines/KlinesCompare.cs new file mode 100644 index 0000000..6e1deea --- /dev/null +++ b/CryptoExchange.Net/Trackers/Klines/KlinesCompare.cs @@ -0,0 +1,30 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace CryptoExchange.Net.Trackers.Klines +{ + /// + /// Klines statistics comparison + /// + public record KlinesCompare + { + /// + /// Number of trades + /// + public CompareValue? LowPriceDif { get; set; } + /// + /// Number of trades + /// + public CompareValue? HighPriceDif { get; set; } + /// + /// Number of trades + /// + public CompareValue? VolumeDif { get; set; } + /// + /// Number of trades + /// + public CompareValue? AverageVolumeDif { get; set; } + + } +} diff --git a/CryptoExchange.Net/Trackers/Klines/KlinesStats.cs b/CryptoExchange.Net/Trackers/Klines/KlinesStats.cs new file mode 100644 index 0000000..2a832f1 --- /dev/null +++ b/CryptoExchange.Net/Trackers/Klines/KlinesStats.cs @@ -0,0 +1,59 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace CryptoExchange.Net.Trackers.Klines +{ + /// + /// Klines statistics + /// + public record KlinesStats + { + /// + /// Number of klines + /// + public int KlineCount { get; set; } + /// + /// The kline open time of the first entry + /// + public DateTime? FirstOpenTime { get; set; } + /// + /// The kline open time of the last entry + /// + public DateTime? LastOpenTime { get; set; } + /// + /// Lowest trade price + /// + public decimal? LowPrice { get; set; } + /// + /// Highest trade price + /// + public decimal? HighPrice { get; set; } + /// + /// Trade volume + /// + public decimal Volume { get; set; } + /// + /// Average volume per kline + /// + public decimal? AverageVolume { get; set; } + /// + /// Whether the data is complete + /// + public bool Complete { get; set; } + + /// + /// Compare 2 stat snapshots to eachother + /// + public KlinesCompare CompareTo(KlinesStats otherStats) + { + return new KlinesCompare + { + LowPriceDif = new CompareValue(LowPrice, otherStats.LowPrice), + HighPriceDif = new CompareValue(HighPrice, otherStats.HighPrice), + VolumeDif = new CompareValue(Volume, otherStats.Volume), + AverageVolumeDif = new CompareValue(AverageVolume, otherStats.AverageVolume), + }; + } + } +} diff --git a/CryptoExchange.Net/Trackers/Trades/ITradeTracker.cs b/CryptoExchange.Net/Trackers/Trades/ITradeTracker.cs new file mode 100644 index 0000000..589d5bd --- /dev/null +++ b/CryptoExchange.Net/Trackers/Trades/ITradeTracker.cs @@ -0,0 +1,100 @@ +using CryptoExchange.Net.Objects; +using CryptoExchange.Net.SharedApis; +using System; +using System.Collections.Generic; +using System.Threading.Tasks; + +namespace CryptoExchange.Net.Trackers.Trades +{ + /// + /// A tracker for trades on a symbol + /// + public interface ITradeTracker + { + /// + /// The total number of trades + /// + int Count { get; } + + /// + /// Exchange name + /// + string Exchange { get; } + + /// + /// Symbol name + /// + string SymbolName { get; } + + /// + /// Symbol + /// + SharedSymbol Symbol { get; } + + /// + /// The max number of trades tracked + /// + int? Limit { get; } + + /// + /// The max age of the data tracked + /// + TimeSpan? Period { get; } + + /// + /// From which timestamp the trades are registered + /// + DateTime? SyncedFrom { get; } + + /// + /// The current synchronization status + /// + SyncStatus Status { get; } + + /// + /// Get the last trade + /// + SharedTrade? Last { get; } + + /// + /// Event for when a new trade is added + /// + event Func? OnAdded; + /// + /// Event for when a trade is removed because it's no longer within the period/limit window + /// + event Func? OnRemoved; + /// + /// Event for when the sync status changes + /// + event Func? OnStatusChanged; + + /// + /// Start synchronization + /// + /// + Task StartAsync(bool startWithSnapshot = true); + + /// + /// Stop synchronization + /// + /// + Task StopAsync(); + + /// + /// Get the data tracked + /// + /// Start timestamp to get the data from, defaults to tracked data start time + /// End timestamp to get the data until, defaults to current time + /// + IEnumerable GetData(DateTime? fromTimestamp = null, DateTime? toTimestamp = null); + + /// + /// Get statitistics on the trades + /// + /// Start timestamp to get the data from, defaults to tracked data start time + /// End timestamp to get the data until, defaults to current time + /// + TradesStats GetStats(DateTime? fromTimestamp = null, DateTime? toTimestamp = null); + } +} \ No newline at end of file diff --git a/CryptoExchange.Net/Trackers/Trades/TradeTracker.cs b/CryptoExchange.Net/Trackers/Trades/TradeTracker.cs new file mode 100644 index 0000000..8962566 --- /dev/null +++ b/CryptoExchange.Net/Trackers/Trades/TradeTracker.cs @@ -0,0 +1,495 @@ +using CryptoExchange.Net.Logging.Extensions; +using CryptoExchange.Net.Objects; +using CryptoExchange.Net.Objects.Sockets; +using CryptoExchange.Net.SharedApis; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Threading.Tasks; + +namespace CryptoExchange.Net.Trackers.Trades +{ + /// + public class TradeTracker : ITradeTracker + { + private readonly ITradeSocketClient _socketClient; + private readonly IRecentTradeRestClient? _recentRestClient; + private readonly ITradeHistoryRestClient? _historyRestClient; + private SyncStatus _status; + private long _snapshotId; + private bool _startWithSnapshot; + + /// + /// The internal data structure + /// + protected readonly List _data = new List(); + /// + /// The pre-snapshot queue buffering updates received before the snapshot is set and which will be applied after the snapshot was set + /// + protected readonly List _preSnapshotQueue = new List(); + + /// + /// The last time the window was applied + /// + protected DateTime _lastWindowApplied = DateTime.MinValue; + /// + /// Whether or not the data has changed since last window was applied + /// + protected bool _changed = false; + /// + /// Lock for accessing _data + /// + protected readonly object _lock = new object(); + /// + /// Whether the snapshot has been set + /// + protected bool _snapshotSet; + /// + /// Logger + /// + protected readonly ILogger _logger; + /// + /// Update subscription + /// + protected UpdateSubscription? _updateSubscription; + + /// + /// The timestamp of the first item + /// + protected DateTime? _firstTimestamp; + + /// + public string Exchange { get; } + + /// + public string SymbolName { get; } + + /// + public SharedSymbol Symbol { get; } + + /// + public int? Limit { get; } + /// + public TimeSpan? Period { get; } + + /// + public SyncStatus Status + { + get => _status; + set + { + if (value == _status) + return; + + var old = _status; + _status = value; + _logger.TradeTrackerStatusChanged(SymbolName, old, value); + OnStatusChanged?.Invoke(old, _status); + } + } + + /// + public int Count + { + get + { + lock (_lock) + { + ApplyWindow(true); + return _data.Count; + } + } + } + + /// + public DateTime? SyncedFrom + { + get + { + if (Period == null) + return _firstTimestamp; + + var max = DateTime.UtcNow - Period.Value; + if (_firstTimestamp > max) + return _firstTimestamp; + + return max; + } + } + + /// + public SharedTrade? Last + { + get + { + lock (_lock) + { + ApplyWindow(true); + return _data.LastOrDefault(); + } + } + } + + /// + public event Func? OnAdded; + /// + public event Func? OnRemoved; + /// + public event Func? OnStatusChanged; + + /// + /// ctor + /// + public TradeTracker( + ILogger? logger, + IRecentTradeRestClient? recentRestClient, + ITradeHistoryRestClient? historyRestClient, + ITradeSocketClient socketClient, + SharedSymbol symbol, + int? limit = null, + TimeSpan? period = null) + { + _logger = logger ?? new NullLogger(); + _recentRestClient = recentRestClient; + _historyRestClient = historyRestClient; + _socketClient = socketClient; + Exchange = socketClient.Exchange; + Symbol = symbol; + SymbolName = socketClient.FormatSymbol(symbol.BaseAsset, symbol.QuoteAsset, symbol.TradingMode, symbol.DeliverTime); + Limit = limit; + Period = period; + } + + private TradesStats GetStats(IEnumerable trades) + { + if (!trades.Any()) + return new TradesStats(); + + return new TradesStats + { + TradeCount = trades.Count(), + FirstTradeTime = trades.First().Timestamp, + LastTradeTime = trades.Last().Timestamp, + AveragePrice = Math.Round(trades.Select(d => d.Price).DefaultIfEmpty().Average(), 8), + VolumeWeightedAveragePrice = trades.Any() ? Math.Round(trades.Select(d => d.Price * d.Quantity).DefaultIfEmpty().Sum() / trades.Select(d => d.Quantity).DefaultIfEmpty().Sum(), 8) : null, + Volume = Math.Round(trades.Sum(d => d.Quantity), 8), + QuoteVolume = Math.Round(trades.Sum(d => d.Quantity * d.Price), 8), + BuySellRatio = Math.Round(trades.Where(x => x.Side == SharedOrderSide.Buy).Sum(x => x.Quantity) / trades.Sum(x => x.Quantity), 8) + }; + } + + /// + public TradesStats GetStats(DateTime? fromTimestamp = null, DateTime? toTimestamp = null) + { + var compareTime = SyncedFrom?.AddSeconds(-2); + var stats = GetStats(GetData(fromTimestamp, toTimestamp)); + stats.Complete = (fromTimestamp == null || fromTimestamp >= compareTime) && (toTimestamp == null || toTimestamp >= compareTime); + return stats; + } + + /// + public async Task StartAsync(bool startWithSnapshot = true) + { + if (Status != SyncStatus.Disconnected) + throw new InvalidOperationException($"Can't start syncing unless state is {SyncStatus.Disconnected}. Current state: {Status}"); + + _startWithSnapshot = startWithSnapshot; + Status = SyncStatus.Syncing; + _logger.TradeTrackerStarting(SymbolName); + var subResult = await DoStartAsync().ConfigureAwait(false); + if (!subResult) + { + _logger.TradeTrackerStartFailed(SymbolName, subResult.Error!.ToString()); + Status = SyncStatus.Disconnected; + return subResult; + } + + _updateSubscription = subResult.Data; + _updateSubscription.ConnectionLost += HandleConnectionLost; + _updateSubscription.ConnectionClosed += HandleConnectionClosed; + _updateSubscription.ConnectionRestored += HandleConnectionRestored; + SetSyncStatus(); + _logger.TradeTrackerStarted(SymbolName); + return new CallResult(null); + } + + /// + public async Task StopAsync() + { + _logger.TradeTrackerStopping(SymbolName); + Status = SyncStatus.Disconnected; + await DoStopAsync().ConfigureAwait(false); + _data.Clear(); + _preSnapshotQueue.Clear(); + _logger.TradeTrackerStopped(SymbolName); + } + + /// + /// The start procedure needed for trade syncing, generally subscribing to an update stream and requesting the snapshot + /// + /// + protected virtual async Task> DoStartAsync() + { + var subResult = await _socketClient.SubscribeToTradeUpdatesAsync(new SubscribeTradeRequest(Symbol), + update => + { + AddData(update.Data); + }).ConfigureAwait(false); + + if (!subResult) + { + Status = SyncStatus.Disconnected; + return subResult; + } + + if (!_startWithSnapshot) + return subResult; + + if (_historyRestClient != null) + { + var startTime = Period == null ? DateTime.UtcNow.AddMinutes(-5) : DateTime.UtcNow.Add(-Period.Value); + var request = new GetTradeHistoryRequest(Symbol, startTime, DateTime.UtcNow); + var data = new List(); + await foreach(var result in ExchangeHelpers.ExecutePages(_historyRestClient.GetTradeHistoryAsync, request).ConfigureAwait(false)) + { + if (!result) + { + _ = subResult.Data.CloseAsync(); + Status = SyncStatus.Disconnected; + return subResult.AsError(result.Error!); + } + + if (Limit != null && data.Count > Limit) + break; + + data.AddRange(result.Data); + } + + SetInitialData(data); + } + else if (_recentRestClient != null) + { + int? limit = null; + if (Limit.HasValue) + limit = Math.Min(_recentRestClient.GetRecentTradesOptions.MaxLimit, Limit.Value); + + var snapshot = await _recentRestClient.GetRecentTradesAsync(new GetRecentTradesRequest(Symbol, limit)).ConfigureAwait(false); + if (!snapshot) + { + _ = subResult.Data.CloseAsync(); + Status = SyncStatus.Disconnected; + return subResult.AsError(snapshot.Error!); + } + + SetInitialData(snapshot.Data); + } + + return subResult; + } + + /// + /// The stop procedure needed, generally stopping the update stream + /// + /// + protected virtual Task DoStopAsync() => _updateSubscription?.CloseAsync() ?? Task.CompletedTask; + + /// + public IEnumerable GetData(DateTime? since = null, DateTime? until = null) + { + lock (_lock) + { + ApplyWindow(true); + + IEnumerable result = _data; + if (since != null) + result = result.Where(d => d.Timestamp >= since); + if (until != null) + result = result.Where(d => d.Timestamp <= until); + + return result.ToList(); + } + } + + /// + /// Set the initial trade data snapshot + /// + /// + protected void SetInitialData(IEnumerable data) + { + lock (_lock) + { + _data.Clear(); + + IEnumerable items = data.OrderByDescending(d => d.Timestamp); + if (Limit != null) + items = items.Take(Limit.Value); + if (Period != null) + items = items.Where(e => e.Timestamp >= DateTime.UtcNow.Add(-Period.Value)); + + _snapshotId = data.Max(d => d.Timestamp.Ticks); + foreach (var item in items.OrderBy(d => d.Timestamp)) + _data.Add(item); + + _snapshotSet = true; + _changed = true; + + _logger.TradeTrackerInitialDataSet(SymbolName, _data.Count, _snapshotId); + + foreach (var item in _preSnapshotQueue) + { + if (_snapshotId >= item.Timestamp.Ticks) + { + _logger.TradeTrackerPreSnapshotSkip(SymbolName, item.Timestamp.Ticks); + continue; + } + + _logger.TradeTrackerPreSnapshotApplied(SymbolName, item.Timestamp.Ticks); + _data.Add(item); + } + + _firstTimestamp = _data.Min(v => v.Timestamp); + + ApplyWindow(false); + } + } + + /// + /// Add a trade + /// + /// + protected void AddData(SharedTrade item) => AddData(new[] { item }); + + /// + /// Add a list of trades + /// + /// + protected void AddData(IEnumerable items) + { + lock (_lock) + { + if ((_recentRestClient != null || _historyRestClient != null) && _startWithSnapshot && !_snapshotSet) + { + _preSnapshotQueue.AddRange(items); + return; + } + + foreach (var item in items) + { + _logger.TradeTrackerTradeAdded(SymbolName, item.Timestamp.Ticks); + _data.Add(item); + OnAdded?.Invoke(item); + } + + _firstTimestamp = _data.Min(x => x.Timestamp); + _changed = true; + SetSyncStatus(); + ApplyWindow(true); + } + } + + private void ApplyWindow(bool broadcastEvents) + { + if (!_changed && (DateTime.UtcNow - _lastWindowApplied) < TimeSpan.FromSeconds(1)) + return; + + if (Period != null) + { + var compareDate = DateTime.UtcNow.Add(-Period.Value); + for(var i = 0; i < _data.Count; i++) + { + var item = _data[0]; + if (item.Timestamp >= compareDate) + break; + + _data.Remove(item); + if (broadcastEvents) + OnRemoved?.Invoke(item); + } + } + + if (Limit != null && _data.Count > Limit.Value) + { + var toRemove = _data.Count - Limit.Value; + for (var i = 0; i < toRemove; i++) + { + var item = _data[0]; + _data.Remove(item); + if (broadcastEvents) + OnRemoved?.Invoke(item); + } + } + + _lastWindowApplied = DateTime.UtcNow; + _changed = false; + + if (Status == SyncStatus.PartiallySynced) + // Need to check if sync status should be changed even if there may not be any new data + SetSyncStatus(); + } + + + private void HandleConnectionLost() + { + _logger.TradeTrackerConnectionLost(SymbolName); + if (Status != SyncStatus.Disconnected) + { + Status = SyncStatus.Syncing; + _snapshotSet = false; + _firstTimestamp = null; + _preSnapshotQueue.Clear(); + } + } + + private void HandleConnectionClosed() + { + _logger.TradeTrackerConnectionClosed(SymbolName); + Status = SyncStatus.Disconnected; + _ = StopAsync(); + } + + private async void HandleConnectionRestored(TimeSpan _) + { + Status = SyncStatus.Syncing; + var success = false; + while (!success) + { + if (Status != SyncStatus.Syncing) + return; + + var resyncResult = await DoStartAsync().ConfigureAwait(false); + success = resyncResult; + } + + _logger.TradeTrackerConnectionRestored(SymbolName); + SetSyncStatus(); + } + + private void SetSyncStatus() + { + if (Status == SyncStatus.Synced) + return; + + if (Period != null) + { + if (_firstTimestamp <= DateTime.UtcNow - Period.Value) + Status = SyncStatus.Synced; + else + Status = SyncStatus.PartiallySynced; + } + + if (Limit != null) + { + if (_data.Count == Limit.Value) + Status = SyncStatus.Synced; + else + Status = SyncStatus.PartiallySynced; + } + + if (Period == null && Limit == null) + Status = SyncStatus.Synced; + } + } +} diff --git a/CryptoExchange.Net/Trackers/Trades/TradesCompare.cs b/CryptoExchange.Net/Trackers/Trades/TradesCompare.cs new file mode 100644 index 0000000..1d02593 --- /dev/null +++ b/CryptoExchange.Net/Trackers/Trades/TradesCompare.cs @@ -0,0 +1,37 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace CryptoExchange.Net.Trackers.Trades +{ + /// + /// Trades statistics comparison + /// + public record TradesCompare + { + /// + /// Number of trades + /// + public CompareValue TradeCountDif { get; set; } = new CompareValue(null, null); + /// + /// Average trade price + /// + public CompareValue? AveragePriceDif { get; set; } + /// + /// Volume weighted average trade price + /// + public CompareValue? VolumeWeightedAveragePriceDif { get; set; } + /// + /// Volume of the trades + /// + public CompareValue VolumeDif { get; set; } = new CompareValue(null, null); + /// + /// Volume of the trades in quote asset + /// + public CompareValue QuoteVolumeDif { get; set; } = new CompareValue(null, null); + /// + /// The volume weighted Buy/Sell ratio. A 0.7 ratio means 70% of the trade volume was a buy. + /// + public CompareValue? BuySellRatioDif { get; set; } + } +} diff --git a/CryptoExchange.Net/Trackers/Trades/TradesStats.cs b/CryptoExchange.Net/Trackers/Trades/TradesStats.cs new file mode 100644 index 0000000..2a0b241 --- /dev/null +++ b/CryptoExchange.Net/Trackers/Trades/TradesStats.cs @@ -0,0 +1,65 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace CryptoExchange.Net.Trackers.Trades +{ + /// + /// Trades statistics + /// + public record TradesStats + { + /// + /// Number of trades + /// + public int TradeCount { get; set; } + /// + /// Timestamp of the last trade + /// + public DateTime? FirstTradeTime { get; set; } + /// + /// Timestamp of the first trade + /// + public DateTime? LastTradeTime { get; set; } + /// + /// Average trade price + /// + public decimal? AveragePrice { get; set; } + /// + /// Volume weighted average trade price + /// + public decimal? VolumeWeightedAveragePrice { get; set; } + /// + /// Volume of the trades + /// + public decimal Volume { get; set; } + /// + /// Volume of the trades in quote asset + /// + public decimal QuoteVolume { get; set; } + /// + /// The volume weighted Buy/Sell ratio. A 0.7 ratio means 70% of the trade volume was a buy. + /// + public decimal? BuySellRatio { get; set; } + /// + /// Whether the data is complete + /// + public bool Complete { get; set; } + + /// + /// Compare 2 stat snapshots to eachother + /// + public TradesCompare CompareTo(TradesStats otherStats) + { + return new TradesCompare + { + TradeCountDif = new CompareValue(TradeCount, otherStats.TradeCount), + AveragePriceDif = new CompareValue(AveragePrice, otherStats.AveragePrice), + VolumeWeightedAveragePriceDif = new CompareValue(VolumeWeightedAveragePrice, otherStats.VolumeWeightedAveragePrice), + VolumeDif = new CompareValue(Volume, otherStats.Volume), + QuoteVolumeDif = new CompareValue(QuoteVolume, otherStats.QuoteVolume), + BuySellRatioDif = new CompareValue(BuySellRatio, otherStats.BuySellRatio), + }; + } + } +} diff --git a/docs/index.html b/docs/index.html index 73148e9..ecda4d6 100644 --- a/docs/index.html +++ b/docs/index.html @@ -106,6 +106,7 @@