diff --git a/CryptoExchange.Net/Objects/Sockets/UpdateSubscription.cs b/CryptoExchange.Net/Objects/Sockets/UpdateSubscription.cs
index f128c79..cce55b6 100644
--- a/CryptoExchange.Net/Objects/Sockets/UpdateSubscription.cs
+++ b/CryptoExchange.Net/Objects/Sockets/UpdateSubscription.cs
@@ -109,6 +109,16 @@ namespace CryptoExchange.Net.Objects.Sockets
///
public int Id => _subscription.Id;
+ ///
+ /// The last timestamp anything was received from the server
+ ///
+ public DateTime? LastReceiveTime => _connection.LastReceiveTime;
+
+ ///
+ /// The current websocket status
+ ///
+ public SocketStatus Status => _connection.Status;
+
///
/// ctor
///
diff --git a/CryptoExchange.Net/Sockets/Default/CryptoExchangeWebSocketClient.cs b/CryptoExchange.Net/Sockets/Default/CryptoExchangeWebSocketClient.cs
index 11a2f7b..b8713c7 100644
--- a/CryptoExchange.Net/Sockets/Default/CryptoExchangeWebSocketClient.cs
+++ b/CryptoExchange.Net/Sockets/Default/CryptoExchangeWebSocketClient.cs
@@ -49,7 +49,6 @@ namespace CryptoExchange.Net.Sockets.Default
private int _reconnectAttempt;
private readonly int _receiveBufferSize;
- private const int _defaultReceiveBufferSize = 1048576;
private const int _sendBufferSize = 4096;
private int _bytesReceived = 0;
@@ -71,7 +70,7 @@ namespace CryptoExchange.Net.Sockets.Default
///
/// The timestamp this socket has been active for the last time
///
- public DateTime LastActionTime { get; private set; }
+ public DateTime? LastReceiveTime { get; private set; }
///
public Uri Uri => Parameters.Uri;
@@ -623,6 +622,7 @@ namespace CryptoExchange.Net.Sockets.Default
break;
}
+ LastReceiveTime = DateTime.UtcNow;
if (receiveResult.MessageType == WebSocketMessageType.Close)
{
// Connection closed
@@ -773,6 +773,7 @@ namespace CryptoExchange.Net.Sockets.Default
break;
}
+ LastReceiveTime = DateTime.UtcNow;
if (receiveResult.MessageType == WebSocketMessageType.Close)
{
// Connection closed
@@ -881,7 +882,6 @@ namespace CryptoExchange.Net.Sockets.Default
///
protected void ProcessDataNew(WebSocketMessageType type, ReadOnlySpan data)
{
- LastActionTime = DateTime.UtcNow;
_connection.HandleStreamMessage2(type, data);
}
@@ -892,7 +892,7 @@ namespace CryptoExchange.Net.Sockets.Default
protected async Task CheckTimeoutAsync()
{
_logger.SocketStartingTaskForNoDataReceivedCheck(Id, Parameters.Timeout);
- LastActionTime = DateTime.UtcNow;
+ LastReceiveTime = DateTime.UtcNow;
try
{
while (true)
@@ -900,7 +900,7 @@ namespace CryptoExchange.Net.Sockets.Default
if (_ctsSource.IsCancellationRequested)
return;
- if (DateTime.UtcNow - LastActionTime > Parameters.Timeout)
+ if (DateTime.UtcNow - LastReceiveTime > Parameters.Timeout)
{
_logger.SocketNoDataReceiveTimoutReconnect(Id, Parameters.Timeout);
_ = ReconnectAsync().ConfigureAwait(false);
diff --git a/CryptoExchange.Net/Sockets/Default/Interfaces/IWebsocket.cs b/CryptoExchange.Net/Sockets/Default/Interfaces/IWebsocket.cs
index 22fe927..a2518bb 100644
--- a/CryptoExchange.Net/Sockets/Default/Interfaces/IWebsocket.cs
+++ b/CryptoExchange.Net/Sockets/Default/Interfaces/IWebsocket.cs
@@ -69,6 +69,10 @@ namespace CryptoExchange.Net.Sockets.Default.Interfaces
///
bool IsOpen { get; }
///
+ /// Last timestamp something was received from the server
+ ///
+ DateTime? LastReceiveTime { get; }
+ ///
/// Connect the socket
///
///
diff --git a/CryptoExchange.Net/Sockets/Default/SocketConnection.cs b/CryptoExchange.Net/Sockets/Default/SocketConnection.cs
index 3023b37..5255762 100644
--- a/CryptoExchange.Net/Sockets/Default/SocketConnection.cs
+++ b/CryptoExchange.Net/Sockets/Default/SocketConnection.cs
@@ -178,6 +178,11 @@ namespace CryptoExchange.Net.Sockets.Default
///
public DateTime? DisconnectTime { get; set; }
+ ///
+ /// Last timestamp something was received from the server
+ ///
+ public DateTime? LastReceiveTime => _socket.LastReceiveTime;
+
///
/// Tag for identification
///
diff --git a/CryptoExchange.Net/Testing/Implementations/TestSocket.cs b/CryptoExchange.Net/Testing/Implementations/TestSocket.cs
index 584e275..ed7c4a2 100644
--- a/CryptoExchange.Net/Testing/Implementations/TestSocket.cs
+++ b/CryptoExchange.Net/Testing/Implementations/TestSocket.cs
@@ -39,6 +39,7 @@ namespace CryptoExchange.Net.Testing.Implementations
public Func>? GetReconnectionUrl { get; set; }
public static int lastId = 0;
+ public DateTime? LastReceiveTime { get; }
#if NET9_0_OR_GREATER
public static readonly Lock lastIdLock = new Lock();
#else
diff --git a/CryptoExchange.Net/Trackers/UserData/IUserDataTracker.cs b/CryptoExchange.Net/Trackers/UserData/IUserDataTracker.cs
new file mode 100644
index 0000000..3d6c139
--- /dev/null
+++ b/CryptoExchange.Net/Trackers/UserData/IUserDataTracker.cs
@@ -0,0 +1,54 @@
+using CryptoExchange.Net.Objects;
+using CryptoExchange.Net.SharedApis;
+using System;
+using System.Threading.Tasks;
+
+namespace CryptoExchange.Net.Trackers.UserData
+{
+ ///
+ /// User data tracker
+ ///
+ public interface IUserDataTracker
+ {
+ ///
+ /// User identifier
+ ///
+ public string? UserIdentifier { get; }
+
+ ///
+ /// Current balances
+ ///
+ SharedBalance[] Balances { get; }
+ ///
+ /// Currently tracked orders
+ ///
+ SharedSpotOrder[] Orders { get; }
+ ///
+ /// Currently tracked trades
+ ///
+ SharedUserTrade[] Trades { get; }
+
+ ///
+ /// On balance update
+ ///
+ event Action>? OnBalanceUpdate;
+ ///
+ /// On order update
+ ///
+ event Action>? OnOrderUpdate;
+ ///
+ /// On user trade update
+ ///
+ event Action>? OnTradeUpdate;
+
+ ///
+ /// Start tracking user data
+ ///
+ Task StartAsync();
+ ///
+ /// Stop tracking data
+ ///
+ ///
+ Task StopAsync();
+ }
+}
\ No newline at end of file
diff --git a/CryptoExchange.Net/Trackers/UserData/UpdateSource.cs b/CryptoExchange.Net/Trackers/UserData/UpdateSource.cs
new file mode 100644
index 0000000..05c8457
--- /dev/null
+++ b/CryptoExchange.Net/Trackers/UserData/UpdateSource.cs
@@ -0,0 +1,17 @@
+namespace CryptoExchange.Net.Trackers.UserData
+{
+ ///
+ /// Update source
+ ///
+ public enum UpdateSource
+ {
+ ///
+ /// Polling result
+ ///
+ Poll,
+ ///
+ /// Websocket push
+ ///
+ Push
+ }
+}
diff --git a/CryptoExchange.Net/Trackers/UserData/UserDataTracker.cs b/CryptoExchange.Net/Trackers/UserData/UserDataTracker.cs
new file mode 100644
index 0000000..e17930b
--- /dev/null
+++ b/CryptoExchange.Net/Trackers/UserData/UserDataTracker.cs
@@ -0,0 +1,643 @@
+using CryptoExchange.Net.Objects;
+using CryptoExchange.Net.Objects.Sockets;
+using CryptoExchange.Net.SharedApis;
+using Microsoft.Extensions.Logging;
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace CryptoExchange.Net.Trackers.UserData
+{
+ ///
+ /// User data tracker
+ ///
+ public abstract class UserDataTracker : IUserDataTracker
+ {
+ // Cached data
+ private ConcurrentDictionary _balanceStore = new ConcurrentDictionary();
+ private ConcurrentDictionary _orderStore = new ConcurrentDictionary();
+ private ConcurrentDictionary _tradeStore = new ConcurrentDictionary();
+
+ // Typed clients
+ private readonly IListenKeyRestClient? _listenKeyRestClient;
+ private readonly ISpotSymbolRestClient _spotSymbolRestClient;
+ private readonly IBalanceRestClient _balanceRestClient;
+ private readonly IBalanceSocketClient _balanceSocketClient;
+ private readonly ISpotOrderRestClient _spotOrderRestClient;
+ private readonly ISpotOrderSocketClient _spotOrderSocketClient;
+ private readonly IUserTradeSocketClient? _userTradeSocketClient;
+ private readonly ILogger _logger;
+
+ // State management
+ private DateTime? _startTime = null;
+ private DateTime? _lastPollAttempt = null;
+ private bool _lastPollSuccessful = false;
+ private DateTime? _lastPollTimeOrders = null;
+ private DateTime? _lastPollTimeTrades = null;
+ private DateTime? _lastDataTimeOrdersBeforeDisconnect = null;
+ private DateTime? _lastDataTimeTradesBeforeDisconnect = null;
+ private bool _firstPollDone = false;
+
+ // Config
+ private List _symbols = new List();
+ private TimeSpan _pollIntervalConnected;
+ private TimeSpan _pollIntervalDisconnected;
+ private bool _pollAtStart;
+ private bool _onlyTrackProvidedSymbols;
+
+ // Subscriptions
+ private UpdateSubscription? _balanceSubscription;
+ private UpdateSubscription? _orderSubscription;
+ private UpdateSubscription? _tradeSubscription;
+
+
+ private AsyncResetEvent _pollWaitEvent = new AsyncResetEvent(false, true);
+ private Task? _pollTask;
+ private CancellationTokenSource? _cts;
+ private object _symbolLock = new object();
+
+ private bool Connected =>
+ _balanceSubscription?.Status == Sockets.Default.SocketStatus.Connected
+ && _orderSubscription?.Status == Sockets.Default.SocketStatus.Connected
+ && _tradeSubscription == null || _tradeSubscription?.Status == Sockets.Default.SocketStatus.Connected;
+
+ ///
+ public event Action>? OnBalanceUpdate;
+ ///
+ public event Action>? OnOrderUpdate;
+ ///
+ public event Action>? OnTradeUpdate;
+
+ ///
+ public string? UserIdentifier { get; }
+ ///
+ public SharedBalance[] Balances => _balanceStore.Values.ToArray();
+ ///
+ public SharedSpotOrder[] Orders => _orderStore.Values.ToArray();
+ ///
+ public SharedUserTrade[] Trades => _tradeStore.Values.ToArray();
+
+ ///
+ /// ctor
+ ///
+ protected UserDataTracker(
+ ILogger logger,
+ ISharedClient restClient,
+ ISharedClient socketClient,
+ string? userIdentifier,
+ UserDataTrackerConfig config
+ )
+ {
+ _logger = logger;
+ _spotSymbolRestClient = (ISpotSymbolRestClient)restClient;
+ _balanceRestClient = (IBalanceRestClient)restClient;
+ _balanceSocketClient = (IBalanceSocketClient)socketClient;
+ _spotOrderRestClient = (ISpotOrderRestClient)restClient;
+ _spotOrderSocketClient = (ISpotOrderSocketClient)socketClient;
+ _listenKeyRestClient = restClient as IListenKeyRestClient;
+ _userTradeSocketClient = socketClient as IUserTradeSocketClient;
+
+ _pollIntervalConnected = config.PollIntervalConnected;
+ _pollIntervalDisconnected = config.PollIntervalDisconnected;
+ _symbols = config.Symbols?.ToList() ?? [];
+ _onlyTrackProvidedSymbols = config.OnlyTrackProvidedSymbols;
+ _pollAtStart = config.PollAtStart;
+
+ UserIdentifier = userIdentifier;
+ }
+
+ ///
+ public async Task StartAsync()
+ {
+ _startTime = DateTime.UtcNow;
+
+ _logger.LogDebug("Starting UserDataTracker");
+ _cts = new CancellationTokenSource();
+
+ // Request symbols so SharedSymbol property can be filled on updates
+ var symbolResult = await _spotSymbolRestClient.GetSpotSymbolsAsync(new GetSymbolsRequest()).ConfigureAwait(false);
+ if (!symbolResult)
+ {
+ _logger.LogWarning("Failed to start UserDataTracker; symbols request failed: {Error}", symbolResult.Error!.Message);
+ return symbolResult;
+ }
+
+ string? listenKey = null;
+ if (_listenKeyRestClient != null)
+ {
+ var lkResult = await _listenKeyRestClient.StartListenKeyAsync(new StartListenKeyRequest()).ConfigureAwait(false);
+ if (!lkResult)
+ {
+ _logger.LogWarning("Failed to start UserDataTracker; listen key request failed: {Error}", lkResult.Error!.Message);
+ return lkResult;
+ }
+
+ listenKey = lkResult.Data;
+ }
+
+ var subBalanceResult = await _balanceSocketClient.SubscribeToBalanceUpdatesAsync(new SubscribeBalancesRequest(listenKey), x => HandleBalanceUpdate(UpdateSource.Push, x.Data), ct: _cts.Token).ConfigureAwait(false);
+ if (!subBalanceResult)
+ {
+ _logger.LogWarning("Failed to start UserDataTracker; failed to subscribe to balance stream: {Error}", subBalanceResult.Error!.Message);
+ return subBalanceResult;
+ }
+
+ _balanceSubscription = subBalanceResult.Data;
+ subBalanceResult.Data.SubscriptionStatusChanged += BalanceSubscriptionStatusChanged;
+
+ var subOrderResult = await _spotOrderSocketClient.SubscribeToSpotOrderUpdatesAsync(new SubscribeSpotOrderRequest(listenKey), x => HandleOrderUpdate(UpdateSource.Push, x.Data), ct: _cts.Token).ConfigureAwait(false);
+ if (!subOrderResult)
+ {
+ _cts.Cancel();
+ _logger.LogWarning("Failed to start UserDataTracker; failed to subscribe to order stream: {Error}", subOrderResult.Error!.Message);
+ return subOrderResult;
+ }
+
+ _orderSubscription = subOrderResult.Data;
+ subOrderResult.Data.SubscriptionStatusChanged += OrderSubscriptionStatusChanged;
+
+ if (_userTradeSocketClient != null)
+ {
+ var subTradeResult = await _userTradeSocketClient.SubscribeToUserTradeUpdatesAsync(new SubscribeUserTradeRequest(listenKey), x => HandleTradeUpdate(UpdateSource.Push, x.Data), ct: _cts.Token).ConfigureAwait(false);
+ if (!subOrderResult)
+ {
+ _cts.Cancel();
+ _logger.LogWarning("Failed to start UserDataTracker; failed to subscribe to trade stream: {Error}", subTradeResult.Error!.Message);
+ return subOrderResult;
+ }
+
+ _tradeSubscription = subTradeResult.Data;
+ subTradeResult.Data.SubscriptionStatusChanged += TradeSubscriptionStatusChanged;
+ }
+
+ _pollTask = PollAsync();
+ _logger.LogDebug("Started UserDataTracker");
+ return CallResult.SuccessResult;
+ }
+
+ private void UpdateSymbolsList(IEnumerable symbols)
+ {
+ lock (_symbolLock)
+ {
+ foreach (var symbol in symbols.Distinct())
+ {
+ if (!_symbols.Any(x => x.TradingMode == symbol.TradingMode && x.BaseAsset == symbol.BaseAsset && x.QuoteAsset == symbol.QuoteAsset))
+ {
+ _symbols.Add(symbol);
+ _logger.LogDebug("Adding {BaseAsset}/{QuoteAsset} to symbol tracking list", symbol.BaseAsset, symbol.QuoteAsset);
+ }
+ }
+ }
+ }
+
+ private void HandleTradeUpdate(UpdateSource source, SharedUserTrade[] @event)
+ {
+ var unknownSymbols = @event.Where(x => x.SharedSymbol == null);
+ if (unknownSymbols.Any())
+ {
+ _logger.LogWarning("Received order without SharedSymbol set, ignoring");
+ @event = @event.Except(unknownSymbols).ToArray();
+ }
+
+ if (!_onlyTrackProvidedSymbols)
+ UpdateSymbolsList(@event.Select(x => x.SharedSymbol!));
+ else
+ @event = @event.Where(x => _symbols.Any(y => y.TradingMode == x.SharedSymbol!.TradingMode && y.BaseAsset == x.SharedSymbol.BaseAsset && y.QuoteAsset == x.SharedSymbol.QuoteAsset)).ToArray();
+
+ // Update local store
+ var updatedIds = @event.Select(x => x.Id).ToList();
+ foreach (var item in @event)
+ {
+ if (_tradeStore.TryAdd(item.Id, item))
+ _logger.LogDebug("Added user trade {Symbol}.{Id}", item.Symbol, item.Id);
+ else
+ updatedIds.Remove(item.Id);
+ }
+
+ if (updatedIds.Count > 0)
+ {
+ OnTradeUpdate?.Invoke(
+ new UserDataUpdate
+ {
+ Source = source,
+ Data = _tradeStore.Where(x => updatedIds.Contains(x.Key)).Select(x => x.Value).ToArray()
+ });
+ }
+ }
+
+ private void HandleOrderUpdate(UpdateSource source, SharedSpotOrder[] @event)
+ {
+ var unknownSymbols = @event.Where(x => x.SharedSymbol == null);
+ if (unknownSymbols.Any())
+ {
+ _logger.LogWarning("Received order without SharedSymbol set, ignoring");
+ @event = @event.Except(unknownSymbols).ToArray();
+ }
+
+ if (!_onlyTrackProvidedSymbols)
+ UpdateSymbolsList(@event.Select(x => x.SharedSymbol!));
+ else
+ @event = @event.Where(x => _symbols.Any(y => y.TradingMode == x.SharedSymbol!.TradingMode && y.BaseAsset == x.SharedSymbol.BaseAsset && y.QuoteAsset == x.SharedSymbol.QuoteAsset)).ToArray();
+
+ // Update local store
+ var updatedIds = @event.Select(x => x.OrderId).ToList();
+
+ foreach (var item in @event)
+ {
+ bool orderExisted = false;
+ _orderStore.AddOrUpdate(item.OrderId, item, (id, existing) =>
+ {
+ orderExisted = true;
+ var updated = UpdateSpotOrder(existing, item);
+ if (!updated)
+ updatedIds.Remove(id);
+ else
+ _logger.LogDebug("Updated spot order {Symbol}.{Id}", item.Symbol, item.OrderId);
+
+ return existing;
+ });
+
+ if (!orderExisted)
+ _logger.LogDebug("Added spot order {Symbol}.{Id}", item.Symbol, item.OrderId);
+ }
+
+ if (updatedIds.Count > 0)
+ {
+ OnOrderUpdate?.Invoke(
+ new UserDataUpdate
+ {
+ Source = source,
+ Data = _orderStore.Where(x => updatedIds.Contains(x.Key)).Select(x => x.Value).ToArray()
+ });
+ }
+
+ var trades = @event.Where(x => x.LastTrade != null).Select(x => x.LastTrade!).ToArray();
+ if (trades.Length != 0)
+ HandleTradeUpdate(source, trades);
+ }
+
+ private void HandleBalanceUpdate(UpdateSource source, SharedBalance[] @event)
+ {
+ // Update local store
+ var updatedAssets = @event.Select(x => x.Asset).ToList();
+
+ foreach (var item in @event)
+ {
+ bool balanceExisted = false;
+ _balanceStore.AddOrUpdate(item.Asset, item, (asset, existing) =>
+ {
+ balanceExisted = true;
+ var updated = UpdateBalance(existing, item);
+ if (!updated)
+ updatedAssets.Remove(asset);
+ else
+ _logger.LogDebug("Updated balance for {Asset}", item.Asset);
+
+ return existing;
+ });
+
+ if (!balanceExisted)
+ _logger.LogDebug("Added balance for {Asset}", item.Asset);
+ }
+
+ if (updatedAssets.Count > 0)
+ {
+ OnBalanceUpdate?.Invoke(
+ new UserDataUpdate
+ {
+ Source = source,
+ Data = _balanceStore.Where(x => updatedAssets.Contains(x.Key)).Select(x => x.Value).ToArray()
+ });
+ }
+ }
+
+ private void BalanceSubscriptionStatusChanged(SubscriptionStatus newState)
+ {
+ _logger.LogDebug("Balance stream status changed: {NewState}", newState);
+ if (newState == SubscriptionStatus.Subscribed)
+ // Trigger REST polling since we weren't connected
+ _pollWaitEvent.Set();
+ }
+
+ private void OrderSubscriptionStatusChanged(SubscriptionStatus newState)
+ {
+ _logger.LogDebug("Order stream status changed: {NewState}", newState);
+
+ if (newState == SubscriptionStatus.Pending)
+ {
+ // Record last data receive time since we need to request data from that timestamp on when polling
+ // Only set to new value if it isn't already set since if we disconnect/reconnect a couple of times without
+ // managing to do a poll we don't want to override the time since we still need to request that earlier data
+
+ if (_lastDataTimeOrdersBeforeDisconnect == null)
+ _lastDataTimeOrdersBeforeDisconnect = _orderSubscription!.LastReceiveTime;
+ }
+
+ if (newState == SubscriptionStatus.Subscribed)
+ // Trigger REST polling since we weren't connected
+ _pollWaitEvent.Set();
+ }
+
+ private void TradeSubscriptionStatusChanged(SubscriptionStatus newState)
+ {
+ _logger.LogDebug("Trade stream status changed: {NewState}", newState);
+
+ if (newState == SubscriptionStatus.Pending)
+ {
+ // Record last data receive time since we need to request data from that timestamp on when polling
+ // Only set to new value if it isn't already set since if we disconnect/reconnect a couple of times without
+ // managing to do a poll we don't want to override the time since we still need to request that earlier data
+
+ if (_lastDataTimeTradesBeforeDisconnect == null)
+ _lastDataTimeTradesBeforeDisconnect = _tradeSubscription?.LastReceiveTime;
+ }
+
+ if (newState == SubscriptionStatus.Subscribed)
+ // Trigger REST polling since we weren't connected
+ _pollWaitEvent.Set();
+ }
+
+ private bool UpdateBalance(SharedBalance existingBalance, SharedBalance newBalance)
+ {
+ // Some other way to way to determine sequence? Maybe timestamp?
+ var changed = false;
+ if (existingBalance.Total != newBalance.Total)
+ {
+ existingBalance.Total = newBalance.Total;
+ changed = true;
+ }
+
+ if (existingBalance.Available != newBalance.Available)
+ {
+ existingBalance.Available = newBalance.Available;
+ changed = true;
+ }
+
+ return changed;
+ }
+
+ private bool UpdateSpotOrder(SharedSpotOrder existingOrder, SharedSpotOrder newOrder)
+ {
+ if (CheckIfOrderUpdateIsNewer(existingOrder, newOrder) == false)
+ // Update is older than the existing data, ignore
+ return false;
+
+ var changed = false;
+ if (newOrder.AveragePrice != null && newOrder.AveragePrice != existingOrder.AveragePrice)
+ {
+ existingOrder.AveragePrice = newOrder.AveragePrice;
+ changed = true;
+ }
+
+ if (newOrder.OrderPrice != null && newOrder.OrderPrice != existingOrder.OrderPrice)
+ {
+ existingOrder.OrderPrice = newOrder.OrderPrice;
+ changed = true;
+ }
+
+ if (newOrder.Fee != null && newOrder.Fee != existingOrder.Fee)
+ {
+ existingOrder.Fee = newOrder.Fee;
+ changed = true;
+ }
+
+ if (newOrder.FeeAsset != null && newOrder.FeeAsset != existingOrder.FeeAsset)
+ {
+ existingOrder.FeeAsset = newOrder.FeeAsset;
+ changed = true;
+ }
+
+ if (newOrder.OrderQuantity != null && newOrder.OrderQuantity != existingOrder.OrderQuantity)
+ {
+ existingOrder.OrderQuantity = newOrder.OrderQuantity;
+ changed = true;
+ }
+
+ if (newOrder.QuantityFilled != null && newOrder.QuantityFilled != existingOrder.QuantityFilled)
+ {
+ existingOrder.QuantityFilled = newOrder.QuantityFilled;
+ changed = true;
+ }
+
+ if (newOrder.Status != existingOrder.Status)
+ {
+ existingOrder.Status = newOrder.Status;
+ changed = true;
+ }
+
+ if (newOrder.UpdateTime != null && newOrder.UpdateTime != existingOrder.UpdateTime)
+ {
+ existingOrder.UpdateTime = newOrder.UpdateTime;
+ changed = true;
+ }
+
+ return changed;
+ }
+
+ private bool? CheckIfOrderUpdateIsNewer(SharedSpotOrder existingOrder, SharedSpotOrder newOrder)
+ {
+ if (existingOrder.Status == SharedOrderStatus.Open && newOrder.Status != SharedOrderStatus.Open)
+ // status changed from open to not open
+ return true;
+
+ if (existingOrder.Status != SharedOrderStatus.Open && newOrder.Status == SharedOrderStatus.Open)
+ // status changed from not open to open; stale
+ return false;
+
+ if (existingOrder.UpdateTime != null && newOrder.UpdateTime != null)
+ {
+ // If both have an update time base of that
+ if (existingOrder.UpdateTime < newOrder.UpdateTime)
+ return true;
+
+ if (existingOrder.UpdateTime > newOrder.UpdateTime)
+ return false;
+ }
+
+ if (existingOrder.QuantityFilled != null && newOrder.QuantityFilled != null)
+ {
+ if (existingOrder.QuantityFilled.QuantityInBaseAsset != null && newOrder.QuantityFilled.QuantityInBaseAsset != null)
+ {
+ // If base quantity is not null we can base it on that
+ if (existingOrder.QuantityFilled.QuantityInBaseAsset < newOrder.QuantityFilled.QuantityInBaseAsset)
+ return true;
+
+ else if (existingOrder.QuantityFilled.QuantityInBaseAsset > newOrder.QuantityFilled.QuantityInBaseAsset)
+ return false;
+ }
+
+ if (existingOrder.QuantityFilled.QuantityInQuoteAsset != null && newOrder.QuantityFilled.QuantityInQuoteAsset != null)
+ {
+ // If quote quantity is not null we can base it on that
+ if (existingOrder.QuantityFilled.QuantityInQuoteAsset < newOrder.QuantityFilled.QuantityInQuoteAsset)
+ return true;
+
+ else if (existingOrder.QuantityFilled.QuantityInQuoteAsset > newOrder.QuantityFilled.QuantityInQuoteAsset)
+ return false;
+ }
+ }
+
+ if (existingOrder.Fee != null && newOrder.Fee != null)
+ {
+ // Higher fee means later processing
+ if (existingOrder.Fee < newOrder.Fee)
+ return true;
+
+ if (existingOrder.Fee > newOrder.Fee)
+ return false;
+ }
+
+ return null;
+ }
+
+ private TimeSpan? GetNextPollDelay()
+ {
+ if (!_firstPollDone && _pollAtStart)
+ // First polling should be done immediately
+ return TimeSpan.Zero;
+
+ if (!Connected)
+ {
+ if (_pollIntervalDisconnected == TimeSpan.Zero)
+ // No polling interval
+ return null;
+
+ return _pollIntervalDisconnected;
+ }
+
+ if (_pollIntervalConnected == TimeSpan.Zero)
+ // No polling interval
+ return null;
+
+ // Wait for next poll
+ return _pollIntervalConnected;
+ }
+
+ private async Task PollAsync()
+ {
+ while (!_cts!.IsCancellationRequested)
+ {
+ var delayForNextPoll = GetNextPollDelay();
+ if (delayForNextPoll != TimeSpan.Zero)
+ {
+ try
+ {
+ if (delayForNextPoll != null)
+ _logger.LogTrace("Delay for next polling: {Delay}", delayForNextPoll);
+
+ await _pollWaitEvent.WaitAsync(delayForNextPoll, _cts.Token).ConfigureAwait(false);
+ }
+ catch { }
+ }
+
+ _firstPollDone = true;
+ if (_cts.IsCancellationRequested)
+ break;
+
+ if (_lastPollAttempt != null && (DateTime.UtcNow - _lastPollAttempt.Value) < TimeSpan.FromSeconds(2))
+ {
+ if (_lastPollSuccessful)
+ // If last poll was less than 2 seconds ago and it was successful don't bother immediately polling again
+ continue;
+ }
+
+ _logger.LogDebug("Starting user data requesting");
+ _lastPollAttempt = DateTime.UtcNow;
+ _lastPollSuccessful = false;
+
+ var anyError = false;
+ var balancesResult = await _balanceRestClient.GetBalancesAsync(new GetBalancesRequest()).ConfigureAwait(false);
+ if (!balancesResult.Success)
+ {
+ // .. ?
+ var transientError = balancesResult.Error!.IsTransient;
+ // If transient we can retry
+ // Should communicate errors, also for websocket disconnecting
+
+ anyError = true;
+ }
+ else
+ {
+ HandleBalanceUpdate(UpdateSource.Poll, balancesResult.Data);
+ }
+
+ var openOrdersResult = await _spotOrderRestClient.GetOpenSpotOrdersAsync(new GetOpenOrdersRequest()).ConfigureAwait(false);
+ if (!openOrdersResult.Success)
+ {
+ // .. ?
+
+ anyError = true;
+ }
+ else
+ {
+ HandleOrderUpdate(UpdateSource.Poll, openOrdersResult.Data);
+ }
+
+ foreach (var symbol in _symbols)
+ {
+ var fromTimeOrders = _lastDataTimeOrdersBeforeDisconnect ?? _lastPollTimeOrders ?? _startTime;
+ var updatedPollTime = DateTime.UtcNow;
+ var closedOrdersResult = await _spotOrderRestClient.GetClosedSpotOrdersAsync(new GetClosedOrdersRequest(symbol, startTime: fromTimeOrders)).ConfigureAwait(false);
+ if (!closedOrdersResult.Success)
+ {
+ // .. ?
+
+ anyError = true;
+ }
+ else
+ {
+ _lastDataTimeOrdersBeforeDisconnect = null;
+ _lastPollTimeOrders = updatedPollTime;
+
+ // Filter orders to only include where close time is after the start time
+ var relevantOrders = closedOrdersResult.Data.Where(x =>
+ x.UpdateTime != null && x.UpdateTime >= _startTime // Updated after the tracker start time
+ || x.CreateTime != null && x.CreateTime >= _startTime // Created after the tracker start time
+ || x.CreateTime == null && x.UpdateTime == null // Unknown time
+ ).ToArray();
+
+ if (relevantOrders.Length > 0)
+ HandleOrderUpdate(UpdateSource.Poll, relevantOrders);
+ }
+
+ var fromTimeTrades = _lastDataTimeTradesBeforeDisconnect ?? _lastPollTimeTrades ?? _startTime;
+ var tradesResult = await _spotOrderRestClient.GetSpotUserTradesAsync(new GetUserTradesRequest(symbol, startTime: fromTimeTrades)).ConfigureAwait(false);
+ if (!tradesResult.Success)
+ {
+ // .. ?
+ anyError = true;
+ }
+ else
+ {
+ _lastDataTimeTradesBeforeDisconnect = null;
+ _lastPollTimeTrades = updatedPollTime;
+
+ // Filter trades to only include where timestamp is after the start time OR it's part of an order we're tracking
+ var relevantTrades = tradesResult.Data.Where(x => x.Timestamp >= _startTime || _orderStore.ContainsKey(x.OrderId)).ToArray();
+ if (relevantTrades.Length > 0)
+ HandleTradeUpdate(UpdateSource.Poll, tradesResult.Data);
+ }
+ }
+
+ _lastPollSuccessful = !anyError;
+ _logger.LogDebug("User data requesting completed");
+ }
+ }
+
+ ///
+ public async Task StopAsync()
+ {
+ _logger.LogDebug("Stopping UserDataTracker");
+ _cts?.Cancel();
+
+ if (_pollTask != null)
+ await _pollTask.ConfigureAwait(false);
+
+ _logger.LogDebug("Stopped UserDataTracker");
+ }
+ }
+}
diff --git a/CryptoExchange.Net/Trackers/UserData/UserDataTrackerConfig.cs b/CryptoExchange.Net/Trackers/UserData/UserDataTrackerConfig.cs
new file mode 100644
index 0000000..302f378
--- /dev/null
+++ b/CryptoExchange.Net/Trackers/UserData/UserDataTrackerConfig.cs
@@ -0,0 +1,34 @@
+using CryptoExchange.Net.SharedApis;
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace CryptoExchange.Net.Trackers.UserData
+{
+ ///
+ /// User data tracker configuration
+ ///
+ public record UserDataTrackerConfig
+ {
+ ///
+ /// Symbols to initially track, used when polling data. Other symbols will get tracked when updates are received for orders or trades on a new symbol and when there are open orders on a new symbol. To only track the symbols specified here set `OnlyTrackProvidedSymbols` to true.
+ ///
+ public IEnumerable Symbols { get; set; } = [];
+ ///
+ /// If true only orders and trades in the `Symbols` options will get tracked, data on other symbols will be ignored.
+ ///
+ public bool OnlyTrackProvidedSymbols { get; set; } = false;
+ ///
+ /// Interval to poll data at as backup, even when the websocket stream is still connected.
+ ///
+ public TimeSpan PollIntervalConnected { get; set; } = TimeSpan.Zero;
+ ///
+ /// Interval to poll data at while the websocket is disconnected.
+ ///
+ public TimeSpan PollIntervalDisconnected { get; set; } = TimeSpan.Zero;
+ ///
+ /// Whether to poll for data initially when starting the tracker.
+ ///
+ public bool PollAtStart { get; set; } = true;
+ }
+}
diff --git a/CryptoExchange.Net/Trackers/UserData/UserDataUpdate.cs b/CryptoExchange.Net/Trackers/UserData/UserDataUpdate.cs
new file mode 100644
index 0000000..bde0c32
--- /dev/null
+++ b/CryptoExchange.Net/Trackers/UserData/UserDataUpdate.cs
@@ -0,0 +1,18 @@
+namespace CryptoExchange.Net.Trackers.UserData
+{
+ ///
+ /// User data update
+ ///
+ /// Data type
+ public class UserDataUpdate
+ {
+ ///
+ /// Source
+ ///
+ public UpdateSource Source { get; set; }
+ ///
+ /// Data
+ ///
+ public T Data { get; set; } = default!;
+ }
+}