From 0ad94cce5aa5645287167692a6f7f8bc7bee12e3 Mon Sep 17 00:00:00 2001 From: Jkorf Date: Fri, 23 Jan 2026 15:19:51 +0100 Subject: [PATCH] wip --- .../Objects/Sockets/UpdateSubscription.cs | 10 + .../Default/CryptoExchangeWebSocketClient.cs | 10 +- .../Sockets/Default/Interfaces/IWebsocket.cs | 4 + .../Sockets/Default/SocketConnection.cs | 5 + .../Testing/Implementations/TestSocket.cs | 1 + .../Trackers/UserData/IUserDataTracker.cs | 54 ++ .../Trackers/UserData/UpdateSource.cs | 17 + .../Trackers/UserData/UserDataTracker.cs | 643 ++++++++++++++++++ .../UserData/UserDataTrackerConfig.cs | 34 + .../Trackers/UserData/UserDataUpdate.cs | 18 + 10 files changed, 791 insertions(+), 5 deletions(-) create mode 100644 CryptoExchange.Net/Trackers/UserData/IUserDataTracker.cs create mode 100644 CryptoExchange.Net/Trackers/UserData/UpdateSource.cs create mode 100644 CryptoExchange.Net/Trackers/UserData/UserDataTracker.cs create mode 100644 CryptoExchange.Net/Trackers/UserData/UserDataTrackerConfig.cs create mode 100644 CryptoExchange.Net/Trackers/UserData/UserDataUpdate.cs diff --git a/CryptoExchange.Net/Objects/Sockets/UpdateSubscription.cs b/CryptoExchange.Net/Objects/Sockets/UpdateSubscription.cs index f128c79..cce55b6 100644 --- a/CryptoExchange.Net/Objects/Sockets/UpdateSubscription.cs +++ b/CryptoExchange.Net/Objects/Sockets/UpdateSubscription.cs @@ -109,6 +109,16 @@ namespace CryptoExchange.Net.Objects.Sockets /// public int Id => _subscription.Id; + /// + /// The last timestamp anything was received from the server + /// + public DateTime? LastReceiveTime => _connection.LastReceiveTime; + + /// + /// The current websocket status + /// + public SocketStatus Status => _connection.Status; + /// /// ctor /// diff --git a/CryptoExchange.Net/Sockets/Default/CryptoExchangeWebSocketClient.cs b/CryptoExchange.Net/Sockets/Default/CryptoExchangeWebSocketClient.cs index 11a2f7b..b8713c7 100644 --- a/CryptoExchange.Net/Sockets/Default/CryptoExchangeWebSocketClient.cs +++ b/CryptoExchange.Net/Sockets/Default/CryptoExchangeWebSocketClient.cs @@ -49,7 +49,6 @@ namespace CryptoExchange.Net.Sockets.Default private int _reconnectAttempt; private readonly int _receiveBufferSize; - private const int _defaultReceiveBufferSize = 1048576; private const int _sendBufferSize = 4096; private int _bytesReceived = 0; @@ -71,7 +70,7 @@ namespace CryptoExchange.Net.Sockets.Default /// /// The timestamp this socket has been active for the last time /// - public DateTime LastActionTime { get; private set; } + public DateTime? LastReceiveTime { get; private set; } /// public Uri Uri => Parameters.Uri; @@ -623,6 +622,7 @@ namespace CryptoExchange.Net.Sockets.Default break; } + LastReceiveTime = DateTime.UtcNow; if (receiveResult.MessageType == WebSocketMessageType.Close) { // Connection closed @@ -773,6 +773,7 @@ namespace CryptoExchange.Net.Sockets.Default break; } + LastReceiveTime = DateTime.UtcNow; if (receiveResult.MessageType == WebSocketMessageType.Close) { // Connection closed @@ -881,7 +882,6 @@ namespace CryptoExchange.Net.Sockets.Default /// protected void ProcessDataNew(WebSocketMessageType type, ReadOnlySpan data) { - LastActionTime = DateTime.UtcNow; _connection.HandleStreamMessage2(type, data); } @@ -892,7 +892,7 @@ namespace CryptoExchange.Net.Sockets.Default protected async Task CheckTimeoutAsync() { _logger.SocketStartingTaskForNoDataReceivedCheck(Id, Parameters.Timeout); - LastActionTime = DateTime.UtcNow; + LastReceiveTime = DateTime.UtcNow; try { while (true) @@ -900,7 +900,7 @@ namespace CryptoExchange.Net.Sockets.Default if (_ctsSource.IsCancellationRequested) return; - if (DateTime.UtcNow - LastActionTime > Parameters.Timeout) + if (DateTime.UtcNow - LastReceiveTime > Parameters.Timeout) { _logger.SocketNoDataReceiveTimoutReconnect(Id, Parameters.Timeout); _ = ReconnectAsync().ConfigureAwait(false); diff --git a/CryptoExchange.Net/Sockets/Default/Interfaces/IWebsocket.cs b/CryptoExchange.Net/Sockets/Default/Interfaces/IWebsocket.cs index 22fe927..a2518bb 100644 --- a/CryptoExchange.Net/Sockets/Default/Interfaces/IWebsocket.cs +++ b/CryptoExchange.Net/Sockets/Default/Interfaces/IWebsocket.cs @@ -69,6 +69,10 @@ namespace CryptoExchange.Net.Sockets.Default.Interfaces /// bool IsOpen { get; } /// + /// Last timestamp something was received from the server + /// + DateTime? LastReceiveTime { get; } + /// /// Connect the socket /// /// diff --git a/CryptoExchange.Net/Sockets/Default/SocketConnection.cs b/CryptoExchange.Net/Sockets/Default/SocketConnection.cs index 3023b37..5255762 100644 --- a/CryptoExchange.Net/Sockets/Default/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/Default/SocketConnection.cs @@ -178,6 +178,11 @@ namespace CryptoExchange.Net.Sockets.Default /// public DateTime? DisconnectTime { get; set; } + /// + /// Last timestamp something was received from the server + /// + public DateTime? LastReceiveTime => _socket.LastReceiveTime; + /// /// Tag for identification /// diff --git a/CryptoExchange.Net/Testing/Implementations/TestSocket.cs b/CryptoExchange.Net/Testing/Implementations/TestSocket.cs index 584e275..ed7c4a2 100644 --- a/CryptoExchange.Net/Testing/Implementations/TestSocket.cs +++ b/CryptoExchange.Net/Testing/Implementations/TestSocket.cs @@ -39,6 +39,7 @@ namespace CryptoExchange.Net.Testing.Implementations public Func>? GetReconnectionUrl { get; set; } public static int lastId = 0; + public DateTime? LastReceiveTime { get; } #if NET9_0_OR_GREATER public static readonly Lock lastIdLock = new Lock(); #else diff --git a/CryptoExchange.Net/Trackers/UserData/IUserDataTracker.cs b/CryptoExchange.Net/Trackers/UserData/IUserDataTracker.cs new file mode 100644 index 0000000..3d6c139 --- /dev/null +++ b/CryptoExchange.Net/Trackers/UserData/IUserDataTracker.cs @@ -0,0 +1,54 @@ +using CryptoExchange.Net.Objects; +using CryptoExchange.Net.SharedApis; +using System; +using System.Threading.Tasks; + +namespace CryptoExchange.Net.Trackers.UserData +{ + /// + /// User data tracker + /// + public interface IUserDataTracker + { + /// + /// User identifier + /// + public string? UserIdentifier { get; } + + /// + /// Current balances + /// + SharedBalance[] Balances { get; } + /// + /// Currently tracked orders + /// + SharedSpotOrder[] Orders { get; } + /// + /// Currently tracked trades + /// + SharedUserTrade[] Trades { get; } + + /// + /// On balance update + /// + event Action>? OnBalanceUpdate; + /// + /// On order update + /// + event Action>? OnOrderUpdate; + /// + /// On user trade update + /// + event Action>? OnTradeUpdate; + + /// + /// Start tracking user data + /// + Task StartAsync(); + /// + /// Stop tracking data + /// + /// + Task StopAsync(); + } +} \ No newline at end of file diff --git a/CryptoExchange.Net/Trackers/UserData/UpdateSource.cs b/CryptoExchange.Net/Trackers/UserData/UpdateSource.cs new file mode 100644 index 0000000..05c8457 --- /dev/null +++ b/CryptoExchange.Net/Trackers/UserData/UpdateSource.cs @@ -0,0 +1,17 @@ +namespace CryptoExchange.Net.Trackers.UserData +{ + /// + /// Update source + /// + public enum UpdateSource + { + /// + /// Polling result + /// + Poll, + /// + /// Websocket push + /// + Push + } +} diff --git a/CryptoExchange.Net/Trackers/UserData/UserDataTracker.cs b/CryptoExchange.Net/Trackers/UserData/UserDataTracker.cs new file mode 100644 index 0000000..e17930b --- /dev/null +++ b/CryptoExchange.Net/Trackers/UserData/UserDataTracker.cs @@ -0,0 +1,643 @@ +using CryptoExchange.Net.Objects; +using CryptoExchange.Net.Objects.Sockets; +using CryptoExchange.Net.SharedApis; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace CryptoExchange.Net.Trackers.UserData +{ + /// + /// User data tracker + /// + public abstract class UserDataTracker : IUserDataTracker + { + // Cached data + private ConcurrentDictionary _balanceStore = new ConcurrentDictionary(); + private ConcurrentDictionary _orderStore = new ConcurrentDictionary(); + private ConcurrentDictionary _tradeStore = new ConcurrentDictionary(); + + // Typed clients + private readonly IListenKeyRestClient? _listenKeyRestClient; + private readonly ISpotSymbolRestClient _spotSymbolRestClient; + private readonly IBalanceRestClient _balanceRestClient; + private readonly IBalanceSocketClient _balanceSocketClient; + private readonly ISpotOrderRestClient _spotOrderRestClient; + private readonly ISpotOrderSocketClient _spotOrderSocketClient; + private readonly IUserTradeSocketClient? _userTradeSocketClient; + private readonly ILogger _logger; + + // State management + private DateTime? _startTime = null; + private DateTime? _lastPollAttempt = null; + private bool _lastPollSuccessful = false; + private DateTime? _lastPollTimeOrders = null; + private DateTime? _lastPollTimeTrades = null; + private DateTime? _lastDataTimeOrdersBeforeDisconnect = null; + private DateTime? _lastDataTimeTradesBeforeDisconnect = null; + private bool _firstPollDone = false; + + // Config + private List _symbols = new List(); + private TimeSpan _pollIntervalConnected; + private TimeSpan _pollIntervalDisconnected; + private bool _pollAtStart; + private bool _onlyTrackProvidedSymbols; + + // Subscriptions + private UpdateSubscription? _balanceSubscription; + private UpdateSubscription? _orderSubscription; + private UpdateSubscription? _tradeSubscription; + + + private AsyncResetEvent _pollWaitEvent = new AsyncResetEvent(false, true); + private Task? _pollTask; + private CancellationTokenSource? _cts; + private object _symbolLock = new object(); + + private bool Connected => + _balanceSubscription?.Status == Sockets.Default.SocketStatus.Connected + && _orderSubscription?.Status == Sockets.Default.SocketStatus.Connected + && _tradeSubscription == null || _tradeSubscription?.Status == Sockets.Default.SocketStatus.Connected; + + /// + public event Action>? OnBalanceUpdate; + /// + public event Action>? OnOrderUpdate; + /// + public event Action>? OnTradeUpdate; + + /// + public string? UserIdentifier { get; } + /// + public SharedBalance[] Balances => _balanceStore.Values.ToArray(); + /// + public SharedSpotOrder[] Orders => _orderStore.Values.ToArray(); + /// + public SharedUserTrade[] Trades => _tradeStore.Values.ToArray(); + + /// + /// ctor + /// + protected UserDataTracker( + ILogger logger, + ISharedClient restClient, + ISharedClient socketClient, + string? userIdentifier, + UserDataTrackerConfig config + ) + { + _logger = logger; + _spotSymbolRestClient = (ISpotSymbolRestClient)restClient; + _balanceRestClient = (IBalanceRestClient)restClient; + _balanceSocketClient = (IBalanceSocketClient)socketClient; + _spotOrderRestClient = (ISpotOrderRestClient)restClient; + _spotOrderSocketClient = (ISpotOrderSocketClient)socketClient; + _listenKeyRestClient = restClient as IListenKeyRestClient; + _userTradeSocketClient = socketClient as IUserTradeSocketClient; + + _pollIntervalConnected = config.PollIntervalConnected; + _pollIntervalDisconnected = config.PollIntervalDisconnected; + _symbols = config.Symbols?.ToList() ?? []; + _onlyTrackProvidedSymbols = config.OnlyTrackProvidedSymbols; + _pollAtStart = config.PollAtStart; + + UserIdentifier = userIdentifier; + } + + /// + public async Task StartAsync() + { + _startTime = DateTime.UtcNow; + + _logger.LogDebug("Starting UserDataTracker"); + _cts = new CancellationTokenSource(); + + // Request symbols so SharedSymbol property can be filled on updates + var symbolResult = await _spotSymbolRestClient.GetSpotSymbolsAsync(new GetSymbolsRequest()).ConfigureAwait(false); + if (!symbolResult) + { + _logger.LogWarning("Failed to start UserDataTracker; symbols request failed: {Error}", symbolResult.Error!.Message); + return symbolResult; + } + + string? listenKey = null; + if (_listenKeyRestClient != null) + { + var lkResult = await _listenKeyRestClient.StartListenKeyAsync(new StartListenKeyRequest()).ConfigureAwait(false); + if (!lkResult) + { + _logger.LogWarning("Failed to start UserDataTracker; listen key request failed: {Error}", lkResult.Error!.Message); + return lkResult; + } + + listenKey = lkResult.Data; + } + + var subBalanceResult = await _balanceSocketClient.SubscribeToBalanceUpdatesAsync(new SubscribeBalancesRequest(listenKey), x => HandleBalanceUpdate(UpdateSource.Push, x.Data), ct: _cts.Token).ConfigureAwait(false); + if (!subBalanceResult) + { + _logger.LogWarning("Failed to start UserDataTracker; failed to subscribe to balance stream: {Error}", subBalanceResult.Error!.Message); + return subBalanceResult; + } + + _balanceSubscription = subBalanceResult.Data; + subBalanceResult.Data.SubscriptionStatusChanged += BalanceSubscriptionStatusChanged; + + var subOrderResult = await _spotOrderSocketClient.SubscribeToSpotOrderUpdatesAsync(new SubscribeSpotOrderRequest(listenKey), x => HandleOrderUpdate(UpdateSource.Push, x.Data), ct: _cts.Token).ConfigureAwait(false); + if (!subOrderResult) + { + _cts.Cancel(); + _logger.LogWarning("Failed to start UserDataTracker; failed to subscribe to order stream: {Error}", subOrderResult.Error!.Message); + return subOrderResult; + } + + _orderSubscription = subOrderResult.Data; + subOrderResult.Data.SubscriptionStatusChanged += OrderSubscriptionStatusChanged; + + if (_userTradeSocketClient != null) + { + var subTradeResult = await _userTradeSocketClient.SubscribeToUserTradeUpdatesAsync(new SubscribeUserTradeRequest(listenKey), x => HandleTradeUpdate(UpdateSource.Push, x.Data), ct: _cts.Token).ConfigureAwait(false); + if (!subOrderResult) + { + _cts.Cancel(); + _logger.LogWarning("Failed to start UserDataTracker; failed to subscribe to trade stream: {Error}", subTradeResult.Error!.Message); + return subOrderResult; + } + + _tradeSubscription = subTradeResult.Data; + subTradeResult.Data.SubscriptionStatusChanged += TradeSubscriptionStatusChanged; + } + + _pollTask = PollAsync(); + _logger.LogDebug("Started UserDataTracker"); + return CallResult.SuccessResult; + } + + private void UpdateSymbolsList(IEnumerable symbols) + { + lock (_symbolLock) + { + foreach (var symbol in symbols.Distinct()) + { + if (!_symbols.Any(x => x.TradingMode == symbol.TradingMode && x.BaseAsset == symbol.BaseAsset && x.QuoteAsset == symbol.QuoteAsset)) + { + _symbols.Add(symbol); + _logger.LogDebug("Adding {BaseAsset}/{QuoteAsset} to symbol tracking list", symbol.BaseAsset, symbol.QuoteAsset); + } + } + } + } + + private void HandleTradeUpdate(UpdateSource source, SharedUserTrade[] @event) + { + var unknownSymbols = @event.Where(x => x.SharedSymbol == null); + if (unknownSymbols.Any()) + { + _logger.LogWarning("Received order without SharedSymbol set, ignoring"); + @event = @event.Except(unknownSymbols).ToArray(); + } + + if (!_onlyTrackProvidedSymbols) + UpdateSymbolsList(@event.Select(x => x.SharedSymbol!)); + else + @event = @event.Where(x => _symbols.Any(y => y.TradingMode == x.SharedSymbol!.TradingMode && y.BaseAsset == x.SharedSymbol.BaseAsset && y.QuoteAsset == x.SharedSymbol.QuoteAsset)).ToArray(); + + // Update local store + var updatedIds = @event.Select(x => x.Id).ToList(); + foreach (var item in @event) + { + if (_tradeStore.TryAdd(item.Id, item)) + _logger.LogDebug("Added user trade {Symbol}.{Id}", item.Symbol, item.Id); + else + updatedIds.Remove(item.Id); + } + + if (updatedIds.Count > 0) + { + OnTradeUpdate?.Invoke( + new UserDataUpdate + { + Source = source, + Data = _tradeStore.Where(x => updatedIds.Contains(x.Key)).Select(x => x.Value).ToArray() + }); + } + } + + private void HandleOrderUpdate(UpdateSource source, SharedSpotOrder[] @event) + { + var unknownSymbols = @event.Where(x => x.SharedSymbol == null); + if (unknownSymbols.Any()) + { + _logger.LogWarning("Received order without SharedSymbol set, ignoring"); + @event = @event.Except(unknownSymbols).ToArray(); + } + + if (!_onlyTrackProvidedSymbols) + UpdateSymbolsList(@event.Select(x => x.SharedSymbol!)); + else + @event = @event.Where(x => _symbols.Any(y => y.TradingMode == x.SharedSymbol!.TradingMode && y.BaseAsset == x.SharedSymbol.BaseAsset && y.QuoteAsset == x.SharedSymbol.QuoteAsset)).ToArray(); + + // Update local store + var updatedIds = @event.Select(x => x.OrderId).ToList(); + + foreach (var item in @event) + { + bool orderExisted = false; + _orderStore.AddOrUpdate(item.OrderId, item, (id, existing) => + { + orderExisted = true; + var updated = UpdateSpotOrder(existing, item); + if (!updated) + updatedIds.Remove(id); + else + _logger.LogDebug("Updated spot order {Symbol}.{Id}", item.Symbol, item.OrderId); + + return existing; + }); + + if (!orderExisted) + _logger.LogDebug("Added spot order {Symbol}.{Id}", item.Symbol, item.OrderId); + } + + if (updatedIds.Count > 0) + { + OnOrderUpdate?.Invoke( + new UserDataUpdate + { + Source = source, + Data = _orderStore.Where(x => updatedIds.Contains(x.Key)).Select(x => x.Value).ToArray() + }); + } + + var trades = @event.Where(x => x.LastTrade != null).Select(x => x.LastTrade!).ToArray(); + if (trades.Length != 0) + HandleTradeUpdate(source, trades); + } + + private void HandleBalanceUpdate(UpdateSource source, SharedBalance[] @event) + { + // Update local store + var updatedAssets = @event.Select(x => x.Asset).ToList(); + + foreach (var item in @event) + { + bool balanceExisted = false; + _balanceStore.AddOrUpdate(item.Asset, item, (asset, existing) => + { + balanceExisted = true; + var updated = UpdateBalance(existing, item); + if (!updated) + updatedAssets.Remove(asset); + else + _logger.LogDebug("Updated balance for {Asset}", item.Asset); + + return existing; + }); + + if (!balanceExisted) + _logger.LogDebug("Added balance for {Asset}", item.Asset); + } + + if (updatedAssets.Count > 0) + { + OnBalanceUpdate?.Invoke( + new UserDataUpdate + { + Source = source, + Data = _balanceStore.Where(x => updatedAssets.Contains(x.Key)).Select(x => x.Value).ToArray() + }); + } + } + + private void BalanceSubscriptionStatusChanged(SubscriptionStatus newState) + { + _logger.LogDebug("Balance stream status changed: {NewState}", newState); + if (newState == SubscriptionStatus.Subscribed) + // Trigger REST polling since we weren't connected + _pollWaitEvent.Set(); + } + + private void OrderSubscriptionStatusChanged(SubscriptionStatus newState) + { + _logger.LogDebug("Order stream status changed: {NewState}", newState); + + if (newState == SubscriptionStatus.Pending) + { + // Record last data receive time since we need to request data from that timestamp on when polling + // Only set to new value if it isn't already set since if we disconnect/reconnect a couple of times without + // managing to do a poll we don't want to override the time since we still need to request that earlier data + + if (_lastDataTimeOrdersBeforeDisconnect == null) + _lastDataTimeOrdersBeforeDisconnect = _orderSubscription!.LastReceiveTime; + } + + if (newState == SubscriptionStatus.Subscribed) + // Trigger REST polling since we weren't connected + _pollWaitEvent.Set(); + } + + private void TradeSubscriptionStatusChanged(SubscriptionStatus newState) + { + _logger.LogDebug("Trade stream status changed: {NewState}", newState); + + if (newState == SubscriptionStatus.Pending) + { + // Record last data receive time since we need to request data from that timestamp on when polling + // Only set to new value if it isn't already set since if we disconnect/reconnect a couple of times without + // managing to do a poll we don't want to override the time since we still need to request that earlier data + + if (_lastDataTimeTradesBeforeDisconnect == null) + _lastDataTimeTradesBeforeDisconnect = _tradeSubscription?.LastReceiveTime; + } + + if (newState == SubscriptionStatus.Subscribed) + // Trigger REST polling since we weren't connected + _pollWaitEvent.Set(); + } + + private bool UpdateBalance(SharedBalance existingBalance, SharedBalance newBalance) + { + // Some other way to way to determine sequence? Maybe timestamp? + var changed = false; + if (existingBalance.Total != newBalance.Total) + { + existingBalance.Total = newBalance.Total; + changed = true; + } + + if (existingBalance.Available != newBalance.Available) + { + existingBalance.Available = newBalance.Available; + changed = true; + } + + return changed; + } + + private bool UpdateSpotOrder(SharedSpotOrder existingOrder, SharedSpotOrder newOrder) + { + if (CheckIfOrderUpdateIsNewer(existingOrder, newOrder) == false) + // Update is older than the existing data, ignore + return false; + + var changed = false; + if (newOrder.AveragePrice != null && newOrder.AveragePrice != existingOrder.AveragePrice) + { + existingOrder.AveragePrice = newOrder.AveragePrice; + changed = true; + } + + if (newOrder.OrderPrice != null && newOrder.OrderPrice != existingOrder.OrderPrice) + { + existingOrder.OrderPrice = newOrder.OrderPrice; + changed = true; + } + + if (newOrder.Fee != null && newOrder.Fee != existingOrder.Fee) + { + existingOrder.Fee = newOrder.Fee; + changed = true; + } + + if (newOrder.FeeAsset != null && newOrder.FeeAsset != existingOrder.FeeAsset) + { + existingOrder.FeeAsset = newOrder.FeeAsset; + changed = true; + } + + if (newOrder.OrderQuantity != null && newOrder.OrderQuantity != existingOrder.OrderQuantity) + { + existingOrder.OrderQuantity = newOrder.OrderQuantity; + changed = true; + } + + if (newOrder.QuantityFilled != null && newOrder.QuantityFilled != existingOrder.QuantityFilled) + { + existingOrder.QuantityFilled = newOrder.QuantityFilled; + changed = true; + } + + if (newOrder.Status != existingOrder.Status) + { + existingOrder.Status = newOrder.Status; + changed = true; + } + + if (newOrder.UpdateTime != null && newOrder.UpdateTime != existingOrder.UpdateTime) + { + existingOrder.UpdateTime = newOrder.UpdateTime; + changed = true; + } + + return changed; + } + + private bool? CheckIfOrderUpdateIsNewer(SharedSpotOrder existingOrder, SharedSpotOrder newOrder) + { + if (existingOrder.Status == SharedOrderStatus.Open && newOrder.Status != SharedOrderStatus.Open) + // status changed from open to not open + return true; + + if (existingOrder.Status != SharedOrderStatus.Open && newOrder.Status == SharedOrderStatus.Open) + // status changed from not open to open; stale + return false; + + if (existingOrder.UpdateTime != null && newOrder.UpdateTime != null) + { + // If both have an update time base of that + if (existingOrder.UpdateTime < newOrder.UpdateTime) + return true; + + if (existingOrder.UpdateTime > newOrder.UpdateTime) + return false; + } + + if (existingOrder.QuantityFilled != null && newOrder.QuantityFilled != null) + { + if (existingOrder.QuantityFilled.QuantityInBaseAsset != null && newOrder.QuantityFilled.QuantityInBaseAsset != null) + { + // If base quantity is not null we can base it on that + if (existingOrder.QuantityFilled.QuantityInBaseAsset < newOrder.QuantityFilled.QuantityInBaseAsset) + return true; + + else if (existingOrder.QuantityFilled.QuantityInBaseAsset > newOrder.QuantityFilled.QuantityInBaseAsset) + return false; + } + + if (existingOrder.QuantityFilled.QuantityInQuoteAsset != null && newOrder.QuantityFilled.QuantityInQuoteAsset != null) + { + // If quote quantity is not null we can base it on that + if (existingOrder.QuantityFilled.QuantityInQuoteAsset < newOrder.QuantityFilled.QuantityInQuoteAsset) + return true; + + else if (existingOrder.QuantityFilled.QuantityInQuoteAsset > newOrder.QuantityFilled.QuantityInQuoteAsset) + return false; + } + } + + if (existingOrder.Fee != null && newOrder.Fee != null) + { + // Higher fee means later processing + if (existingOrder.Fee < newOrder.Fee) + return true; + + if (existingOrder.Fee > newOrder.Fee) + return false; + } + + return null; + } + + private TimeSpan? GetNextPollDelay() + { + if (!_firstPollDone && _pollAtStart) + // First polling should be done immediately + return TimeSpan.Zero; + + if (!Connected) + { + if (_pollIntervalDisconnected == TimeSpan.Zero) + // No polling interval + return null; + + return _pollIntervalDisconnected; + } + + if (_pollIntervalConnected == TimeSpan.Zero) + // No polling interval + return null; + + // Wait for next poll + return _pollIntervalConnected; + } + + private async Task PollAsync() + { + while (!_cts!.IsCancellationRequested) + { + var delayForNextPoll = GetNextPollDelay(); + if (delayForNextPoll != TimeSpan.Zero) + { + try + { + if (delayForNextPoll != null) + _logger.LogTrace("Delay for next polling: {Delay}", delayForNextPoll); + + await _pollWaitEvent.WaitAsync(delayForNextPoll, _cts.Token).ConfigureAwait(false); + } + catch { } + } + + _firstPollDone = true; + if (_cts.IsCancellationRequested) + break; + + if (_lastPollAttempt != null && (DateTime.UtcNow - _lastPollAttempt.Value) < TimeSpan.FromSeconds(2)) + { + if (_lastPollSuccessful) + // If last poll was less than 2 seconds ago and it was successful don't bother immediately polling again + continue; + } + + _logger.LogDebug("Starting user data requesting"); + _lastPollAttempt = DateTime.UtcNow; + _lastPollSuccessful = false; + + var anyError = false; + var balancesResult = await _balanceRestClient.GetBalancesAsync(new GetBalancesRequest()).ConfigureAwait(false); + if (!balancesResult.Success) + { + // .. ? + var transientError = balancesResult.Error!.IsTransient; + // If transient we can retry + // Should communicate errors, also for websocket disconnecting + + anyError = true; + } + else + { + HandleBalanceUpdate(UpdateSource.Poll, balancesResult.Data); + } + + var openOrdersResult = await _spotOrderRestClient.GetOpenSpotOrdersAsync(new GetOpenOrdersRequest()).ConfigureAwait(false); + if (!openOrdersResult.Success) + { + // .. ? + + anyError = true; + } + else + { + HandleOrderUpdate(UpdateSource.Poll, openOrdersResult.Data); + } + + foreach (var symbol in _symbols) + { + var fromTimeOrders = _lastDataTimeOrdersBeforeDisconnect ?? _lastPollTimeOrders ?? _startTime; + var updatedPollTime = DateTime.UtcNow; + var closedOrdersResult = await _spotOrderRestClient.GetClosedSpotOrdersAsync(new GetClosedOrdersRequest(symbol, startTime: fromTimeOrders)).ConfigureAwait(false); + if (!closedOrdersResult.Success) + { + // .. ? + + anyError = true; + } + else + { + _lastDataTimeOrdersBeforeDisconnect = null; + _lastPollTimeOrders = updatedPollTime; + + // Filter orders to only include where close time is after the start time + var relevantOrders = closedOrdersResult.Data.Where(x => + x.UpdateTime != null && x.UpdateTime >= _startTime // Updated after the tracker start time + || x.CreateTime != null && x.CreateTime >= _startTime // Created after the tracker start time + || x.CreateTime == null && x.UpdateTime == null // Unknown time + ).ToArray(); + + if (relevantOrders.Length > 0) + HandleOrderUpdate(UpdateSource.Poll, relevantOrders); + } + + var fromTimeTrades = _lastDataTimeTradesBeforeDisconnect ?? _lastPollTimeTrades ?? _startTime; + var tradesResult = await _spotOrderRestClient.GetSpotUserTradesAsync(new GetUserTradesRequest(symbol, startTime: fromTimeTrades)).ConfigureAwait(false); + if (!tradesResult.Success) + { + // .. ? + anyError = true; + } + else + { + _lastDataTimeTradesBeforeDisconnect = null; + _lastPollTimeTrades = updatedPollTime; + + // Filter trades to only include where timestamp is after the start time OR it's part of an order we're tracking + var relevantTrades = tradesResult.Data.Where(x => x.Timestamp >= _startTime || _orderStore.ContainsKey(x.OrderId)).ToArray(); + if (relevantTrades.Length > 0) + HandleTradeUpdate(UpdateSource.Poll, tradesResult.Data); + } + } + + _lastPollSuccessful = !anyError; + _logger.LogDebug("User data requesting completed"); + } + } + + /// + public async Task StopAsync() + { + _logger.LogDebug("Stopping UserDataTracker"); + _cts?.Cancel(); + + if (_pollTask != null) + await _pollTask.ConfigureAwait(false); + + _logger.LogDebug("Stopped UserDataTracker"); + } + } +} diff --git a/CryptoExchange.Net/Trackers/UserData/UserDataTrackerConfig.cs b/CryptoExchange.Net/Trackers/UserData/UserDataTrackerConfig.cs new file mode 100644 index 0000000..302f378 --- /dev/null +++ b/CryptoExchange.Net/Trackers/UserData/UserDataTrackerConfig.cs @@ -0,0 +1,34 @@ +using CryptoExchange.Net.SharedApis; +using System; +using System.Collections.Generic; +using System.Text; + +namespace CryptoExchange.Net.Trackers.UserData +{ + /// + /// User data tracker configuration + /// + public record UserDataTrackerConfig + { + /// + /// Symbols to initially track, used when polling data. Other symbols will get tracked when updates are received for orders or trades on a new symbol and when there are open orders on a new symbol. To only track the symbols specified here set `OnlyTrackProvidedSymbols` to true. + /// + public IEnumerable Symbols { get; set; } = []; + /// + /// If true only orders and trades in the `Symbols` options will get tracked, data on other symbols will be ignored. + /// + public bool OnlyTrackProvidedSymbols { get; set; } = false; + /// + /// Interval to poll data at as backup, even when the websocket stream is still connected. + /// + public TimeSpan PollIntervalConnected { get; set; } = TimeSpan.Zero; + /// + /// Interval to poll data at while the websocket is disconnected. + /// + public TimeSpan PollIntervalDisconnected { get; set; } = TimeSpan.Zero; + /// + /// Whether to poll for data initially when starting the tracker. + /// + public bool PollAtStart { get; set; } = true; + } +} diff --git a/CryptoExchange.Net/Trackers/UserData/UserDataUpdate.cs b/CryptoExchange.Net/Trackers/UserData/UserDataUpdate.cs new file mode 100644 index 0000000..bde0c32 --- /dev/null +++ b/CryptoExchange.Net/Trackers/UserData/UserDataUpdate.cs @@ -0,0 +1,18 @@ +namespace CryptoExchange.Net.Trackers.UserData +{ + /// + /// User data update + /// + /// Data type + public class UserDataUpdate + { + /// + /// Source + /// + public UpdateSource Source { get; set; } + /// + /// Data + /// + public T Data { get; set; } = default!; + } +}