diff --git a/CryptoExchange.Net/Objects/Sockets/UpdateSubscription.cs b/CryptoExchange.Net/Objects/Sockets/UpdateSubscription.cs
index cce55b6..a1e5811 100644
--- a/CryptoExchange.Net/Objects/Sockets/UpdateSubscription.cs
+++ b/CryptoExchange.Net/Objects/Sockets/UpdateSubscription.cs
@@ -119,6 +119,11 @@ namespace CryptoExchange.Net.Objects.Sockets
///
public SocketStatus Status => _connection.Status;
+ ///
+ /// The current subscription status
+ ///
+ public SubscriptionStatus SubscriptionStatus => _subscription.Status;
+
///
/// ctor
///
diff --git a/CryptoExchange.Net/SharedApis/ResponseModels/SharedPosition.cs b/CryptoExchange.Net/SharedApis/ResponseModels/SharedPosition.cs
index b251f71..de8c5fd 100644
--- a/CryptoExchange.Net/SharedApis/ResponseModels/SharedPosition.cs
+++ b/CryptoExchange.Net/SharedApis/ResponseModels/SharedPosition.cs
@@ -20,6 +20,10 @@ namespace CryptoExchange.Net.SharedApis
///
public SharedPositionSide PositionSide { get; set; }
///
+ /// Whether the position is one way mode
+ ///
+ public SharedPositionMode PositionMode { get; set; }
+ ///
/// Average open price
///
public decimal? AverageOpenPrice { get; set; }
diff --git a/CryptoExchange.Net/Trackers/UserData/IUserDataTracker.cs b/CryptoExchange.Net/Trackers/UserData/IUserDataTracker.cs
deleted file mode 100644
index 3d6c139..0000000
--- a/CryptoExchange.Net/Trackers/UserData/IUserDataTracker.cs
+++ /dev/null
@@ -1,54 +0,0 @@
-using CryptoExchange.Net.Objects;
-using CryptoExchange.Net.SharedApis;
-using System;
-using System.Threading.Tasks;
-
-namespace CryptoExchange.Net.Trackers.UserData
-{
- ///
- /// User data tracker
- ///
- public interface IUserDataTracker
- {
- ///
- /// User identifier
- ///
- public string? UserIdentifier { get; }
-
- ///
- /// Current balances
- ///
- SharedBalance[] Balances { get; }
- ///
- /// Currently tracked orders
- ///
- SharedSpotOrder[] Orders { get; }
- ///
- /// Currently tracked trades
- ///
- SharedUserTrade[] Trades { get; }
-
- ///
- /// On balance update
- ///
- event Action>? OnBalanceUpdate;
- ///
- /// On order update
- ///
- event Action>? OnOrderUpdate;
- ///
- /// On user trade update
- ///
- event Action>? OnTradeUpdate;
-
- ///
- /// Start tracking user data
- ///
- Task StartAsync();
- ///
- /// Stop tracking data
- ///
- ///
- Task StopAsync();
- }
-}
\ No newline at end of file
diff --git a/CryptoExchange.Net/Trackers/UserData/IUserFuturesDataTracker.cs b/CryptoExchange.Net/Trackers/UserData/IUserFuturesDataTracker.cs
new file mode 100644
index 0000000..30ae518
--- /dev/null
+++ b/CryptoExchange.Net/Trackers/UserData/IUserFuturesDataTracker.cs
@@ -0,0 +1,79 @@
+using CryptoExchange.Net.Objects;
+using CryptoExchange.Net.SharedApis;
+using System;
+using System.Collections.Generic;
+using System.Threading.Tasks;
+
+namespace CryptoExchange.Net.Trackers.UserData
+{
+ ///
+ /// Futures user data tracker
+ ///
+ public interface IUserFuturesDataTracker
+ {
+ ///
+ /// User identifier
+ ///
+ string? UserIdentifier { get; }
+
+ ///
+ /// Whether the tracker is currently fully connected
+ ///
+ bool Connected { get; }
+
+ ///
+ /// Currently tracked symbols. Data for these symbols will be requested when polling.
+ /// Websocket updates will be available for all symbols regardless.
+ /// When new data is received for a symbol which is not yet being tracked it will be added to this list and polled in the future unless the `OnlyTrackProvidedSymbols` option is set in the configuration.
+ ///
+ IEnumerable TrackedSymbols { get; }
+
+ ///
+ /// Current balances
+ ///
+ SharedBalance[] Balances { get; }
+ ///
+ /// Currently tracked orders
+ ///
+ SharedFuturesOrder[] Orders { get; }
+ ///
+ /// Currently tracked positions
+ ///
+ SharedPosition[] Positions { get; }
+ ///
+ /// Currently tracked trades
+ ///
+ SharedUserTrade[] Trades { get; }
+
+ ///
+ /// On connection status change. Might trigger multiple times with the same status depending on the underlying subscriptions.
+ ///
+ event Action? OnConnectedStatusChange;
+ ///
+ /// On balance update
+ ///
+ event Func, Task>? OnBalanceUpdate;
+ ///
+ /// On order update
+ ///
+ event Func, Task>? OnOrderUpdate;
+ ///
+ /// On position order update
+ ///
+ event Func, Task>? OnPositionUpdate;
+ ///
+ /// On user trade update
+ ///
+ event Func, Task>? OnTradeUpdate;
+
+ ///
+ /// Start tracking user data
+ ///
+ Task StartAsync();
+ ///
+ /// Stop tracking data
+ ///
+ ///
+ Task StopAsync();
+ }
+}
\ No newline at end of file
diff --git a/CryptoExchange.Net/Trackers/UserData/IUserSpotDataTracker.cs b/CryptoExchange.Net/Trackers/UserData/IUserSpotDataTracker.cs
new file mode 100644
index 0000000..6abc556
--- /dev/null
+++ b/CryptoExchange.Net/Trackers/UserData/IUserSpotDataTracker.cs
@@ -0,0 +1,72 @@
+using CryptoExchange.Net.Objects;
+using CryptoExchange.Net.SharedApis;
+using System;
+using System.Collections;
+using System.Collections.Generic;
+using System.Threading.Tasks;
+
+namespace CryptoExchange.Net.Trackers.UserData
+{
+ ///
+ /// User data tracker
+ ///
+ public interface IUserSpotDataTracker
+ {
+ ///
+ /// User identifier
+ ///
+ string? UserIdentifier { get; }
+
+ ///
+ /// Whether the tracker is currently fully connected
+ ///
+ bool Connected { get; }
+
+ ///
+ /// Currently tracked symbols. Data for these symbols will be requested when polling.
+ /// Websocket updates will be available for all symbols regardless.
+ /// When new data is received for a symbol which is not yet being tracked it will be added to this list and polled in the future unless the `OnlyTrackProvidedSymbols` option is set in the configuration.
+ ///
+ IEnumerable TrackedSymbols { get; }
+
+ ///
+ /// Current balances
+ ///
+ SharedBalance[] Balances { get; }
+ ///
+ /// Currently tracked orders
+ ///
+ SharedSpotOrder[] Orders { get; }
+ ///
+ /// Currently tracked trades
+ ///
+ SharedUserTrade[] Trades { get; }
+
+ ///
+ /// On connection status change. Might trigger multiple times with the same status depending on the underlying subscriptions.
+ ///
+ event Action? OnConnectedStatusChange;
+ ///
+ /// On balance update
+ ///
+ event Func, Task>? OnBalanceUpdate;
+ ///
+ /// On order update
+ ///
+ event Func, Task>? OnOrderUpdate;
+ ///
+ /// On user trade update
+ ///
+ event Func, Task>? OnTradeUpdate;
+
+ ///
+ /// Start tracking user data
+ ///
+ Task StartAsync();
+ ///
+ /// Stop tracking data
+ ///
+ ///
+ Task StopAsync();
+ }
+}
\ No newline at end of file
diff --git a/CryptoExchange.Net/Trackers/UserData/UserDataTracker.cs b/CryptoExchange.Net/Trackers/UserData/UserDataTracker.cs
index 676d0da..2481bd3 100644
--- a/CryptoExchange.Net/Trackers/UserData/UserDataTracker.cs
+++ b/CryptoExchange.Net/Trackers/UserData/UserDataTracker.cs
@@ -1,9 +1,7 @@
using CryptoExchange.Net.Objects;
-using CryptoExchange.Net.Objects.Sockets;
using CryptoExchange.Net.SharedApis;
using Microsoft.Extensions.Logging;
using System;
-using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
@@ -12,99 +10,74 @@ using System.Threading.Tasks;
namespace CryptoExchange.Net.Trackers.UserData
{
- ///
- /// User data tracker
- ///
- public abstract class UserDataTracker : IUserDataTracker
+ public abstract class UserDataTracker
{
- // Cached data
- private ConcurrentDictionary _balanceStore = new ConcurrentDictionary();
- private ConcurrentDictionary _orderStore = new ConcurrentDictionary();
- private ConcurrentDictionary _tradeStore = new ConcurrentDictionary();
-
- // Typed clients
- private readonly IListenKeyRestClient? _listenKeyRestClient;
- private readonly ISpotSymbolRestClient _spotSymbolRestClient;
- private readonly IBalanceRestClient _balanceRestClient;
- private readonly IBalanceSocketClient _balanceSocketClient;
- private readonly ISpotOrderRestClient _spotOrderRestClient;
- private readonly ISpotOrderSocketClient _spotOrderSocketClient;
- private readonly IUserTradeSocketClient? _userTradeSocketClient;
- private readonly ILogger _logger;
+ protected readonly ILogger _logger;
// State management
- private DateTime? _startTime = null;
- private DateTime? _lastPollAttempt = null;
- private bool _lastPollSuccessful = false;
- private DateTime? _lastPollTimeOrders = null;
- private DateTime? _lastPollTimeTrades = null;
- private DateTime? _lastDataTimeOrdersBeforeDisconnect = null;
- private DateTime? _lastDataTimeTradesBeforeDisconnect = null;
- private bool _firstPollDone = false;
+ protected DateTime? _startTime = null;
+ protected DateTime? _lastPollAttempt = null;
+ protected bool _lastPollSuccessful = false;
+ protected DateTime? _lastPollTimeOrders = null;
+ protected DateTime? _lastPollTimeTrades = null;
+ protected DateTime? _lastDataTimeOrdersBeforeDisconnect = null;
+ protected DateTime? _lastDataTimeTradesBeforeDisconnect = null;
+ protected bool _firstPollDone = false;
+ protected bool _wasDisconnected = false;
// Config
- private List _symbols = new List();
- private TimeSpan _pollIntervalConnected;
- private TimeSpan _pollIntervalDisconnected;
- private bool _pollAtStart;
- private bool _onlyTrackProvidedSymbols;
- private bool _trackTrades = true;
-
- // Subscriptions
- private UpdateSubscription? _balanceSubscription;
- private UpdateSubscription? _orderSubscription;
- private UpdateSubscription? _tradeSubscription;
+ protected List _symbols = new List();
+ protected TimeSpan _pollIntervalConnected;
+ protected TimeSpan _pollIntervalDisconnected;
+ protected bool _pollAtStart;
+ protected bool _onlyTrackProvidedSymbols;
+ protected bool _trackTrades = true;
- private AsyncResetEvent _pollWaitEvent = new AsyncResetEvent(false, true);
- private Task? _pollTask;
- private CancellationTokenSource? _cts;
- private object _symbolLock = new object();
+ protected AsyncResetEvent _pollWaitEvent = new AsyncResetEvent(false, true);
+ protected Task? _pollTask;
+ protected CancellationTokenSource? _cts;
+ protected object _symbolLock = new object();
- private bool Connected =>
- _balanceSubscription?.Status == Sockets.Default.SocketStatus.Connected
- && _orderSubscription?.Status == Sockets.Default.SocketStatus.Connected
- && _tradeSubscription == null || _tradeSubscription?.Status == Sockets.Default.SocketStatus.Connected;
-
- ///
- public event Action>? OnBalanceUpdate;
- ///
- public event Action>? OnOrderUpdate;
- ///
- public event Action>? OnTradeUpdate;
///
public string? UserIdentifier { get; }
///
- public SharedBalance[] Balances => _balanceStore.Values.ToArray();
- ///
- public SharedSpotOrder[] Orders => _orderStore.Values.ToArray();
- ///
- public SharedUserTrade[] Trades => _tradeStore.Values.ToArray();
+ public IEnumerable TrackedSymbols => _symbols.AsEnumerable();
- ///
- /// ctor
- ///
- protected UserDataTracker(
- ILogger logger,
- ISharedClient restClient,
- ISharedClient socketClient,
- string? userIdentifier,
- UserDataTrackerConfig config
- )
+ private bool _connected;
+ ///
+ public bool Connected
{
+ get => _connected;
+ protected set
+ {
+ if (_connected == value)
+ return;
+
+ _connected = value;
+ if (!_connected)
+ _wasDisconnected = true;
+ else
+ _pollWaitEvent.Set();
+
+ InvokeConnectedStatusChanged();
+ }
+ }
+
+ ///
+ public event Action? OnConnectedStatusChange;
+
+ public UserDataTracker(ILogger logger, UserDataTrackerConfig config, string? userIdentifier)
+ {
+ if (config.OnlyTrackProvidedSymbols && !config.TrackedSymbols.Any())
+ throw new ArgumentException(nameof(config.TrackedSymbols), "Conflicting options; `OnlyTrackProvidedSymbols` but no symbols specific in `TrackedSymbols`");
+
_logger = logger;
- _spotSymbolRestClient = (ISpotSymbolRestClient)restClient;
- _balanceRestClient = (IBalanceRestClient)restClient;
- _balanceSocketClient = (IBalanceSocketClient)socketClient;
- _spotOrderRestClient = (ISpotOrderRestClient)restClient;
- _spotOrderSocketClient = (ISpotOrderSocketClient)socketClient;
- _listenKeyRestClient = restClient as IListenKeyRestClient;
- _userTradeSocketClient = socketClient as IUserTradeSocketClient;
_pollIntervalConnected = config.PollIntervalConnected;
_pollIntervalDisconnected = config.PollIntervalDisconnected;
- _symbols = config.Symbols?.ToList() ?? [];
+ _symbols = config.TrackedSymbols?.ToList() ?? [];
_onlyTrackProvidedSymbols = config.OnlyTrackProvidedSymbols;
_pollAtStart = config.PollAtStart;
_trackTrades = config.TrackTrades;
@@ -112,388 +85,38 @@ namespace CryptoExchange.Net.Trackers.UserData
UserIdentifier = userIdentifier;
}
- ///
+ protected void InvokeConnectedStatusChanged()
+ {
+ OnConnectedStatusChange?.Invoke(Connected);
+ }
+
public async Task StartAsync()
{
_startTime = DateTime.UtcNow;
-
- _logger.LogDebug("Starting UserDataTracker");
_cts = new CancellationTokenSource();
- // Request symbols so SharedSymbol property can be filled on updates
- var symbolResult = await _spotSymbolRestClient.GetSpotSymbolsAsync(new GetSymbolsRequest()).ConfigureAwait(false);
- if (!symbolResult)
- {
- _logger.LogWarning("Failed to start UserDataTracker; symbols request failed: {Error}", symbolResult.Error!.Message);
- return symbolResult;
- }
+ var start = await DoStartAsync().ConfigureAwait(false);
+ if (!start)
+ return start;
- string? listenKey = null;
- if (_listenKeyRestClient != null)
- {
- var lkResult = await _listenKeyRestClient.StartListenKeyAsync(new StartListenKeyRequest()).ConfigureAwait(false);
- if (!lkResult)
- {
- _logger.LogWarning("Failed to start UserDataTracker; listen key request failed: {Error}", lkResult.Error!.Message);
- return lkResult;
- }
-
- listenKey = lkResult.Data;
- }
-
- var subBalanceResult = await _balanceSocketClient.SubscribeToBalanceUpdatesAsync(new SubscribeBalancesRequest(listenKey), x => HandleBalanceUpdate(UpdateSource.Push, x.Data), ct: _cts.Token).ConfigureAwait(false);
- if (!subBalanceResult)
- {
- _logger.LogWarning("Failed to start UserDataTracker; failed to subscribe to balance stream: {Error}", subBalanceResult.Error!.Message);
- return subBalanceResult;
- }
-
- _balanceSubscription = subBalanceResult.Data;
- subBalanceResult.Data.SubscriptionStatusChanged += BalanceSubscriptionStatusChanged;
-
- var subOrderResult = await _spotOrderSocketClient.SubscribeToSpotOrderUpdatesAsync(new SubscribeSpotOrderRequest(listenKey), x => HandleOrderUpdate(UpdateSource.Push, x.Data), ct: _cts.Token).ConfigureAwait(false);
- if (!subOrderResult)
- {
- _cts.Cancel();
- _logger.LogWarning("Failed to start UserDataTracker; failed to subscribe to order stream: {Error}", subOrderResult.Error!.Message);
- return subOrderResult;
- }
-
- _orderSubscription = subOrderResult.Data;
- subOrderResult.Data.SubscriptionStatusChanged += OrderSubscriptionStatusChanged;
-
- if (_userTradeSocketClient != null && _trackTrades)
- {
- var subTradeResult = await _userTradeSocketClient.SubscribeToUserTradeUpdatesAsync(new SubscribeUserTradeRequest(listenKey), x => HandleTradeUpdate(UpdateSource.Push, x.Data), ct: _cts.Token).ConfigureAwait(false);
- if (!subOrderResult)
- {
- _cts.Cancel();
- _logger.LogWarning("Failed to start UserDataTracker; failed to subscribe to trade stream: {Error}", subTradeResult.Error!.Message);
- return subOrderResult;
- }
-
- _tradeSubscription = subTradeResult.Data;
- subTradeResult.Data.SubscriptionStatusChanged += TradeSubscriptionStatusChanged;
- }
+ Connected = true;
_pollTask = PollAsync();
- _logger.LogDebug("Started UserDataTracker");
return CallResult.SuccessResult;
}
- private void UpdateSymbolsList(IEnumerable symbols)
+ protected abstract Task DoStartAsync();
+
+ ///
+ public async Task StopAsync()
{
- lock (_symbolLock)
- {
- foreach (var symbol in symbols.Distinct())
- {
- if (!_symbols.Any(x => x.TradingMode == symbol.TradingMode && x.BaseAsset == symbol.BaseAsset && x.QuoteAsset == symbol.QuoteAsset))
- {
- _symbols.Add(symbol);
- _logger.LogDebug("Adding {BaseAsset}/{QuoteAsset} to symbol tracking list", symbol.BaseAsset, symbol.QuoteAsset);
- }
- }
- }
- }
+ _logger.LogDebug("Stopping UserDataTracker");
+ _cts?.Cancel();
- private void HandleTradeUpdate(UpdateSource source, SharedUserTrade[] @event)
- {
- var unknownSymbols = @event.Where(x => x.SharedSymbol == null);
- if (unknownSymbols.Any())
- {
- _logger.LogWarning("Received order without SharedSymbol set, ignoring");
- @event = @event.Except(unknownSymbols).ToArray();
- }
+ if (_pollTask != null)
+ await _pollTask.ConfigureAwait(false);
- if (!_onlyTrackProvidedSymbols)
- UpdateSymbolsList(@event.Select(x => x.SharedSymbol!));
- else
- @event = @event.Where(x => _symbols.Any(y => y.TradingMode == x.SharedSymbol!.TradingMode && y.BaseAsset == x.SharedSymbol.BaseAsset && y.QuoteAsset == x.SharedSymbol.QuoteAsset)).ToArray();
-
- // Update local store
- var updatedIds = @event.Select(x => x.Id).ToList();
- foreach (var item in @event)
- {
- if (_tradeStore.TryAdd(item.Id, item))
- _logger.LogDebug("Added user trade {Symbol}.{Id}", item.Symbol, item.Id);
- else
- updatedIds.Remove(item.Id);
- }
-
- if (updatedIds.Count > 0)
- {
- OnTradeUpdate?.Invoke(
- new UserDataUpdate
- {
- Source = source,
- Data = _tradeStore.Where(x => updatedIds.Contains(x.Key)).Select(x => x.Value).ToArray()
- });
- }
- }
-
- private void HandleOrderUpdate(UpdateSource source, SharedSpotOrder[] @event)
- {
- var unknownSymbols = @event.Where(x => x.SharedSymbol == null);
- if (unknownSymbols.Any())
- {
- _logger.LogWarning("Received order without SharedSymbol set, ignoring");
- @event = @event.Except(unknownSymbols).ToArray();
- }
-
- if (!_onlyTrackProvidedSymbols)
- UpdateSymbolsList(@event.Select(x => x.SharedSymbol!));
- else
- @event = @event.Where(x => _symbols.Any(y => y.TradingMode == x.SharedSymbol!.TradingMode && y.BaseAsset == x.SharedSymbol.BaseAsset && y.QuoteAsset == x.SharedSymbol.QuoteAsset)).ToArray();
-
- // Update local store
- var updatedIds = @event.Select(x => x.OrderId).ToList();
-
- foreach (var item in @event)
- {
- bool orderExisted = false;
- _orderStore.AddOrUpdate(item.OrderId, item, (id, existing) =>
- {
- orderExisted = true;
- var updated = UpdateSpotOrder(existing, item);
- if (!updated)
- updatedIds.Remove(id);
- else
- _logger.LogDebug("Updated spot order {Symbol}.{Id}", item.Symbol, item.OrderId);
-
- return existing;
- });
-
- if (!orderExisted)
- _logger.LogDebug("Added spot order {Symbol}.{Id}", item.Symbol, item.OrderId);
- }
-
- if (updatedIds.Count > 0)
- {
- OnOrderUpdate?.Invoke(
- new UserDataUpdate
- {
- Source = source,
- Data = _orderStore.Where(x => updatedIds.Contains(x.Key)).Select(x => x.Value).ToArray()
- });
- }
-
- var trades = @event.Where(x => x.LastTrade != null).Select(x => x.LastTrade!).ToArray();
- if (trades.Length != 0)
- HandleTradeUpdate(source, trades);
- }
-
- private void HandleBalanceUpdate(UpdateSource source, SharedBalance[] @event)
- {
- // Update local store
- var updatedAssets = @event.Select(x => x.Asset).ToList();
-
- foreach (var item in @event)
- {
- bool balanceExisted = false;
- _balanceStore.AddOrUpdate(item.Asset, item, (asset, existing) =>
- {
- balanceExisted = true;
- var updated = UpdateBalance(existing, item);
- if (!updated)
- updatedAssets.Remove(asset);
- else
- _logger.LogDebug("Updated balance for {Asset}", item.Asset);
-
- return existing;
- });
-
- if (!balanceExisted)
- _logger.LogDebug("Added balance for {Asset}", item.Asset);
- }
-
- if (updatedAssets.Count > 0)
- {
- OnBalanceUpdate?.Invoke(
- new UserDataUpdate
- {
- Source = source,
- Data = _balanceStore.Where(x => updatedAssets.Contains(x.Key)).Select(x => x.Value).ToArray()
- });
- }
- }
-
- private void BalanceSubscriptionStatusChanged(SubscriptionStatus newState)
- {
- _logger.LogDebug("Balance stream status changed: {NewState}", newState);
- if (newState == SubscriptionStatus.Subscribed)
- // Trigger REST polling since we weren't connected
- _pollWaitEvent.Set();
- }
-
- private void OrderSubscriptionStatusChanged(SubscriptionStatus newState)
- {
- _logger.LogDebug("Order stream status changed: {NewState}", newState);
-
- if (newState == SubscriptionStatus.Pending)
- {
- // Record last data receive time since we need to request data from that timestamp on when polling
- // Only set to new value if it isn't already set since if we disconnect/reconnect a couple of times without
- // managing to do a poll we don't want to override the time since we still need to request that earlier data
-
- if (_lastDataTimeOrdersBeforeDisconnect == null)
- _lastDataTimeOrdersBeforeDisconnect = _orderSubscription!.LastReceiveTime;
- }
-
- if (newState == SubscriptionStatus.Subscribed)
- // Trigger REST polling since we weren't connected
- _pollWaitEvent.Set();
- }
-
- private void TradeSubscriptionStatusChanged(SubscriptionStatus newState)
- {
- _logger.LogDebug("Trade stream status changed: {NewState}", newState);
-
- if (newState == SubscriptionStatus.Pending)
- {
- // Record last data receive time since we need to request data from that timestamp on when polling
- // Only set to new value if it isn't already set since if we disconnect/reconnect a couple of times without
- // managing to do a poll we don't want to override the time since we still need to request that earlier data
-
- if (_lastDataTimeTradesBeforeDisconnect == null)
- _lastDataTimeTradesBeforeDisconnect = _tradeSubscription?.LastReceiveTime;
- }
-
- if (newState == SubscriptionStatus.Subscribed)
- // Trigger REST polling since we weren't connected
- _pollWaitEvent.Set();
- }
-
- private bool UpdateBalance(SharedBalance existingBalance, SharedBalance newBalance)
- {
- // Some other way to way to determine sequence? Maybe timestamp?
- var changed = false;
- if (existingBalance.Total != newBalance.Total)
- {
- existingBalance.Total = newBalance.Total;
- changed = true;
- }
-
- if (existingBalance.Available != newBalance.Available)
- {
- existingBalance.Available = newBalance.Available;
- changed = true;
- }
-
- return changed;
- }
-
- private bool UpdateSpotOrder(SharedSpotOrder existingOrder, SharedSpotOrder newOrder)
- {
- if (CheckIfOrderUpdateIsNewer(existingOrder, newOrder) == false)
- // Update is older than the existing data, ignore
- return false;
-
- var changed = false;
- if (newOrder.AveragePrice != null && newOrder.AveragePrice != existingOrder.AveragePrice)
- {
- existingOrder.AveragePrice = newOrder.AveragePrice;
- changed = true;
- }
-
- if (newOrder.OrderPrice != null && newOrder.OrderPrice != existingOrder.OrderPrice)
- {
- existingOrder.OrderPrice = newOrder.OrderPrice;
- changed = true;
- }
-
- if (newOrder.Fee != null && newOrder.Fee != existingOrder.Fee)
- {
- existingOrder.Fee = newOrder.Fee;
- changed = true;
- }
-
- if (newOrder.FeeAsset != null && newOrder.FeeAsset != existingOrder.FeeAsset)
- {
- existingOrder.FeeAsset = newOrder.FeeAsset;
- changed = true;
- }
-
- if (newOrder.OrderQuantity != null && newOrder.OrderQuantity != existingOrder.OrderQuantity)
- {
- existingOrder.OrderQuantity = newOrder.OrderQuantity;
- changed = true;
- }
-
- if (newOrder.QuantityFilled != null && newOrder.QuantityFilled != existingOrder.QuantityFilled)
- {
- existingOrder.QuantityFilled = newOrder.QuantityFilled;
- changed = true;
- }
-
- if (newOrder.Status != existingOrder.Status)
- {
- existingOrder.Status = newOrder.Status;
- changed = true;
- }
-
- if (newOrder.UpdateTime != null && newOrder.UpdateTime != existingOrder.UpdateTime)
- {
- existingOrder.UpdateTime = newOrder.UpdateTime;
- changed = true;
- }
-
- return changed;
- }
-
- private bool? CheckIfOrderUpdateIsNewer(SharedSpotOrder existingOrder, SharedSpotOrder newOrder)
- {
- if (existingOrder.Status == SharedOrderStatus.Open && newOrder.Status != SharedOrderStatus.Open)
- // status changed from open to not open
- return true;
-
- if (existingOrder.Status != SharedOrderStatus.Open && newOrder.Status == SharedOrderStatus.Open)
- // status changed from not open to open; stale
- return false;
-
- if (existingOrder.UpdateTime != null && newOrder.UpdateTime != null)
- {
- // If both have an update time base of that
- if (existingOrder.UpdateTime < newOrder.UpdateTime)
- return true;
-
- if (existingOrder.UpdateTime > newOrder.UpdateTime)
- return false;
- }
-
- if (existingOrder.QuantityFilled != null && newOrder.QuantityFilled != null)
- {
- if (existingOrder.QuantityFilled.QuantityInBaseAsset != null && newOrder.QuantityFilled.QuantityInBaseAsset != null)
- {
- // If base quantity is not null we can base it on that
- if (existingOrder.QuantityFilled.QuantityInBaseAsset < newOrder.QuantityFilled.QuantityInBaseAsset)
- return true;
-
- else if (existingOrder.QuantityFilled.QuantityInBaseAsset > newOrder.QuantityFilled.QuantityInBaseAsset)
- return false;
- }
-
- if (existingOrder.QuantityFilled.QuantityInQuoteAsset != null && newOrder.QuantityFilled.QuantityInQuoteAsset != null)
- {
- // If quote quantity is not null we can base it on that
- if (existingOrder.QuantityFilled.QuantityInQuoteAsset < newOrder.QuantityFilled.QuantityInQuoteAsset)
- return true;
-
- else if (existingOrder.QuantityFilled.QuantityInQuoteAsset > newOrder.QuantityFilled.QuantityInQuoteAsset)
- return false;
- }
- }
-
- if (existingOrder.Fee != null && newOrder.Fee != null)
- {
- // Higher fee means later processing
- if (existingOrder.Fee < newOrder.Fee)
- return true;
-
- if (existingOrder.Fee > newOrder.Fee)
- return false;
- }
-
- return null;
+ _logger.LogDebug("Stopped UserDataTracker");
}
private TimeSpan? GetNextPollDelay()
@@ -519,11 +142,11 @@ namespace CryptoExchange.Net.Trackers.UserData
return _pollIntervalConnected;
}
- private async Task PollAsync()
+ public async Task PollAsync()
{
while (!_cts!.IsCancellationRequested)
{
- var delayForNextPoll = GetNextPollDelay();
+ var delayForNextPoll = GetNextPollDelay();
if (delayForNextPoll != TimeSpan.Zero)
{
try
@@ -540,109 +163,49 @@ namespace CryptoExchange.Net.Trackers.UserData
if (_cts.IsCancellationRequested)
break;
- if (_lastPollAttempt != null && (DateTime.UtcNow - _lastPollAttempt.Value) < TimeSpan.FromSeconds(2))
+ if (_lastPollAttempt != null
+ && (DateTime.UtcNow - _lastPollAttempt.Value) < TimeSpan.FromSeconds(2)
+ && !(Connected && _wasDisconnected))
{
if (_lastPollSuccessful)
// If last poll was less than 2 seconds ago and it was successful don't bother immediately polling again
continue;
}
- _logger.LogDebug("Starting user data requesting");
- _lastPollAttempt = DateTime.UtcNow;
+ if (Connected)
+ _wasDisconnected = false;
+
_lastPollSuccessful = false;
- var anyError = false;
- var balancesResult = await _balanceRestClient.GetBalancesAsync(new GetBalancesRequest()).ConfigureAwait(false);
- if (!balancesResult.Success)
+ try
{
- // .. ?
- var transientError = balancesResult.Error!.IsTransient;
- // If transient we can retry
- // Should communicate errors, also for websocket disconnecting
+ var anyError = await DoPollAsync().ConfigureAwait(false);
- anyError = true;
+ _lastPollAttempt = DateTime.UtcNow;
+ _lastPollSuccessful = !anyError;
}
- else
+ catch (Exception ex)
{
- HandleBalanceUpdate(UpdateSource.Poll, balancesResult.Data);
+ _logger.LogError(ex, "UserDataTracker polling exception");
}
-
- var openOrdersResult = await _spotOrderRestClient.GetOpenSpotOrdersAsync(new GetOpenOrdersRequest()).ConfigureAwait(false);
- if (!openOrdersResult.Success)
- {
- // .. ?
-
- anyError = true;
- }
- else
- {
- HandleOrderUpdate(UpdateSource.Poll, openOrdersResult.Data);
- }
-
- foreach (var symbol in _symbols)
- {
- var fromTimeOrders = _lastDataTimeOrdersBeforeDisconnect ?? _lastPollTimeOrders ?? _startTime;
- var updatedPollTime = DateTime.UtcNow;
- var closedOrdersResult = await _spotOrderRestClient.GetClosedSpotOrdersAsync(new GetClosedOrdersRequest(symbol, startTime: fromTimeOrders)).ConfigureAwait(false);
- if (!closedOrdersResult.Success)
- {
- // .. ?
-
- anyError = true;
- }
- else
- {
- _lastDataTimeOrdersBeforeDisconnect = null;
- _lastPollTimeOrders = updatedPollTime;
-
- // Filter orders to only include where close time is after the start time
- var relevantOrders = closedOrdersResult.Data.Where(x =>
- x.UpdateTime != null && x.UpdateTime >= _startTime // Updated after the tracker start time
- || x.CreateTime != null && x.CreateTime >= _startTime // Created after the tracker start time
- || x.CreateTime == null && x.UpdateTime == null // Unknown time
- ).ToArray();
-
- if (relevantOrders.Length > 0)
- HandleOrderUpdate(UpdateSource.Poll, relevantOrders);
- }
-
- if (_trackTrades)
- {
- var fromTimeTrades = _lastDataTimeTradesBeforeDisconnect ?? _lastPollTimeTrades ?? _startTime;
- var tradesResult = await _spotOrderRestClient.GetSpotUserTradesAsync(new GetUserTradesRequest(symbol, startTime: fromTimeTrades)).ConfigureAwait(false);
- if (!tradesResult.Success)
- {
- // .. ?
- anyError = true;
- }
- else
- {
- _lastDataTimeTradesBeforeDisconnect = null;
- _lastPollTimeTrades = updatedPollTime;
-
- // Filter trades to only include where timestamp is after the start time OR it's part of an order we're tracking
- var relevantTrades = tradesResult.Data.Where(x => x.Timestamp >= _startTime || _orderStore.ContainsKey(x.OrderId)).ToArray();
- if (relevantTrades.Length > 0)
- HandleTradeUpdate(UpdateSource.Poll, tradesResult.Data);
- }
- }
- }
-
- _lastPollSuccessful = !anyError;
- _logger.LogDebug("User data requesting completed");
}
}
- ///
- public async Task StopAsync()
+ protected abstract Task DoPollAsync();
+
+ protected void UpdateSymbolsList(IEnumerable symbols)
{
- _logger.LogDebug("Stopping UserDataTracker");
- _cts?.Cancel();
-
- if (_pollTask != null)
- await _pollTask.ConfigureAwait(false);
-
- _logger.LogDebug("Stopped UserDataTracker");
+ lock (_symbolLock)
+ {
+ foreach (var symbol in symbols.Distinct())
+ {
+ if (!_symbols.Any(x => x.TradingMode == symbol.TradingMode && x.BaseAsset == symbol.BaseAsset && x.QuoteAsset == symbol.QuoteAsset))
+ {
+ _symbols.Add(symbol);
+ _logger.LogDebug("Adding {BaseAsset}/{QuoteAsset} to symbol tracking list", symbol.BaseAsset, symbol.QuoteAsset);
+ }
+ }
+ }
}
}
}
diff --git a/CryptoExchange.Net/Trackers/UserData/UserDataTrackerConfig.cs b/CryptoExchange.Net/Trackers/UserData/UserDataTrackerConfig.cs
index 5b998a0..8e440d2 100644
--- a/CryptoExchange.Net/Trackers/UserData/UserDataTrackerConfig.cs
+++ b/CryptoExchange.Net/Trackers/UserData/UserDataTrackerConfig.cs
@@ -11,9 +11,9 @@ namespace CryptoExchange.Net.Trackers.UserData
public record UserDataTrackerConfig
{
///
- /// Symbols to initially track, used when polling data. Other symbols will get tracked when updates are received for orders or trades on a new symbol and when there are open orders on a new symbol. To only track the symbols specified here set `OnlyTrackProvidedSymbols` to true.
+ /// Symbols to initially track, used when polling data. Other symbols will get tracked when updates are received for orders or trades on a new symbol and when there are open orders or positions on a new symbol. To only track the symbols specified here set `OnlyTrackProvidedSymbols` to true.
///
- public IEnumerable Symbols { get; set; } = [];
+ public IEnumerable TrackedSymbols { get; set; } = [];
///
/// If true only orders and trades in the `Symbols` options will get tracked, data on other symbols will be ignored.
///
@@ -25,13 +25,13 @@ namespace CryptoExchange.Net.Trackers.UserData
///
/// Interval to poll data at while the websocket is disconnected.
///
- public TimeSpan PollIntervalDisconnected { get; set; } = TimeSpan.Zero;
+ public TimeSpan PollIntervalDisconnected { get; set; } = TimeSpan.FromSeconds(30);
///
/// Whether to poll for data initially when starting the tracker.
///
public bool PollAtStart { get; set; } = true;
///
- /// Whether to track order trades, can lead to increased requests when polling
+ /// Whether to track order trades, can lead to increased requests when polling since they're requested per symbol.
///
public bool TrackTrades { get; set; } = true;
}
diff --git a/CryptoExchange.Net/Trackers/UserData/UserFuturesDataTracker.cs b/CryptoExchange.Net/Trackers/UserData/UserFuturesDataTracker.cs
new file mode 100644
index 0000000..8b6e93b
--- /dev/null
+++ b/CryptoExchange.Net/Trackers/UserData/UserFuturesDataTracker.cs
@@ -0,0 +1,712 @@
+using CryptoExchange.Net.Objects;
+using CryptoExchange.Net.Objects.Sockets;
+using CryptoExchange.Net.SharedApis;
+using Microsoft.Extensions.Logging;
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace CryptoExchange.Net.Trackers.UserData
+{
+ ///
+ /// User data tracker
+ ///
+ public abstract class UserFuturesDataTracker : UserDataTracker, IUserFuturesDataTracker
+ {
+ // Cached data
+ private ConcurrentDictionary _balanceStore = new ConcurrentDictionary();
+ private ConcurrentDictionary _positionStore = new ConcurrentDictionary();
+ private ConcurrentDictionary _orderStore = new ConcurrentDictionary();
+ private ConcurrentDictionary _tradeStore = new ConcurrentDictionary();
+
+ // Typed clients
+ private readonly IListenKeyRestClient? _listenKeyRestClient;
+ private readonly IFuturesSymbolRestClient _futuresSymbolRestClient;
+ private readonly IBalanceRestClient _balanceRestClient;
+ private readonly IBalanceSocketClient _balanceSocketClient;
+ private readonly IFuturesOrderRestClient _futuresOrderRestClient;
+ private readonly IFuturesOrderSocketClient _futuresOrderSocketClient;
+ private readonly IPositionSocketClient _positionSocketClient;
+ private readonly IUserTradeSocketClient? _userTradeSocketClient;
+
+ // Subscriptions
+ private UpdateSubscription? _balanceSubscription;
+ private UpdateSubscription? _orderSubscription;
+ private UpdateSubscription? _positionSubscription;
+ private UpdateSubscription? _tradeSubscription;
+
+ private ExchangeParameters? _exchangeParameters;
+
+ ///
+ /// Whether updates received from the websocket for positions are full snapshots, or individual updates for positions.
+ /// This is used to determine if positions should be removed if no longer present in the update data.
+ ///
+ protected abstract bool WebsocketPositionUpdatesAreFullSnapshots { get; }
+
+ ///
+ public event Func, Task>? OnBalanceUpdate;
+ ///
+ public event Func, Task>? OnOrderUpdate;
+ ///
+ public event Func, Task>? OnPositionUpdate;
+ ///
+ public event Func, Task>? OnTradeUpdate;
+
+ ///
+ public SharedBalance[] Balances => _balanceStore.Values.ToArray();
+ ///
+ public SharedFuturesOrder[] Orders => _orderStore.Values.ToArray();
+ ///
+ public SharedPosition[] Positions => _positionStore.Values.ToArray();
+ ///
+ public SharedUserTrade[] Trades => _tradeStore.Values.ToArray();
+
+ ///
+ /// ctor
+ ///
+ protected UserFuturesDataTracker(
+ ILogger logger,
+ ISharedClient restClient,
+ ISharedClient socketClient,
+ string? userIdentifier,
+ UserDataTrackerConfig config,
+ ExchangeParameters? exchangeParameters = null) : base(logger, config, userIdentifier)
+ {
+ _exchangeParameters = exchangeParameters;
+ _futuresSymbolRestClient = (IFuturesSymbolRestClient)restClient;
+ _balanceRestClient = (IBalanceRestClient)restClient;
+ _balanceSocketClient = (IBalanceSocketClient)socketClient;
+ _futuresOrderRestClient = (IFuturesOrderRestClient)restClient;
+ _futuresOrderSocketClient = (IFuturesOrderSocketClient)socketClient;
+ _positionSocketClient = (IPositionSocketClient)socketClient;
+ _listenKeyRestClient = restClient as IListenKeyRestClient;
+ _userTradeSocketClient = socketClient as IUserTradeSocketClient;
+ }
+
+ ///
+ protected override async Task DoStartAsync()
+ {
+ _logger.LogDebug("Starting UserDataTracker");
+
+ // Request symbols so SharedSymbol property can be filled on updates
+ var symbolResult = await _futuresSymbolRestClient.GetFuturesSymbolsAsync(new GetSymbolsRequest(exchangeParameters: _exchangeParameters)).ConfigureAwait(false);
+ if (!symbolResult)
+ {
+ _logger.LogWarning("Failed to start UserFuturesDataTracker; symbols request failed: {Error}", symbolResult.Error!.Message);
+ return symbolResult;
+ }
+
+ string? listenKey = null;
+ if (_listenKeyRestClient != null)
+ {
+ var lkResult = await _listenKeyRestClient.StartListenKeyAsync(new StartListenKeyRequest(exchangeParameters: _exchangeParameters)).ConfigureAwait(false);
+ if (!lkResult)
+ {
+ _logger.LogWarning("Failed to start UserFuturesDataTracker; listen key request failed: {Error}", lkResult.Error!.Message);
+ return lkResult;
+ }
+
+ listenKey = lkResult.Data;
+ }
+
+ var subBalanceResult = await ExchangeHelpers.ProcessQueuedAsync(
+ async handler => await _balanceSocketClient.SubscribeToBalanceUpdatesAsync(new SubscribeBalancesRequest(listenKey, exchangeParameters: _exchangeParameters), handler, ct: _cts!.Token).ConfigureAwait(false),
+ x => HandleBalanceUpdateAsync(UpdateSource.Push, x.Data)).ConfigureAwait(false);
+ if (!subBalanceResult)
+ {
+ _logger.LogWarning("Failed to start UserFuturesDataTracker; failed to subscribe to balance stream: {Error}", subBalanceResult.Error!.Message);
+ return subBalanceResult;
+ }
+
+ _balanceSubscription = subBalanceResult.Data;
+ subBalanceResult.Data.SubscriptionStatusChanged += BalanceSubscriptionStatusChanged;
+
+ var subOrderResult = await ExchangeHelpers.ProcessQueuedAsync(
+ async handler => await _futuresOrderSocketClient.SubscribeToFuturesOrderUpdatesAsync(new SubscribeFuturesOrderRequest(listenKey, exchangeParameters: _exchangeParameters), handler, ct: _cts!.Token).ConfigureAwait(false),
+ x => HandleOrderUpdateAsync(UpdateSource.Push, x.Data)).ConfigureAwait(false);
+ if (!subOrderResult)
+ {
+ _cts!.Cancel();
+ _logger.LogWarning("Failed to start UserFuturesDataTracker; failed to subscribe to order stream: {Error}", subOrderResult.Error!.Message);
+ return subOrderResult;
+ }
+
+ _orderSubscription = subOrderResult.Data;
+ subOrderResult.Data.SubscriptionStatusChanged += OrderSubscriptionStatusChanged;
+
+ if (_positionSocketClient != null)
+ {
+ var subPositionResult = await ExchangeHelpers.ProcessQueuedAsync(
+ async handler => await _positionSocketClient.SubscribeToPositionUpdatesAsync(new SubscribePositionRequest(listenKey, exchangeParameters: _exchangeParameters), handler, ct: _cts!.Token).ConfigureAwait(false),
+ x => HandlePositionUpdateAsync(UpdateSource.Push, x.Data)).ConfigureAwait(false);
+ if (!subPositionResult)
+ {
+ _cts!.Cancel();
+ _logger.LogWarning("Failed to start UserFuturesDataTracker; failed to subscribe to trade stream: {Error}", subPositionResult.Error!.Message);
+ return subPositionResult;
+ }
+
+ _positionSubscription = subPositionResult.Data;
+ subPositionResult.Data.SubscriptionStatusChanged += PositionSubscriptionStatusChanged;
+ }
+
+ if (_userTradeSocketClient != null && _trackTrades)
+ {
+ var subTradeResult = await ExchangeHelpers.ProcessQueuedAsync(
+ async handler => await _userTradeSocketClient.SubscribeToUserTradeUpdatesAsync(new SubscribeUserTradeRequest(listenKey, exchangeParameters: _exchangeParameters), handler, ct: _cts!.Token).ConfigureAwait(false),
+ x => HandleTradeUpdateAsync(UpdateSource.Push, x.Data)).ConfigureAwait(false);
+ if (!subOrderResult)
+ {
+ _cts.Cancel();
+ _logger.LogWarning("Failed to start UserFuturesDataTracker; failed to subscribe to trade stream: {Error}", subTradeResult.Error!.Message);
+ return subOrderResult;
+ }
+
+ _tradeSubscription = subTradeResult.Data;
+ subTradeResult.Data.SubscriptionStatusChanged += TradeSubscriptionStatusChanged;
+ }
+
+ _logger.LogDebug("Started UserFuturesDataTracker");
+ return CallResult.SuccessResult;
+ }
+
+ private async Task HandleTradeUpdateAsync(UpdateSource source, SharedUserTrade[] @event)
+ {
+ var unknownSymbols = @event.Where(x => x.SharedSymbol == null);
+ if (unknownSymbols.Any())
+ {
+ _logger.LogWarning("Received order without SharedSymbol set, ignoring");
+ @event = @event.Except(unknownSymbols).ToArray();
+ }
+
+ if (!_onlyTrackProvidedSymbols)
+ UpdateSymbolsList(@event.Select(x => x.SharedSymbol!));
+ else
+ @event = @event.Where(x => _symbols.Any(y => y.TradingMode == x.SharedSymbol!.TradingMode && y.BaseAsset == x.SharedSymbol.BaseAsset && y.QuoteAsset == x.SharedSymbol.QuoteAsset)).ToArray();
+
+ // Update local store
+ var updatedIds = @event.Select(x => x.Id).ToList();
+ foreach (var item in @event)
+ {
+ if (_tradeStore.TryAdd(item.Id, item))
+ _logger.LogDebug("Added user trade {Symbol}.{Id}", item.Symbol, item.Id);
+ else
+ updatedIds.Remove(item.Id);
+ }
+
+ if (updatedIds.Count > 0 && OnTradeUpdate != null)
+ {
+ await OnTradeUpdate.Invoke(
+ new UserDataUpdate
+ {
+ Source = source,
+ Data = _tradeStore.Where(x => updatedIds.Contains(x.Key)).Select(x => x.Value).ToArray()
+ }).ConfigureAwait(false);
+ }
+ }
+
+ private async Task HandleOrderUpdateAsync(UpdateSource source, SharedFuturesOrder[] @event)
+ {
+ var unknownSymbols = @event.Where(x => x.SharedSymbol == null);
+ if (unknownSymbols.Any())
+ {
+ _logger.LogWarning("Received order without SharedSymbol set, ignoring");
+ @event = @event.Except(unknownSymbols).ToArray();
+ }
+
+ if (!_onlyTrackProvidedSymbols)
+ UpdateSymbolsList(@event.Select(x => x.SharedSymbol!));
+ else
+ @event = @event.Where(x => _symbols.Any(y => y.TradingMode == x.SharedSymbol!.TradingMode && y.BaseAsset == x.SharedSymbol.BaseAsset && y.QuoteAsset == x.SharedSymbol.QuoteAsset)).ToArray();
+
+ // Update local store
+ var updatedIds = @event.Select(x => x.OrderId).ToList();
+
+ foreach (var item in @event)
+ {
+ bool orderExisted = false;
+ _orderStore.AddOrUpdate(item.OrderId, item, (id, existing) =>
+ {
+ orderExisted = true;
+ var updated = UpdateFuturesOrder(existing, item);
+ if (!updated)
+ updatedIds.Remove(id);
+ else
+ _logger.LogDebug("Updated futures order {Symbol}.{Id}", item.Symbol, item.OrderId);
+
+ return existing;
+ });
+
+ if (!orderExisted)
+ _logger.LogDebug("Added futures order {Symbol}.{Id}", item.Symbol, item.OrderId);
+ }
+
+ if (updatedIds.Count > 0 && OnOrderUpdate != null)
+ {
+ await OnOrderUpdate.Invoke(
+ new UserDataUpdate
+ {
+ Source = source,
+ Data = _orderStore.Where(x => updatedIds.Contains(x.Key)).Select(x => x.Value).ToArray()
+ }).ConfigureAwait(false);
+ }
+
+ var trades = @event.Where(x => x.LastTrade != null).Select(x => x.LastTrade!).ToArray();
+ if (trades.Length != 0)
+ await HandleTradeUpdateAsync(source, trades).ConfigureAwait(false);
+ }
+
+ private string PositionIdentifier(SharedPosition position) =>
+ position.Id ?? position.Symbol + position.PositionMode + (position.PositionMode != SharedPositionMode.OneWay ? position.PositionSide.ToString() : "");
+
+ private async Task HandlePositionUpdateAsync(UpdateSource source, SharedPosition[] @event)
+ {
+ // Update local store
+ var updatedPositions = @event.Select(PositionIdentifier).ToList();
+
+ if (WebsocketPositionUpdatesAreFullSnapshots)
+ {
+ // Reset any tracking position to zero/null values when it's no longer in the snapshot as it means there is no open position any more
+ var notInSnapshot = _positionStore.Where(x => !updatedPositions.Contains(x.Key) && x.Value.PositionSize != 0).ToList();
+ foreach (var position in notInSnapshot)
+ {
+ position.Value.UpdateTime = DateTime.UtcNow;
+ position.Value.AverageOpenPrice = null;
+ position.Value.LiquidationPrice = null;
+ position.Value.PositionSize = 0;
+ position.Value.StopLossPrice = null;
+ position.Value.TakeProfitPrice = null;
+ position.Value.UnrealizedPnl = null;
+ updatedPositions.Add(position.Key);
+ }
+ }
+
+ foreach (var item in @event)
+ {
+ bool positionExisted = false;
+ _positionStore.AddOrUpdate(PositionIdentifier(item), item, (key, existing) =>
+ {
+ positionExisted = true;
+ var updated = UpdatePosition(existing, item);
+ if (!updated)
+ updatedPositions.Remove(key);
+ else
+ _logger.LogDebug("Updated position for {Symbol}", item.Symbol);
+
+ return existing;
+ });
+
+ if (!positionExisted)
+ _logger.LogDebug("Added position for {Symbol}", item.Symbol);
+ }
+
+ if (updatedPositions.Count > 0 && OnPositionUpdate != null)
+ {
+ await OnPositionUpdate.Invoke(
+ new UserDataUpdate
+ {
+ Source = source,
+ Data = _positionStore.Where(x => updatedPositions.Contains(x.Key)).Select(x => x.Value).ToArray()
+ }).ConfigureAwait(false);
+ }
+ }
+
+ private async Task HandleBalanceUpdateAsync(UpdateSource source, SharedBalance[] @event)
+ {
+ // Update local store
+ var updatedAssets = @event.Select(x => x.Asset).ToList();
+
+ foreach (var item in @event)
+ {
+ bool balanceExisted = false;
+ _balanceStore.AddOrUpdate(item.Asset, item, (asset, existing) =>
+ {
+ balanceExisted = true;
+ var updated = UpdateBalance(existing, item);
+ if (!updated)
+ updatedAssets.Remove(asset);
+ else
+ _logger.LogDebug("Updated balance for {Asset}", item.Asset);
+
+ return existing;
+ });
+
+ if (!balanceExisted)
+ _logger.LogDebug("Added balance for {Asset}", item.Asset);
+ }
+
+ if (updatedAssets.Count > 0 && OnBalanceUpdate != null)
+ {
+ await OnBalanceUpdate.Invoke(
+ new UserDataUpdate
+ {
+ Source = source,
+ Data = _balanceStore.Where(x => updatedAssets.Contains(x.Key)).Select(x => x.Value).ToArray()
+ }).ConfigureAwait(false);
+ }
+ }
+
+ private void CheckConnectedChanged()
+ {
+ Connected = _balanceSubscription?.SubscriptionStatus == SubscriptionStatus.Subscribed
+ && _orderSubscription?.SubscriptionStatus == SubscriptionStatus.Subscribed
+ && (_tradeSubscription == null || _tradeSubscription.SubscriptionStatus == SubscriptionStatus.Subscribed)
+ && (_positionSubscription == null || _positionSubscription.SubscriptionStatus == SubscriptionStatus.Subscribed);
+ }
+
+ private void BalanceSubscriptionStatusChanged(SubscriptionStatus newState)
+ {
+ _logger.LogDebug("Balance stream status changed: {NewState}", newState);
+
+ CheckConnectedChanged();
+ }
+
+ private void PositionSubscriptionStatusChanged(SubscriptionStatus newState)
+ {
+ _logger.LogDebug("Position stream status changed: {NewState}", newState);
+
+ CheckConnectedChanged();
+ }
+
+ private void OrderSubscriptionStatusChanged(SubscriptionStatus newState)
+ {
+ _logger.LogDebug("Order stream status changed: {NewState}", newState);
+
+ if (newState == SubscriptionStatus.Pending)
+ {
+ // Record last data receive time since we need to request data from that timestamp on when polling
+ // Only set to new value if it isn't already set since if we disconnect/reconnect a couple of times without
+ // managing to do a poll we don't want to override the time since we still need to request that earlier data
+
+ if (_lastDataTimeOrdersBeforeDisconnect == null)
+ _lastDataTimeOrdersBeforeDisconnect = _orderSubscription!.LastReceiveTime;
+ }
+
+ CheckConnectedChanged();
+ }
+
+ private void TradeSubscriptionStatusChanged(SubscriptionStatus newState)
+ {
+ _logger.LogDebug("Trade stream status changed: {NewState}", newState);
+
+ if (newState == SubscriptionStatus.Pending)
+ {
+ // Record last data receive time since we need to request data from that timestamp on when polling
+ // Only set to new value if it isn't already set since if we disconnect/reconnect a couple of times without
+ // managing to do a poll we don't want to override the time since we still need to request that earlier data
+
+ if (_lastDataTimeTradesBeforeDisconnect == null)
+ _lastDataTimeTradesBeforeDisconnect = _tradeSubscription?.LastReceiveTime;
+ }
+
+ CheckConnectedChanged();
+ }
+
+ private bool UpdateBalance(SharedBalance existingBalance, SharedBalance newBalance)
+ {
+ // Some other way to way to determine sequence? Maybe timestamp?
+ var changed = false;
+ if (existingBalance.Total != newBalance.Total)
+ {
+ existingBalance.Total = newBalance.Total;
+ changed = true;
+ }
+
+ if (existingBalance.Available != newBalance.Available)
+ {
+ existingBalance.Available = newBalance.Available;
+ changed = true;
+ }
+
+ return changed;
+ }
+
+ private bool UpdatePosition(SharedPosition existingPosition, SharedPosition newPosition)
+ {
+ // Some other way to way to determine sequence? Maybe timestamp?
+ var changed = false;
+ if (existingPosition.AverageOpenPrice != newPosition.AverageOpenPrice)
+ {
+ existingPosition.AverageOpenPrice = newPosition.AverageOpenPrice;
+ changed = true;
+ }
+
+ if (existingPosition.Leverage != newPosition.Leverage)
+ {
+ existingPosition.Leverage = newPosition.Leverage;
+ changed = true;
+ }
+
+ if (existingPosition.LiquidationPrice != newPosition.LiquidationPrice)
+ {
+ existingPosition.LiquidationPrice = newPosition.LiquidationPrice;
+ changed = true;
+ }
+
+ if (existingPosition.PositionSize != newPosition.PositionSize)
+ {
+ existingPosition.PositionSize = newPosition.PositionSize;
+ changed = true;
+ }
+
+ if (existingPosition.StopLossPrice != newPosition.StopLossPrice)
+ {
+ existingPosition.StopLossPrice = newPosition.StopLossPrice;
+ changed = true;
+ }
+
+ if (existingPosition.TakeProfitPrice != newPosition.TakeProfitPrice)
+ {
+ existingPosition.TakeProfitPrice = newPosition.TakeProfitPrice;
+ changed = true;
+ }
+
+ if (newPosition.UnrealizedPnl != null && existingPosition.UnrealizedPnl != newPosition.UnrealizedPnl)
+ {
+ existingPosition.UnrealizedPnl = newPosition.UnrealizedPnl;
+ changed = true;
+ }
+
+ if (newPosition.UpdateTime != null && existingPosition.UpdateTime != newPosition.UpdateTime)
+ {
+ existingPosition.UpdateTime = newPosition.UpdateTime;
+ // If update time is the only changed prop don't mark it as changed
+ }
+
+ return changed;
+ }
+
+ private bool UpdateFuturesOrder(SharedFuturesOrder existingOrder, SharedFuturesOrder newOrder)
+ {
+ if (CheckIfOrderUpdateIsNewer(existingOrder, newOrder) == false)
+ // Update is older than the existing data, ignore
+ return false;
+
+ var changed = false;
+ if (newOrder.AveragePrice != null && newOrder.AveragePrice != existingOrder.AveragePrice)
+ {
+ existingOrder.AveragePrice = newOrder.AveragePrice;
+ changed = true;
+ }
+
+ if (newOrder.OrderPrice != null && newOrder.OrderPrice != existingOrder.OrderPrice)
+ {
+ existingOrder.OrderPrice = newOrder.OrderPrice;
+ changed = true;
+ }
+
+ if (newOrder.Fee != null && newOrder.Fee != existingOrder.Fee)
+ {
+ existingOrder.Fee = newOrder.Fee;
+ changed = true;
+ }
+
+ if (newOrder.FeeAsset != null && newOrder.FeeAsset != existingOrder.FeeAsset)
+ {
+ existingOrder.FeeAsset = newOrder.FeeAsset;
+ changed = true;
+ }
+
+ if (newOrder.OrderQuantity != null && newOrder.OrderQuantity != existingOrder.OrderQuantity)
+ {
+ existingOrder.OrderQuantity = newOrder.OrderQuantity;
+ changed = true;
+ }
+
+ if (newOrder.QuantityFilled != null && newOrder.QuantityFilled != existingOrder.QuantityFilled)
+ {
+ existingOrder.QuantityFilled = newOrder.QuantityFilled;
+ changed = true;
+ }
+
+ if (newOrder.Status != existingOrder.Status)
+ {
+ existingOrder.Status = newOrder.Status;
+ changed = true;
+ }
+
+ if (newOrder.StopLossPrice != existingOrder.StopLossPrice)
+ {
+ existingOrder.StopLossPrice = newOrder.StopLossPrice;
+ changed = true;
+ }
+
+ if (newOrder.TakeProfitPrice != existingOrder.TakeProfitPrice)
+ {
+ existingOrder.TakeProfitPrice = newOrder.TakeProfitPrice;
+ changed = true;
+ }
+
+ if (newOrder.TriggerPrice != existingOrder.TriggerPrice)
+ {
+ existingOrder.TriggerPrice = newOrder.TriggerPrice;
+ changed = true;
+ }
+
+ if (newOrder.UpdateTime != null && newOrder.UpdateTime != existingOrder.UpdateTime)
+ {
+ existingOrder.UpdateTime = newOrder.UpdateTime;
+ changed = true;
+ }
+
+ return changed;
+ }
+
+ private bool? CheckIfOrderUpdateIsNewer(SharedFuturesOrder existingOrder, SharedFuturesOrder newOrder)
+ {
+ if (existingOrder.Status == SharedOrderStatus.Open && newOrder.Status != SharedOrderStatus.Open)
+ // status changed from open to not open
+ return true;
+
+ if (existingOrder.Status != SharedOrderStatus.Open && newOrder.Status == SharedOrderStatus.Open)
+ // status changed from not open to open; stale
+ return false;
+
+ if (existingOrder.UpdateTime != null && newOrder.UpdateTime != null)
+ {
+ // If both have an update time base of that
+ if (existingOrder.UpdateTime < newOrder.UpdateTime)
+ return true;
+
+ if (existingOrder.UpdateTime > newOrder.UpdateTime)
+ return false;
+ }
+
+ if (existingOrder.QuantityFilled != null && newOrder.QuantityFilled != null)
+ {
+ if (existingOrder.QuantityFilled.QuantityInBaseAsset != null && newOrder.QuantityFilled.QuantityInBaseAsset != null)
+ {
+ // If base quantity is not null we can base it on that
+ if (existingOrder.QuantityFilled.QuantityInBaseAsset < newOrder.QuantityFilled.QuantityInBaseAsset)
+ return true;
+
+ else if (existingOrder.QuantityFilled.QuantityInBaseAsset > newOrder.QuantityFilled.QuantityInBaseAsset)
+ return false;
+ }
+
+ if (existingOrder.QuantityFilled.QuantityInQuoteAsset != null && newOrder.QuantityFilled.QuantityInQuoteAsset != null)
+ {
+ // If quote quantity is not null we can base it on that
+ if (existingOrder.QuantityFilled.QuantityInQuoteAsset < newOrder.QuantityFilled.QuantityInQuoteAsset)
+ return true;
+
+ else if (existingOrder.QuantityFilled.QuantityInQuoteAsset > newOrder.QuantityFilled.QuantityInQuoteAsset)
+ return false;
+ }
+ }
+
+ if (existingOrder.Fee != null && newOrder.Fee != null)
+ {
+ // Higher fee means later processing
+ if (existingOrder.Fee < newOrder.Fee)
+ return true;
+
+ if (existingOrder.Fee > newOrder.Fee)
+ return false;
+ }
+
+ return null;
+ }
+
+ ///
+ protected override async Task DoPollAsync()
+ {
+ _logger.LogDebug("Starting user data requesting");
+ var anyError = false;
+ var balancesResult = await _balanceRestClient.GetBalancesAsync(new GetBalancesRequest(exchangeParameters: _exchangeParameters)).ConfigureAwait(false);
+ if (!balancesResult.Success)
+ {
+ // .. ?
+ var transientError = balancesResult.Error!.IsTransient;
+ // If transient we can retry
+ // Should communicate errors, also for websocket disconnecting
+
+ anyError = true;
+ }
+ else
+ {
+ await HandleBalanceUpdateAsync(UpdateSource.Poll, balancesResult.Data).ConfigureAwait(false);
+ }
+
+ var openOrdersResult = await _futuresOrderRestClient.GetOpenFuturesOrdersAsync(new GetOpenOrdersRequest(exchangeParameters: _exchangeParameters)).ConfigureAwait(false);
+ if (!openOrdersResult.Success)
+ {
+ // .. ?
+
+ anyError = true;
+ }
+ else
+ {
+ await HandleOrderUpdateAsync(UpdateSource.Poll, openOrdersResult.Data).ConfigureAwait(false);
+ }
+
+ var positionResult = await _futuresOrderRestClient.GetPositionsAsync(new GetPositionsRequest(exchangeParameters: _exchangeParameters)).ConfigureAwait(false);
+ if (!positionResult.Success)
+ {
+ // .. ?
+
+ anyError = true;
+ }
+ else
+ {
+ await HandlePositionUpdateAsync(UpdateSource.Poll, positionResult.Data).ConfigureAwait(false);
+ }
+
+ foreach (var symbol in _symbols)
+ {
+ var fromTimeOrders = _lastDataTimeOrdersBeforeDisconnect ?? _lastPollTimeOrders ?? _startTime;
+ var updatedPollTime = DateTime.UtcNow;
+ var closedOrdersResult = await _futuresOrderRestClient.GetClosedFuturesOrdersAsync(new GetClosedOrdersRequest(symbol, startTime: fromTimeOrders, exchangeParameters: _exchangeParameters)).ConfigureAwait(false);
+ if (!closedOrdersResult.Success)
+ {
+ // .. ?
+
+ anyError = true;
+ }
+ else
+ {
+ _lastDataTimeOrdersBeforeDisconnect = null;
+ _lastPollTimeOrders = updatedPollTime;
+
+ // Filter orders to only include where close time is after the start time
+ var relevantOrders = closedOrdersResult.Data.Where(x =>
+ x.UpdateTime != null && x.UpdateTime >= _startTime // Updated after the tracker start time
+ || x.CreateTime != null && x.CreateTime >= _startTime // Created after the tracker start time
+ || x.CreateTime == null && x.UpdateTime == null // Unknown time
+ ).ToArray();
+
+ if (relevantOrders.Length > 0)
+ await HandleOrderUpdateAsync(UpdateSource.Poll, relevantOrders).ConfigureAwait(false);
+ }
+
+ if (_trackTrades)
+ {
+ var fromTimeTrades = _lastDataTimeTradesBeforeDisconnect ?? _lastPollTimeTrades ?? _startTime;
+ var tradesResult = await _futuresOrderRestClient.GetFuturesUserTradesAsync(new GetUserTradesRequest(symbol, startTime: fromTimeTrades, exchangeParameters: _exchangeParameters)).ConfigureAwait(false);
+ if (!tradesResult.Success)
+ {
+ // .. ?
+ anyError = true;
+ }
+ else
+ {
+ _lastDataTimeTradesBeforeDisconnect = null;
+ _lastPollTimeTrades = updatedPollTime;
+
+ // Filter trades to only include where timestamp is after the start time OR it's part of an order we're tracking
+ var relevantTrades = tradesResult.Data.Where(x => x.Timestamp >= _startTime || _orderStore.ContainsKey(x.OrderId)).ToArray();
+ if (relevantTrades.Length > 0)
+ await HandleTradeUpdateAsync(UpdateSource.Poll, tradesResult.Data).ConfigureAwait(false);
+ }
+ }
+ }
+
+ _logger.LogDebug("User data requesting completed");
+ return anyError;
+ }
+ }
+}
diff --git a/CryptoExchange.Net/Trackers/UserData/UserSpotDataTracker.cs b/CryptoExchange.Net/Trackers/UserData/UserSpotDataTracker.cs
new file mode 100644
index 0000000..d33d833
--- /dev/null
+++ b/CryptoExchange.Net/Trackers/UserData/UserSpotDataTracker.cs
@@ -0,0 +1,533 @@
+using CryptoExchange.Net.Objects;
+using CryptoExchange.Net.Objects.Sockets;
+using CryptoExchange.Net.SharedApis;
+using Microsoft.Extensions.Logging;
+using System;
+using System.Collections.Concurrent;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace CryptoExchange.Net.Trackers.UserData
+{
+ ///
+ /// User data tracker
+ ///
+ public abstract class UserSpotDataTracker : UserDataTracker, IUserSpotDataTracker
+ {
+ // Cached data
+ private ConcurrentDictionary _balanceStore = new ConcurrentDictionary();
+ private ConcurrentDictionary _orderStore = new ConcurrentDictionary();
+ private ConcurrentDictionary _tradeStore = new ConcurrentDictionary();
+
+ // Typed clients
+ private readonly IListenKeyRestClient? _listenKeyRestClient;
+ private readonly ISpotSymbolRestClient _spotSymbolRestClient;
+ private readonly IBalanceRestClient _balanceRestClient;
+ private readonly IBalanceSocketClient _balanceSocketClient;
+ private readonly ISpotOrderRestClient _spotOrderRestClient;
+ private readonly ISpotOrderSocketClient _spotOrderSocketClient;
+ private readonly IUserTradeSocketClient? _userTradeSocketClient;
+
+ // Subscriptions
+ private UpdateSubscription? _balanceSubscription;
+ private UpdateSubscription? _orderSubscription;
+ private UpdateSubscription? _tradeSubscription;
+
+ private ExchangeParameters? _exchangeParameters;
+
+ ///
+ public event Func, Task>? OnBalanceUpdate;
+ ///
+ public event Func, Task>? OnOrderUpdate;
+ ///
+ public event Func, Task>? OnTradeUpdate;
+
+ ///
+ public SharedBalance[] Balances => _balanceStore.Values.ToArray();
+ ///
+ public SharedSpotOrder[] Orders => _orderStore.Values.ToArray();
+ ///
+ public SharedUserTrade[] Trades => _tradeStore.Values.ToArray();
+
+ ///
+ /// ctor
+ ///
+ protected UserSpotDataTracker(
+ ILogger logger,
+ ISharedClient restClient,
+ ISharedClient socketClient,
+ string? userIdentifier,
+ UserDataTrackerConfig config,
+ ExchangeParameters? exchangeParameters = null
+ ) : base(logger, config, userIdentifier)
+ {
+ _exchangeParameters = exchangeParameters;
+
+ _spotSymbolRestClient = (ISpotSymbolRestClient)restClient;
+ _balanceRestClient = (IBalanceRestClient)restClient;
+ _balanceSocketClient = (IBalanceSocketClient)socketClient;
+ _spotOrderRestClient = (ISpotOrderRestClient)restClient;
+ _spotOrderSocketClient = (ISpotOrderSocketClient)socketClient;
+ _listenKeyRestClient = restClient as IListenKeyRestClient;
+ _userTradeSocketClient = socketClient as IUserTradeSocketClient;
+ }
+
+ ///
+ protected override async Task DoStartAsync()
+ {
+ _logger.LogDebug("Starting UserDataTracker");
+ // Request symbols so SharedSymbol property can be filled on updates
+ var symbolResult = await _spotSymbolRestClient.GetSpotSymbolsAsync(new GetSymbolsRequest(exchangeParameters: _exchangeParameters)).ConfigureAwait(false);
+ if (!symbolResult)
+ {
+ _logger.LogWarning("Failed to start UserDataTracker; symbols request failed: {Error}", symbolResult.Error!.Message);
+ return symbolResult;
+ }
+
+ string? listenKey = null;
+ if (_listenKeyRestClient != null)
+ {
+ var lkResult = await _listenKeyRestClient.StartListenKeyAsync(new StartListenKeyRequest(exchangeParameters: _exchangeParameters)).ConfigureAwait(false);
+ if (!lkResult)
+ {
+ _logger.LogWarning("Failed to start UserDataTracker; listen key request failed: {Error}", lkResult.Error!.Message);
+ return lkResult;
+ }
+
+ listenKey = lkResult.Data;
+ }
+
+ var subBalanceResult = await ExchangeHelpers.ProcessQueuedAsync(
+ async handler => await _balanceSocketClient.SubscribeToBalanceUpdatesAsync(new SubscribeBalancesRequest(listenKey, exchangeParameters: _exchangeParameters), handler, ct: _cts!.Token).ConfigureAwait(false),
+ x => HandleBalanceUpdateAsync(UpdateSource.Push, x.Data)).ConfigureAwait(false);
+ if (!subBalanceResult)
+ {
+ _logger.LogWarning("Failed to start UserDataTracker; failed to subscribe to balance stream: {Error}", subBalanceResult.Error!.Message);
+ return subBalanceResult;
+ }
+
+ _balanceSubscription = subBalanceResult.Data;
+ subBalanceResult.Data.SubscriptionStatusChanged += BalanceSubscriptionStatusChanged;
+
+ var subOrderResult = await ExchangeHelpers.ProcessQueuedAsync(
+ async handler => await _spotOrderSocketClient.SubscribeToSpotOrderUpdatesAsync(new SubscribeSpotOrderRequest(listenKey, exchangeParameters: _exchangeParameters), handler, ct: _cts!.Token).ConfigureAwait(false),
+ x => HandleOrderUpdateAsync(UpdateSource.Push, x.Data)).ConfigureAwait(false);
+ if (!subOrderResult)
+ {
+ _cts!.Cancel();
+ _logger.LogWarning("Failed to start UserDataTracker; failed to subscribe to order stream: {Error}", subOrderResult.Error!.Message);
+ return subOrderResult;
+ }
+
+ _orderSubscription = subOrderResult.Data;
+ subOrderResult.Data.SubscriptionStatusChanged += OrderSubscriptionStatusChanged;
+
+ if (_userTradeSocketClient != null && _trackTrades)
+ {
+ var subTradeResult = await ExchangeHelpers.ProcessQueuedAsync(
+ async handler => await _userTradeSocketClient.SubscribeToUserTradeUpdatesAsync(new SubscribeUserTradeRequest(listenKey, exchangeParameters: _exchangeParameters), handler, ct: _cts!.Token).ConfigureAwait(false),
+ x => HandleTradeUpdateAsync(UpdateSource.Push, x.Data)).ConfigureAwait(false);
+ if (!subOrderResult)
+ {
+ _cts!.Cancel();
+ _logger.LogWarning("Failed to start UserDataTracker; failed to subscribe to trade stream: {Error}", subTradeResult.Error!.Message);
+ return subOrderResult;
+ }
+
+ _tradeSubscription = subTradeResult.Data;
+ subTradeResult.Data.SubscriptionStatusChanged += TradeSubscriptionStatusChanged;
+ }
+
+ _logger.LogDebug("Started UserDataTracker");
+ return CallResult.SuccessResult;
+ }
+
+ private async Task HandleTradeUpdateAsync(UpdateSource source, SharedUserTrade[] @event)
+ {
+ var unknownSymbols = @event.Where(x => x.SharedSymbol == null);
+ if (unknownSymbols.Any())
+ {
+ _logger.LogWarning("Received order without SharedSymbol set, ignoring");
+ @event = @event.Except(unknownSymbols).ToArray();
+ }
+
+ if (!_onlyTrackProvidedSymbols)
+ UpdateSymbolsList(@event.Select(x => x.SharedSymbol!));
+ else
+ @event = @event.Where(x => _symbols.Any(y => y.TradingMode == x.SharedSymbol!.TradingMode && y.BaseAsset == x.SharedSymbol.BaseAsset && y.QuoteAsset == x.SharedSymbol.QuoteAsset)).ToArray();
+
+ // Update local store
+ var updatedIds = @event.Select(x => x.Id).ToList();
+ foreach (var item in @event)
+ {
+ if (_tradeStore.TryAdd(item.Id, item))
+ _logger.LogDebug("Added user trade {Symbol}.{Id}", item.Symbol, item.Id);
+ else
+ updatedIds.Remove(item.Id);
+ }
+
+ if (updatedIds.Count > 0 && OnTradeUpdate != null)
+ {
+ await OnTradeUpdate.Invoke(
+ new UserDataUpdate
+ {
+ Source = source,
+ Data = _tradeStore.Where(x => updatedIds.Contains(x.Key)).Select(x => x.Value).ToArray()
+ }).ConfigureAwait(false);
+ }
+ }
+
+ private async Task HandleOrderUpdateAsync(UpdateSource source, SharedSpotOrder[] @event)
+ {
+ var unknownSymbols = @event.Where(x => x.SharedSymbol == null);
+ if (unknownSymbols.Any())
+ {
+ _logger.LogWarning("Received order without SharedSymbol set, ignoring");
+ @event = @event.Except(unknownSymbols).ToArray();
+ }
+
+ if (!_onlyTrackProvidedSymbols)
+ UpdateSymbolsList(@event.Select(x => x.SharedSymbol!));
+ else
+ @event = @event.Where(x => _symbols.Any(y => y.TradingMode == x.SharedSymbol!.TradingMode && y.BaseAsset == x.SharedSymbol.BaseAsset && y.QuoteAsset == x.SharedSymbol.QuoteAsset)).ToArray();
+
+ // Update local store
+ var updatedIds = @event.Select(x => x.OrderId).ToList();
+
+ foreach (var item in @event)
+ {
+ bool orderExisted = false;
+ _orderStore.AddOrUpdate(item.OrderId, item, (id, existing) =>
+ {
+ orderExisted = true;
+ var updated = UpdateSpotOrder(existing, item);
+ if (!updated)
+ updatedIds.Remove(id);
+ else
+ _logger.LogDebug("Updated spot order {Symbol}.{Id}", item.Symbol, item.OrderId);
+
+ return existing;
+ });
+
+ if (!orderExisted)
+ _logger.LogDebug("Added spot order {Symbol}.{Id}", item.Symbol, item.OrderId);
+ }
+
+ if (updatedIds.Count > 0 && OnOrderUpdate != null)
+ {
+ await OnOrderUpdate.Invoke(
+ new UserDataUpdate
+ {
+ Source = source,
+ Data = _orderStore.Where(x => updatedIds.Contains(x.Key)).Select(x => x.Value).ToArray()
+ }).ConfigureAwait(false);
+ }
+
+ var trades = @event.Where(x => x.LastTrade != null).Select(x => x.LastTrade!).ToArray();
+ if (trades.Length != 0)
+ await HandleTradeUpdateAsync(source, trades).ConfigureAwait(false);
+ }
+
+ private async Task HandleBalanceUpdateAsync(UpdateSource source, SharedBalance[] @event)
+ {
+ // Update local store
+ var updatedAssets = @event.Select(x => x.Asset).ToList();
+
+ foreach (var item in @event)
+ {
+ bool balanceExisted = false;
+ _balanceStore.AddOrUpdate(item.Asset, item, (asset, existing) =>
+ {
+ balanceExisted = true;
+ var updated = UpdateBalance(existing, item);
+ if (!updated)
+ updatedAssets.Remove(asset);
+ else
+ _logger.LogDebug("Updated balance for {Asset}", item.Asset);
+
+ return existing;
+ });
+
+ if (!balanceExisted)
+ _logger.LogDebug("Added balance for {Asset}", item.Asset);
+ }
+
+ if (updatedAssets.Count > 0 && OnBalanceUpdate != null)
+ {
+ await OnBalanceUpdate.Invoke(
+ new UserDataUpdate
+ {
+ Source = source,
+ Data = _balanceStore.Where(x => updatedAssets.Contains(x.Key)).Select(x => x.Value).ToArray()
+ }).ConfigureAwait(false);
+ }
+ }
+
+ private void CheckConnectedChanged()
+ {
+ Connected = _balanceSubscription?.SubscriptionStatus == SubscriptionStatus.Subscribed
+ && _orderSubscription?.SubscriptionStatus == SubscriptionStatus.Subscribed
+ && (_tradeSubscription == null || _tradeSubscription.SubscriptionStatus == SubscriptionStatus.Subscribed);
+ }
+
+ private void BalanceSubscriptionStatusChanged(SubscriptionStatus newState)
+ {
+ _logger.LogDebug("Balance stream status changed: {NewState}", newState);
+
+ CheckConnectedChanged();
+ }
+
+ private void OrderSubscriptionStatusChanged(SubscriptionStatus newState)
+ {
+ _logger.LogDebug("Order stream status changed: {NewState}", newState);
+
+ if (newState == SubscriptionStatus.Pending)
+ {
+ // Record last data receive time since we need to request data from that timestamp on when polling
+ // Only set to new value if it isn't already set since if we disconnect/reconnect a couple of times without
+ // managing to do a poll we don't want to override the time since we still need to request that earlier data
+
+ if (_lastDataTimeOrdersBeforeDisconnect == null)
+ _lastDataTimeOrdersBeforeDisconnect = _orderSubscription!.LastReceiveTime;
+ }
+
+ CheckConnectedChanged();
+ }
+
+ private void TradeSubscriptionStatusChanged(SubscriptionStatus newState)
+ {
+ _logger.LogDebug("Trade stream status changed: {NewState}", newState);
+
+ if (newState == SubscriptionStatus.Pending)
+ {
+ // Record last data receive time since we need to request data from that timestamp on when polling
+ // Only set to new value if it isn't already set since if we disconnect/reconnect a couple of times without
+ // managing to do a poll we don't want to override the time since we still need to request that earlier data
+
+ if (_lastDataTimeTradesBeforeDisconnect == null)
+ _lastDataTimeTradesBeforeDisconnect = _tradeSubscription?.LastReceiveTime;
+ }
+
+ CheckConnectedChanged();
+ }
+
+ private bool UpdateBalance(SharedBalance existingBalance, SharedBalance newBalance)
+ {
+ // Some other way to way to determine sequence? Maybe timestamp?
+ var changed = false;
+ if (existingBalance.Total != newBalance.Total)
+ {
+ existingBalance.Total = newBalance.Total;
+ changed = true;
+ }
+
+ if (existingBalance.Available != newBalance.Available)
+ {
+ existingBalance.Available = newBalance.Available;
+ changed = true;
+ }
+
+ return changed;
+ }
+
+ private bool UpdateSpotOrder(SharedSpotOrder existingOrder, SharedSpotOrder newOrder)
+ {
+ if (CheckIfOrderUpdateIsNewer(existingOrder, newOrder) == false)
+ // Update is older than the existing data, ignore
+ return false;
+
+ var changed = false;
+ if (newOrder.AveragePrice != null && newOrder.AveragePrice != existingOrder.AveragePrice)
+ {
+ existingOrder.AveragePrice = newOrder.AveragePrice;
+ changed = true;
+ }
+
+ if (newOrder.OrderPrice != null && newOrder.OrderPrice != existingOrder.OrderPrice)
+ {
+ existingOrder.OrderPrice = newOrder.OrderPrice;
+ changed = true;
+ }
+
+ if (newOrder.Fee != null && newOrder.Fee != existingOrder.Fee)
+ {
+ existingOrder.Fee = newOrder.Fee;
+ changed = true;
+ }
+
+ if (newOrder.FeeAsset != null && newOrder.FeeAsset != existingOrder.FeeAsset)
+ {
+ existingOrder.FeeAsset = newOrder.FeeAsset;
+ changed = true;
+ }
+
+ if (newOrder.OrderQuantity != null && newOrder.OrderQuantity != existingOrder.OrderQuantity)
+ {
+ existingOrder.OrderQuantity = newOrder.OrderQuantity;
+ changed = true;
+ }
+
+ if (newOrder.QuantityFilled != null && newOrder.QuantityFilled != existingOrder.QuantityFilled)
+ {
+ existingOrder.QuantityFilled = newOrder.QuantityFilled;
+ changed = true;
+ }
+
+ if (newOrder.Status != existingOrder.Status)
+ {
+ existingOrder.Status = newOrder.Status;
+ changed = true;
+ }
+
+ if (newOrder.UpdateTime != null && newOrder.UpdateTime != existingOrder.UpdateTime)
+ {
+ existingOrder.UpdateTime = newOrder.UpdateTime;
+ changed = true;
+ }
+
+ return changed;
+ }
+
+ private bool? CheckIfOrderUpdateIsNewer(SharedSpotOrder existingOrder, SharedSpotOrder newOrder)
+ {
+ if (existingOrder.Status == SharedOrderStatus.Open && newOrder.Status != SharedOrderStatus.Open)
+ // status changed from open to not open
+ return true;
+
+ if (existingOrder.Status != SharedOrderStatus.Open && newOrder.Status == SharedOrderStatus.Open)
+ // status changed from not open to open; stale
+ return false;
+
+ if (existingOrder.UpdateTime != null && newOrder.UpdateTime != null)
+ {
+ // If both have an update time base of that
+ if (existingOrder.UpdateTime < newOrder.UpdateTime)
+ return true;
+
+ if (existingOrder.UpdateTime > newOrder.UpdateTime)
+ return false;
+ }
+
+ if (existingOrder.QuantityFilled != null && newOrder.QuantityFilled != null)
+ {
+ if (existingOrder.QuantityFilled.QuantityInBaseAsset != null && newOrder.QuantityFilled.QuantityInBaseAsset != null)
+ {
+ // If base quantity is not null we can base it on that
+ if (existingOrder.QuantityFilled.QuantityInBaseAsset < newOrder.QuantityFilled.QuantityInBaseAsset)
+ return true;
+
+ else if (existingOrder.QuantityFilled.QuantityInBaseAsset > newOrder.QuantityFilled.QuantityInBaseAsset)
+ return false;
+ }
+
+ if (existingOrder.QuantityFilled.QuantityInQuoteAsset != null && newOrder.QuantityFilled.QuantityInQuoteAsset != null)
+ {
+ // If quote quantity is not null we can base it on that
+ if (existingOrder.QuantityFilled.QuantityInQuoteAsset < newOrder.QuantityFilled.QuantityInQuoteAsset)
+ return true;
+
+ else if (existingOrder.QuantityFilled.QuantityInQuoteAsset > newOrder.QuantityFilled.QuantityInQuoteAsset)
+ return false;
+ }
+ }
+
+ if (existingOrder.Fee != null && newOrder.Fee != null)
+ {
+ // Higher fee means later processing
+ if (existingOrder.Fee < newOrder.Fee)
+ return true;
+
+ if (existingOrder.Fee > newOrder.Fee)
+ return false;
+ }
+
+ return null;
+ }
+
+ ///
+ protected override async Task DoPollAsync()
+ {
+ _logger.LogDebug("Starting user data requesting");
+ var anyError = false;
+ var balancesResult = await _balanceRestClient.GetBalancesAsync(new GetBalancesRequest(exchangeParameters: _exchangeParameters)).ConfigureAwait(false);
+ if (!balancesResult.Success)
+ {
+ // .. ?
+ var transientError = balancesResult.Error!.IsTransient;
+ // If transient we can retry
+ // Should communicate errors, also for websocket disconnecting
+
+ anyError = true;
+ }
+ else
+ {
+ await HandleBalanceUpdateAsync(UpdateSource.Poll, balancesResult.Data).ConfigureAwait(false);
+ }
+
+ var openOrdersResult = await _spotOrderRestClient.GetOpenSpotOrdersAsync(new GetOpenOrdersRequest(exchangeParameters: _exchangeParameters)).ConfigureAwait(false);
+ if (!openOrdersResult.Success)
+ {
+ // .. ?
+
+ anyError = true;
+ }
+ else
+ {
+ await HandleOrderUpdateAsync(UpdateSource.Poll, openOrdersResult.Data).ConfigureAwait(false);
+ }
+
+ foreach (var symbol in _symbols)
+ {
+ var fromTimeOrders = _lastDataTimeOrdersBeforeDisconnect ?? _lastPollTimeOrders ?? _startTime;
+ var updatedPollTime = DateTime.UtcNow;
+ var closedOrdersResult = await _spotOrderRestClient.GetClosedSpotOrdersAsync(new GetClosedOrdersRequest(symbol, startTime: fromTimeOrders, exchangeParameters: _exchangeParameters)).ConfigureAwait(false);
+ if (!closedOrdersResult.Success)
+ {
+ // .. ?
+
+ anyError = true;
+ }
+ else
+ {
+ _lastDataTimeOrdersBeforeDisconnect = null;
+ _lastPollTimeOrders = updatedPollTime;
+
+ // Filter orders to only include where close time is after the start time
+ var relevantOrders = closedOrdersResult.Data.Where(x =>
+ x.UpdateTime != null && x.UpdateTime >= _startTime // Updated after the tracker start time
+ || x.CreateTime != null && x.CreateTime >= _startTime // Created after the tracker start time
+ || x.CreateTime == null && x.UpdateTime == null // Unknown time
+ ).ToArray();
+
+ if (relevantOrders.Length > 0)
+ await HandleOrderUpdateAsync(UpdateSource.Poll, relevantOrders).ConfigureAwait(false);
+ }
+
+ if (_trackTrades)
+ {
+ var fromTimeTrades = _lastDataTimeTradesBeforeDisconnect ?? _lastPollTimeTrades ?? _startTime;
+ var tradesResult = await _spotOrderRestClient.GetSpotUserTradesAsync(new GetUserTradesRequest(symbol, startTime: fromTimeTrades, exchangeParameters: _exchangeParameters)).ConfigureAwait(false);
+ if (!tradesResult.Success)
+ {
+ // .. ?
+ anyError = true;
+ }
+ else
+ {
+ _lastDataTimeTradesBeforeDisconnect = null;
+ _lastPollTimeTrades = updatedPollTime;
+
+ // Filter trades to only include where timestamp is after the start time OR it's part of an order we're tracking
+ var relevantTrades = tradesResult.Data.Where(x => x.Timestamp >= _startTime || _orderStore.ContainsKey(x.OrderId)).ToArray();
+ if (relevantTrades.Length > 0)
+ await HandleTradeUpdateAsync(UpdateSource.Poll, tradesResult.Data).ConfigureAwait(false);
+ }
+ }
+ }
+
+ _logger.LogDebug("User data requesting completed");
+ return anyError;
+ }
+ }
+}