1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2026-02-16 14:13:46 +00:00
This commit is contained in:
Jkorf 2026-02-02 16:36:00 +01:00
parent c9d10d51ae
commit a9ac1a3b75
9 changed files with 1505 additions and 591 deletions

View File

@ -119,6 +119,11 @@ namespace CryptoExchange.Net.Objects.Sockets
/// </summary> /// </summary>
public SocketStatus Status => _connection.Status; public SocketStatus Status => _connection.Status;
/// <summary>
/// The current subscription status
/// </summary>
public SubscriptionStatus SubscriptionStatus => _subscription.Status;
/// <summary> /// <summary>
/// ctor /// ctor
/// </summary> /// </summary>

View File

@ -20,6 +20,10 @@ namespace CryptoExchange.Net.SharedApis
/// </summary> /// </summary>
public SharedPositionSide PositionSide { get; set; } public SharedPositionSide PositionSide { get; set; }
/// <summary> /// <summary>
/// Whether the position is one way mode
/// </summary>
public SharedPositionMode PositionMode { get; set; }
/// <summary>
/// Average open price /// Average open price
/// </summary> /// </summary>
public decimal? AverageOpenPrice { get; set; } public decimal? AverageOpenPrice { get; set; }

View File

@ -1,54 +0,0 @@
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.SharedApis;
using System;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Trackers.UserData
{
/// <summary>
/// User data tracker
/// </summary>
public interface IUserDataTracker
{
/// <summary>
/// User identifier
/// </summary>
public string? UserIdentifier { get; }
/// <summary>
/// Current balances
/// </summary>
SharedBalance[] Balances { get; }
/// <summary>
/// Currently tracked orders
/// </summary>
SharedSpotOrder[] Orders { get; }
/// <summary>
/// Currently tracked trades
/// </summary>
SharedUserTrade[] Trades { get; }
/// <summary>
/// On balance update
/// </summary>
event Action<UserDataUpdate<SharedBalance[]>>? OnBalanceUpdate;
/// <summary>
/// On order update
/// </summary>
event Action<UserDataUpdate<SharedSpotOrder[]>>? OnOrderUpdate;
/// <summary>
/// On user trade update
/// </summary>
event Action<UserDataUpdate<SharedUserTrade[]>>? OnTradeUpdate;
/// <summary>
/// Start tracking user data
/// </summary>
Task<CallResult> StartAsync();
/// <summary>
/// Stop tracking data
/// </summary>
/// <returns></returns>
Task StopAsync();
}
}

View File

@ -0,0 +1,79 @@
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.SharedApis;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Trackers.UserData
{
/// <summary>
/// Futures user data tracker
/// </summary>
public interface IUserFuturesDataTracker
{
/// <summary>
/// User identifier
/// </summary>
string? UserIdentifier { get; }
/// <summary>
/// Whether the tracker is currently fully connected
/// </summary>
bool Connected { get; }
/// <summary>
/// Currently tracked symbols. Data for these symbols will be requested when polling.
/// Websocket updates will be available for all symbols regardless.
/// When new data is received for a symbol which is not yet being tracked it will be added to this list and polled in the future unless the `OnlyTrackProvidedSymbols` option is set in the configuration.
/// </summary>
IEnumerable<SharedSymbol> TrackedSymbols { get; }
/// <summary>
/// Current balances
/// </summary>
SharedBalance[] Balances { get; }
/// <summary>
/// Currently tracked orders
/// </summary>
SharedFuturesOrder[] Orders { get; }
/// <summary>
/// Currently tracked positions
/// </summary>
SharedPosition[] Positions { get; }
/// <summary>
/// Currently tracked trades
/// </summary>
SharedUserTrade[] Trades { get; }
/// <summary>
/// On connection status change. Might trigger multiple times with the same status depending on the underlying subscriptions.
/// </summary>
event Action<bool>? OnConnectedStatusChange;
/// <summary>
/// On balance update
/// </summary>
event Func<UserDataUpdate<SharedBalance[]>, Task>? OnBalanceUpdate;
/// <summary>
/// On order update
/// </summary>
event Func<UserDataUpdate<SharedFuturesOrder[]>, Task>? OnOrderUpdate;
/// <summary>
/// On position order update
/// </summary>
event Func<UserDataUpdate<SharedPosition[]>, Task>? OnPositionUpdate;
/// <summary>
/// On user trade update
/// </summary>
event Func<UserDataUpdate<SharedUserTrade[]>, Task>? OnTradeUpdate;
/// <summary>
/// Start tracking user data
/// </summary>
Task<CallResult> StartAsync();
/// <summary>
/// Stop tracking data
/// </summary>
/// <returns></returns>
Task StopAsync();
}
}

View File

@ -0,0 +1,72 @@
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.SharedApis;
using System;
using System.Collections;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Trackers.UserData
{
/// <summary>
/// User data tracker
/// </summary>
public interface IUserSpotDataTracker
{
/// <summary>
/// User identifier
/// </summary>
string? UserIdentifier { get; }
/// <summary>
/// Whether the tracker is currently fully connected
/// </summary>
bool Connected { get; }
/// <summary>
/// Currently tracked symbols. Data for these symbols will be requested when polling.
/// Websocket updates will be available for all symbols regardless.
/// When new data is received for a symbol which is not yet being tracked it will be added to this list and polled in the future unless the `OnlyTrackProvidedSymbols` option is set in the configuration.
/// </summary>
IEnumerable<SharedSymbol> TrackedSymbols { get; }
/// <summary>
/// Current balances
/// </summary>
SharedBalance[] Balances { get; }
/// <summary>
/// Currently tracked orders
/// </summary>
SharedSpotOrder[] Orders { get; }
/// <summary>
/// Currently tracked trades
/// </summary>
SharedUserTrade[] Trades { get; }
/// <summary>
/// On connection status change. Might trigger multiple times with the same status depending on the underlying subscriptions.
/// </summary>
event Action<bool>? OnConnectedStatusChange;
/// <summary>
/// On balance update
/// </summary>
event Func<UserDataUpdate<SharedBalance[]>, Task>? OnBalanceUpdate;
/// <summary>
/// On order update
/// </summary>
event Func<UserDataUpdate<SharedSpotOrder[]>, Task>? OnOrderUpdate;
/// <summary>
/// On user trade update
/// </summary>
event Func<UserDataUpdate<SharedUserTrade[]>, Task>? OnTradeUpdate;
/// <summary>
/// Start tracking user data
/// </summary>
Task<CallResult> StartAsync();
/// <summary>
/// Stop tracking data
/// </summary>
/// <returns></returns>
Task StopAsync();
}
}

View File

@ -1,9 +1,7 @@
using CryptoExchange.Net.Objects; using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Sockets;
using CryptoExchange.Net.SharedApis; using CryptoExchange.Net.SharedApis;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using System; using System;
using System.Collections.Concurrent;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Text; using System.Text;
@ -12,99 +10,74 @@ using System.Threading.Tasks;
namespace CryptoExchange.Net.Trackers.UserData namespace CryptoExchange.Net.Trackers.UserData
{ {
/// <summary> public abstract class UserDataTracker
/// User data tracker
/// </summary>
public abstract class UserDataTracker : IUserDataTracker
{ {
// Cached data protected readonly ILogger _logger;
private ConcurrentDictionary<string, SharedBalance> _balanceStore = new ConcurrentDictionary<string, SharedBalance>();
private ConcurrentDictionary<string, SharedSpotOrder> _orderStore = new ConcurrentDictionary<string, SharedSpotOrder>();
private ConcurrentDictionary<string, SharedUserTrade> _tradeStore = new ConcurrentDictionary<string, SharedUserTrade>();
// Typed clients
private readonly IListenKeyRestClient? _listenKeyRestClient;
private readonly ISpotSymbolRestClient _spotSymbolRestClient;
private readonly IBalanceRestClient _balanceRestClient;
private readonly IBalanceSocketClient _balanceSocketClient;
private readonly ISpotOrderRestClient _spotOrderRestClient;
private readonly ISpotOrderSocketClient _spotOrderSocketClient;
private readonly IUserTradeSocketClient? _userTradeSocketClient;
private readonly ILogger _logger;
// State management // State management
private DateTime? _startTime = null; protected DateTime? _startTime = null;
private DateTime? _lastPollAttempt = null; protected DateTime? _lastPollAttempt = null;
private bool _lastPollSuccessful = false; protected bool _lastPollSuccessful = false;
private DateTime? _lastPollTimeOrders = null; protected DateTime? _lastPollTimeOrders = null;
private DateTime? _lastPollTimeTrades = null; protected DateTime? _lastPollTimeTrades = null;
private DateTime? _lastDataTimeOrdersBeforeDisconnect = null; protected DateTime? _lastDataTimeOrdersBeforeDisconnect = null;
private DateTime? _lastDataTimeTradesBeforeDisconnect = null; protected DateTime? _lastDataTimeTradesBeforeDisconnect = null;
private bool _firstPollDone = false; protected bool _firstPollDone = false;
protected bool _wasDisconnected = false;
// Config // Config
private List<SharedSymbol> _symbols = new List<SharedSymbol>(); protected List<SharedSymbol> _symbols = new List<SharedSymbol>();
private TimeSpan _pollIntervalConnected; protected TimeSpan _pollIntervalConnected;
private TimeSpan _pollIntervalDisconnected; protected TimeSpan _pollIntervalDisconnected;
private bool _pollAtStart; protected bool _pollAtStart;
private bool _onlyTrackProvidedSymbols; protected bool _onlyTrackProvidedSymbols;
private bool _trackTrades = true; protected bool _trackTrades = true;
// Subscriptions
private UpdateSubscription? _balanceSubscription;
private UpdateSubscription? _orderSubscription;
private UpdateSubscription? _tradeSubscription;
private AsyncResetEvent _pollWaitEvent = new AsyncResetEvent(false, true); protected AsyncResetEvent _pollWaitEvent = new AsyncResetEvent(false, true);
private Task? _pollTask; protected Task? _pollTask;
private CancellationTokenSource? _cts; protected CancellationTokenSource? _cts;
private object _symbolLock = new object(); protected object _symbolLock = new object();
private bool Connected =>
_balanceSubscription?.Status == Sockets.Default.SocketStatus.Connected
&& _orderSubscription?.Status == Sockets.Default.SocketStatus.Connected
&& _tradeSubscription == null || _tradeSubscription?.Status == Sockets.Default.SocketStatus.Connected;
/// <inheritdoc />
public event Action<UserDataUpdate<SharedBalance[]>>? OnBalanceUpdate;
/// <inheritdoc />
public event Action<UserDataUpdate<SharedSpotOrder[]>>? OnOrderUpdate;
/// <inheritdoc />
public event Action<UserDataUpdate<SharedUserTrade[]>>? OnTradeUpdate;
/// <inheritdoc /> /// <inheritdoc />
public string? UserIdentifier { get; } public string? UserIdentifier { get; }
/// <inheritdoc /> /// <inheritdoc />
public SharedBalance[] Balances => _balanceStore.Values.ToArray(); public IEnumerable<SharedSymbol> TrackedSymbols => _symbols.AsEnumerable();
/// <inheritdoc />
public SharedSpotOrder[] Orders => _orderStore.Values.ToArray();
/// <inheritdoc />
public SharedUserTrade[] Trades => _tradeStore.Values.ToArray();
/// <summary> private bool _connected;
/// ctor /// <inheritdoc />
/// </summary> public bool Connected
protected UserDataTracker(
ILogger logger,
ISharedClient restClient,
ISharedClient socketClient,
string? userIdentifier,
UserDataTrackerConfig config
)
{ {
get => _connected;
protected set
{
if (_connected == value)
return;
_connected = value;
if (!_connected)
_wasDisconnected = true;
else
_pollWaitEvent.Set();
InvokeConnectedStatusChanged();
}
}
/// <inheritdoc />
public event Action<bool>? OnConnectedStatusChange;
public UserDataTracker(ILogger logger, UserDataTrackerConfig config, string? userIdentifier)
{
if (config.OnlyTrackProvidedSymbols && !config.TrackedSymbols.Any())
throw new ArgumentException(nameof(config.TrackedSymbols), "Conflicting options; `OnlyTrackProvidedSymbols` but no symbols specific in `TrackedSymbols`");
_logger = logger; _logger = logger;
_spotSymbolRestClient = (ISpotSymbolRestClient)restClient;
_balanceRestClient = (IBalanceRestClient)restClient;
_balanceSocketClient = (IBalanceSocketClient)socketClient;
_spotOrderRestClient = (ISpotOrderRestClient)restClient;
_spotOrderSocketClient = (ISpotOrderSocketClient)socketClient;
_listenKeyRestClient = restClient as IListenKeyRestClient;
_userTradeSocketClient = socketClient as IUserTradeSocketClient;
_pollIntervalConnected = config.PollIntervalConnected; _pollIntervalConnected = config.PollIntervalConnected;
_pollIntervalDisconnected = config.PollIntervalDisconnected; _pollIntervalDisconnected = config.PollIntervalDisconnected;
_symbols = config.Symbols?.ToList() ?? []; _symbols = config.TrackedSymbols?.ToList() ?? [];
_onlyTrackProvidedSymbols = config.OnlyTrackProvidedSymbols; _onlyTrackProvidedSymbols = config.OnlyTrackProvidedSymbols;
_pollAtStart = config.PollAtStart; _pollAtStart = config.PollAtStart;
_trackTrades = config.TrackTrades; _trackTrades = config.TrackTrades;
@ -112,388 +85,38 @@ namespace CryptoExchange.Net.Trackers.UserData
UserIdentifier = userIdentifier; UserIdentifier = userIdentifier;
} }
/// <inheritdoc /> protected void InvokeConnectedStatusChanged()
{
OnConnectedStatusChange?.Invoke(Connected);
}
public async Task<CallResult> StartAsync() public async Task<CallResult> StartAsync()
{ {
_startTime = DateTime.UtcNow; _startTime = DateTime.UtcNow;
_logger.LogDebug("Starting UserDataTracker");
_cts = new CancellationTokenSource(); _cts = new CancellationTokenSource();
// Request symbols so SharedSymbol property can be filled on updates var start = await DoStartAsync().ConfigureAwait(false);
var symbolResult = await _spotSymbolRestClient.GetSpotSymbolsAsync(new GetSymbolsRequest()).ConfigureAwait(false); if (!start)
if (!symbolResult) return start;
{
_logger.LogWarning("Failed to start UserDataTracker; symbols request failed: {Error}", symbolResult.Error!.Message);
return symbolResult;
}
string? listenKey = null; Connected = true;
if (_listenKeyRestClient != null)
{
var lkResult = await _listenKeyRestClient.StartListenKeyAsync(new StartListenKeyRequest()).ConfigureAwait(false);
if (!lkResult)
{
_logger.LogWarning("Failed to start UserDataTracker; listen key request failed: {Error}", lkResult.Error!.Message);
return lkResult;
}
listenKey = lkResult.Data;
}
var subBalanceResult = await _balanceSocketClient.SubscribeToBalanceUpdatesAsync(new SubscribeBalancesRequest(listenKey), x => HandleBalanceUpdate(UpdateSource.Push, x.Data), ct: _cts.Token).ConfigureAwait(false);
if (!subBalanceResult)
{
_logger.LogWarning("Failed to start UserDataTracker; failed to subscribe to balance stream: {Error}", subBalanceResult.Error!.Message);
return subBalanceResult;
}
_balanceSubscription = subBalanceResult.Data;
subBalanceResult.Data.SubscriptionStatusChanged += BalanceSubscriptionStatusChanged;
var subOrderResult = await _spotOrderSocketClient.SubscribeToSpotOrderUpdatesAsync(new SubscribeSpotOrderRequest(listenKey), x => HandleOrderUpdate(UpdateSource.Push, x.Data), ct: _cts.Token).ConfigureAwait(false);
if (!subOrderResult)
{
_cts.Cancel();
_logger.LogWarning("Failed to start UserDataTracker; failed to subscribe to order stream: {Error}", subOrderResult.Error!.Message);
return subOrderResult;
}
_orderSubscription = subOrderResult.Data;
subOrderResult.Data.SubscriptionStatusChanged += OrderSubscriptionStatusChanged;
if (_userTradeSocketClient != null && _trackTrades)
{
var subTradeResult = await _userTradeSocketClient.SubscribeToUserTradeUpdatesAsync(new SubscribeUserTradeRequest(listenKey), x => HandleTradeUpdate(UpdateSource.Push, x.Data), ct: _cts.Token).ConfigureAwait(false);
if (!subOrderResult)
{
_cts.Cancel();
_logger.LogWarning("Failed to start UserDataTracker; failed to subscribe to trade stream: {Error}", subTradeResult.Error!.Message);
return subOrderResult;
}
_tradeSubscription = subTradeResult.Data;
subTradeResult.Data.SubscriptionStatusChanged += TradeSubscriptionStatusChanged;
}
_pollTask = PollAsync(); _pollTask = PollAsync();
_logger.LogDebug("Started UserDataTracker");
return CallResult.SuccessResult; return CallResult.SuccessResult;
} }
private void UpdateSymbolsList(IEnumerable<SharedSymbol> symbols) protected abstract Task<CallResult> DoStartAsync();
/// <inheritdoc />
public async Task StopAsync()
{ {
lock (_symbolLock) _logger.LogDebug("Stopping UserDataTracker");
{ _cts?.Cancel();
foreach (var symbol in symbols.Distinct())
{
if (!_symbols.Any(x => x.TradingMode == symbol.TradingMode && x.BaseAsset == symbol.BaseAsset && x.QuoteAsset == symbol.QuoteAsset))
{
_symbols.Add(symbol);
_logger.LogDebug("Adding {BaseAsset}/{QuoteAsset} to symbol tracking list", symbol.BaseAsset, symbol.QuoteAsset);
}
}
}
}
private void HandleTradeUpdate(UpdateSource source, SharedUserTrade[] @event) if (_pollTask != null)
{ await _pollTask.ConfigureAwait(false);
var unknownSymbols = @event.Where(x => x.SharedSymbol == null);
if (unknownSymbols.Any())
{
_logger.LogWarning("Received order without SharedSymbol set, ignoring");
@event = @event.Except(unknownSymbols).ToArray();
}
if (!_onlyTrackProvidedSymbols) _logger.LogDebug("Stopped UserDataTracker");
UpdateSymbolsList(@event.Select(x => x.SharedSymbol!));
else
@event = @event.Where(x => _symbols.Any(y => y.TradingMode == x.SharedSymbol!.TradingMode && y.BaseAsset == x.SharedSymbol.BaseAsset && y.QuoteAsset == x.SharedSymbol.QuoteAsset)).ToArray();
// Update local store
var updatedIds = @event.Select(x => x.Id).ToList();
foreach (var item in @event)
{
if (_tradeStore.TryAdd(item.Id, item))
_logger.LogDebug("Added user trade {Symbol}.{Id}", item.Symbol, item.Id);
else
updatedIds.Remove(item.Id);
}
if (updatedIds.Count > 0)
{
OnTradeUpdate?.Invoke(
new UserDataUpdate<SharedUserTrade[]>
{
Source = source,
Data = _tradeStore.Where(x => updatedIds.Contains(x.Key)).Select(x => x.Value).ToArray()
});
}
}
private void HandleOrderUpdate(UpdateSource source, SharedSpotOrder[] @event)
{
var unknownSymbols = @event.Where(x => x.SharedSymbol == null);
if (unknownSymbols.Any())
{
_logger.LogWarning("Received order without SharedSymbol set, ignoring");
@event = @event.Except(unknownSymbols).ToArray();
}
if (!_onlyTrackProvidedSymbols)
UpdateSymbolsList(@event.Select(x => x.SharedSymbol!));
else
@event = @event.Where(x => _symbols.Any(y => y.TradingMode == x.SharedSymbol!.TradingMode && y.BaseAsset == x.SharedSymbol.BaseAsset && y.QuoteAsset == x.SharedSymbol.QuoteAsset)).ToArray();
// Update local store
var updatedIds = @event.Select(x => x.OrderId).ToList();
foreach (var item in @event)
{
bool orderExisted = false;
_orderStore.AddOrUpdate(item.OrderId, item, (id, existing) =>
{
orderExisted = true;
var updated = UpdateSpotOrder(existing, item);
if (!updated)
updatedIds.Remove(id);
else
_logger.LogDebug("Updated spot order {Symbol}.{Id}", item.Symbol, item.OrderId);
return existing;
});
if (!orderExisted)
_logger.LogDebug("Added spot order {Symbol}.{Id}", item.Symbol, item.OrderId);
}
if (updatedIds.Count > 0)
{
OnOrderUpdate?.Invoke(
new UserDataUpdate<SharedSpotOrder[]>
{
Source = source,
Data = _orderStore.Where(x => updatedIds.Contains(x.Key)).Select(x => x.Value).ToArray()
});
}
var trades = @event.Where(x => x.LastTrade != null).Select(x => x.LastTrade!).ToArray();
if (trades.Length != 0)
HandleTradeUpdate(source, trades);
}
private void HandleBalanceUpdate(UpdateSource source, SharedBalance[] @event)
{
// Update local store
var updatedAssets = @event.Select(x => x.Asset).ToList();
foreach (var item in @event)
{
bool balanceExisted = false;
_balanceStore.AddOrUpdate(item.Asset, item, (asset, existing) =>
{
balanceExisted = true;
var updated = UpdateBalance(existing, item);
if (!updated)
updatedAssets.Remove(asset);
else
_logger.LogDebug("Updated balance for {Asset}", item.Asset);
return existing;
});
if (!balanceExisted)
_logger.LogDebug("Added balance for {Asset}", item.Asset);
}
if (updatedAssets.Count > 0)
{
OnBalanceUpdate?.Invoke(
new UserDataUpdate<SharedBalance[]>
{
Source = source,
Data = _balanceStore.Where(x => updatedAssets.Contains(x.Key)).Select(x => x.Value).ToArray()
});
}
}
private void BalanceSubscriptionStatusChanged(SubscriptionStatus newState)
{
_logger.LogDebug("Balance stream status changed: {NewState}", newState);
if (newState == SubscriptionStatus.Subscribed)
// Trigger REST polling since we weren't connected
_pollWaitEvent.Set();
}
private void OrderSubscriptionStatusChanged(SubscriptionStatus newState)
{
_logger.LogDebug("Order stream status changed: {NewState}", newState);
if (newState == SubscriptionStatus.Pending)
{
// Record last data receive time since we need to request data from that timestamp on when polling
// Only set to new value if it isn't already set since if we disconnect/reconnect a couple of times without
// managing to do a poll we don't want to override the time since we still need to request that earlier data
if (_lastDataTimeOrdersBeforeDisconnect == null)
_lastDataTimeOrdersBeforeDisconnect = _orderSubscription!.LastReceiveTime;
}
if (newState == SubscriptionStatus.Subscribed)
// Trigger REST polling since we weren't connected
_pollWaitEvent.Set();
}
private void TradeSubscriptionStatusChanged(SubscriptionStatus newState)
{
_logger.LogDebug("Trade stream status changed: {NewState}", newState);
if (newState == SubscriptionStatus.Pending)
{
// Record last data receive time since we need to request data from that timestamp on when polling
// Only set to new value if it isn't already set since if we disconnect/reconnect a couple of times without
// managing to do a poll we don't want to override the time since we still need to request that earlier data
if (_lastDataTimeTradesBeforeDisconnect == null)
_lastDataTimeTradesBeforeDisconnect = _tradeSubscription?.LastReceiveTime;
}
if (newState == SubscriptionStatus.Subscribed)
// Trigger REST polling since we weren't connected
_pollWaitEvent.Set();
}
private bool UpdateBalance(SharedBalance existingBalance, SharedBalance newBalance)
{
// Some other way to way to determine sequence? Maybe timestamp?
var changed = false;
if (existingBalance.Total != newBalance.Total)
{
existingBalance.Total = newBalance.Total;
changed = true;
}
if (existingBalance.Available != newBalance.Available)
{
existingBalance.Available = newBalance.Available;
changed = true;
}
return changed;
}
private bool UpdateSpotOrder(SharedSpotOrder existingOrder, SharedSpotOrder newOrder)
{
if (CheckIfOrderUpdateIsNewer(existingOrder, newOrder) == false)
// Update is older than the existing data, ignore
return false;
var changed = false;
if (newOrder.AveragePrice != null && newOrder.AveragePrice != existingOrder.AveragePrice)
{
existingOrder.AveragePrice = newOrder.AveragePrice;
changed = true;
}
if (newOrder.OrderPrice != null && newOrder.OrderPrice != existingOrder.OrderPrice)
{
existingOrder.OrderPrice = newOrder.OrderPrice;
changed = true;
}
if (newOrder.Fee != null && newOrder.Fee != existingOrder.Fee)
{
existingOrder.Fee = newOrder.Fee;
changed = true;
}
if (newOrder.FeeAsset != null && newOrder.FeeAsset != existingOrder.FeeAsset)
{
existingOrder.FeeAsset = newOrder.FeeAsset;
changed = true;
}
if (newOrder.OrderQuantity != null && newOrder.OrderQuantity != existingOrder.OrderQuantity)
{
existingOrder.OrderQuantity = newOrder.OrderQuantity;
changed = true;
}
if (newOrder.QuantityFilled != null && newOrder.QuantityFilled != existingOrder.QuantityFilled)
{
existingOrder.QuantityFilled = newOrder.QuantityFilled;
changed = true;
}
if (newOrder.Status != existingOrder.Status)
{
existingOrder.Status = newOrder.Status;
changed = true;
}
if (newOrder.UpdateTime != null && newOrder.UpdateTime != existingOrder.UpdateTime)
{
existingOrder.UpdateTime = newOrder.UpdateTime;
changed = true;
}
return changed;
}
private bool? CheckIfOrderUpdateIsNewer(SharedSpotOrder existingOrder, SharedSpotOrder newOrder)
{
if (existingOrder.Status == SharedOrderStatus.Open && newOrder.Status != SharedOrderStatus.Open)
// status changed from open to not open
return true;
if (existingOrder.Status != SharedOrderStatus.Open && newOrder.Status == SharedOrderStatus.Open)
// status changed from not open to open; stale
return false;
if (existingOrder.UpdateTime != null && newOrder.UpdateTime != null)
{
// If both have an update time base of that
if (existingOrder.UpdateTime < newOrder.UpdateTime)
return true;
if (existingOrder.UpdateTime > newOrder.UpdateTime)
return false;
}
if (existingOrder.QuantityFilled != null && newOrder.QuantityFilled != null)
{
if (existingOrder.QuantityFilled.QuantityInBaseAsset != null && newOrder.QuantityFilled.QuantityInBaseAsset != null)
{
// If base quantity is not null we can base it on that
if (existingOrder.QuantityFilled.QuantityInBaseAsset < newOrder.QuantityFilled.QuantityInBaseAsset)
return true;
else if (existingOrder.QuantityFilled.QuantityInBaseAsset > newOrder.QuantityFilled.QuantityInBaseAsset)
return false;
}
if (existingOrder.QuantityFilled.QuantityInQuoteAsset != null && newOrder.QuantityFilled.QuantityInQuoteAsset != null)
{
// If quote quantity is not null we can base it on that
if (existingOrder.QuantityFilled.QuantityInQuoteAsset < newOrder.QuantityFilled.QuantityInQuoteAsset)
return true;
else if (existingOrder.QuantityFilled.QuantityInQuoteAsset > newOrder.QuantityFilled.QuantityInQuoteAsset)
return false;
}
}
if (existingOrder.Fee != null && newOrder.Fee != null)
{
// Higher fee means later processing
if (existingOrder.Fee < newOrder.Fee)
return true;
if (existingOrder.Fee > newOrder.Fee)
return false;
}
return null;
} }
private TimeSpan? GetNextPollDelay() private TimeSpan? GetNextPollDelay()
@ -519,11 +142,11 @@ namespace CryptoExchange.Net.Trackers.UserData
return _pollIntervalConnected; return _pollIntervalConnected;
} }
private async Task PollAsync() public async Task PollAsync()
{ {
while (!_cts!.IsCancellationRequested) while (!_cts!.IsCancellationRequested)
{ {
var delayForNextPoll = GetNextPollDelay(); var delayForNextPoll = GetNextPollDelay();
if (delayForNextPoll != TimeSpan.Zero) if (delayForNextPoll != TimeSpan.Zero)
{ {
try try
@ -540,109 +163,49 @@ namespace CryptoExchange.Net.Trackers.UserData
if (_cts.IsCancellationRequested) if (_cts.IsCancellationRequested)
break; break;
if (_lastPollAttempt != null && (DateTime.UtcNow - _lastPollAttempt.Value) < TimeSpan.FromSeconds(2)) if (_lastPollAttempt != null
&& (DateTime.UtcNow - _lastPollAttempt.Value) < TimeSpan.FromSeconds(2)
&& !(Connected && _wasDisconnected))
{ {
if (_lastPollSuccessful) if (_lastPollSuccessful)
// If last poll was less than 2 seconds ago and it was successful don't bother immediately polling again // If last poll was less than 2 seconds ago and it was successful don't bother immediately polling again
continue; continue;
} }
_logger.LogDebug("Starting user data requesting"); if (Connected)
_lastPollAttempt = DateTime.UtcNow; _wasDisconnected = false;
_lastPollSuccessful = false; _lastPollSuccessful = false;
var anyError = false; try
var balancesResult = await _balanceRestClient.GetBalancesAsync(new GetBalancesRequest()).ConfigureAwait(false);
if (!balancesResult.Success)
{ {
// .. ? var anyError = await DoPollAsync().ConfigureAwait(false);
var transientError = balancesResult.Error!.IsTransient;
// If transient we can retry
// Should communicate errors, also for websocket disconnecting
anyError = true; _lastPollAttempt = DateTime.UtcNow;
_lastPollSuccessful = !anyError;
} }
else catch (Exception ex)
{ {
HandleBalanceUpdate(UpdateSource.Poll, balancesResult.Data); _logger.LogError(ex, "UserDataTracker polling exception");
} }
var openOrdersResult = await _spotOrderRestClient.GetOpenSpotOrdersAsync(new GetOpenOrdersRequest()).ConfigureAwait(false);
if (!openOrdersResult.Success)
{
// .. ?
anyError = true;
}
else
{
HandleOrderUpdate(UpdateSource.Poll, openOrdersResult.Data);
}
foreach (var symbol in _symbols)
{
var fromTimeOrders = _lastDataTimeOrdersBeforeDisconnect ?? _lastPollTimeOrders ?? _startTime;
var updatedPollTime = DateTime.UtcNow;
var closedOrdersResult = await _spotOrderRestClient.GetClosedSpotOrdersAsync(new GetClosedOrdersRequest(symbol, startTime: fromTimeOrders)).ConfigureAwait(false);
if (!closedOrdersResult.Success)
{
// .. ?
anyError = true;
}
else
{
_lastDataTimeOrdersBeforeDisconnect = null;
_lastPollTimeOrders = updatedPollTime;
// Filter orders to only include where close time is after the start time
var relevantOrders = closedOrdersResult.Data.Where(x =>
x.UpdateTime != null && x.UpdateTime >= _startTime // Updated after the tracker start time
|| x.CreateTime != null && x.CreateTime >= _startTime // Created after the tracker start time
|| x.CreateTime == null && x.UpdateTime == null // Unknown time
).ToArray();
if (relevantOrders.Length > 0)
HandleOrderUpdate(UpdateSource.Poll, relevantOrders);
}
if (_trackTrades)
{
var fromTimeTrades = _lastDataTimeTradesBeforeDisconnect ?? _lastPollTimeTrades ?? _startTime;
var tradesResult = await _spotOrderRestClient.GetSpotUserTradesAsync(new GetUserTradesRequest(symbol, startTime: fromTimeTrades)).ConfigureAwait(false);
if (!tradesResult.Success)
{
// .. ?
anyError = true;
}
else
{
_lastDataTimeTradesBeforeDisconnect = null;
_lastPollTimeTrades = updatedPollTime;
// Filter trades to only include where timestamp is after the start time OR it's part of an order we're tracking
var relevantTrades = tradesResult.Data.Where(x => x.Timestamp >= _startTime || _orderStore.ContainsKey(x.OrderId)).ToArray();
if (relevantTrades.Length > 0)
HandleTradeUpdate(UpdateSource.Poll, tradesResult.Data);
}
}
}
_lastPollSuccessful = !anyError;
_logger.LogDebug("User data requesting completed");
} }
} }
/// <inheritdoc /> protected abstract Task<bool> DoPollAsync();
public async Task StopAsync()
protected void UpdateSymbolsList(IEnumerable<SharedSymbol> symbols)
{ {
_logger.LogDebug("Stopping UserDataTracker"); lock (_symbolLock)
_cts?.Cancel(); {
foreach (var symbol in symbols.Distinct())
if (_pollTask != null) {
await _pollTask.ConfigureAwait(false); if (!_symbols.Any(x => x.TradingMode == symbol.TradingMode && x.BaseAsset == symbol.BaseAsset && x.QuoteAsset == symbol.QuoteAsset))
{
_logger.LogDebug("Stopped UserDataTracker"); _symbols.Add(symbol);
_logger.LogDebug("Adding {BaseAsset}/{QuoteAsset} to symbol tracking list", symbol.BaseAsset, symbol.QuoteAsset);
}
}
}
} }
} }
} }

View File

@ -11,9 +11,9 @@ namespace CryptoExchange.Net.Trackers.UserData
public record UserDataTrackerConfig public record UserDataTrackerConfig
{ {
/// <summary> /// <summary>
/// Symbols to initially track, used when polling data. Other symbols will get tracked when updates are received for orders or trades on a new symbol and when there are open orders on a new symbol. To only track the symbols specified here set `OnlyTrackProvidedSymbols` to true. /// Symbols to initially track, used when polling data. Other symbols will get tracked when updates are received for orders or trades on a new symbol and when there are open orders or positions on a new symbol. To only track the symbols specified here set `OnlyTrackProvidedSymbols` to true.
/// </summary> /// </summary>
public IEnumerable<SharedSymbol> Symbols { get; set; } = []; public IEnumerable<SharedSymbol> TrackedSymbols { get; set; } = [];
/// <summary> /// <summary>
/// If true only orders and trades in the `Symbols` options will get tracked, data on other symbols will be ignored. /// If true only orders and trades in the `Symbols` options will get tracked, data on other symbols will be ignored.
/// </summary> /// </summary>
@ -25,13 +25,13 @@ namespace CryptoExchange.Net.Trackers.UserData
/// <summary> /// <summary>
/// Interval to poll data at while the websocket is disconnected. /// Interval to poll data at while the websocket is disconnected.
/// </summary> /// </summary>
public TimeSpan PollIntervalDisconnected { get; set; } = TimeSpan.Zero; public TimeSpan PollIntervalDisconnected { get; set; } = TimeSpan.FromSeconds(30);
/// <summary> /// <summary>
/// Whether to poll for data initially when starting the tracker. /// Whether to poll for data initially when starting the tracker.
/// </summary> /// </summary>
public bool PollAtStart { get; set; } = true; public bool PollAtStart { get; set; } = true;
/// <summary> /// <summary>
/// Whether to track order trades, can lead to increased requests when polling /// Whether to track order trades, can lead to increased requests when polling since they're requested per symbol.
/// </summary> /// </summary>
public bool TrackTrades { get; set; } = true; public bool TrackTrades { get; set; } = true;
} }

View File

@ -0,0 +1,712 @@
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Sockets;
using CryptoExchange.Net.SharedApis;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Trackers.UserData
{
/// <summary>
/// User data tracker
/// </summary>
public abstract class UserFuturesDataTracker : UserDataTracker, IUserFuturesDataTracker
{
// Cached data
private ConcurrentDictionary<string, SharedBalance> _balanceStore = new ConcurrentDictionary<string, SharedBalance>();
private ConcurrentDictionary<string, SharedPosition> _positionStore = new ConcurrentDictionary<string, SharedPosition>();
private ConcurrentDictionary<string, SharedFuturesOrder> _orderStore = new ConcurrentDictionary<string, SharedFuturesOrder>();
private ConcurrentDictionary<string, SharedUserTrade> _tradeStore = new ConcurrentDictionary<string, SharedUserTrade>();
// Typed clients
private readonly IListenKeyRestClient? _listenKeyRestClient;
private readonly IFuturesSymbolRestClient _futuresSymbolRestClient;
private readonly IBalanceRestClient _balanceRestClient;
private readonly IBalanceSocketClient _balanceSocketClient;
private readonly IFuturesOrderRestClient _futuresOrderRestClient;
private readonly IFuturesOrderSocketClient _futuresOrderSocketClient;
private readonly IPositionSocketClient _positionSocketClient;
private readonly IUserTradeSocketClient? _userTradeSocketClient;
// Subscriptions
private UpdateSubscription? _balanceSubscription;
private UpdateSubscription? _orderSubscription;
private UpdateSubscription? _positionSubscription;
private UpdateSubscription? _tradeSubscription;
private ExchangeParameters? _exchangeParameters;
/// <summary>
/// Whether updates received from the websocket for positions are full snapshots, or individual updates for positions.
/// This is used to determine if positions should be removed if no longer present in the update data.
/// </summary>
protected abstract bool WebsocketPositionUpdatesAreFullSnapshots { get; }
/// <inheritdoc />
public event Func<UserDataUpdate<SharedBalance[]>, Task>? OnBalanceUpdate;
/// <inheritdoc />
public event Func<UserDataUpdate<SharedFuturesOrder[]>, Task>? OnOrderUpdate;
/// <inheritdoc />
public event Func<UserDataUpdate<SharedPosition[]>, Task>? OnPositionUpdate;
/// <inheritdoc />
public event Func<UserDataUpdate<SharedUserTrade[]>, Task>? OnTradeUpdate;
/// <inheritdoc />
public SharedBalance[] Balances => _balanceStore.Values.ToArray();
/// <inheritdoc />
public SharedFuturesOrder[] Orders => _orderStore.Values.ToArray();
/// <inheritdoc />
public SharedPosition[] Positions => _positionStore.Values.ToArray();
/// <inheritdoc />
public SharedUserTrade[] Trades => _tradeStore.Values.ToArray();
/// <summary>
/// ctor
/// </summary>
protected UserFuturesDataTracker(
ILogger logger,
ISharedClient restClient,
ISharedClient socketClient,
string? userIdentifier,
UserDataTrackerConfig config,
ExchangeParameters? exchangeParameters = null) : base(logger, config, userIdentifier)
{
_exchangeParameters = exchangeParameters;
_futuresSymbolRestClient = (IFuturesSymbolRestClient)restClient;
_balanceRestClient = (IBalanceRestClient)restClient;
_balanceSocketClient = (IBalanceSocketClient)socketClient;
_futuresOrderRestClient = (IFuturesOrderRestClient)restClient;
_futuresOrderSocketClient = (IFuturesOrderSocketClient)socketClient;
_positionSocketClient = (IPositionSocketClient)socketClient;
_listenKeyRestClient = restClient as IListenKeyRestClient;
_userTradeSocketClient = socketClient as IUserTradeSocketClient;
}
/// <inheritdoc />
protected override async Task<CallResult> DoStartAsync()
{
_logger.LogDebug("Starting UserDataTracker");
// Request symbols so SharedSymbol property can be filled on updates
var symbolResult = await _futuresSymbolRestClient.GetFuturesSymbolsAsync(new GetSymbolsRequest(exchangeParameters: _exchangeParameters)).ConfigureAwait(false);
if (!symbolResult)
{
_logger.LogWarning("Failed to start UserFuturesDataTracker; symbols request failed: {Error}", symbolResult.Error!.Message);
return symbolResult;
}
string? listenKey = null;
if (_listenKeyRestClient != null)
{
var lkResult = await _listenKeyRestClient.StartListenKeyAsync(new StartListenKeyRequest(exchangeParameters: _exchangeParameters)).ConfigureAwait(false);
if (!lkResult)
{
_logger.LogWarning("Failed to start UserFuturesDataTracker; listen key request failed: {Error}", lkResult.Error!.Message);
return lkResult;
}
listenKey = lkResult.Data;
}
var subBalanceResult = await ExchangeHelpers.ProcessQueuedAsync<SharedBalance[]>(
async handler => await _balanceSocketClient.SubscribeToBalanceUpdatesAsync(new SubscribeBalancesRequest(listenKey, exchangeParameters: _exchangeParameters), handler, ct: _cts!.Token).ConfigureAwait(false),
x => HandleBalanceUpdateAsync(UpdateSource.Push, x.Data)).ConfigureAwait(false);
if (!subBalanceResult)
{
_logger.LogWarning("Failed to start UserFuturesDataTracker; failed to subscribe to balance stream: {Error}", subBalanceResult.Error!.Message);
return subBalanceResult;
}
_balanceSubscription = subBalanceResult.Data;
subBalanceResult.Data.SubscriptionStatusChanged += BalanceSubscriptionStatusChanged;
var subOrderResult = await ExchangeHelpers.ProcessQueuedAsync<SharedFuturesOrder[]>(
async handler => await _futuresOrderSocketClient.SubscribeToFuturesOrderUpdatesAsync(new SubscribeFuturesOrderRequest(listenKey, exchangeParameters: _exchangeParameters), handler, ct: _cts!.Token).ConfigureAwait(false),
x => HandleOrderUpdateAsync(UpdateSource.Push, x.Data)).ConfigureAwait(false);
if (!subOrderResult)
{
_cts!.Cancel();
_logger.LogWarning("Failed to start UserFuturesDataTracker; failed to subscribe to order stream: {Error}", subOrderResult.Error!.Message);
return subOrderResult;
}
_orderSubscription = subOrderResult.Data;
subOrderResult.Data.SubscriptionStatusChanged += OrderSubscriptionStatusChanged;
if (_positionSocketClient != null)
{
var subPositionResult = await ExchangeHelpers.ProcessQueuedAsync<SharedPosition[]>(
async handler => await _positionSocketClient.SubscribeToPositionUpdatesAsync(new SubscribePositionRequest(listenKey, exchangeParameters: _exchangeParameters), handler, ct: _cts!.Token).ConfigureAwait(false),
x => HandlePositionUpdateAsync(UpdateSource.Push, x.Data)).ConfigureAwait(false);
if (!subPositionResult)
{
_cts!.Cancel();
_logger.LogWarning("Failed to start UserFuturesDataTracker; failed to subscribe to trade stream: {Error}", subPositionResult.Error!.Message);
return subPositionResult;
}
_positionSubscription = subPositionResult.Data;
subPositionResult.Data.SubscriptionStatusChanged += PositionSubscriptionStatusChanged;
}
if (_userTradeSocketClient != null && _trackTrades)
{
var subTradeResult = await ExchangeHelpers.ProcessQueuedAsync<SharedUserTrade[]>(
async handler => await _userTradeSocketClient.SubscribeToUserTradeUpdatesAsync(new SubscribeUserTradeRequest(listenKey, exchangeParameters: _exchangeParameters), handler, ct: _cts!.Token).ConfigureAwait(false),
x => HandleTradeUpdateAsync(UpdateSource.Push, x.Data)).ConfigureAwait(false);
if (!subOrderResult)
{
_cts.Cancel();
_logger.LogWarning("Failed to start UserFuturesDataTracker; failed to subscribe to trade stream: {Error}", subTradeResult.Error!.Message);
return subOrderResult;
}
_tradeSubscription = subTradeResult.Data;
subTradeResult.Data.SubscriptionStatusChanged += TradeSubscriptionStatusChanged;
}
_logger.LogDebug("Started UserFuturesDataTracker");
return CallResult.SuccessResult;
}
private async Task HandleTradeUpdateAsync(UpdateSource source, SharedUserTrade[] @event)
{
var unknownSymbols = @event.Where(x => x.SharedSymbol == null);
if (unknownSymbols.Any())
{
_logger.LogWarning("Received order without SharedSymbol set, ignoring");
@event = @event.Except(unknownSymbols).ToArray();
}
if (!_onlyTrackProvidedSymbols)
UpdateSymbolsList(@event.Select(x => x.SharedSymbol!));
else
@event = @event.Where(x => _symbols.Any(y => y.TradingMode == x.SharedSymbol!.TradingMode && y.BaseAsset == x.SharedSymbol.BaseAsset && y.QuoteAsset == x.SharedSymbol.QuoteAsset)).ToArray();
// Update local store
var updatedIds = @event.Select(x => x.Id).ToList();
foreach (var item in @event)
{
if (_tradeStore.TryAdd(item.Id, item))
_logger.LogDebug("Added user trade {Symbol}.{Id}", item.Symbol, item.Id);
else
updatedIds.Remove(item.Id);
}
if (updatedIds.Count > 0 && OnTradeUpdate != null)
{
await OnTradeUpdate.Invoke(
new UserDataUpdate<SharedUserTrade[]>
{
Source = source,
Data = _tradeStore.Where(x => updatedIds.Contains(x.Key)).Select(x => x.Value).ToArray()
}).ConfigureAwait(false);
}
}
private async Task HandleOrderUpdateAsync(UpdateSource source, SharedFuturesOrder[] @event)
{
var unknownSymbols = @event.Where(x => x.SharedSymbol == null);
if (unknownSymbols.Any())
{
_logger.LogWarning("Received order without SharedSymbol set, ignoring");
@event = @event.Except(unknownSymbols).ToArray();
}
if (!_onlyTrackProvidedSymbols)
UpdateSymbolsList(@event.Select(x => x.SharedSymbol!));
else
@event = @event.Where(x => _symbols.Any(y => y.TradingMode == x.SharedSymbol!.TradingMode && y.BaseAsset == x.SharedSymbol.BaseAsset && y.QuoteAsset == x.SharedSymbol.QuoteAsset)).ToArray();
// Update local store
var updatedIds = @event.Select(x => x.OrderId).ToList();
foreach (var item in @event)
{
bool orderExisted = false;
_orderStore.AddOrUpdate(item.OrderId, item, (id, existing) =>
{
orderExisted = true;
var updated = UpdateFuturesOrder(existing, item);
if (!updated)
updatedIds.Remove(id);
else
_logger.LogDebug("Updated futures order {Symbol}.{Id}", item.Symbol, item.OrderId);
return existing;
});
if (!orderExisted)
_logger.LogDebug("Added futures order {Symbol}.{Id}", item.Symbol, item.OrderId);
}
if (updatedIds.Count > 0 && OnOrderUpdate != null)
{
await OnOrderUpdate.Invoke(
new UserDataUpdate<SharedFuturesOrder[]>
{
Source = source,
Data = _orderStore.Where(x => updatedIds.Contains(x.Key)).Select(x => x.Value).ToArray()
}).ConfigureAwait(false);
}
var trades = @event.Where(x => x.LastTrade != null).Select(x => x.LastTrade!).ToArray();
if (trades.Length != 0)
await HandleTradeUpdateAsync(source, trades).ConfigureAwait(false);
}
private string PositionIdentifier(SharedPosition position) =>
position.Id ?? position.Symbol + position.PositionMode + (position.PositionMode != SharedPositionMode.OneWay ? position.PositionSide.ToString() : "");
private async Task HandlePositionUpdateAsync(UpdateSource source, SharedPosition[] @event)
{
// Update local store
var updatedPositions = @event.Select(PositionIdentifier).ToList();
if (WebsocketPositionUpdatesAreFullSnapshots)
{
// Reset any tracking position to zero/null values when it's no longer in the snapshot as it means there is no open position any more
var notInSnapshot = _positionStore.Where(x => !updatedPositions.Contains(x.Key) && x.Value.PositionSize != 0).ToList();
foreach (var position in notInSnapshot)
{
position.Value.UpdateTime = DateTime.UtcNow;
position.Value.AverageOpenPrice = null;
position.Value.LiquidationPrice = null;
position.Value.PositionSize = 0;
position.Value.StopLossPrice = null;
position.Value.TakeProfitPrice = null;
position.Value.UnrealizedPnl = null;
updatedPositions.Add(position.Key);
}
}
foreach (var item in @event)
{
bool positionExisted = false;
_positionStore.AddOrUpdate(PositionIdentifier(item), item, (key, existing) =>
{
positionExisted = true;
var updated = UpdatePosition(existing, item);
if (!updated)
updatedPositions.Remove(key);
else
_logger.LogDebug("Updated position for {Symbol}", item.Symbol);
return existing;
});
if (!positionExisted)
_logger.LogDebug("Added position for {Symbol}", item.Symbol);
}
if (updatedPositions.Count > 0 && OnPositionUpdate != null)
{
await OnPositionUpdate.Invoke(
new UserDataUpdate<SharedPosition[]>
{
Source = source,
Data = _positionStore.Where(x => updatedPositions.Contains(x.Key)).Select(x => x.Value).ToArray()
}).ConfigureAwait(false);
}
}
private async Task HandleBalanceUpdateAsync(UpdateSource source, SharedBalance[] @event)
{
// Update local store
var updatedAssets = @event.Select(x => x.Asset).ToList();
foreach (var item in @event)
{
bool balanceExisted = false;
_balanceStore.AddOrUpdate(item.Asset, item, (asset, existing) =>
{
balanceExisted = true;
var updated = UpdateBalance(existing, item);
if (!updated)
updatedAssets.Remove(asset);
else
_logger.LogDebug("Updated balance for {Asset}", item.Asset);
return existing;
});
if (!balanceExisted)
_logger.LogDebug("Added balance for {Asset}", item.Asset);
}
if (updatedAssets.Count > 0 && OnBalanceUpdate != null)
{
await OnBalanceUpdate.Invoke(
new UserDataUpdate<SharedBalance[]>
{
Source = source,
Data = _balanceStore.Where(x => updatedAssets.Contains(x.Key)).Select(x => x.Value).ToArray()
}).ConfigureAwait(false);
}
}
private void CheckConnectedChanged()
{
Connected = _balanceSubscription?.SubscriptionStatus == SubscriptionStatus.Subscribed
&& _orderSubscription?.SubscriptionStatus == SubscriptionStatus.Subscribed
&& (_tradeSubscription == null || _tradeSubscription.SubscriptionStatus == SubscriptionStatus.Subscribed)
&& (_positionSubscription == null || _positionSubscription.SubscriptionStatus == SubscriptionStatus.Subscribed);
}
private void BalanceSubscriptionStatusChanged(SubscriptionStatus newState)
{
_logger.LogDebug("Balance stream status changed: {NewState}", newState);
CheckConnectedChanged();
}
private void PositionSubscriptionStatusChanged(SubscriptionStatus newState)
{
_logger.LogDebug("Position stream status changed: {NewState}", newState);
CheckConnectedChanged();
}
private void OrderSubscriptionStatusChanged(SubscriptionStatus newState)
{
_logger.LogDebug("Order stream status changed: {NewState}", newState);
if (newState == SubscriptionStatus.Pending)
{
// Record last data receive time since we need to request data from that timestamp on when polling
// Only set to new value if it isn't already set since if we disconnect/reconnect a couple of times without
// managing to do a poll we don't want to override the time since we still need to request that earlier data
if (_lastDataTimeOrdersBeforeDisconnect == null)
_lastDataTimeOrdersBeforeDisconnect = _orderSubscription!.LastReceiveTime;
}
CheckConnectedChanged();
}
private void TradeSubscriptionStatusChanged(SubscriptionStatus newState)
{
_logger.LogDebug("Trade stream status changed: {NewState}", newState);
if (newState == SubscriptionStatus.Pending)
{
// Record last data receive time since we need to request data from that timestamp on when polling
// Only set to new value if it isn't already set since if we disconnect/reconnect a couple of times without
// managing to do a poll we don't want to override the time since we still need to request that earlier data
if (_lastDataTimeTradesBeforeDisconnect == null)
_lastDataTimeTradesBeforeDisconnect = _tradeSubscription?.LastReceiveTime;
}
CheckConnectedChanged();
}
private bool UpdateBalance(SharedBalance existingBalance, SharedBalance newBalance)
{
// Some other way to way to determine sequence? Maybe timestamp?
var changed = false;
if (existingBalance.Total != newBalance.Total)
{
existingBalance.Total = newBalance.Total;
changed = true;
}
if (existingBalance.Available != newBalance.Available)
{
existingBalance.Available = newBalance.Available;
changed = true;
}
return changed;
}
private bool UpdatePosition(SharedPosition existingPosition, SharedPosition newPosition)
{
// Some other way to way to determine sequence? Maybe timestamp?
var changed = false;
if (existingPosition.AverageOpenPrice != newPosition.AverageOpenPrice)
{
existingPosition.AverageOpenPrice = newPosition.AverageOpenPrice;
changed = true;
}
if (existingPosition.Leverage != newPosition.Leverage)
{
existingPosition.Leverage = newPosition.Leverage;
changed = true;
}
if (existingPosition.LiquidationPrice != newPosition.LiquidationPrice)
{
existingPosition.LiquidationPrice = newPosition.LiquidationPrice;
changed = true;
}
if (existingPosition.PositionSize != newPosition.PositionSize)
{
existingPosition.PositionSize = newPosition.PositionSize;
changed = true;
}
if (existingPosition.StopLossPrice != newPosition.StopLossPrice)
{
existingPosition.StopLossPrice = newPosition.StopLossPrice;
changed = true;
}
if (existingPosition.TakeProfitPrice != newPosition.TakeProfitPrice)
{
existingPosition.TakeProfitPrice = newPosition.TakeProfitPrice;
changed = true;
}
if (newPosition.UnrealizedPnl != null && existingPosition.UnrealizedPnl != newPosition.UnrealizedPnl)
{
existingPosition.UnrealizedPnl = newPosition.UnrealizedPnl;
changed = true;
}
if (newPosition.UpdateTime != null && existingPosition.UpdateTime != newPosition.UpdateTime)
{
existingPosition.UpdateTime = newPosition.UpdateTime;
// If update time is the only changed prop don't mark it as changed
}
return changed;
}
private bool UpdateFuturesOrder(SharedFuturesOrder existingOrder, SharedFuturesOrder newOrder)
{
if (CheckIfOrderUpdateIsNewer(existingOrder, newOrder) == false)
// Update is older than the existing data, ignore
return false;
var changed = false;
if (newOrder.AveragePrice != null && newOrder.AveragePrice != existingOrder.AveragePrice)
{
existingOrder.AveragePrice = newOrder.AveragePrice;
changed = true;
}
if (newOrder.OrderPrice != null && newOrder.OrderPrice != existingOrder.OrderPrice)
{
existingOrder.OrderPrice = newOrder.OrderPrice;
changed = true;
}
if (newOrder.Fee != null && newOrder.Fee != existingOrder.Fee)
{
existingOrder.Fee = newOrder.Fee;
changed = true;
}
if (newOrder.FeeAsset != null && newOrder.FeeAsset != existingOrder.FeeAsset)
{
existingOrder.FeeAsset = newOrder.FeeAsset;
changed = true;
}
if (newOrder.OrderQuantity != null && newOrder.OrderQuantity != existingOrder.OrderQuantity)
{
existingOrder.OrderQuantity = newOrder.OrderQuantity;
changed = true;
}
if (newOrder.QuantityFilled != null && newOrder.QuantityFilled != existingOrder.QuantityFilled)
{
existingOrder.QuantityFilled = newOrder.QuantityFilled;
changed = true;
}
if (newOrder.Status != existingOrder.Status)
{
existingOrder.Status = newOrder.Status;
changed = true;
}
if (newOrder.StopLossPrice != existingOrder.StopLossPrice)
{
existingOrder.StopLossPrice = newOrder.StopLossPrice;
changed = true;
}
if (newOrder.TakeProfitPrice != existingOrder.TakeProfitPrice)
{
existingOrder.TakeProfitPrice = newOrder.TakeProfitPrice;
changed = true;
}
if (newOrder.TriggerPrice != existingOrder.TriggerPrice)
{
existingOrder.TriggerPrice = newOrder.TriggerPrice;
changed = true;
}
if (newOrder.UpdateTime != null && newOrder.UpdateTime != existingOrder.UpdateTime)
{
existingOrder.UpdateTime = newOrder.UpdateTime;
changed = true;
}
return changed;
}
private bool? CheckIfOrderUpdateIsNewer(SharedFuturesOrder existingOrder, SharedFuturesOrder newOrder)
{
if (existingOrder.Status == SharedOrderStatus.Open && newOrder.Status != SharedOrderStatus.Open)
// status changed from open to not open
return true;
if (existingOrder.Status != SharedOrderStatus.Open && newOrder.Status == SharedOrderStatus.Open)
// status changed from not open to open; stale
return false;
if (existingOrder.UpdateTime != null && newOrder.UpdateTime != null)
{
// If both have an update time base of that
if (existingOrder.UpdateTime < newOrder.UpdateTime)
return true;
if (existingOrder.UpdateTime > newOrder.UpdateTime)
return false;
}
if (existingOrder.QuantityFilled != null && newOrder.QuantityFilled != null)
{
if (existingOrder.QuantityFilled.QuantityInBaseAsset != null && newOrder.QuantityFilled.QuantityInBaseAsset != null)
{
// If base quantity is not null we can base it on that
if (existingOrder.QuantityFilled.QuantityInBaseAsset < newOrder.QuantityFilled.QuantityInBaseAsset)
return true;
else if (existingOrder.QuantityFilled.QuantityInBaseAsset > newOrder.QuantityFilled.QuantityInBaseAsset)
return false;
}
if (existingOrder.QuantityFilled.QuantityInQuoteAsset != null && newOrder.QuantityFilled.QuantityInQuoteAsset != null)
{
// If quote quantity is not null we can base it on that
if (existingOrder.QuantityFilled.QuantityInQuoteAsset < newOrder.QuantityFilled.QuantityInQuoteAsset)
return true;
else if (existingOrder.QuantityFilled.QuantityInQuoteAsset > newOrder.QuantityFilled.QuantityInQuoteAsset)
return false;
}
}
if (existingOrder.Fee != null && newOrder.Fee != null)
{
// Higher fee means later processing
if (existingOrder.Fee < newOrder.Fee)
return true;
if (existingOrder.Fee > newOrder.Fee)
return false;
}
return null;
}
/// <inheritdoc />
protected override async Task<bool> DoPollAsync()
{
_logger.LogDebug("Starting user data requesting");
var anyError = false;
var balancesResult = await _balanceRestClient.GetBalancesAsync(new GetBalancesRequest(exchangeParameters: _exchangeParameters)).ConfigureAwait(false);
if (!balancesResult.Success)
{
// .. ?
var transientError = balancesResult.Error!.IsTransient;
// If transient we can retry
// Should communicate errors, also for websocket disconnecting
anyError = true;
}
else
{
await HandleBalanceUpdateAsync(UpdateSource.Poll, balancesResult.Data).ConfigureAwait(false);
}
var openOrdersResult = await _futuresOrderRestClient.GetOpenFuturesOrdersAsync(new GetOpenOrdersRequest(exchangeParameters: _exchangeParameters)).ConfigureAwait(false);
if (!openOrdersResult.Success)
{
// .. ?
anyError = true;
}
else
{
await HandleOrderUpdateAsync(UpdateSource.Poll, openOrdersResult.Data).ConfigureAwait(false);
}
var positionResult = await _futuresOrderRestClient.GetPositionsAsync(new GetPositionsRequest(exchangeParameters: _exchangeParameters)).ConfigureAwait(false);
if (!positionResult.Success)
{
// .. ?
anyError = true;
}
else
{
await HandlePositionUpdateAsync(UpdateSource.Poll, positionResult.Data).ConfigureAwait(false);
}
foreach (var symbol in _symbols)
{
var fromTimeOrders = _lastDataTimeOrdersBeforeDisconnect ?? _lastPollTimeOrders ?? _startTime;
var updatedPollTime = DateTime.UtcNow;
var closedOrdersResult = await _futuresOrderRestClient.GetClosedFuturesOrdersAsync(new GetClosedOrdersRequest(symbol, startTime: fromTimeOrders, exchangeParameters: _exchangeParameters)).ConfigureAwait(false);
if (!closedOrdersResult.Success)
{
// .. ?
anyError = true;
}
else
{
_lastDataTimeOrdersBeforeDisconnect = null;
_lastPollTimeOrders = updatedPollTime;
// Filter orders to only include where close time is after the start time
var relevantOrders = closedOrdersResult.Data.Where(x =>
x.UpdateTime != null && x.UpdateTime >= _startTime // Updated after the tracker start time
|| x.CreateTime != null && x.CreateTime >= _startTime // Created after the tracker start time
|| x.CreateTime == null && x.UpdateTime == null // Unknown time
).ToArray();
if (relevantOrders.Length > 0)
await HandleOrderUpdateAsync(UpdateSource.Poll, relevantOrders).ConfigureAwait(false);
}
if (_trackTrades)
{
var fromTimeTrades = _lastDataTimeTradesBeforeDisconnect ?? _lastPollTimeTrades ?? _startTime;
var tradesResult = await _futuresOrderRestClient.GetFuturesUserTradesAsync(new GetUserTradesRequest(symbol, startTime: fromTimeTrades, exchangeParameters: _exchangeParameters)).ConfigureAwait(false);
if (!tradesResult.Success)
{
// .. ?
anyError = true;
}
else
{
_lastDataTimeTradesBeforeDisconnect = null;
_lastPollTimeTrades = updatedPollTime;
// Filter trades to only include where timestamp is after the start time OR it's part of an order we're tracking
var relevantTrades = tradesResult.Data.Where(x => x.Timestamp >= _startTime || _orderStore.ContainsKey(x.OrderId)).ToArray();
if (relevantTrades.Length > 0)
await HandleTradeUpdateAsync(UpdateSource.Poll, tradesResult.Data).ConfigureAwait(false);
}
}
}
_logger.LogDebug("User data requesting completed");
return anyError;
}
}
}

View File

@ -0,0 +1,533 @@
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Sockets;
using CryptoExchange.Net.SharedApis;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Trackers.UserData
{
/// <summary>
/// User data tracker
/// </summary>
public abstract class UserSpotDataTracker : UserDataTracker, IUserSpotDataTracker
{
// Cached data
private ConcurrentDictionary<string, SharedBalance> _balanceStore = new ConcurrentDictionary<string, SharedBalance>();
private ConcurrentDictionary<string, SharedSpotOrder> _orderStore = new ConcurrentDictionary<string, SharedSpotOrder>();
private ConcurrentDictionary<string, SharedUserTrade> _tradeStore = new ConcurrentDictionary<string, SharedUserTrade>();
// Typed clients
private readonly IListenKeyRestClient? _listenKeyRestClient;
private readonly ISpotSymbolRestClient _spotSymbolRestClient;
private readonly IBalanceRestClient _balanceRestClient;
private readonly IBalanceSocketClient _balanceSocketClient;
private readonly ISpotOrderRestClient _spotOrderRestClient;
private readonly ISpotOrderSocketClient _spotOrderSocketClient;
private readonly IUserTradeSocketClient? _userTradeSocketClient;
// Subscriptions
private UpdateSubscription? _balanceSubscription;
private UpdateSubscription? _orderSubscription;
private UpdateSubscription? _tradeSubscription;
private ExchangeParameters? _exchangeParameters;
/// <inheritdoc />
public event Func<UserDataUpdate<SharedBalance[]>, Task>? OnBalanceUpdate;
/// <inheritdoc />
public event Func<UserDataUpdate<SharedSpotOrder[]>, Task>? OnOrderUpdate;
/// <inheritdoc />
public event Func<UserDataUpdate<SharedUserTrade[]>, Task>? OnTradeUpdate;
/// <inheritdoc />
public SharedBalance[] Balances => _balanceStore.Values.ToArray();
/// <inheritdoc />
public SharedSpotOrder[] Orders => _orderStore.Values.ToArray();
/// <inheritdoc />
public SharedUserTrade[] Trades => _tradeStore.Values.ToArray();
/// <summary>
/// ctor
/// </summary>
protected UserSpotDataTracker(
ILogger logger,
ISharedClient restClient,
ISharedClient socketClient,
string? userIdentifier,
UserDataTrackerConfig config,
ExchangeParameters? exchangeParameters = null
) : base(logger, config, userIdentifier)
{
_exchangeParameters = exchangeParameters;
_spotSymbolRestClient = (ISpotSymbolRestClient)restClient;
_balanceRestClient = (IBalanceRestClient)restClient;
_balanceSocketClient = (IBalanceSocketClient)socketClient;
_spotOrderRestClient = (ISpotOrderRestClient)restClient;
_spotOrderSocketClient = (ISpotOrderSocketClient)socketClient;
_listenKeyRestClient = restClient as IListenKeyRestClient;
_userTradeSocketClient = socketClient as IUserTradeSocketClient;
}
/// <inheritdoc />
protected override async Task<CallResult> DoStartAsync()
{
_logger.LogDebug("Starting UserDataTracker");
// Request symbols so SharedSymbol property can be filled on updates
var symbolResult = await _spotSymbolRestClient.GetSpotSymbolsAsync(new GetSymbolsRequest(exchangeParameters: _exchangeParameters)).ConfigureAwait(false);
if (!symbolResult)
{
_logger.LogWarning("Failed to start UserDataTracker; symbols request failed: {Error}", symbolResult.Error!.Message);
return symbolResult;
}
string? listenKey = null;
if (_listenKeyRestClient != null)
{
var lkResult = await _listenKeyRestClient.StartListenKeyAsync(new StartListenKeyRequest(exchangeParameters: _exchangeParameters)).ConfigureAwait(false);
if (!lkResult)
{
_logger.LogWarning("Failed to start UserDataTracker; listen key request failed: {Error}", lkResult.Error!.Message);
return lkResult;
}
listenKey = lkResult.Data;
}
var subBalanceResult = await ExchangeHelpers.ProcessQueuedAsync<SharedBalance[]>(
async handler => await _balanceSocketClient.SubscribeToBalanceUpdatesAsync(new SubscribeBalancesRequest(listenKey, exchangeParameters: _exchangeParameters), handler, ct: _cts!.Token).ConfigureAwait(false),
x => HandleBalanceUpdateAsync(UpdateSource.Push, x.Data)).ConfigureAwait(false);
if (!subBalanceResult)
{
_logger.LogWarning("Failed to start UserDataTracker; failed to subscribe to balance stream: {Error}", subBalanceResult.Error!.Message);
return subBalanceResult;
}
_balanceSubscription = subBalanceResult.Data;
subBalanceResult.Data.SubscriptionStatusChanged += BalanceSubscriptionStatusChanged;
var subOrderResult = await ExchangeHelpers.ProcessQueuedAsync<SharedSpotOrder[]>(
async handler => await _spotOrderSocketClient.SubscribeToSpotOrderUpdatesAsync(new SubscribeSpotOrderRequest(listenKey, exchangeParameters: _exchangeParameters), handler, ct: _cts!.Token).ConfigureAwait(false),
x => HandleOrderUpdateAsync(UpdateSource.Push, x.Data)).ConfigureAwait(false);
if (!subOrderResult)
{
_cts!.Cancel();
_logger.LogWarning("Failed to start UserDataTracker; failed to subscribe to order stream: {Error}", subOrderResult.Error!.Message);
return subOrderResult;
}
_orderSubscription = subOrderResult.Data;
subOrderResult.Data.SubscriptionStatusChanged += OrderSubscriptionStatusChanged;
if (_userTradeSocketClient != null && _trackTrades)
{
var subTradeResult = await ExchangeHelpers.ProcessQueuedAsync<SharedUserTrade[]>(
async handler => await _userTradeSocketClient.SubscribeToUserTradeUpdatesAsync(new SubscribeUserTradeRequest(listenKey, exchangeParameters: _exchangeParameters), handler, ct: _cts!.Token).ConfigureAwait(false),
x => HandleTradeUpdateAsync(UpdateSource.Push, x.Data)).ConfigureAwait(false);
if (!subOrderResult)
{
_cts!.Cancel();
_logger.LogWarning("Failed to start UserDataTracker; failed to subscribe to trade stream: {Error}", subTradeResult.Error!.Message);
return subOrderResult;
}
_tradeSubscription = subTradeResult.Data;
subTradeResult.Data.SubscriptionStatusChanged += TradeSubscriptionStatusChanged;
}
_logger.LogDebug("Started UserDataTracker");
return CallResult.SuccessResult;
}
private async Task HandleTradeUpdateAsync(UpdateSource source, SharedUserTrade[] @event)
{
var unknownSymbols = @event.Where(x => x.SharedSymbol == null);
if (unknownSymbols.Any())
{
_logger.LogWarning("Received order without SharedSymbol set, ignoring");
@event = @event.Except(unknownSymbols).ToArray();
}
if (!_onlyTrackProvidedSymbols)
UpdateSymbolsList(@event.Select(x => x.SharedSymbol!));
else
@event = @event.Where(x => _symbols.Any(y => y.TradingMode == x.SharedSymbol!.TradingMode && y.BaseAsset == x.SharedSymbol.BaseAsset && y.QuoteAsset == x.SharedSymbol.QuoteAsset)).ToArray();
// Update local store
var updatedIds = @event.Select(x => x.Id).ToList();
foreach (var item in @event)
{
if (_tradeStore.TryAdd(item.Id, item))
_logger.LogDebug("Added user trade {Symbol}.{Id}", item.Symbol, item.Id);
else
updatedIds.Remove(item.Id);
}
if (updatedIds.Count > 0 && OnTradeUpdate != null)
{
await OnTradeUpdate.Invoke(
new UserDataUpdate<SharedUserTrade[]>
{
Source = source,
Data = _tradeStore.Where(x => updatedIds.Contains(x.Key)).Select(x => x.Value).ToArray()
}).ConfigureAwait(false);
}
}
private async Task HandleOrderUpdateAsync(UpdateSource source, SharedSpotOrder[] @event)
{
var unknownSymbols = @event.Where(x => x.SharedSymbol == null);
if (unknownSymbols.Any())
{
_logger.LogWarning("Received order without SharedSymbol set, ignoring");
@event = @event.Except(unknownSymbols).ToArray();
}
if (!_onlyTrackProvidedSymbols)
UpdateSymbolsList(@event.Select(x => x.SharedSymbol!));
else
@event = @event.Where(x => _symbols.Any(y => y.TradingMode == x.SharedSymbol!.TradingMode && y.BaseAsset == x.SharedSymbol.BaseAsset && y.QuoteAsset == x.SharedSymbol.QuoteAsset)).ToArray();
// Update local store
var updatedIds = @event.Select(x => x.OrderId).ToList();
foreach (var item in @event)
{
bool orderExisted = false;
_orderStore.AddOrUpdate(item.OrderId, item, (id, existing) =>
{
orderExisted = true;
var updated = UpdateSpotOrder(existing, item);
if (!updated)
updatedIds.Remove(id);
else
_logger.LogDebug("Updated spot order {Symbol}.{Id}", item.Symbol, item.OrderId);
return existing;
});
if (!orderExisted)
_logger.LogDebug("Added spot order {Symbol}.{Id}", item.Symbol, item.OrderId);
}
if (updatedIds.Count > 0 && OnOrderUpdate != null)
{
await OnOrderUpdate.Invoke(
new UserDataUpdate<SharedSpotOrder[]>
{
Source = source,
Data = _orderStore.Where(x => updatedIds.Contains(x.Key)).Select(x => x.Value).ToArray()
}).ConfigureAwait(false);
}
var trades = @event.Where(x => x.LastTrade != null).Select(x => x.LastTrade!).ToArray();
if (trades.Length != 0)
await HandleTradeUpdateAsync(source, trades).ConfigureAwait(false);
}
private async Task HandleBalanceUpdateAsync(UpdateSource source, SharedBalance[] @event)
{
// Update local store
var updatedAssets = @event.Select(x => x.Asset).ToList();
foreach (var item in @event)
{
bool balanceExisted = false;
_balanceStore.AddOrUpdate(item.Asset, item, (asset, existing) =>
{
balanceExisted = true;
var updated = UpdateBalance(existing, item);
if (!updated)
updatedAssets.Remove(asset);
else
_logger.LogDebug("Updated balance for {Asset}", item.Asset);
return existing;
});
if (!balanceExisted)
_logger.LogDebug("Added balance for {Asset}", item.Asset);
}
if (updatedAssets.Count > 0 && OnBalanceUpdate != null)
{
await OnBalanceUpdate.Invoke(
new UserDataUpdate<SharedBalance[]>
{
Source = source,
Data = _balanceStore.Where(x => updatedAssets.Contains(x.Key)).Select(x => x.Value).ToArray()
}).ConfigureAwait(false);
}
}
private void CheckConnectedChanged()
{
Connected = _balanceSubscription?.SubscriptionStatus == SubscriptionStatus.Subscribed
&& _orderSubscription?.SubscriptionStatus == SubscriptionStatus.Subscribed
&& (_tradeSubscription == null || _tradeSubscription.SubscriptionStatus == SubscriptionStatus.Subscribed);
}
private void BalanceSubscriptionStatusChanged(SubscriptionStatus newState)
{
_logger.LogDebug("Balance stream status changed: {NewState}", newState);
CheckConnectedChanged();
}
private void OrderSubscriptionStatusChanged(SubscriptionStatus newState)
{
_logger.LogDebug("Order stream status changed: {NewState}", newState);
if (newState == SubscriptionStatus.Pending)
{
// Record last data receive time since we need to request data from that timestamp on when polling
// Only set to new value if it isn't already set since if we disconnect/reconnect a couple of times without
// managing to do a poll we don't want to override the time since we still need to request that earlier data
if (_lastDataTimeOrdersBeforeDisconnect == null)
_lastDataTimeOrdersBeforeDisconnect = _orderSubscription!.LastReceiveTime;
}
CheckConnectedChanged();
}
private void TradeSubscriptionStatusChanged(SubscriptionStatus newState)
{
_logger.LogDebug("Trade stream status changed: {NewState}", newState);
if (newState == SubscriptionStatus.Pending)
{
// Record last data receive time since we need to request data from that timestamp on when polling
// Only set to new value if it isn't already set since if we disconnect/reconnect a couple of times without
// managing to do a poll we don't want to override the time since we still need to request that earlier data
if (_lastDataTimeTradesBeforeDisconnect == null)
_lastDataTimeTradesBeforeDisconnect = _tradeSubscription?.LastReceiveTime;
}
CheckConnectedChanged();
}
private bool UpdateBalance(SharedBalance existingBalance, SharedBalance newBalance)
{
// Some other way to way to determine sequence? Maybe timestamp?
var changed = false;
if (existingBalance.Total != newBalance.Total)
{
existingBalance.Total = newBalance.Total;
changed = true;
}
if (existingBalance.Available != newBalance.Available)
{
existingBalance.Available = newBalance.Available;
changed = true;
}
return changed;
}
private bool UpdateSpotOrder(SharedSpotOrder existingOrder, SharedSpotOrder newOrder)
{
if (CheckIfOrderUpdateIsNewer(existingOrder, newOrder) == false)
// Update is older than the existing data, ignore
return false;
var changed = false;
if (newOrder.AveragePrice != null && newOrder.AveragePrice != existingOrder.AveragePrice)
{
existingOrder.AveragePrice = newOrder.AveragePrice;
changed = true;
}
if (newOrder.OrderPrice != null && newOrder.OrderPrice != existingOrder.OrderPrice)
{
existingOrder.OrderPrice = newOrder.OrderPrice;
changed = true;
}
if (newOrder.Fee != null && newOrder.Fee != existingOrder.Fee)
{
existingOrder.Fee = newOrder.Fee;
changed = true;
}
if (newOrder.FeeAsset != null && newOrder.FeeAsset != existingOrder.FeeAsset)
{
existingOrder.FeeAsset = newOrder.FeeAsset;
changed = true;
}
if (newOrder.OrderQuantity != null && newOrder.OrderQuantity != existingOrder.OrderQuantity)
{
existingOrder.OrderQuantity = newOrder.OrderQuantity;
changed = true;
}
if (newOrder.QuantityFilled != null && newOrder.QuantityFilled != existingOrder.QuantityFilled)
{
existingOrder.QuantityFilled = newOrder.QuantityFilled;
changed = true;
}
if (newOrder.Status != existingOrder.Status)
{
existingOrder.Status = newOrder.Status;
changed = true;
}
if (newOrder.UpdateTime != null && newOrder.UpdateTime != existingOrder.UpdateTime)
{
existingOrder.UpdateTime = newOrder.UpdateTime;
changed = true;
}
return changed;
}
private bool? CheckIfOrderUpdateIsNewer(SharedSpotOrder existingOrder, SharedSpotOrder newOrder)
{
if (existingOrder.Status == SharedOrderStatus.Open && newOrder.Status != SharedOrderStatus.Open)
// status changed from open to not open
return true;
if (existingOrder.Status != SharedOrderStatus.Open && newOrder.Status == SharedOrderStatus.Open)
// status changed from not open to open; stale
return false;
if (existingOrder.UpdateTime != null && newOrder.UpdateTime != null)
{
// If both have an update time base of that
if (existingOrder.UpdateTime < newOrder.UpdateTime)
return true;
if (existingOrder.UpdateTime > newOrder.UpdateTime)
return false;
}
if (existingOrder.QuantityFilled != null && newOrder.QuantityFilled != null)
{
if (existingOrder.QuantityFilled.QuantityInBaseAsset != null && newOrder.QuantityFilled.QuantityInBaseAsset != null)
{
// If base quantity is not null we can base it on that
if (existingOrder.QuantityFilled.QuantityInBaseAsset < newOrder.QuantityFilled.QuantityInBaseAsset)
return true;
else if (existingOrder.QuantityFilled.QuantityInBaseAsset > newOrder.QuantityFilled.QuantityInBaseAsset)
return false;
}
if (existingOrder.QuantityFilled.QuantityInQuoteAsset != null && newOrder.QuantityFilled.QuantityInQuoteAsset != null)
{
// If quote quantity is not null we can base it on that
if (existingOrder.QuantityFilled.QuantityInQuoteAsset < newOrder.QuantityFilled.QuantityInQuoteAsset)
return true;
else if (existingOrder.QuantityFilled.QuantityInQuoteAsset > newOrder.QuantityFilled.QuantityInQuoteAsset)
return false;
}
}
if (existingOrder.Fee != null && newOrder.Fee != null)
{
// Higher fee means later processing
if (existingOrder.Fee < newOrder.Fee)
return true;
if (existingOrder.Fee > newOrder.Fee)
return false;
}
return null;
}
/// <inheritdoc />
protected override async Task<bool> DoPollAsync()
{
_logger.LogDebug("Starting user data requesting");
var anyError = false;
var balancesResult = await _balanceRestClient.GetBalancesAsync(new GetBalancesRequest(exchangeParameters: _exchangeParameters)).ConfigureAwait(false);
if (!balancesResult.Success)
{
// .. ?
var transientError = balancesResult.Error!.IsTransient;
// If transient we can retry
// Should communicate errors, also for websocket disconnecting
anyError = true;
}
else
{
await HandleBalanceUpdateAsync(UpdateSource.Poll, balancesResult.Data).ConfigureAwait(false);
}
var openOrdersResult = await _spotOrderRestClient.GetOpenSpotOrdersAsync(new GetOpenOrdersRequest(exchangeParameters: _exchangeParameters)).ConfigureAwait(false);
if (!openOrdersResult.Success)
{
// .. ?
anyError = true;
}
else
{
await HandleOrderUpdateAsync(UpdateSource.Poll, openOrdersResult.Data).ConfigureAwait(false);
}
foreach (var symbol in _symbols)
{
var fromTimeOrders = _lastDataTimeOrdersBeforeDisconnect ?? _lastPollTimeOrders ?? _startTime;
var updatedPollTime = DateTime.UtcNow;
var closedOrdersResult = await _spotOrderRestClient.GetClosedSpotOrdersAsync(new GetClosedOrdersRequest(symbol, startTime: fromTimeOrders, exchangeParameters: _exchangeParameters)).ConfigureAwait(false);
if (!closedOrdersResult.Success)
{
// .. ?
anyError = true;
}
else
{
_lastDataTimeOrdersBeforeDisconnect = null;
_lastPollTimeOrders = updatedPollTime;
// Filter orders to only include where close time is after the start time
var relevantOrders = closedOrdersResult.Data.Where(x =>
x.UpdateTime != null && x.UpdateTime >= _startTime // Updated after the tracker start time
|| x.CreateTime != null && x.CreateTime >= _startTime // Created after the tracker start time
|| x.CreateTime == null && x.UpdateTime == null // Unknown time
).ToArray();
if (relevantOrders.Length > 0)
await HandleOrderUpdateAsync(UpdateSource.Poll, relevantOrders).ConfigureAwait(false);
}
if (_trackTrades)
{
var fromTimeTrades = _lastDataTimeTradesBeforeDisconnect ?? _lastPollTimeTrades ?? _startTime;
var tradesResult = await _spotOrderRestClient.GetSpotUserTradesAsync(new GetUserTradesRequest(symbol, startTime: fromTimeTrades, exchangeParameters: _exchangeParameters)).ConfigureAwait(false);
if (!tradesResult.Success)
{
// .. ?
anyError = true;
}
else
{
_lastDataTimeTradesBeforeDisconnect = null;
_lastPollTimeTrades = updatedPollTime;
// Filter trades to only include where timestamp is after the start time OR it's part of an order we're tracking
var relevantTrades = tradesResult.Data.Where(x => x.Timestamp >= _startTime || _orderStore.ContainsKey(x.OrderId)).ToArray();
if (relevantTrades.Length > 0)
await HandleTradeUpdateAsync(UpdateSource.Poll, tradesResult.Data).ConfigureAwait(false);
}
}
}
_logger.LogDebug("User data requesting completed");
return anyError;
}
}
}