1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2026-02-16 14:13:46 +00:00

Compare commits

...

2 Commits

Author SHA1 Message Date
Jkorf
fdc8b452b1 wip 2026-02-03 16:36:09 +01:00
Jkorf
2af66e5f0a wip 2026-02-03 10:28:47 +01:00
21 changed files with 1904 additions and 1504 deletions

View File

@ -1,79 +0,0 @@
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.SharedApis;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Trackers.UserData
{
/// <summary>
/// Futures user data tracker
/// </summary>
public interface IUserFuturesDataTracker
{
/// <summary>
/// User identifier
/// </summary>
string? UserIdentifier { get; }
/// <summary>
/// Whether the tracker is currently fully connected
/// </summary>
bool Connected { get; }
/// <summary>
/// Currently tracked symbols. Data for these symbols will be requested when polling.
/// Websocket updates will be available for all symbols regardless.
/// When new data is received for a symbol which is not yet being tracked it will be added to this list and polled in the future unless the `OnlyTrackProvidedSymbols` option is set in the configuration.
/// </summary>
IEnumerable<SharedSymbol> TrackedSymbols { get; }
/// <summary>
/// Current balances
/// </summary>
SharedBalance[] Balances { get; }
/// <summary>
/// Currently tracked orders
/// </summary>
SharedFuturesOrder[] Orders { get; }
/// <summary>
/// Currently tracked positions
/// </summary>
SharedPosition[] Positions { get; }
/// <summary>
/// Currently tracked trades
/// </summary>
SharedUserTrade[] Trades { get; }
/// <summary>
/// On connection status change. Might trigger multiple times with the same status depending on the underlying subscriptions.
/// </summary>
event Action<bool>? OnConnectedStatusChange;
/// <summary>
/// On balance update
/// </summary>
event Func<UserDataUpdate<SharedBalance[]>, Task>? OnBalanceUpdate;
/// <summary>
/// On order update
/// </summary>
event Func<UserDataUpdate<SharedFuturesOrder[]>, Task>? OnOrderUpdate;
/// <summary>
/// On position order update
/// </summary>
event Func<UserDataUpdate<SharedPosition[]>, Task>? OnPositionUpdate;
/// <summary>
/// On user trade update
/// </summary>
event Func<UserDataUpdate<SharedUserTrade[]>, Task>? OnTradeUpdate;
/// <summary>
/// Start tracking user data
/// </summary>
Task<CallResult> StartAsync();
/// <summary>
/// Stop tracking data
/// </summary>
/// <returns></returns>
Task StopAsync();
}
}

View File

@ -1,72 +0,0 @@
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.SharedApis;
using System;
using System.Collections;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Trackers.UserData
{
/// <summary>
/// User data tracker
/// </summary>
public interface IUserSpotDataTracker
{
/// <summary>
/// User identifier
/// </summary>
string? UserIdentifier { get; }
/// <summary>
/// Whether the tracker is currently fully connected
/// </summary>
bool Connected { get; }
/// <summary>
/// Currently tracked symbols. Data for these symbols will be requested when polling.
/// Websocket updates will be available for all symbols regardless.
/// When new data is received for a symbol which is not yet being tracked it will be added to this list and polled in the future unless the `OnlyTrackProvidedSymbols` option is set in the configuration.
/// </summary>
IEnumerable<SharedSymbol> TrackedSymbols { get; }
/// <summary>
/// Current balances
/// </summary>
SharedBalance[] Balances { get; }
/// <summary>
/// Currently tracked orders
/// </summary>
SharedSpotOrder[] Orders { get; }
/// <summary>
/// Currently tracked trades
/// </summary>
SharedUserTrade[] Trades { get; }
/// <summary>
/// On connection status change. Might trigger multiple times with the same status depending on the underlying subscriptions.
/// </summary>
event Action<bool>? OnConnectedStatusChange;
/// <summary>
/// On balance update
/// </summary>
event Func<UserDataUpdate<SharedBalance[]>, Task>? OnBalanceUpdate;
/// <summary>
/// On order update
/// </summary>
event Func<UserDataUpdate<SharedSpotOrder[]>, Task>? OnOrderUpdate;
/// <summary>
/// On user trade update
/// </summary>
event Func<UserDataUpdate<SharedUserTrade[]>, Task>? OnTradeUpdate;
/// <summary>
/// Start tracking user data
/// </summary>
Task<CallResult> StartAsync();
/// <summary>
/// Stop tracking data
/// </summary>
/// <returns></returns>
Task StopAsync();
}
}

View File

@ -0,0 +1,41 @@
using CryptoExchange.Net.SharedApis;
using CryptoExchange.Net.Trackers.UserData.Objects;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Trackers.UserData.Interfaces
{
/// <summary>
/// Data tracker interface
/// </summary>
public interface IUserDataTracker<T>
{
/// <summary>
/// Whether the tracker is currently fully connected
/// </summary>
bool Connected { get; }
/// <summary>
/// Currently tracked symbols. Data for these symbols will be requested when polling.
/// Websocket updates will be available for all symbols regardless.
/// When new data is received for a symbol which is not yet being tracked it will be added to this list and polled in the future unless the `OnlyTrackProvidedSymbols` option is set in the configuration.
/// </summary>
IEnumerable<SharedSymbol> TrackedSymbols { get; }
/// <summary>
/// On connection status change. Might trigger multiple times with the same status depending on the underlying subscriptions.
/// </summary>
event Action<bool>? OnConnectedChange;
/// <summary>
/// Currently tracker values
/// </summary>
T[] Values { get; }
/// <summary>
/// On data update
/// </summary>
event Func<UserDataUpdate<T[]>, Task>? OnUpdate;
}
}

View File

@ -0,0 +1,56 @@
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.SharedApis;
using CryptoExchange.Net.Trackers.UserData.Objects;
using System;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Trackers.UserData.Interfaces
{
/// <summary>
/// Futures user data tracker
/// </summary>
public interface IUserFuturesDataTracker
{
/// <summary>
/// User identifier
/// </summary>
string? UserIdentifier { get; }
/// <summary>
/// Whether the tracker is currently fully connected
/// </summary>
bool Connected { get; }
/// <summary>
/// Balances tracker
/// </summary>
IUserDataTracker<SharedBalance> Balances { get; }
/// <summary>
/// Orders tracker
/// </summary>
IUserDataTracker<SharedFuturesOrder> Orders { get; }
/// <summary>
/// Positions tracker
/// </summary>
IUserDataTracker<SharedPosition> Positions { get; }
/// <summary>
/// Trades tracker
/// </summary>
IUserDataTracker<SharedUserTrade>? Trades { get; }
/// <summary>
/// On connection status change
/// </summary>
event Action<UserDataType, bool>? OnConnectedChange;
/// <summary>
/// Start tracking user data
/// </summary>
Task<CallResult> StartAsync();
/// <summary>
/// Stop tracking data
/// </summary>
/// <returns></returns>
Task StopAsync();
}
}

View File

@ -0,0 +1,52 @@
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.SharedApis;
using CryptoExchange.Net.Trackers.UserData.Objects;
using System;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Trackers.UserData.Interfaces
{
/// <summary>
/// User data tracker
/// </summary>
public interface IUserSpotDataTracker
{
/// <summary>
/// User identifier
/// </summary>
string? UserIdentifier { get; }
/// <summary>
/// Whether the tracker is currently fully connected
/// </summary>
bool Connected { get; }
/// <summary>
/// Balances tracker
/// </summary>
IUserDataTracker<SharedBalance> Balances { get; }
/// <summary>
/// Orders tracker
/// </summary>
IUserDataTracker<SharedSpotOrder> Orders { get; }
/// <summary>
/// Trades tracker
/// </summary>
IUserDataTracker<SharedUserTrade>? Trades { get; }
/// <summary>
/// On connection status change
/// </summary>
event Action<UserDataType, bool>? OnConnectedChange;
/// <summary>
/// Start tracking user data
/// </summary>
Task<CallResult> StartAsync();
/// <summary>
/// Stop tracking data
/// </summary>
/// <returns></returns>
Task StopAsync();
}
}

View File

@ -0,0 +1,81 @@
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Sockets;
using CryptoExchange.Net.SharedApis;
using CryptoExchange.Net.Trackers.UserData.Objects;
using Microsoft.Extensions.Logging;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Trackers.UserData.ItemTrackers
{
/// <summary>
/// Balance tracker implementation
/// </summary>
public class BalanceTracker : UserDataItemTracker<SharedBalance>
{
private readonly IBalanceRestClient _restClient;
private readonly IBalanceSocketClient? _socketClient;
private readonly ExchangeParameters? _exchangeParameters;
/// <summary>
/// ctor
/// </summary>
public BalanceTracker(
ILogger logger,
IBalanceRestClient restClient,
IBalanceSocketClient? socketClient,
TrackerItemConfig config,
ExchangeParameters? exchangeParameters = null
) : base(logger, UserDataType.Balances, config, false, null)
{
_restClient = restClient;
_socketClient = socketClient;
_exchangeParameters = exchangeParameters;
}
/// <inheritdoc />
protected override bool Update(SharedBalance existingItem, SharedBalance updateItem)
{
var changed = false;
if (existingItem.Total != updateItem.Total)
{
existingItem.Total = updateItem.Total;
changed = true;
}
if (existingItem.Available != updateItem.Available)
{
existingItem.Available = updateItem.Available;
changed = true;
}
return changed;
}
/// <inheritdoc />
protected override string GetKey(SharedBalance item) => item.Asset;
/// <inheritdoc />
protected override bool? CheckIfUpdateShouldBeApplied(SharedBalance existingItem, SharedBalance updateItem) => true;
/// <inheritdoc />
protected override Task<CallResult<UpdateSubscription?>> DoSubscribeAsync(string? listenKey)
{
if (_socketClient == null)
return Task.FromResult(new CallResult<UpdateSubscription?>(data: null));
return ExchangeHelpers.ProcessQueuedAsync<SharedBalance[]>(
async handler => await _socketClient.SubscribeToBalanceUpdatesAsync(new SubscribeBalancesRequest(listenKey, exchangeParameters: _exchangeParameters), handler, ct: _cts!.Token).ConfigureAwait(false),
x => HandleUpdateAsync(UpdateSource.Push, x.Data))!;
}
/// <inheritdoc />
protected override async Task<bool> DoPollAsync()
{
var balances = await _restClient.GetBalancesAsync(new GetBalancesRequest(exchangeParameters: _exchangeParameters)).ConfigureAwait(false);
if (balances.Success)
await HandleUpdateAsync(UpdateSource.Poll, balances.Data).ConfigureAwait(false);
return balances.Success;
}
}
}

View File

@ -0,0 +1,243 @@
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Sockets;
using CryptoExchange.Net.SharedApis;
using CryptoExchange.Net.Trackers.UserData.Objects;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Trackers.UserData.ItemTrackers
{
/// <summary>
/// Futures order tracker
/// </summary>
public class FuturesOrderTracker : UserDataItemTracker<SharedFuturesOrder>
{
private readonly IFuturesOrderRestClient _restClient;
private readonly IFuturesOrderSocketClient? _socketClient;
private readonly ExchangeParameters? _exchangeParameters;
internal event Func<UpdateSource, SharedUserTrade[], Task>? OnTradeUpdate;
/// <summary>
/// ctor
/// </summary>
public FuturesOrderTracker(
ILogger logger,
IFuturesOrderRestClient restClient,
IFuturesOrderSocketClient? socketClient,
TrackerItemConfig config,
IEnumerable<SharedSymbol> symbols,
bool onlyTrackProvidedSymbols,
ExchangeParameters? exchangeParameters = null
) : base(logger, UserDataType.Orders, config, onlyTrackProvidedSymbols, symbols)
{
_restClient = restClient;
_socketClient = socketClient;
_exchangeParameters = exchangeParameters;
}
/// <inheritdoc />
protected override bool Update(SharedFuturesOrder existingItem, SharedFuturesOrder updateItem)
{
var changed = false;
if (updateItem.AveragePrice != null && updateItem.AveragePrice != existingItem.AveragePrice)
{
existingItem.AveragePrice = updateItem.AveragePrice;
changed = true;
}
if (updateItem.OrderPrice != null && updateItem.OrderPrice != existingItem.OrderPrice)
{
existingItem.OrderPrice = updateItem.OrderPrice;
changed = true;
}
if (updateItem.Fee != null && updateItem.Fee != existingItem.Fee)
{
existingItem.Fee = updateItem.Fee;
changed = true;
}
if (updateItem.FeeAsset != null && updateItem.FeeAsset != existingItem.FeeAsset)
{
existingItem.FeeAsset = updateItem.FeeAsset;
changed = true;
}
if (updateItem.OrderQuantity != null && updateItem.OrderQuantity != existingItem.OrderQuantity)
{
existingItem.OrderQuantity = updateItem.OrderQuantity;
changed = true;
}
if (updateItem.QuantityFilled != null && updateItem.QuantityFilled != existingItem.QuantityFilled)
{
existingItem.QuantityFilled = updateItem.QuantityFilled;
changed = true;
}
if (updateItem.Status != existingItem.Status)
{
existingItem.Status = updateItem.Status;
changed = true;
}
if (updateItem.StopLossPrice != existingItem.StopLossPrice)
{
existingItem.StopLossPrice = updateItem.StopLossPrice;
changed = true;
}
if (updateItem.TakeProfitPrice != existingItem.TakeProfitPrice)
{
existingItem.TakeProfitPrice = updateItem.TakeProfitPrice;
changed = true;
}
if (updateItem.TriggerPrice != existingItem.TriggerPrice)
{
existingItem.TriggerPrice = updateItem.TriggerPrice;
changed = true;
}
if (updateItem.UpdateTime != null && updateItem.UpdateTime != existingItem.UpdateTime)
{
existingItem.UpdateTime = updateItem.UpdateTime;
changed = true;
}
return changed;
}
/// <inheritdoc />
protected override string GetKey(SharedFuturesOrder item) => item.OrderId;
/// <inheritdoc />
protected override bool? CheckIfUpdateShouldBeApplied(SharedFuturesOrder existingItem, SharedFuturesOrder updateItem)
{
if (existingItem.Status == SharedOrderStatus.Open && updateItem.Status != SharedOrderStatus.Open)
// status changed from open to not open
return true;
if (existingItem.Status != SharedOrderStatus.Open && updateItem.Status == SharedOrderStatus.Open)
// status changed from not open to open; stale
return false;
if (existingItem.UpdateTime != null && updateItem.UpdateTime != null)
{
// If both have an update time base of that
if (existingItem.UpdateTime < updateItem.UpdateTime)
return true;
if (existingItem.UpdateTime > updateItem.UpdateTime)
return false;
}
if (existingItem.QuantityFilled != null && updateItem.QuantityFilled != null)
{
if (existingItem.QuantityFilled.QuantityInBaseAsset != null && updateItem.QuantityFilled.QuantityInBaseAsset != null)
{
// If base quantity is not null we can base it on that
if (existingItem.QuantityFilled.QuantityInBaseAsset < updateItem.QuantityFilled.QuantityInBaseAsset)
return true;
else if (existingItem.QuantityFilled.QuantityInBaseAsset > updateItem.QuantityFilled.QuantityInBaseAsset)
return false;
}
if (existingItem.QuantityFilled.QuantityInQuoteAsset != null && updateItem.QuantityFilled.QuantityInQuoteAsset != null)
{
// If quote quantity is not null we can base it on that
if (existingItem.QuantityFilled.QuantityInQuoteAsset < updateItem.QuantityFilled.QuantityInQuoteAsset)
return true;
else if (existingItem.QuantityFilled.QuantityInQuoteAsset > updateItem.QuantityFilled.QuantityInQuoteAsset)
return false;
}
}
if (existingItem.Fee != null && updateItem.Fee != null)
{
// Higher fee means later processing
if (existingItem.Fee < updateItem.Fee)
return true;
if (existingItem.Fee > updateItem.Fee)
return false;
}
return null;
}
/// <inheritdoc />
protected internal override async Task HandleUpdateAsync(UpdateSource source, SharedFuturesOrder[] @event)
{
await base.HandleUpdateAsync(source, @event).ConfigureAwait(false);
var trades = @event.Where(x => x.LastTrade != null).Select(x => x.LastTrade!).ToArray();
if (trades.Length != 0 && OnTradeUpdate != null)
await OnTradeUpdate.Invoke(source, trades).ConfigureAwait(false);
}
/// <inheritdoc />
protected override Task<CallResult<UpdateSubscription?>> DoSubscribeAsync(string? listenKey)
{
if (_socketClient == null)
return Task.FromResult(new CallResult<UpdateSubscription?>(data: null));
return ExchangeHelpers.ProcessQueuedAsync<SharedFuturesOrder[]>(
async handler => await _socketClient.SubscribeToFuturesOrderUpdatesAsync(new SubscribeFuturesOrderRequest(listenKey, exchangeParameters: _exchangeParameters), handler, ct: _cts!.Token).ConfigureAwait(false),
x => HandleUpdateAsync(UpdateSource.Push, x.Data))!;
}
/// <inheritdoc />
protected override async Task<bool> DoPollAsync()
{
var anyError = false;
var openOrdersResult = await _restClient.GetOpenFuturesOrdersAsync(new GetOpenOrdersRequest(exchangeParameters: _exchangeParameters)).ConfigureAwait(false);
if (!openOrdersResult.Success)
{
// .. ?
anyError = true;
}
else
{
await HandleUpdateAsync(UpdateSource.Poll, openOrdersResult.Data).ConfigureAwait(false);
}
foreach (var symbol in _symbols.ToList())
{
var fromTimeOrders = _lastDataTimeBeforeDisconnect ?? _lastPollTime ?? _startTime;
var updatedPollTime = DateTime.UtcNow;
var closedOrdersResult = await _restClient.GetClosedFuturesOrdersAsync(new GetClosedOrdersRequest(symbol, startTime: fromTimeOrders, exchangeParameters: _exchangeParameters)).ConfigureAwait(false);
if (!closedOrdersResult.Success)
{
// .. ?
anyError = true;
}
else
{
_lastDataTimeBeforeDisconnect = null;
_lastPollTime = updatedPollTime;
// Filter orders to only include where close time is after the start time
var relevantOrders = closedOrdersResult.Data.Where(x =>
x.UpdateTime != null && x.UpdateTime >= _startTime // Updated after the tracker start time
|| x.CreateTime != null && x.CreateTime >= _startTime // Created after the tracker start time
|| x.CreateTime == null && x.UpdateTime == null // Unknown time
).ToArray();
if (relevantOrders.Length > 0)
await HandleUpdateAsync(UpdateSource.Poll, relevantOrders).ConfigureAwait(false);
}
}
return anyError;
}
}
}

View File

@ -0,0 +1,90 @@
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Sockets;
using CryptoExchange.Net.SharedApis;
using CryptoExchange.Net.Trackers.UserData.Objects;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Trackers.UserData.ItemTrackers
{
/// <summary>
/// Futures user trade tracker
/// </summary>
public class FuturesUserTradeTracker : UserDataItemTracker<SharedUserTrade>
{
private readonly IFuturesOrderRestClient _restClient;
private readonly IUserTradeSocketClient? _socketClient;
private readonly ExchangeParameters? _exchangeParameters;
internal Func<string[]>? GetTrackedOrderIds { get; set; }
/// <summary>
/// ctor
/// </summary>
public FuturesUserTradeTracker(
ILogger logger,
IFuturesOrderRestClient restClient,
IUserTradeSocketClient? socketClient,
TrackerItemConfig config,
IEnumerable<SharedSymbol> symbols,
bool onlyTrackProvidedSymbols,
ExchangeParameters? exchangeParameters = null
) : base(logger, UserDataType.Trades, config, onlyTrackProvidedSymbols, symbols)
{
_restClient = restClient;
_socketClient = socketClient;
_exchangeParameters = exchangeParameters;
}
/// <inheritdoc />
protected override string GetKey(SharedUserTrade item) => item.Id;
/// <inheritdoc />
protected override bool? CheckIfUpdateShouldBeApplied(SharedUserTrade existingItem, SharedUserTrade updateItem) => false;
/// <inheritdoc />
protected override bool Update(SharedUserTrade existingItem, SharedUserTrade updateItem) => false; // trades are never updated
/// <inheritdoc />
protected override async Task<bool> DoPollAsync()
{
var anyError = false;
foreach (var symbol in _symbols)
{
var fromTimeTrades = _lastDataTimeBeforeDisconnect ?? _lastPollTime ?? _startTime;
var updatedPollTime = DateTime.UtcNow;
var tradesResult = await _restClient.GetFuturesUserTradesAsync(new GetUserTradesRequest(symbol, startTime: fromTimeTrades, exchangeParameters: _exchangeParameters)).ConfigureAwait(false);
if (!tradesResult.Success)
{
// .. ?
anyError = true;
}
else
{
_lastDataTimeBeforeDisconnect = null;
_lastPollTime = updatedPollTime;
// Filter trades to only include where timestamp is after the start time OR it's part of an order we're tracking
var relevantTrades = tradesResult.Data.Where(x => x.Timestamp >= _startTime || (GetTrackedOrderIds?.Invoke() ?? []).Any(o => o == x.OrderId)).ToArray();
if (relevantTrades.Length > 0)
await HandleUpdateAsync(UpdateSource.Poll, tradesResult.Data).ConfigureAwait(false);
}
}
return anyError;
}
/// <inheritdoc />
protected override Task<CallResult<UpdateSubscription?>> DoSubscribeAsync(string? listenKey)
{
if (_socketClient == null)
return Task.FromResult(new CallResult<UpdateSubscription?>(data: null));
return ExchangeHelpers.ProcessQueuedAsync<SharedUserTrade[]>(
async handler => await _socketClient.SubscribeToUserTradeUpdatesAsync(new SubscribeUserTradeRequest(listenKey, exchangeParameters: _exchangeParameters), handler, ct: _cts!.Token).ConfigureAwait(false),
x => HandleUpdateAsync(UpdateSource.Push, x.Data))!;
}
}
}

View File

@ -0,0 +1,238 @@
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Sockets;
using CryptoExchange.Net.SharedApis;
using CryptoExchange.Net.Trackers.UserData.Objects;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Trackers.UserData.ItemTrackers
{
/// <summary>
/// Position tracker
/// </summary>
public class PositionTracker : UserDataItemTracker<SharedPosition>
{
private readonly IFuturesOrderRestClient _restClient;
private readonly IPositionSocketClient? _socketClient;
private readonly ExchangeParameters? _exchangeParameters;
/// <summary>
/// Whether websocket position updates are full snapshots and missing positions should be considered 0
/// </summary>
protected bool WebsocketPositionUpdatesAreFullSnapshots { get; }
/// <summary>
/// ctor
/// </summary>
public PositionTracker(
ILogger logger,
IFuturesOrderRestClient restClient,
IPositionSocketClient? socketClient,
TrackerItemConfig config,
IEnumerable<SharedSymbol> symbols,
bool onlyTrackProvidedSymbols,
bool websocketPositionUpdatesAreFullSnapshots,
ExchangeParameters? exchangeParameters = null
) : base(logger, UserDataType.Positions, config, onlyTrackProvidedSymbols, symbols)
{
_restClient = restClient;
_socketClient = socketClient;
_exchangeParameters = exchangeParameters;
WebsocketPositionUpdatesAreFullSnapshots = websocketPositionUpdatesAreFullSnapshots;
}
/// <inheritdoc />
protected override bool Update(SharedPosition existingItem, SharedPosition updateItem)
{
// Some other way to way to determine sequence? Maybe timestamp?
var changed = false;
if (existingItem.AverageOpenPrice != updateItem.AverageOpenPrice)
{
existingItem.AverageOpenPrice = updateItem.AverageOpenPrice;
changed = true;
}
if (existingItem.Leverage != updateItem.Leverage)
{
existingItem.Leverage = updateItem.Leverage;
changed = true;
}
if (existingItem.LiquidationPrice != updateItem.LiquidationPrice)
{
existingItem.LiquidationPrice = updateItem.LiquidationPrice;
changed = true;
}
if (existingItem.PositionSize != updateItem.PositionSize)
{
existingItem.PositionSize = updateItem.PositionSize;
changed = true;
}
if (existingItem.StopLossPrice != updateItem.StopLossPrice)
{
existingItem.StopLossPrice = updateItem.StopLossPrice;
changed = true;
}
if (existingItem.TakeProfitPrice != updateItem.TakeProfitPrice)
{
existingItem.TakeProfitPrice = updateItem.TakeProfitPrice;
changed = true;
}
if (updateItem.UnrealizedPnl != null && existingItem.UnrealizedPnl != updateItem.UnrealizedPnl)
{
existingItem.UnrealizedPnl = updateItem.UnrealizedPnl;
changed = true;
}
if (updateItem.UpdateTime != null && existingItem.UpdateTime != updateItem.UpdateTime)
{
existingItem.UpdateTime = updateItem.UpdateTime;
// If update time is the only changed prop don't mark it as changed
}
return changed;
}
/// <inheritdoc />
protected internal override async Task HandleUpdateAsync(UpdateSource source, SharedPosition[] @event)
{
LastUpdateTime = DateTime.UtcNow;
List<SharedPosition>? toRemove = null;
foreach (var item in @event)
{
if (item is SharedSymbolModel symbolModel)
{
if (symbolModel.SharedSymbol == null)
{
toRemove ??= new List<SharedPosition>();
toRemove.Add(item);
}
else if (_onlyTrackProvidedSymbols
&& !_symbols.Any(y => y.TradingMode == symbolModel.SharedSymbol!.TradingMode && y.BaseAsset == symbolModel.SharedSymbol.BaseAsset && y.QuoteAsset == symbolModel.SharedSymbol.QuoteAsset))
{
toRemove ??= new List<SharedPosition>();
toRemove.Add(item);
}
}
}
if (toRemove != null)
@event = @event.Except(toRemove).ToArray();
if (!_onlyTrackProvidedSymbols)
UpdateSymbolsList(@event.OfType<SharedSymbolModel>().Select(x => x.SharedSymbol!));
// Update local store
var updatedItems = @event.Select(GetKey).ToList();
if (WebsocketPositionUpdatesAreFullSnapshots)
{
// Reset any tracking position to zero/null values when it's no longer in the snapshot as it means there is no open position any more
var notInSnapshot = _store.Where(x => !updatedItems.Contains(x.Key) && x.Value.PositionSize != 0).ToList();
foreach (var position in notInSnapshot)
{
position.Value.UpdateTime = DateTime.UtcNow;
position.Value.AverageOpenPrice = null;
position.Value.LiquidationPrice = null;
position.Value.PositionSize = 0;
position.Value.StopLossPrice = null;
position.Value.TakeProfitPrice = null;
position.Value.UnrealizedPnl = null;
updatedItems.Add(position.Key);
LastChangeTime = DateTime.UtcNow;
}
}
foreach (var item in @event)
{
bool existed = false;
_store.AddOrUpdate(GetKey(item), item, (key, existing) =>
{
existed = true;
if (CheckIfUpdateShouldBeApplied(existing, item) == false)
{
updatedItems.Remove(key);
}
else
{
var updated = Update(existing, item);
if (!updated)
{
updatedItems.Remove(key);
}
else
{
_logger.LogDebug("Updated {DataType} {Item}", DataType, key);
LastChangeTime = DateTime.UtcNow;
}
}
return existing;
});
if (!existed)
{
_logger.LogDebug("Added {DataType} {Item}", DataType, GetKey(item));
LastChangeTime = DateTime.UtcNow;
}
}
if (updatedItems.Count > 0)
{
await InvokeUpdate(
new UserDataUpdate<SharedPosition[]>
{
Source = source,
Data = _store.Where(x => updatedItems.Contains(x.Key)).Select(x => x.Value).ToArray()
}).ConfigureAwait(false);
}
}
/// <inheritdoc />
protected override string GetKey(SharedPosition item) =>
item.Id ?? item.Symbol + item.PositionMode + (item.PositionMode != SharedPositionMode.OneWay ? item.PositionSide.ToString() : "");
/// <inheritdoc />
protected override bool? CheckIfUpdateShouldBeApplied(SharedPosition existingItem, SharedPosition updateItem) => true;
/// <inheritdoc />
protected override Task<CallResult<UpdateSubscription?>> DoSubscribeAsync(string? listenKey)
{
if (_socketClient == null)
return Task.FromResult(new CallResult<UpdateSubscription?>(data: null));
return ExchangeHelpers.ProcessQueuedAsync<SharedPosition[]>(
async handler => await _socketClient.SubscribeToPositionUpdatesAsync(new SubscribePositionRequest(listenKey, exchangeParameters: _exchangeParameters), handler, ct: _cts!.Token).ConfigureAwait(false),
x => HandleUpdateAsync(UpdateSource.Push, x.Data))!;
}
/// <inheritdoc />
protected override async Task<bool> DoPollAsync()
{
var anyError = false;
var openOrdersResult = await _restClient.GetPositionsAsync(new GetPositionsRequest(exchangeParameters: _exchangeParameters)).ConfigureAwait(false);
if (!openOrdersResult.Success)
{
// .. ?
anyError = true;
}
else
{
await HandleUpdateAsync(UpdateSource.Poll, openOrdersResult.Data).ConfigureAwait(false);
}
return anyError;
}
}
}

View File

@ -0,0 +1,225 @@
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Sockets;
using CryptoExchange.Net.SharedApis;
using CryptoExchange.Net.Trackers.UserData.Objects;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Trackers.UserData.ItemTrackers
{
/// <summary>
/// Spot order tracker
/// </summary>
public class SpotOrderTracker : UserDataItemTracker<SharedSpotOrder>
{
private readonly ISpotOrderRestClient _restClient;
private readonly ISpotOrderSocketClient? _socketClient;
private readonly ExchangeParameters? _exchangeParameters;
internal event Func<UpdateSource, SharedUserTrade[], Task>? OnTradeUpdate;
/// <summary>
/// ctor
/// </summary>
public SpotOrderTracker(
ILogger logger,
ISpotOrderRestClient restClient,
ISpotOrderSocketClient? socketClient,
TrackerItemConfig config,
IEnumerable<SharedSymbol> symbols,
bool onlyTrackProvidedSymbols,
ExchangeParameters? exchangeParameters = null
) : base(logger, UserDataType.Orders, config, onlyTrackProvidedSymbols, symbols)
{
_restClient = restClient;
_socketClient = socketClient;
_exchangeParameters = exchangeParameters;
}
/// <inheritdoc />
protected override bool Update(SharedSpotOrder existingItem, SharedSpotOrder updateItem)
{
var changed = false;
if (updateItem.AveragePrice != null && updateItem.AveragePrice != existingItem.AveragePrice)
{
existingItem.AveragePrice = updateItem.AveragePrice;
changed = true;
}
if (updateItem.OrderPrice != null && updateItem.OrderPrice != existingItem.OrderPrice)
{
existingItem.OrderPrice = updateItem.OrderPrice;
changed = true;
}
if (updateItem.Fee != null && updateItem.Fee != existingItem.Fee)
{
existingItem.Fee = updateItem.Fee;
changed = true;
}
if (updateItem.FeeAsset != null && updateItem.FeeAsset != existingItem.FeeAsset)
{
existingItem.FeeAsset = updateItem.FeeAsset;
changed = true;
}
if (updateItem.OrderQuantity != null && updateItem.OrderQuantity != existingItem.OrderQuantity)
{
existingItem.OrderQuantity = updateItem.OrderQuantity;
changed = true;
}
if (updateItem.QuantityFilled != null && updateItem.QuantityFilled != existingItem.QuantityFilled)
{
existingItem.QuantityFilled = updateItem.QuantityFilled;
changed = true;
}
if (updateItem.Status != existingItem.Status)
{
existingItem.Status = updateItem.Status;
changed = true;
}
if (updateItem.UpdateTime != null && updateItem.UpdateTime != existingItem.UpdateTime)
{
existingItem.UpdateTime = updateItem.UpdateTime;
changed = true;
}
return changed;
}
/// <inheritdoc />
protected override string GetKey(SharedSpotOrder item) => item.OrderId;
/// <inheritdoc />
protected override bool? CheckIfUpdateShouldBeApplied(SharedSpotOrder existingItem, SharedSpotOrder updateItem)
{
if (existingItem.Status == SharedOrderStatus.Open && updateItem.Status != SharedOrderStatus.Open)
// status changed from open to not open
return true;
if (existingItem.Status != SharedOrderStatus.Open && updateItem.Status == SharedOrderStatus.Open)
// status changed from not open to open; stale
return false;
if (existingItem.UpdateTime != null && updateItem.UpdateTime != null)
{
// If both have an update time base of that
if (existingItem.UpdateTime < updateItem.UpdateTime)
return true;
if (existingItem.UpdateTime > updateItem.UpdateTime)
return false;
}
if (existingItem.QuantityFilled != null && updateItem.QuantityFilled != null)
{
if (existingItem.QuantityFilled.QuantityInBaseAsset != null && updateItem.QuantityFilled.QuantityInBaseAsset != null)
{
// If base quantity is not null we can base it on that
if (existingItem.QuantityFilled.QuantityInBaseAsset < updateItem.QuantityFilled.QuantityInBaseAsset)
return true;
else if (existingItem.QuantityFilled.QuantityInBaseAsset > updateItem.QuantityFilled.QuantityInBaseAsset)
return false;
}
if (existingItem.QuantityFilled.QuantityInQuoteAsset != null && updateItem.QuantityFilled.QuantityInQuoteAsset != null)
{
// If quote quantity is not null we can base it on that
if (existingItem.QuantityFilled.QuantityInQuoteAsset < updateItem.QuantityFilled.QuantityInQuoteAsset)
return true;
else if (existingItem.QuantityFilled.QuantityInQuoteAsset > updateItem.QuantityFilled.QuantityInQuoteAsset)
return false;
}
}
if (existingItem.Fee != null && updateItem.Fee != null)
{
// Higher fee means later processing
if (existingItem.Fee < updateItem.Fee)
return true;
if (existingItem.Fee > updateItem.Fee)
return false;
}
return null;
}
/// <inheritdoc />
protected internal override async Task HandleUpdateAsync(UpdateSource source, SharedSpotOrder[] @event)
{
await base.HandleUpdateAsync(source, @event).ConfigureAwait(false);
var trades = @event.Where(x => x.LastTrade != null).Select(x => x.LastTrade!).ToArray();
if (trades.Length != 0 && OnTradeUpdate != null)
await OnTradeUpdate(source, trades).ConfigureAwait(false);
}
/// <inheritdoc />
protected override Task<CallResult<UpdateSubscription?>> DoSubscribeAsync(string? listenKey)
{
if (_socketClient == null)
return Task.FromResult(new CallResult<UpdateSubscription?>(data: null));
return ExchangeHelpers.ProcessQueuedAsync<SharedSpotOrder[]>(
async handler => await _socketClient.SubscribeToSpotOrderUpdatesAsync(new SubscribeSpotOrderRequest(listenKey, exchangeParameters: _exchangeParameters), handler, ct: _cts!.Token).ConfigureAwait(false),
x => HandleUpdateAsync(UpdateSource.Push, x.Data))!;
}
/// <inheritdoc />
protected override async Task<bool> DoPollAsync()
{
var anyError = false;
var openOrdersResult = await _restClient.GetOpenSpotOrdersAsync(new GetOpenOrdersRequest(exchangeParameters: _exchangeParameters)).ConfigureAwait(false);
if (!openOrdersResult.Success)
{
// .. ?
anyError = true;
}
else
{
await HandleUpdateAsync(UpdateSource.Poll, openOrdersResult.Data).ConfigureAwait(false);
}
foreach (var symbol in _symbols.ToList())
{
var fromTimeOrders = _lastDataTimeBeforeDisconnect ?? _lastPollTime ?? _startTime;
var updatedPollTime = DateTime.UtcNow;
var closedOrdersResult = await _restClient.GetClosedSpotOrdersAsync(new GetClosedOrdersRequest(symbol, startTime: fromTimeOrders, exchangeParameters: _exchangeParameters)).ConfigureAwait(false);
if (!closedOrdersResult.Success)
{
// .. ?
anyError = true;
}
else
{
_lastDataTimeBeforeDisconnect = null;
_lastPollTime = updatedPollTime;
// Filter orders to only include where close time is after the start time
var relevantOrders = closedOrdersResult.Data.Where(x =>
x.UpdateTime != null && x.UpdateTime >= _startTime // Updated after the tracker start time
|| x.CreateTime != null && x.CreateTime >= _startTime // Created after the tracker start time
|| x.CreateTime == null && x.UpdateTime == null // Unknown time
).ToArray();
if (relevantOrders.Length > 0)
await HandleUpdateAsync(UpdateSource.Poll, relevantOrders).ConfigureAwait(false);
}
}
return anyError;
}
}
}

View File

@ -0,0 +1,90 @@
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Sockets;
using CryptoExchange.Net.SharedApis;
using CryptoExchange.Net.Trackers.UserData.Objects;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Trackers.UserData.ItemTrackers
{
/// <summary>
/// Spot user trade tracker
/// </summary>
public class SpotUserTradeTracker : UserDataItemTracker<SharedUserTrade>
{
private readonly ISpotOrderRestClient _restClient;
private readonly IUserTradeSocketClient? _socketClient;
private readonly ExchangeParameters? _exchangeParameters;
internal Func<string[]>? GetTrackedOrderIds { get; set; }
/// <summary>
/// ctor
/// </summary>
public SpotUserTradeTracker(
ILogger logger,
ISpotOrderRestClient restClient,
IUserTradeSocketClient? socketClient,
TrackerItemConfig config,
IEnumerable<SharedSymbol> symbols,
bool onlyTrackProvidedSymbols,
ExchangeParameters? exchangeParameters = null
) : base(logger, UserDataType.Trades, config, onlyTrackProvidedSymbols, symbols)
{
_restClient = restClient;
_socketClient = socketClient;
_exchangeParameters = exchangeParameters;
}
/// <inheritdoc />
protected override string GetKey(SharedUserTrade item) => item.Id;
/// <inheritdoc />
protected override bool? CheckIfUpdateShouldBeApplied(SharedUserTrade existingItem, SharedUserTrade updateItem) => false;
/// <inheritdoc />
protected override bool Update(SharedUserTrade existingItem, SharedUserTrade updateItem) => false; // Trades are never updated
/// <inheritdoc />
protected override async Task<bool> DoPollAsync()
{
var anyError = false;
foreach (var symbol in _symbols)
{
var fromTimeTrades = _lastDataTimeBeforeDisconnect ?? _lastPollTime ?? _startTime;
var updatedPollTime = DateTime.UtcNow;
var tradesResult = await _restClient.GetSpotUserTradesAsync(new GetUserTradesRequest(symbol, startTime: fromTimeTrades, exchangeParameters: _exchangeParameters)).ConfigureAwait(false);
if (!tradesResult.Success)
{
// .. ?
anyError = true;
}
else
{
_lastDataTimeBeforeDisconnect = null;
_lastPollTime = updatedPollTime;
// Filter trades to only include where timestamp is after the start time OR it's part of an order we're tracking
var relevantTrades = tradesResult.Data.Where(x => x.Timestamp >= _startTime || (GetTrackedOrderIds?.Invoke() ?? []).Any(o => o == x.OrderId)).ToArray();
if (relevantTrades.Length > 0)
await HandleUpdateAsync(UpdateSource.Poll, tradesResult.Data).ConfigureAwait(false);
}
}
return anyError;
}
/// <inheritdoc />
protected override Task<CallResult<UpdateSubscription?>> DoSubscribeAsync(string? listenKey)
{
if (_socketClient == null)
return Task.FromResult(new CallResult<UpdateSubscription?>(data: null));
return ExchangeHelpers.ProcessQueuedAsync<SharedUserTrade[]>(
async handler => await _socketClient.SubscribeToUserTradeUpdatesAsync(new SubscribeUserTradeRequest(listenKey, exchangeParameters: _exchangeParameters), handler, ct: _cts!.Token).ConfigureAwait(false),
x => HandleUpdateAsync(UpdateSource.Push, x.Data))!;
}
}
}

View File

@ -0,0 +1,471 @@
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Sockets;
using CryptoExchange.Net.SharedApis;
using CryptoExchange.Net.Trackers.UserData.Interfaces;
using CryptoExchange.Net.Trackers.UserData.Objects;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Trackers.UserData.ItemTrackers
{
/// <summary>
/// User data tracker
/// </summary>
public abstract class UserDataItemTracker
{
private bool _connected;
/// <summary>
/// Logger
/// </summary>
protected ILogger _logger;
/// <summary>
/// Polling wait event
/// </summary>
protected AsyncResetEvent _pollWaitEvent = new AsyncResetEvent(false, true);
/// <summary>
/// Polling task
/// </summary>
protected Task? _pollTask;
/// <summary>
/// Cancellation token
/// </summary>
protected CancellationTokenSource? _cts;
/// <summary>
/// Websocket subscription
/// </summary>
protected UpdateSubscription? _subscription;
/// <summary>
/// Start time
/// </summary>
protected DateTime? _startTime = null;
/// <summary>
/// Last polling attempt
/// </summary>
protected DateTime? _lastPollAttempt;
/// <summary>
/// Last polling timestamp
/// </summary>
protected DateTime? _lastPollTime;
/// <summary>
/// Timestamp of last message received before websocket disconnecting
/// </summary>
protected DateTime? _lastDataTimeBeforeDisconnect;
/// <summary>
/// Whether last polling was successful
/// </summary>
protected bool _lastPollSuccess;
/// <summary>
/// Whether first polling was done
/// </summary>
protected bool _firstPollDone;
/// <summary>
/// Whether websocket was disconnected before a polling
/// </summary>
protected bool _wasDisconnected;
/// <summary>
/// Poll at the start
/// </summary>
protected bool _pollAtStart;
/// <summary>
/// Poll interval when connected
/// </summary>
protected TimeSpan _pollIntervalConnected;
/// <summary>
/// Poll interval when disconnected
/// </summary>
protected TimeSpan _pollIntervalDisconnected;
/// <summary>
/// Data type
/// </summary>
public UserDataType DataType { get; }
/// <summary>
/// Timestamp an update was handled. Does not necessarily mean the data was changed
/// </summary>
public DateTime? LastUpdateTime { get; protected set; }
/// <summary>
/// Timestamp any change was applied to the data
/// </summary>
public DateTime? LastChangeTime { get; protected set; }
/// <summary>
/// Connection status changed
/// </summary>
public event Action<bool>? OnConnectedChange;
/// <summary>
/// ctor
/// </summary>
public UserDataItemTracker(ILogger logger, UserDataType dataType)
{
_logger = logger;
DataType = dataType;
}
/// <summary>
/// Start the tracker
/// </summary>
/// <param name="listenKey">Optional listen key</param>
public abstract Task<CallResult> StartAsync(string? listenKey);
/// <summary>
/// Stop the tracker
/// </summary>
/// <returns></returns>
public async Task StopAsync()
{
_cts?.Cancel();
if (_pollTask != null)
await _pollTask.ConfigureAwait(false);
}
/// <summary>
/// Get the delay until next poll
/// </summary>
/// <returns></returns>
protected TimeSpan? GetNextPollDelay()
{
if (!_firstPollDone && _pollAtStart)
// First polling should be done immediately
return TimeSpan.Zero;
if (!Connected)
{
if (_pollIntervalDisconnected == TimeSpan.Zero)
// No polling interval
return null;
return _pollIntervalDisconnected;
}
if (_pollIntervalConnected == TimeSpan.Zero)
// No polling interval
return null;
// Wait for next poll
return _pollIntervalConnected;
}
/// <inheritdoc />
public bool Connected
{
get => _connected;
protected set
{
if (_connected == value)
return;
_connected = value;
if (!_connected)
_wasDisconnected = true;
else
_pollWaitEvent.Set();
OnConnectedChange?.Invoke(_connected);
}
}
}
/// <summary>
/// User data tracker
/// </summary>
public abstract class UserDataItemTracker<T> : UserDataItemTracker, IUserDataTracker<T>
{
/// <summary>
/// Data store
/// </summary>
protected ConcurrentDictionary<string, T> _store = new ConcurrentDictionary<string, T>();
/// <summary>
/// Tracked symbols list
/// </summary>
protected readonly List<SharedSymbol> _symbols;
/// <summary>
/// Symbol lock
/// </summary>
protected object _symbolLock = new object();
/// <summary>
/// Only track provided symbols setting
/// </summary>
protected bool _onlyTrackProvidedSymbols;
/// <inheritdoc />
public T[] Values => _store.Values.ToArray();
/// <inheritdoc />
public event Func<UserDataUpdate<T[]>, Task>? OnUpdate;
/// <inheritdoc />
public IEnumerable<SharedSymbol> TrackedSymbols => _symbols;
/// <summary>
/// ctor
/// </summary>
public UserDataItemTracker(ILogger logger, UserDataType dataType, TrackerItemConfig config, bool onlyTrackProvidedSymbols, IEnumerable<SharedSymbol>? symbols) : base(logger, dataType)
{
_onlyTrackProvidedSymbols = onlyTrackProvidedSymbols;
_symbols = symbols?.ToList() ?? [];
_pollIntervalDisconnected = config.PollIntervalDisconnected;
_pollIntervalConnected = config.PollIntervalConnected;
_pollAtStart = config.PollAtStart;
}
/// <summary>
/// Invoke OnUpdate event
/// </summary>
protected async Task InvokeUpdate(UserDataUpdate<T[]> data)
{
if (OnUpdate == null)
return;
await OnUpdate(data).ConfigureAwait(false);
}
/// <inheritdoc />
public async override Task<CallResult> StartAsync(string? listenKey)
{
_startTime = DateTime.UtcNow;
_cts = new CancellationTokenSource();
var start = await SubscribeAsync(listenKey).ConfigureAwait(false);
if (!start)
return start;
Connected = true;
_pollTask = PollAsync();
return CallResult.SuccessResult;
}
/// <summary>
/// Subscribe the websocket
/// </summary>
public async Task<CallResult> SubscribeAsync(string? listenKey)
{
var subscriptionResult = await DoSubscribeAsync(listenKey).ConfigureAwait(false);
if (!subscriptionResult)
{
// Failed
// ..
return subscriptionResult;
}
if (subscriptionResult.Data == null)
{
// No subscription available
// ..
return CallResult.SuccessResult;
}
_subscription = subscriptionResult.Data;
_subscription.SubscriptionStatusChanged += SubscriptionStatusChanged;
return CallResult.SuccessResult;
}
/// <summary>
/// Get the unique identifier for the item
/// </summary>
protected abstract string GetKey(T item);
/// <summary>
/// Check whether an update should be applied
/// </summary>
protected abstract bool? CheckIfUpdateShouldBeApplied(T existingItem, T updateItem);
/// <summary>
/// Update an existing item with an update
/// </summary>
protected abstract bool Update(T existingItem, T updateItem);
/// <summary>
/// Update the tracked symbol list with potential new symbols
/// </summary>
/// <param name="symbols"></param>
protected void UpdateSymbolsList(IEnumerable<SharedSymbol> symbols)
{
lock (_symbolLock)
{
foreach (var symbol in symbols.Distinct())
{
if (!_symbols.Any(x => x.TradingMode == symbol.TradingMode && x.BaseAsset == symbol.BaseAsset && x.QuoteAsset == symbol.QuoteAsset))
{
_symbols.Add(symbol);
_logger.LogDebug("Adding {BaseAsset}/{QuoteAsset} to symbol tracking list", symbol.BaseAsset, symbol.QuoteAsset);
}
}
}
}
/// <summary>
/// Handle an update
/// </summary>
protected internal virtual async Task HandleUpdateAsync(UpdateSource source, T[] @event)
{
LastUpdateTime = DateTime.UtcNow;
if (typeof(T) == typeof(SharedSymbolModel))
{
List<T>? toRemove = null;
foreach (var item in @event)
{
if (item is SharedSymbolModel symbolModel)
{
if (symbolModel.SharedSymbol == null)
{
toRemove ??= new List<T>();
toRemove.Add(item);
}
else if (_onlyTrackProvidedSymbols
&& !_symbols.Any(y => y.TradingMode == symbolModel.SharedSymbol!.TradingMode && y.BaseAsset == symbolModel.SharedSymbol.BaseAsset && y.QuoteAsset == symbolModel.SharedSymbol.QuoteAsset))
{
toRemove ??= new List<T>();
toRemove.Add(item);
}
}
}
if (toRemove != null)
@event = @event.Except(toRemove).ToArray();
if (!_onlyTrackProvidedSymbols)
UpdateSymbolsList(@event.OfType<SharedSymbolModel>().Select(x => x.SharedSymbol!));
}
// Update local store
var updatedItems = @event.Select(GetKey).ToList();
foreach (var item in @event)
{
bool existed = false;
_store.AddOrUpdate(GetKey(item), item, (key, existing) =>
{
existed = true;
if (CheckIfUpdateShouldBeApplied(existing, item) == false)
{
updatedItems.Remove(key);
}
else
{
var updated = Update(existing, item);
if (!updated)
{
updatedItems.Remove(key);
}
else
{
_logger.LogDebug("Updated {DataType} {Item}", DataType, key);
LastChangeTime = DateTime.UtcNow;
}
}
return existing;
});
if (!existed)
{
_logger.LogDebug("Added {DataType} {Item}", DataType, GetKey(item));
LastChangeTime = DateTime.UtcNow;
}
}
if (updatedItems.Count > 0 && OnUpdate != null)
{
await OnUpdate.Invoke(
new UserDataUpdate<T[]>
{
Source = source,
Data = _store.Where(x => updatedItems.Contains(x.Key)).Select(x => x.Value).ToArray()
}).ConfigureAwait(false);
}
}
/// <summary>
/// Websocket subscription implementation
/// </summary>
protected abstract Task<CallResult<UpdateSubscription?>> DoSubscribeAsync(string? listenKey);
/// <summary>
/// Polling task
/// </summary>
protected async Task PollAsync()
{
while (!_cts!.IsCancellationRequested)
{
var delayForNextPoll = GetNextPollDelay();
if (delayForNextPoll != TimeSpan.Zero)
{
try
{
if (delayForNextPoll != null)
_logger.LogTrace("{DataType} delay for next polling: {Delay}", DataType, delayForNextPoll);
await _pollWaitEvent.WaitAsync(delayForNextPoll, _cts.Token).ConfigureAwait(false);
}
catch { }
}
_firstPollDone = true;
if (_cts.IsCancellationRequested)
break;
if (_lastPollAttempt != null
&& (DateTime.UtcNow - _lastPollAttempt.Value) < TimeSpan.FromSeconds(2)
&& !(Connected && _wasDisconnected))
{
if (_lastPollSuccess)
// If last poll was less than 2 seconds ago and it was successful don't bother immediately polling again
continue;
}
if (Connected)
_wasDisconnected = false;
_lastPollSuccess = false;
try
{
var anyError = await DoPollAsync().ConfigureAwait(false);
_lastPollAttempt = DateTime.UtcNow;
_lastPollSuccess = !anyError;
}
catch (Exception ex)
{
_logger.LogError(ex, "{DataType} UserDataTracker polling exception", DataType);
}
}
}
/// <summary>
/// Polling implementation
/// </summary>
/// <returns></returns>
protected abstract Task<bool> DoPollAsync();
/// <summary>
/// Handle subscription status change
/// </summary>
/// <param name="newState"></param>
private void SubscriptionStatusChanged(SubscriptionStatus newState)
{
_logger.LogDebug("{DataType} stream status changed: {NewState}", DataType, newState);
if (newState == SubscriptionStatus.Pending)
{
// Record last data receive time since we need to request data from that timestamp on when polling
// Only set to new value if it isn't already set since if we disconnect/reconnect a couple of times without
// managing to do a poll we don't want to override the time since we still need to request that earlier data
if (_lastDataTimeBeforeDisconnect == null)
_lastDataTimeBeforeDisconnect = _subscription!.LastReceiveTime;
}
Connected = newState == SubscriptionStatus.Subscribed;
}
}
}

View File

@ -0,0 +1,36 @@
using System;
namespace CryptoExchange.Net.Trackers.UserData.Objects
{
/// <summary>
/// Tracker configuration
/// </summary>
public class TrackerItemConfig
{
/// <summary>
/// Interval to poll data at as backup, even when the websocket stream is still connected.
/// </summary>
public TimeSpan PollIntervalConnected { get; set; } = TimeSpan.Zero;
/// <summary>
/// Interval to poll data at while the websocket is disconnected.
/// </summary>
public TimeSpan PollIntervalDisconnected { get; set; } = TimeSpan.FromSeconds(30);
/// <summary>
/// Whether to poll for data initially when starting the tracker.
/// </summary>
public bool PollAtStart { get; set; } = true;
/// <summary>
/// ctor
/// </summary>
/// <param name="pollAtStart">Whether to poll for data initially when starting the tracker</param>
/// <param name="pollIntervalConnected">Interval to poll data at as backup, even when the websocket stream is still connected</param>
/// <param name="pollIntervalDisconnected">Interval to poll data at while the websocket is disconnected</param>
public TrackerItemConfig(bool pollAtStart, TimeSpan pollIntervalConnected, TimeSpan pollIntervalDisconnected)
{
PollAtStart = pollAtStart;
PollIntervalConnected = pollIntervalConnected;
PollIntervalDisconnected = pollIntervalDisconnected;
}
}
}

View File

@ -1,4 +1,4 @@
namespace CryptoExchange.Net.Trackers.UserData
namespace CryptoExchange.Net.Trackers.UserData.Objects
{
/// <summary>
/// Update source

View File

@ -0,0 +1,68 @@
using CryptoExchange.Net.SharedApis;
using System;
using System.Collections.Generic;
namespace CryptoExchange.Net.Trackers.UserData.Objects
{
/// <summary>
/// User data tracker configuration
/// </summary>
public record UserDataTrackerConfig
{
/// <summary>
/// Symbols to initially track, used when polling data. Other symbols will get tracked when updates are received for orders or trades on a new symbol and when there are open orders or positions on a new symbol. To only track the symbols specified here set `OnlyTrackProvidedSymbols` to true.
/// </summary>
public IEnumerable<SharedSymbol> TrackedSymbols { get; set; } = [];
/// <summary>
/// If true only orders and trades in the `Symbols` options will get tracked, data on other symbols will be ignored.
/// </summary>
public bool OnlyTrackProvidedSymbols { get; set; } = false;
/// <summary>
/// Whether to track order trades, can lead to increased requests when polling since they're requested per symbol.
/// </summary>
public bool TrackTrades { get; set; } = true;
}
/// <summary>
/// Spot user data tracker config
/// </summary>
public record SpotUserDataTrackerConfig : UserDataTrackerConfig
{
/// <summary>
/// Balance tracking config
/// </summary>
public TrackerItemConfig BalancesConfig { get; set; } = new TrackerItemConfig(true, TimeSpan.Zero, TimeSpan.FromSeconds(10));
/// <summary>
/// Order tracking config
/// </summary>
public TrackerItemConfig OrdersConfig { get; set; } = new TrackerItemConfig(true, TimeSpan.Zero, TimeSpan.FromSeconds(30));
/// <summary>
/// Trade tracking config
/// </summary>
public TrackerItemConfig UserTradesConfig { get; set; } = new TrackerItemConfig(false, TimeSpan.Zero, TimeSpan.FromSeconds(30));
}
/// <summary>
/// Futures user data tracker config
/// </summary>
public record FuturesUserDataTrackerConfig : UserDataTrackerConfig
{
/// <summary>
/// Balance tracking config
/// </summary>
public TrackerItemConfig BalancesConfig { get; set; } = new TrackerItemConfig(true, TimeSpan.Zero, TimeSpan.FromSeconds(10));
/// <summary>
/// Order tracking config
/// </summary>
public TrackerItemConfig OrdersConfig { get; set; } = new TrackerItemConfig(true, TimeSpan.Zero, TimeSpan.FromSeconds(30));
/// <summary>
/// Trade tracking config
/// </summary>
public TrackerItemConfig UserTradesConfig { get; set; } = new TrackerItemConfig(false, TimeSpan.Zero, TimeSpan.FromSeconds(30));
/// <summary>
/// Position tracking config
/// </summary>
public TrackerItemConfig PositionConfig { get; set; } = new TrackerItemConfig(true, TimeSpan.Zero, TimeSpan.FromSeconds(30));
}
}

View File

@ -0,0 +1,25 @@
namespace CryptoExchange.Net.Trackers.UserData.Objects
{
/// <summary>
/// Data type
/// </summary>
public enum UserDataType
{
/// <summary>
/// Balances
/// </summary>
Balances,
/// <summary>
/// Orders
/// </summary>
Orders,
/// <summary>
/// Trades
/// </summary>
Trades,
/// <summary>
/// Positions
/// </summary>
Positions
}
}

View File

@ -1,4 +1,4 @@
namespace CryptoExchange.Net.Trackers.UserData
namespace CryptoExchange.Net.Trackers.UserData.Objects
{
/// <summary>
/// User data update

View File

@ -1,211 +1,107 @@
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.SharedApis;
using CryptoExchange.Net.Trackers.UserData.ItemTrackers;
using CryptoExchange.Net.Trackers.UserData.Objects;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Trackers.UserData
{
/// <summary>
/// User data tracker
/// </summary>
public abstract class UserDataTracker
{
#warning max age for data?
/// <summary>
/// Logger
/// </summary>
protected readonly ILogger _logger;
// State management
protected DateTime? _startTime = null;
protected DateTime? _lastPollAttempt = null;
protected bool _lastPollSuccessful = false;
protected DateTime? _lastPollTimeOrders = null;
protected DateTime? _lastPollTimeTrades = null;
protected DateTime? _lastDataTimeOrdersBeforeDisconnect = null;
protected DateTime? _lastDataTimeTradesBeforeDisconnect = null;
protected bool _firstPollDone = false;
protected bool _wasDisconnected = false;
// Config
protected List<SharedSymbol> _symbols = new List<SharedSymbol>();
protected TimeSpan _pollIntervalConnected;
protected TimeSpan _pollIntervalDisconnected;
protected bool _pollAtStart;
protected bool _onlyTrackProvidedSymbols;
protected bool _trackTrades = true;
protected AsyncResetEvent _pollWaitEvent = new AsyncResetEvent(false, true);
protected Task? _pollTask;
protected CancellationTokenSource? _cts;
protected object _symbolLock = new object();
/// <summary>
/// Listen key to use for subscriptions
/// </summary>
protected string? _listenKey;
/// <summary>
/// List of data trackers
/// </summary>
protected abstract UserDataItemTracker[] DataTrackers { get; }
/// <inheritdoc />
public string? UserIdentifier { get; }
/// <inheritdoc />
public IEnumerable<SharedSymbol> TrackedSymbols => _symbols.AsEnumerable();
private bool _connected;
/// <inheritdoc />
public bool Connected
{
get => _connected;
protected set
{
if (_connected == value)
return;
/// <summary>
/// Connected status changed
/// </summary>
public event Action<UserDataType, bool>? OnConnectedChange;
_connected = value;
if (!_connected)
_wasDisconnected = true;
else
_pollWaitEvent.Set();
/// <summary>
/// Whether all trackers are full connected
/// </summary>
public bool Connected => DataTrackers.All(x => x.Connected);
InvokeConnectedStatusChanged();
}
}
/// <inheritdoc />
public event Action<bool>? OnConnectedStatusChange;
public UserDataTracker(ILogger logger, UserDataTrackerConfig config, string? userIdentifier)
/// <summary>
/// ctor
/// </summary>
public UserDataTracker(
ILogger logger,
UserDataTrackerConfig config,
string? userIdentifier)
{
if (config.OnlyTrackProvidedSymbols && !config.TrackedSymbols.Any())
throw new ArgumentException(nameof(config.TrackedSymbols), "Conflicting options; `OnlyTrackProvidedSymbols` but no symbols specific in `TrackedSymbols`");
_logger = logger;
_pollIntervalConnected = config.PollIntervalConnected;
_pollIntervalDisconnected = config.PollIntervalDisconnected;
_symbols = config.TrackedSymbols?.ToList() ?? [];
_onlyTrackProvidedSymbols = config.OnlyTrackProvidedSymbols;
_pollAtStart = config.PollAtStart;
_trackTrades = config.TrackTrades;
UserIdentifier = userIdentifier;
}
protected void InvokeConnectedStatusChanged()
{
OnConnectedStatusChange?.Invoke(Connected);
}
/// <summary>
/// Start the data tracker
/// </summary>
public async Task<CallResult> StartAsync()
{
_startTime = DateTime.UtcNow;
_cts = new CancellationTokenSource();
foreach(var tracker in DataTrackers)
tracker.OnConnectedChange += (x) => OnConnectedChange?.Invoke(tracker.DataType, x);
var start = await DoStartAsync().ConfigureAwait(false);
if (!start)
return start;
var result = await DoStartAsync().ConfigureAwait(false);
if (!result)
return result;
Connected = true;
var tasks = new List<Task<CallResult>>();
foreach (var dataTracker in DataTrackers)
{
tasks.Add(dataTracker.StartAsync(_listenKey));
}
await Task.WhenAll(tasks).ConfigureAwait(false);
if (!tasks.All(x => x.Result.Success))
{
await Task.WhenAll(DataTrackers.Select(x => x.StopAsync())).ConfigureAwait(false);
return tasks.First(x => !x.Result.Success).Result;
}
_pollTask = PollAsync();
return CallResult.SuccessResult;
}
/// <summary>
/// Implementation specific start logic
/// </summary>
protected abstract Task<CallResult> DoStartAsync();
/// <inheritdoc />
/// <summary>
/// Stop the data tracker
/// </summary>
public async Task StopAsync()
{
_logger.LogDebug("Stopping UserDataTracker");
_cts?.Cancel();
if (_pollTask != null)
await _pollTask.ConfigureAwait(false);
var tasks = new List<Task>();
foreach (var dataTracker in DataTrackers)
tasks.Add(dataTracker.StopAsync());
await Task.WhenAll(tasks).ConfigureAwait(false);
_logger.LogDebug("Stopped UserDataTracker");
}
private TimeSpan? GetNextPollDelay()
{
if (!_firstPollDone && _pollAtStart)
// First polling should be done immediately
return TimeSpan.Zero;
if (!Connected)
{
if (_pollIntervalDisconnected == TimeSpan.Zero)
// No polling interval
return null;
return _pollIntervalDisconnected;
}
if (_pollIntervalConnected == TimeSpan.Zero)
// No polling interval
return null;
// Wait for next poll
return _pollIntervalConnected;
}
public async Task PollAsync()
{
while (!_cts!.IsCancellationRequested)
{
var delayForNextPoll = GetNextPollDelay();
if (delayForNextPoll != TimeSpan.Zero)
{
try
{
if (delayForNextPoll != null)
_logger.LogTrace("Delay for next polling: {Delay}", delayForNextPoll);
await _pollWaitEvent.WaitAsync(delayForNextPoll, _cts.Token).ConfigureAwait(false);
}
catch { }
}
_firstPollDone = true;
if (_cts.IsCancellationRequested)
break;
if (_lastPollAttempt != null
&& (DateTime.UtcNow - _lastPollAttempt.Value) < TimeSpan.FromSeconds(2)
&& !(Connected && _wasDisconnected))
{
if (_lastPollSuccessful)
// If last poll was less than 2 seconds ago and it was successful don't bother immediately polling again
continue;
}
if (Connected)
_wasDisconnected = false;
_lastPollSuccessful = false;
try
{
var anyError = await DoPollAsync().ConfigureAwait(false);
_lastPollAttempt = DateTime.UtcNow;
_lastPollSuccessful = !anyError;
}
catch (Exception ex)
{
_logger.LogError(ex, "UserDataTracker polling exception");
}
}
}
protected abstract Task<bool> DoPollAsync();
protected void UpdateSymbolsList(IEnumerable<SharedSymbol> symbols)
{
lock (_symbolLock)
{
foreach (var symbol in symbols.Distinct())
{
if (!_symbols.Any(x => x.TradingMode == symbol.TradingMode && x.BaseAsset == symbol.BaseAsset && x.QuoteAsset == symbol.QuoteAsset))
{
_symbols.Add(symbol);
_logger.LogDebug("Adding {BaseAsset}/{QuoteAsset} to symbol tracking list", symbol.BaseAsset, symbol.QuoteAsset);
}
}
}
}
}
}

View File

@ -1,38 +0,0 @@
using CryptoExchange.Net.SharedApis;
using System;
using System.Collections.Generic;
using System.Text;
namespace CryptoExchange.Net.Trackers.UserData
{
/// <summary>
/// User data tracker configuration
/// </summary>
public record UserDataTrackerConfig
{
/// <summary>
/// Symbols to initially track, used when polling data. Other symbols will get tracked when updates are received for orders or trades on a new symbol and when there are open orders or positions on a new symbol. To only track the symbols specified here set `OnlyTrackProvidedSymbols` to true.
/// </summary>
public IEnumerable<SharedSymbol> TrackedSymbols { get; set; } = [];
/// <summary>
/// If true only orders and trades in the `Symbols` options will get tracked, data on other symbols will be ignored.
/// </summary>
public bool OnlyTrackProvidedSymbols { get; set; } = false;
/// <summary>
/// Interval to poll data at as backup, even when the websocket stream is still connected.
/// </summary>
public TimeSpan PollIntervalConnected { get; set; } = TimeSpan.Zero;
/// <summary>
/// Interval to poll data at while the websocket is disconnected.
/// </summary>
public TimeSpan PollIntervalDisconnected { get; set; } = TimeSpan.FromSeconds(30);
/// <summary>
/// Whether to poll for data initially when starting the tracker.
/// </summary>
public bool PollAtStart { get; set; } = true;
/// <summary>
/// Whether to track order trades, can lead to increased requests when polling since they're requested per symbol.
/// </summary>
public bool TrackTrades { get; set; } = true;
}
}

View File

@ -1,712 +1,122 @@
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Sockets;
using CryptoExchange.Net.SharedApis;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Linq;
using CryptoExchange.Net.Trackers.UserData.ItemTrackers;
using CryptoExchange.Net.Trackers.UserData.Interfaces;
using CryptoExchange.Net.Trackers.UserData.Objects;
namespace CryptoExchange.Net.Trackers.UserData
{
/// <summary>
/// User data tracker
/// User futures data tracker
/// </summary>
public abstract class UserFuturesDataTracker : UserDataTracker, IUserFuturesDataTracker
{
// Cached data
private ConcurrentDictionary<string, SharedBalance> _balanceStore = new ConcurrentDictionary<string, SharedBalance>();
private ConcurrentDictionary<string, SharedPosition> _positionStore = new ConcurrentDictionary<string, SharedPosition>();
private ConcurrentDictionary<string, SharedFuturesOrder> _orderStore = new ConcurrentDictionary<string, SharedFuturesOrder>();
private ConcurrentDictionary<string, SharedUserTrade> _tradeStore = new ConcurrentDictionary<string, SharedUserTrade>();
private readonly IFuturesSymbolRestClient _symbolClient;
private readonly IListenKeyRestClient? _listenKeyClient;
private readonly ExchangeParameters? _exchangeParameters;
// Typed clients
private readonly IListenKeyRestClient? _listenKeyRestClient;
private readonly IFuturesSymbolRestClient _futuresSymbolRestClient;
private readonly IBalanceRestClient _balanceRestClient;
private readonly IBalanceSocketClient _balanceSocketClient;
private readonly IFuturesOrderRestClient _futuresOrderRestClient;
private readonly IFuturesOrderSocketClient _futuresOrderSocketClient;
private readonly IPositionSocketClient _positionSocketClient;
private readonly IUserTradeSocketClient? _userTradeSocketClient;
// Subscriptions
private UpdateSubscription? _balanceSubscription;
private UpdateSubscription? _orderSubscription;
private UpdateSubscription? _positionSubscription;
private UpdateSubscription? _tradeSubscription;
private ExchangeParameters? _exchangeParameters;
/// <inheritdoc />
protected override UserDataItemTracker[] DataTrackers { get; }
/// <summary>
/// Balances tracker
/// </summary>
public IUserDataTracker<SharedBalance> Balances { get; }
/// <summary>
/// Orders tracker
/// </summary>
public IUserDataTracker<SharedFuturesOrder> Orders { get; }
/// <summary>
/// Positions tracker
/// </summary>
public IUserDataTracker<SharedPosition> Positions { get; }
/// <summary>
/// Trades tracker
/// </summary>
public IUserDataTracker<SharedUserTrade>? Trades { get; }
/// <summary>
/// Whether updates received from the websocket for positions are full snapshots, or individual updates for positions.
/// This is used to determine if positions should be removed if no longer present in the update data.
/// Whether websocket position updates are full snapshots and missing positions should be considered 0
/// </summary>
protected abstract bool WebsocketPositionUpdatesAreFullSnapshots { get; }
/// <inheritdoc />
public event Func<UserDataUpdate<SharedBalance[]>, Task>? OnBalanceUpdate;
/// <inheritdoc />
public event Func<UserDataUpdate<SharedFuturesOrder[]>, Task>? OnOrderUpdate;
/// <inheritdoc />
public event Func<UserDataUpdate<SharedPosition[]>, Task>? OnPositionUpdate;
/// <inheritdoc />
public event Func<UserDataUpdate<SharedUserTrade[]>, Task>? OnTradeUpdate;
/// <inheritdoc />
public SharedBalance[] Balances => _balanceStore.Values.ToArray();
/// <inheritdoc />
public SharedFuturesOrder[] Orders => _orderStore.Values.ToArray();
/// <inheritdoc />
public SharedPosition[] Positions => _positionStore.Values.ToArray();
/// <inheritdoc />
public SharedUserTrade[] Trades => _tradeStore.Values.ToArray();
/// <summary>
/// ctor
/// </summary>
protected UserFuturesDataTracker(
public UserFuturesDataTracker(
ILogger logger,
ISharedClient restClient,
ISharedClient socketClient,
IFuturesSymbolRestClient symbolRestClient,
IListenKeyRestClient? listenKeyRestClient,
IBalanceRestClient balanceRestClient,
IBalanceSocketClient? balanceSocketClient,
IFuturesOrderRestClient futuresOrderRestClient,
IFuturesOrderSocketClient? futuresOrderSocketClient,
IUserTradeSocketClient? userTradeSocketClient,
IPositionSocketClient? positionSocketClient,
string? userIdentifier,
UserDataTrackerConfig config,
FuturesUserDataTrackerConfig config,
ExchangeParameters? exchangeParameters = null) : base(logger, config, userIdentifier)
{
// create trackers
_symbolClient = symbolRestClient;
_listenKeyClient = listenKeyRestClient;
_exchangeParameters = exchangeParameters;
_futuresSymbolRestClient = (IFuturesSymbolRestClient)restClient;
_balanceRestClient = (IBalanceRestClient)restClient;
_balanceSocketClient = (IBalanceSocketClient)socketClient;
_futuresOrderRestClient = (IFuturesOrderRestClient)restClient;
_futuresOrderSocketClient = (IFuturesOrderSocketClient)socketClient;
_positionSocketClient = (IPositionSocketClient)socketClient;
_listenKeyRestClient = restClient as IListenKeyRestClient;
_userTradeSocketClient = socketClient as IUserTradeSocketClient;
var trackers = new List<UserDataItemTracker>();
var balanceTracker = new BalanceTracker(logger, balanceRestClient, balanceSocketClient, config.BalancesConfig, exchangeParameters);
Balances = balanceTracker;
trackers.Add(balanceTracker);
var orderTracker = new FuturesOrderTracker(logger, futuresOrderRestClient, futuresOrderSocketClient, config.OrdersConfig, config.TrackedSymbols, config.OnlyTrackProvidedSymbols, exchangeParameters);
Orders = orderTracker;
trackers.Add(orderTracker);
var positionTracker = new PositionTracker(logger, futuresOrderRestClient, positionSocketClient, config.PositionConfig, config.TrackedSymbols, config.OnlyTrackProvidedSymbols, WebsocketPositionUpdatesAreFullSnapshots, exchangeParameters);
Positions = positionTracker;
trackers.Add(positionTracker);
if (config.TrackTrades)
{
var tradeTracker = new FuturesUserTradeTracker(logger, futuresOrderRestClient, userTradeSocketClient, config.UserTradesConfig, config.TrackedSymbols, config.OnlyTrackProvidedSymbols, exchangeParameters);
Trades = tradeTracker;
trackers.Add(tradeTracker);
orderTracker.OnTradeUpdate += tradeTracker.HandleUpdateAsync;
tradeTracker.GetTrackedOrderIds = () => orderTracker.Values.Select(x => x.OrderId).ToArray();
}
DataTrackers = trackers.ToArray();
}
/// <inheritdoc />
protected override async Task<CallResult> DoStartAsync()
{
_logger.LogDebug("Starting UserDataTracker");
// Request symbols so SharedSymbol property can be filled on updates
var symbolResult = await _futuresSymbolRestClient.GetFuturesSymbolsAsync(new GetSymbolsRequest(exchangeParameters: _exchangeParameters)).ConfigureAwait(false);
var symbolResult = await _symbolClient.GetFuturesSymbolsAsync(new GetSymbolsRequest(exchangeParameters: _exchangeParameters)).ConfigureAwait(false);
if (!symbolResult)
{
_logger.LogWarning("Failed to start UserFuturesDataTracker; symbols request failed: {Error}", symbolResult.Error!.Message);
return symbolResult;
}
string? listenKey = null;
if (_listenKeyRestClient != null)
if (_listenKeyClient != null)
{
var lkResult = await _listenKeyRestClient.StartListenKeyAsync(new StartListenKeyRequest(exchangeParameters: _exchangeParameters)).ConfigureAwait(false);
var lkResult = await _listenKeyClient.StartListenKeyAsync(new StartListenKeyRequest(exchangeParameters: _exchangeParameters)).ConfigureAwait(false);
if (!lkResult)
{
_logger.LogWarning("Failed to start UserFuturesDataTracker; listen key request failed: {Error}", lkResult.Error!.Message);
return lkResult;
}
listenKey = lkResult.Data;
_listenKey = lkResult.Data;
}
var subBalanceResult = await ExchangeHelpers.ProcessQueuedAsync<SharedBalance[]>(
async handler => await _balanceSocketClient.SubscribeToBalanceUpdatesAsync(new SubscribeBalancesRequest(listenKey, exchangeParameters: _exchangeParameters), handler, ct: _cts!.Token).ConfigureAwait(false),
x => HandleBalanceUpdateAsync(UpdateSource.Push, x.Data)).ConfigureAwait(false);
if (!subBalanceResult)
{
_logger.LogWarning("Failed to start UserFuturesDataTracker; failed to subscribe to balance stream: {Error}", subBalanceResult.Error!.Message);
return subBalanceResult;
}
_balanceSubscription = subBalanceResult.Data;
subBalanceResult.Data.SubscriptionStatusChanged += BalanceSubscriptionStatusChanged;
var subOrderResult = await ExchangeHelpers.ProcessQueuedAsync<SharedFuturesOrder[]>(
async handler => await _futuresOrderSocketClient.SubscribeToFuturesOrderUpdatesAsync(new SubscribeFuturesOrderRequest(listenKey, exchangeParameters: _exchangeParameters), handler, ct: _cts!.Token).ConfigureAwait(false),
x => HandleOrderUpdateAsync(UpdateSource.Push, x.Data)).ConfigureAwait(false);
if (!subOrderResult)
{
_cts!.Cancel();
_logger.LogWarning("Failed to start UserFuturesDataTracker; failed to subscribe to order stream: {Error}", subOrderResult.Error!.Message);
return subOrderResult;
}
_orderSubscription = subOrderResult.Data;
subOrderResult.Data.SubscriptionStatusChanged += OrderSubscriptionStatusChanged;
if (_positionSocketClient != null)
{
var subPositionResult = await ExchangeHelpers.ProcessQueuedAsync<SharedPosition[]>(
async handler => await _positionSocketClient.SubscribeToPositionUpdatesAsync(new SubscribePositionRequest(listenKey, exchangeParameters: _exchangeParameters), handler, ct: _cts!.Token).ConfigureAwait(false),
x => HandlePositionUpdateAsync(UpdateSource.Push, x.Data)).ConfigureAwait(false);
if (!subPositionResult)
{
_cts!.Cancel();
_logger.LogWarning("Failed to start UserFuturesDataTracker; failed to subscribe to trade stream: {Error}", subPositionResult.Error!.Message);
return subPositionResult;
}
_positionSubscription = subPositionResult.Data;
subPositionResult.Data.SubscriptionStatusChanged += PositionSubscriptionStatusChanged;
}
if (_userTradeSocketClient != null && _trackTrades)
{
var subTradeResult = await ExchangeHelpers.ProcessQueuedAsync<SharedUserTrade[]>(
async handler => await _userTradeSocketClient.SubscribeToUserTradeUpdatesAsync(new SubscribeUserTradeRequest(listenKey, exchangeParameters: _exchangeParameters), handler, ct: _cts!.Token).ConfigureAwait(false),
x => HandleTradeUpdateAsync(UpdateSource.Push, x.Data)).ConfigureAwait(false);
if (!subOrderResult)
{
_cts.Cancel();
_logger.LogWarning("Failed to start UserFuturesDataTracker; failed to subscribe to trade stream: {Error}", subTradeResult.Error!.Message);
return subOrderResult;
}
_tradeSubscription = subTradeResult.Data;
subTradeResult.Data.SubscriptionStatusChanged += TradeSubscriptionStatusChanged;
}
_logger.LogDebug("Started UserFuturesDataTracker");
return CallResult.SuccessResult;
}
private async Task HandleTradeUpdateAsync(UpdateSource source, SharedUserTrade[] @event)
{
var unknownSymbols = @event.Where(x => x.SharedSymbol == null);
if (unknownSymbols.Any())
{
_logger.LogWarning("Received order without SharedSymbol set, ignoring");
@event = @event.Except(unknownSymbols).ToArray();
}
if (!_onlyTrackProvidedSymbols)
UpdateSymbolsList(@event.Select(x => x.SharedSymbol!));
else
@event = @event.Where(x => _symbols.Any(y => y.TradingMode == x.SharedSymbol!.TradingMode && y.BaseAsset == x.SharedSymbol.BaseAsset && y.QuoteAsset == x.SharedSymbol.QuoteAsset)).ToArray();
// Update local store
var updatedIds = @event.Select(x => x.Id).ToList();
foreach (var item in @event)
{
if (_tradeStore.TryAdd(item.Id, item))
_logger.LogDebug("Added user trade {Symbol}.{Id}", item.Symbol, item.Id);
else
updatedIds.Remove(item.Id);
}
if (updatedIds.Count > 0 && OnTradeUpdate != null)
{
await OnTradeUpdate.Invoke(
new UserDataUpdate<SharedUserTrade[]>
{
Source = source,
Data = _tradeStore.Where(x => updatedIds.Contains(x.Key)).Select(x => x.Value).ToArray()
}).ConfigureAwait(false);
}
}
private async Task HandleOrderUpdateAsync(UpdateSource source, SharedFuturesOrder[] @event)
{
var unknownSymbols = @event.Where(x => x.SharedSymbol == null);
if (unknownSymbols.Any())
{
_logger.LogWarning("Received order without SharedSymbol set, ignoring");
@event = @event.Except(unknownSymbols).ToArray();
}
if (!_onlyTrackProvidedSymbols)
UpdateSymbolsList(@event.Select(x => x.SharedSymbol!));
else
@event = @event.Where(x => _symbols.Any(y => y.TradingMode == x.SharedSymbol!.TradingMode && y.BaseAsset == x.SharedSymbol.BaseAsset && y.QuoteAsset == x.SharedSymbol.QuoteAsset)).ToArray();
// Update local store
var updatedIds = @event.Select(x => x.OrderId).ToList();
foreach (var item in @event)
{
bool orderExisted = false;
_orderStore.AddOrUpdate(item.OrderId, item, (id, existing) =>
{
orderExisted = true;
var updated = UpdateFuturesOrder(existing, item);
if (!updated)
updatedIds.Remove(id);
else
_logger.LogDebug("Updated futures order {Symbol}.{Id}", item.Symbol, item.OrderId);
return existing;
});
if (!orderExisted)
_logger.LogDebug("Added futures order {Symbol}.{Id}", item.Symbol, item.OrderId);
}
if (updatedIds.Count > 0 && OnOrderUpdate != null)
{
await OnOrderUpdate.Invoke(
new UserDataUpdate<SharedFuturesOrder[]>
{
Source = source,
Data = _orderStore.Where(x => updatedIds.Contains(x.Key)).Select(x => x.Value).ToArray()
}).ConfigureAwait(false);
}
var trades = @event.Where(x => x.LastTrade != null).Select(x => x.LastTrade!).ToArray();
if (trades.Length != 0)
await HandleTradeUpdateAsync(source, trades).ConfigureAwait(false);
}
private string PositionIdentifier(SharedPosition position) =>
position.Id ?? position.Symbol + position.PositionMode + (position.PositionMode != SharedPositionMode.OneWay ? position.PositionSide.ToString() : "");
private async Task HandlePositionUpdateAsync(UpdateSource source, SharedPosition[] @event)
{
// Update local store
var updatedPositions = @event.Select(PositionIdentifier).ToList();
if (WebsocketPositionUpdatesAreFullSnapshots)
{
// Reset any tracking position to zero/null values when it's no longer in the snapshot as it means there is no open position any more
var notInSnapshot = _positionStore.Where(x => !updatedPositions.Contains(x.Key) && x.Value.PositionSize != 0).ToList();
foreach (var position in notInSnapshot)
{
position.Value.UpdateTime = DateTime.UtcNow;
position.Value.AverageOpenPrice = null;
position.Value.LiquidationPrice = null;
position.Value.PositionSize = 0;
position.Value.StopLossPrice = null;
position.Value.TakeProfitPrice = null;
position.Value.UnrealizedPnl = null;
updatedPositions.Add(position.Key);
}
}
foreach (var item in @event)
{
bool positionExisted = false;
_positionStore.AddOrUpdate(PositionIdentifier(item), item, (key, existing) =>
{
positionExisted = true;
var updated = UpdatePosition(existing, item);
if (!updated)
updatedPositions.Remove(key);
else
_logger.LogDebug("Updated position for {Symbol}", item.Symbol);
return existing;
});
if (!positionExisted)
_logger.LogDebug("Added position for {Symbol}", item.Symbol);
}
if (updatedPositions.Count > 0 && OnPositionUpdate != null)
{
await OnPositionUpdate.Invoke(
new UserDataUpdate<SharedPosition[]>
{
Source = source,
Data = _positionStore.Where(x => updatedPositions.Contains(x.Key)).Select(x => x.Value).ToArray()
}).ConfigureAwait(false);
}
}
private async Task HandleBalanceUpdateAsync(UpdateSource source, SharedBalance[] @event)
{
// Update local store
var updatedAssets = @event.Select(x => x.Asset).ToList();
foreach (var item in @event)
{
bool balanceExisted = false;
_balanceStore.AddOrUpdate(item.Asset, item, (asset, existing) =>
{
balanceExisted = true;
var updated = UpdateBalance(existing, item);
if (!updated)
updatedAssets.Remove(asset);
else
_logger.LogDebug("Updated balance for {Asset}", item.Asset);
return existing;
});
if (!balanceExisted)
_logger.LogDebug("Added balance for {Asset}", item.Asset);
}
if (updatedAssets.Count > 0 && OnBalanceUpdate != null)
{
await OnBalanceUpdate.Invoke(
new UserDataUpdate<SharedBalance[]>
{
Source = source,
Data = _balanceStore.Where(x => updatedAssets.Contains(x.Key)).Select(x => x.Value).ToArray()
}).ConfigureAwait(false);
}
}
private void CheckConnectedChanged()
{
Connected = _balanceSubscription?.SubscriptionStatus == SubscriptionStatus.Subscribed
&& _orderSubscription?.SubscriptionStatus == SubscriptionStatus.Subscribed
&& (_tradeSubscription == null || _tradeSubscription.SubscriptionStatus == SubscriptionStatus.Subscribed)
&& (_positionSubscription == null || _positionSubscription.SubscriptionStatus == SubscriptionStatus.Subscribed);
}
private void BalanceSubscriptionStatusChanged(SubscriptionStatus newState)
{
_logger.LogDebug("Balance stream status changed: {NewState}", newState);
CheckConnectedChanged();
}
private void PositionSubscriptionStatusChanged(SubscriptionStatus newState)
{
_logger.LogDebug("Position stream status changed: {NewState}", newState);
CheckConnectedChanged();
}
private void OrderSubscriptionStatusChanged(SubscriptionStatus newState)
{
_logger.LogDebug("Order stream status changed: {NewState}", newState);
if (newState == SubscriptionStatus.Pending)
{
// Record last data receive time since we need to request data from that timestamp on when polling
// Only set to new value if it isn't already set since if we disconnect/reconnect a couple of times without
// managing to do a poll we don't want to override the time since we still need to request that earlier data
if (_lastDataTimeOrdersBeforeDisconnect == null)
_lastDataTimeOrdersBeforeDisconnect = _orderSubscription!.LastReceiveTime;
}
CheckConnectedChanged();
}
private void TradeSubscriptionStatusChanged(SubscriptionStatus newState)
{
_logger.LogDebug("Trade stream status changed: {NewState}", newState);
if (newState == SubscriptionStatus.Pending)
{
// Record last data receive time since we need to request data from that timestamp on when polling
// Only set to new value if it isn't already set since if we disconnect/reconnect a couple of times without
// managing to do a poll we don't want to override the time since we still need to request that earlier data
if (_lastDataTimeTradesBeforeDisconnect == null)
_lastDataTimeTradesBeforeDisconnect = _tradeSubscription?.LastReceiveTime;
}
CheckConnectedChanged();
}
private bool UpdateBalance(SharedBalance existingBalance, SharedBalance newBalance)
{
// Some other way to way to determine sequence? Maybe timestamp?
var changed = false;
if (existingBalance.Total != newBalance.Total)
{
existingBalance.Total = newBalance.Total;
changed = true;
}
if (existingBalance.Available != newBalance.Available)
{
existingBalance.Available = newBalance.Available;
changed = true;
}
return changed;
}
private bool UpdatePosition(SharedPosition existingPosition, SharedPosition newPosition)
{
// Some other way to way to determine sequence? Maybe timestamp?
var changed = false;
if (existingPosition.AverageOpenPrice != newPosition.AverageOpenPrice)
{
existingPosition.AverageOpenPrice = newPosition.AverageOpenPrice;
changed = true;
}
if (existingPosition.Leverage != newPosition.Leverage)
{
existingPosition.Leverage = newPosition.Leverage;
changed = true;
}
if (existingPosition.LiquidationPrice != newPosition.LiquidationPrice)
{
existingPosition.LiquidationPrice = newPosition.LiquidationPrice;
changed = true;
}
if (existingPosition.PositionSize != newPosition.PositionSize)
{
existingPosition.PositionSize = newPosition.PositionSize;
changed = true;
}
if (existingPosition.StopLossPrice != newPosition.StopLossPrice)
{
existingPosition.StopLossPrice = newPosition.StopLossPrice;
changed = true;
}
if (existingPosition.TakeProfitPrice != newPosition.TakeProfitPrice)
{
existingPosition.TakeProfitPrice = newPosition.TakeProfitPrice;
changed = true;
}
if (newPosition.UnrealizedPnl != null && existingPosition.UnrealizedPnl != newPosition.UnrealizedPnl)
{
existingPosition.UnrealizedPnl = newPosition.UnrealizedPnl;
changed = true;
}
if (newPosition.UpdateTime != null && existingPosition.UpdateTime != newPosition.UpdateTime)
{
existingPosition.UpdateTime = newPosition.UpdateTime;
// If update time is the only changed prop don't mark it as changed
}
return changed;
}
private bool UpdateFuturesOrder(SharedFuturesOrder existingOrder, SharedFuturesOrder newOrder)
{
if (CheckIfOrderUpdateIsNewer(existingOrder, newOrder) == false)
// Update is older than the existing data, ignore
return false;
var changed = false;
if (newOrder.AveragePrice != null && newOrder.AveragePrice != existingOrder.AveragePrice)
{
existingOrder.AveragePrice = newOrder.AveragePrice;
changed = true;
}
if (newOrder.OrderPrice != null && newOrder.OrderPrice != existingOrder.OrderPrice)
{
existingOrder.OrderPrice = newOrder.OrderPrice;
changed = true;
}
if (newOrder.Fee != null && newOrder.Fee != existingOrder.Fee)
{
existingOrder.Fee = newOrder.Fee;
changed = true;
}
if (newOrder.FeeAsset != null && newOrder.FeeAsset != existingOrder.FeeAsset)
{
existingOrder.FeeAsset = newOrder.FeeAsset;
changed = true;
}
if (newOrder.OrderQuantity != null && newOrder.OrderQuantity != existingOrder.OrderQuantity)
{
existingOrder.OrderQuantity = newOrder.OrderQuantity;
changed = true;
}
if (newOrder.QuantityFilled != null && newOrder.QuantityFilled != existingOrder.QuantityFilled)
{
existingOrder.QuantityFilled = newOrder.QuantityFilled;
changed = true;
}
if (newOrder.Status != existingOrder.Status)
{
existingOrder.Status = newOrder.Status;
changed = true;
}
if (newOrder.StopLossPrice != existingOrder.StopLossPrice)
{
existingOrder.StopLossPrice = newOrder.StopLossPrice;
changed = true;
}
if (newOrder.TakeProfitPrice != existingOrder.TakeProfitPrice)
{
existingOrder.TakeProfitPrice = newOrder.TakeProfitPrice;
changed = true;
}
if (newOrder.TriggerPrice != existingOrder.TriggerPrice)
{
existingOrder.TriggerPrice = newOrder.TriggerPrice;
changed = true;
}
if (newOrder.UpdateTime != null && newOrder.UpdateTime != existingOrder.UpdateTime)
{
existingOrder.UpdateTime = newOrder.UpdateTime;
changed = true;
}
return changed;
}
private bool? CheckIfOrderUpdateIsNewer(SharedFuturesOrder existingOrder, SharedFuturesOrder newOrder)
{
if (existingOrder.Status == SharedOrderStatus.Open && newOrder.Status != SharedOrderStatus.Open)
// status changed from open to not open
return true;
if (existingOrder.Status != SharedOrderStatus.Open && newOrder.Status == SharedOrderStatus.Open)
// status changed from not open to open; stale
return false;
if (existingOrder.UpdateTime != null && newOrder.UpdateTime != null)
{
// If both have an update time base of that
if (existingOrder.UpdateTime < newOrder.UpdateTime)
return true;
if (existingOrder.UpdateTime > newOrder.UpdateTime)
return false;
}
if (existingOrder.QuantityFilled != null && newOrder.QuantityFilled != null)
{
if (existingOrder.QuantityFilled.QuantityInBaseAsset != null && newOrder.QuantityFilled.QuantityInBaseAsset != null)
{
// If base quantity is not null we can base it on that
if (existingOrder.QuantityFilled.QuantityInBaseAsset < newOrder.QuantityFilled.QuantityInBaseAsset)
return true;
else if (existingOrder.QuantityFilled.QuantityInBaseAsset > newOrder.QuantityFilled.QuantityInBaseAsset)
return false;
}
if (existingOrder.QuantityFilled.QuantityInQuoteAsset != null && newOrder.QuantityFilled.QuantityInQuoteAsset != null)
{
// If quote quantity is not null we can base it on that
if (existingOrder.QuantityFilled.QuantityInQuoteAsset < newOrder.QuantityFilled.QuantityInQuoteAsset)
return true;
else if (existingOrder.QuantityFilled.QuantityInQuoteAsset > newOrder.QuantityFilled.QuantityInQuoteAsset)
return false;
}
}
if (existingOrder.Fee != null && newOrder.Fee != null)
{
// Higher fee means later processing
if (existingOrder.Fee < newOrder.Fee)
return true;
if (existingOrder.Fee > newOrder.Fee)
return false;
}
return null;
}
/// <inheritdoc />
protected override async Task<bool> DoPollAsync()
{
_logger.LogDebug("Starting user data requesting");
var anyError = false;
var balancesResult = await _balanceRestClient.GetBalancesAsync(new GetBalancesRequest(exchangeParameters: _exchangeParameters)).ConfigureAwait(false);
if (!balancesResult.Success)
{
// .. ?
var transientError = balancesResult.Error!.IsTransient;
// If transient we can retry
// Should communicate errors, also for websocket disconnecting
anyError = true;
}
else
{
await HandleBalanceUpdateAsync(UpdateSource.Poll, balancesResult.Data).ConfigureAwait(false);
}
var openOrdersResult = await _futuresOrderRestClient.GetOpenFuturesOrdersAsync(new GetOpenOrdersRequest(exchangeParameters: _exchangeParameters)).ConfigureAwait(false);
if (!openOrdersResult.Success)
{
// .. ?
anyError = true;
}
else
{
await HandleOrderUpdateAsync(UpdateSource.Poll, openOrdersResult.Data).ConfigureAwait(false);
}
var positionResult = await _futuresOrderRestClient.GetPositionsAsync(new GetPositionsRequest(exchangeParameters: _exchangeParameters)).ConfigureAwait(false);
if (!positionResult.Success)
{
// .. ?
anyError = true;
}
else
{
await HandlePositionUpdateAsync(UpdateSource.Poll, positionResult.Data).ConfigureAwait(false);
}
foreach (var symbol in _symbols)
{
var fromTimeOrders = _lastDataTimeOrdersBeforeDisconnect ?? _lastPollTimeOrders ?? _startTime;
var updatedPollTime = DateTime.UtcNow;
var closedOrdersResult = await _futuresOrderRestClient.GetClosedFuturesOrdersAsync(new GetClosedOrdersRequest(symbol, startTime: fromTimeOrders, exchangeParameters: _exchangeParameters)).ConfigureAwait(false);
if (!closedOrdersResult.Success)
{
// .. ?
anyError = true;
}
else
{
_lastDataTimeOrdersBeforeDisconnect = null;
_lastPollTimeOrders = updatedPollTime;
// Filter orders to only include where close time is after the start time
var relevantOrders = closedOrdersResult.Data.Where(x =>
x.UpdateTime != null && x.UpdateTime >= _startTime // Updated after the tracker start time
|| x.CreateTime != null && x.CreateTime >= _startTime // Created after the tracker start time
|| x.CreateTime == null && x.UpdateTime == null // Unknown time
).ToArray();
if (relevantOrders.Length > 0)
await HandleOrderUpdateAsync(UpdateSource.Poll, relevantOrders).ConfigureAwait(false);
}
if (_trackTrades)
{
var fromTimeTrades = _lastDataTimeTradesBeforeDisconnect ?? _lastPollTimeTrades ?? _startTime;
var tradesResult = await _futuresOrderRestClient.GetFuturesUserTradesAsync(new GetUserTradesRequest(symbol, startTime: fromTimeTrades, exchangeParameters: _exchangeParameters)).ConfigureAwait(false);
if (!tradesResult.Success)
{
// .. ?
anyError = true;
}
else
{
_lastDataTimeTradesBeforeDisconnect = null;
_lastPollTimeTrades = updatedPollTime;
// Filter trades to only include where timestamp is after the start time OR it's part of an order we're tracking
var relevantTrades = tradesResult.Data.Where(x => x.Timestamp >= _startTime || _orderStore.ContainsKey(x.OrderId)).ToArray();
if (relevantTrades.Length > 0)
await HandleTradeUpdateAsync(UpdateSource.Poll, tradesResult.Data).ConfigureAwait(false);
}
}
}
_logger.LogDebug("User data requesting completed");
return anyError;
}
}
}

View File

@ -1,533 +1,100 @@
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Sockets;
using CryptoExchange.Net.SharedApis;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Linq;
using CryptoExchange.Net.Trackers.UserData.Interfaces;
using CryptoExchange.Net.Trackers.UserData.Objects;
using CryptoExchange.Net.Trackers.UserData.ItemTrackers;
namespace CryptoExchange.Net.Trackers.UserData
{
/// <summary>
/// User data tracker
/// Spot user data tracker
/// </summary>
public abstract class UserSpotDataTracker : UserDataTracker, IUserSpotDataTracker
public class UserSpotDataTracker : UserDataTracker, IUserSpotDataTracker
{
// Cached data
private ConcurrentDictionary<string, SharedBalance> _balanceStore = new ConcurrentDictionary<string, SharedBalance>();
private ConcurrentDictionary<string, SharedSpotOrder> _orderStore = new ConcurrentDictionary<string, SharedSpotOrder>();
private ConcurrentDictionary<string, SharedUserTrade> _tradeStore = new ConcurrentDictionary<string, SharedUserTrade>();
// Typed clients
private readonly IListenKeyRestClient? _listenKeyRestClient;
private readonly ISpotSymbolRestClient _spotSymbolRestClient;
private readonly IBalanceRestClient _balanceRestClient;
private readonly IBalanceSocketClient _balanceSocketClient;
private readonly ISpotOrderRestClient _spotOrderRestClient;
private readonly ISpotOrderSocketClient _spotOrderSocketClient;
private readonly IUserTradeSocketClient? _userTradeSocketClient;
// Subscriptions
private UpdateSubscription? _balanceSubscription;
private UpdateSubscription? _orderSubscription;
private UpdateSubscription? _tradeSubscription;
private ExchangeParameters? _exchangeParameters;
private readonly ISpotSymbolRestClient _symbolClient;
private readonly IListenKeyRestClient? _listenKeyClient;
private readonly ExchangeParameters? _exchangeParameters;
/// <inheritdoc />
public event Func<UserDataUpdate<SharedBalance[]>, Task>? OnBalanceUpdate;
protected override UserDataItemTracker[] DataTrackers { get; }
/// <inheritdoc />
public event Func<UserDataUpdate<SharedSpotOrder[]>, Task>? OnOrderUpdate;
public IUserDataTracker<SharedBalance> Balances { get; }
/// <inheritdoc />
public event Func<UserDataUpdate<SharedUserTrade[]>, Task>? OnTradeUpdate;
public IUserDataTracker<SharedSpotOrder> Orders { get; }
/// <inheritdoc />
public SharedBalance[] Balances => _balanceStore.Values.ToArray();
/// <inheritdoc />
public SharedSpotOrder[] Orders => _orderStore.Values.ToArray();
/// <inheritdoc />
public SharedUserTrade[] Trades => _tradeStore.Values.ToArray();
public IUserDataTracker<SharedUserTrade>? Trades { get; }
/// <summary>
/// ctor
/// </summary>
protected UserSpotDataTracker(
public UserSpotDataTracker(
ILogger logger,
ISharedClient restClient,
ISharedClient socketClient,
ISpotSymbolRestClient symbolRestClient,
IListenKeyRestClient? listenKeyRestClient,
IBalanceRestClient balanceRestClient,
IBalanceSocketClient? balanceSocketClient,
ISpotOrderRestClient spotOrderRestClient,
ISpotOrderSocketClient? spotOrderSocketClient,
IUserTradeSocketClient? userTradeSocketClient,
string? userIdentifier,
UserDataTrackerConfig config,
ExchangeParameters? exchangeParameters = null
) : base(logger, config, userIdentifier)
SpotUserDataTrackerConfig config,
ExchangeParameters? exchangeParameters = null) : base(logger, config, userIdentifier)
{
// create trackers
_symbolClient = symbolRestClient;
_listenKeyClient = listenKeyRestClient;
_exchangeParameters = exchangeParameters;
_spotSymbolRestClient = (ISpotSymbolRestClient)restClient;
_balanceRestClient = (IBalanceRestClient)restClient;
_balanceSocketClient = (IBalanceSocketClient)socketClient;
_spotOrderRestClient = (ISpotOrderRestClient)restClient;
_spotOrderSocketClient = (ISpotOrderSocketClient)socketClient;
_listenKeyRestClient = restClient as IListenKeyRestClient;
_userTradeSocketClient = socketClient as IUserTradeSocketClient;
var trackers = new List<UserDataItemTracker>();
var balanceTracker = new BalanceTracker(logger, balanceRestClient, balanceSocketClient, config.BalancesConfig, exchangeParameters);
Balances = balanceTracker;
trackers.Add(balanceTracker);
var orderTracker = new SpotOrderTracker(logger, spotOrderRestClient, spotOrderSocketClient, config.OrdersConfig, config.TrackedSymbols, config.OnlyTrackProvidedSymbols, exchangeParameters);
Orders = orderTracker;
trackers.Add(orderTracker);
if (config.TrackTrades)
{
var tradeTracker = new SpotUserTradeTracker(logger, spotOrderRestClient, userTradeSocketClient, config.UserTradesConfig, config.TrackedSymbols, config.OnlyTrackProvidedSymbols, exchangeParameters);
Trades = tradeTracker;
trackers.Add(tradeTracker);
orderTracker.OnTradeUpdate += tradeTracker.HandleUpdateAsync;
tradeTracker.GetTrackedOrderIds = () => orderTracker.Values.Select(x => x.OrderId).ToArray();
}
DataTrackers = trackers.ToArray();
}
/// <inheritdoc />
protected override async Task<CallResult> DoStartAsync()
{
_logger.LogDebug("Starting UserDataTracker");
// Request symbols so SharedSymbol property can be filled on updates
var symbolResult = await _spotSymbolRestClient.GetSpotSymbolsAsync(new GetSymbolsRequest(exchangeParameters: _exchangeParameters)).ConfigureAwait(false);
var symbolResult = await _symbolClient.GetSpotSymbolsAsync(new GetSymbolsRequest(exchangeParameters: _exchangeParameters)).ConfigureAwait(false);
if (!symbolResult)
{
_logger.LogWarning("Failed to start UserDataTracker; symbols request failed: {Error}", symbolResult.Error!.Message);
_logger.LogWarning("Failed to start UserSpotDataTracker; symbols request failed: {Error}", symbolResult.Error!.Message);
return symbolResult;
}
string? listenKey = null;
if (_listenKeyRestClient != null)
if (_listenKeyClient != null)
{
var lkResult = await _listenKeyRestClient.StartListenKeyAsync(new StartListenKeyRequest(exchangeParameters: _exchangeParameters)).ConfigureAwait(false);
var lkResult = await _listenKeyClient.StartListenKeyAsync(new StartListenKeyRequest(exchangeParameters: _exchangeParameters)).ConfigureAwait(false);
if (!lkResult)
{
_logger.LogWarning("Failed to start UserDataTracker; listen key request failed: {Error}", lkResult.Error!.Message);
_logger.LogWarning("Failed to start UserSpotDataTracker; listen key request failed: {Error}", lkResult.Error!.Message);
return lkResult;
}
listenKey = lkResult.Data;
_listenKey = lkResult.Data;
}
var subBalanceResult = await ExchangeHelpers.ProcessQueuedAsync<SharedBalance[]>(
async handler => await _balanceSocketClient.SubscribeToBalanceUpdatesAsync(new SubscribeBalancesRequest(listenKey, exchangeParameters: _exchangeParameters), handler, ct: _cts!.Token).ConfigureAwait(false),
x => HandleBalanceUpdateAsync(UpdateSource.Push, x.Data)).ConfigureAwait(false);
if (!subBalanceResult)
{
_logger.LogWarning("Failed to start UserDataTracker; failed to subscribe to balance stream: {Error}", subBalanceResult.Error!.Message);
return subBalanceResult;
}
_balanceSubscription = subBalanceResult.Data;
subBalanceResult.Data.SubscriptionStatusChanged += BalanceSubscriptionStatusChanged;
var subOrderResult = await ExchangeHelpers.ProcessQueuedAsync<SharedSpotOrder[]>(
async handler => await _spotOrderSocketClient.SubscribeToSpotOrderUpdatesAsync(new SubscribeSpotOrderRequest(listenKey, exchangeParameters: _exchangeParameters), handler, ct: _cts!.Token).ConfigureAwait(false),
x => HandleOrderUpdateAsync(UpdateSource.Push, x.Data)).ConfigureAwait(false);
if (!subOrderResult)
{
_cts!.Cancel();
_logger.LogWarning("Failed to start UserDataTracker; failed to subscribe to order stream: {Error}", subOrderResult.Error!.Message);
return subOrderResult;
}
_orderSubscription = subOrderResult.Data;
subOrderResult.Data.SubscriptionStatusChanged += OrderSubscriptionStatusChanged;
if (_userTradeSocketClient != null && _trackTrades)
{
var subTradeResult = await ExchangeHelpers.ProcessQueuedAsync<SharedUserTrade[]>(
async handler => await _userTradeSocketClient.SubscribeToUserTradeUpdatesAsync(new SubscribeUserTradeRequest(listenKey, exchangeParameters: _exchangeParameters), handler, ct: _cts!.Token).ConfigureAwait(false),
x => HandleTradeUpdateAsync(UpdateSource.Push, x.Data)).ConfigureAwait(false);
if (!subOrderResult)
{
_cts!.Cancel();
_logger.LogWarning("Failed to start UserDataTracker; failed to subscribe to trade stream: {Error}", subTradeResult.Error!.Message);
return subOrderResult;
}
_tradeSubscription = subTradeResult.Data;
subTradeResult.Data.SubscriptionStatusChanged += TradeSubscriptionStatusChanged;
}
_logger.LogDebug("Started UserDataTracker");
return CallResult.SuccessResult;
}
private async Task HandleTradeUpdateAsync(UpdateSource source, SharedUserTrade[] @event)
{
var unknownSymbols = @event.Where(x => x.SharedSymbol == null);
if (unknownSymbols.Any())
{
_logger.LogWarning("Received order without SharedSymbol set, ignoring");
@event = @event.Except(unknownSymbols).ToArray();
}
if (!_onlyTrackProvidedSymbols)
UpdateSymbolsList(@event.Select(x => x.SharedSymbol!));
else
@event = @event.Where(x => _symbols.Any(y => y.TradingMode == x.SharedSymbol!.TradingMode && y.BaseAsset == x.SharedSymbol.BaseAsset && y.QuoteAsset == x.SharedSymbol.QuoteAsset)).ToArray();
// Update local store
var updatedIds = @event.Select(x => x.Id).ToList();
foreach (var item in @event)
{
if (_tradeStore.TryAdd(item.Id, item))
_logger.LogDebug("Added user trade {Symbol}.{Id}", item.Symbol, item.Id);
else
updatedIds.Remove(item.Id);
}
if (updatedIds.Count > 0 && OnTradeUpdate != null)
{
await OnTradeUpdate.Invoke(
new UserDataUpdate<SharedUserTrade[]>
{
Source = source,
Data = _tradeStore.Where(x => updatedIds.Contains(x.Key)).Select(x => x.Value).ToArray()
}).ConfigureAwait(false);
}
}
private async Task HandleOrderUpdateAsync(UpdateSource source, SharedSpotOrder[] @event)
{
var unknownSymbols = @event.Where(x => x.SharedSymbol == null);
if (unknownSymbols.Any())
{
_logger.LogWarning("Received order without SharedSymbol set, ignoring");
@event = @event.Except(unknownSymbols).ToArray();
}
if (!_onlyTrackProvidedSymbols)
UpdateSymbolsList(@event.Select(x => x.SharedSymbol!));
else
@event = @event.Where(x => _symbols.Any(y => y.TradingMode == x.SharedSymbol!.TradingMode && y.BaseAsset == x.SharedSymbol.BaseAsset && y.QuoteAsset == x.SharedSymbol.QuoteAsset)).ToArray();
// Update local store
var updatedIds = @event.Select(x => x.OrderId).ToList();
foreach (var item in @event)
{
bool orderExisted = false;
_orderStore.AddOrUpdate(item.OrderId, item, (id, existing) =>
{
orderExisted = true;
var updated = UpdateSpotOrder(existing, item);
if (!updated)
updatedIds.Remove(id);
else
_logger.LogDebug("Updated spot order {Symbol}.{Id}", item.Symbol, item.OrderId);
return existing;
});
if (!orderExisted)
_logger.LogDebug("Added spot order {Symbol}.{Id}", item.Symbol, item.OrderId);
}
if (updatedIds.Count > 0 && OnOrderUpdate != null)
{
await OnOrderUpdate.Invoke(
new UserDataUpdate<SharedSpotOrder[]>
{
Source = source,
Data = _orderStore.Where(x => updatedIds.Contains(x.Key)).Select(x => x.Value).ToArray()
}).ConfigureAwait(false);
}
var trades = @event.Where(x => x.LastTrade != null).Select(x => x.LastTrade!).ToArray();
if (trades.Length != 0)
await HandleTradeUpdateAsync(source, trades).ConfigureAwait(false);
}
private async Task HandleBalanceUpdateAsync(UpdateSource source, SharedBalance[] @event)
{
// Update local store
var updatedAssets = @event.Select(x => x.Asset).ToList();
foreach (var item in @event)
{
bool balanceExisted = false;
_balanceStore.AddOrUpdate(item.Asset, item, (asset, existing) =>
{
balanceExisted = true;
var updated = UpdateBalance(existing, item);
if (!updated)
updatedAssets.Remove(asset);
else
_logger.LogDebug("Updated balance for {Asset}", item.Asset);
return existing;
});
if (!balanceExisted)
_logger.LogDebug("Added balance for {Asset}", item.Asset);
}
if (updatedAssets.Count > 0 && OnBalanceUpdate != null)
{
await OnBalanceUpdate.Invoke(
new UserDataUpdate<SharedBalance[]>
{
Source = source,
Data = _balanceStore.Where(x => updatedAssets.Contains(x.Key)).Select(x => x.Value).ToArray()
}).ConfigureAwait(false);
}
}
private void CheckConnectedChanged()
{
Connected = _balanceSubscription?.SubscriptionStatus == SubscriptionStatus.Subscribed
&& _orderSubscription?.SubscriptionStatus == SubscriptionStatus.Subscribed
&& (_tradeSubscription == null || _tradeSubscription.SubscriptionStatus == SubscriptionStatus.Subscribed);
}
private void BalanceSubscriptionStatusChanged(SubscriptionStatus newState)
{
_logger.LogDebug("Balance stream status changed: {NewState}", newState);
CheckConnectedChanged();
}
private void OrderSubscriptionStatusChanged(SubscriptionStatus newState)
{
_logger.LogDebug("Order stream status changed: {NewState}", newState);
if (newState == SubscriptionStatus.Pending)
{
// Record last data receive time since we need to request data from that timestamp on when polling
// Only set to new value if it isn't already set since if we disconnect/reconnect a couple of times without
// managing to do a poll we don't want to override the time since we still need to request that earlier data
if (_lastDataTimeOrdersBeforeDisconnect == null)
_lastDataTimeOrdersBeforeDisconnect = _orderSubscription!.LastReceiveTime;
}
CheckConnectedChanged();
}
private void TradeSubscriptionStatusChanged(SubscriptionStatus newState)
{
_logger.LogDebug("Trade stream status changed: {NewState}", newState);
if (newState == SubscriptionStatus.Pending)
{
// Record last data receive time since we need to request data from that timestamp on when polling
// Only set to new value if it isn't already set since if we disconnect/reconnect a couple of times without
// managing to do a poll we don't want to override the time since we still need to request that earlier data
if (_lastDataTimeTradesBeforeDisconnect == null)
_lastDataTimeTradesBeforeDisconnect = _tradeSubscription?.LastReceiveTime;
}
CheckConnectedChanged();
}
private bool UpdateBalance(SharedBalance existingBalance, SharedBalance newBalance)
{
// Some other way to way to determine sequence? Maybe timestamp?
var changed = false;
if (existingBalance.Total != newBalance.Total)
{
existingBalance.Total = newBalance.Total;
changed = true;
}
if (existingBalance.Available != newBalance.Available)
{
existingBalance.Available = newBalance.Available;
changed = true;
}
return changed;
}
private bool UpdateSpotOrder(SharedSpotOrder existingOrder, SharedSpotOrder newOrder)
{
if (CheckIfOrderUpdateIsNewer(existingOrder, newOrder) == false)
// Update is older than the existing data, ignore
return false;
var changed = false;
if (newOrder.AveragePrice != null && newOrder.AveragePrice != existingOrder.AveragePrice)
{
existingOrder.AveragePrice = newOrder.AveragePrice;
changed = true;
}
if (newOrder.OrderPrice != null && newOrder.OrderPrice != existingOrder.OrderPrice)
{
existingOrder.OrderPrice = newOrder.OrderPrice;
changed = true;
}
if (newOrder.Fee != null && newOrder.Fee != existingOrder.Fee)
{
existingOrder.Fee = newOrder.Fee;
changed = true;
}
if (newOrder.FeeAsset != null && newOrder.FeeAsset != existingOrder.FeeAsset)
{
existingOrder.FeeAsset = newOrder.FeeAsset;
changed = true;
}
if (newOrder.OrderQuantity != null && newOrder.OrderQuantity != existingOrder.OrderQuantity)
{
existingOrder.OrderQuantity = newOrder.OrderQuantity;
changed = true;
}
if (newOrder.QuantityFilled != null && newOrder.QuantityFilled != existingOrder.QuantityFilled)
{
existingOrder.QuantityFilled = newOrder.QuantityFilled;
changed = true;
}
if (newOrder.Status != existingOrder.Status)
{
existingOrder.Status = newOrder.Status;
changed = true;
}
if (newOrder.UpdateTime != null && newOrder.UpdateTime != existingOrder.UpdateTime)
{
existingOrder.UpdateTime = newOrder.UpdateTime;
changed = true;
}
return changed;
}
private bool? CheckIfOrderUpdateIsNewer(SharedSpotOrder existingOrder, SharedSpotOrder newOrder)
{
if (existingOrder.Status == SharedOrderStatus.Open && newOrder.Status != SharedOrderStatus.Open)
// status changed from open to not open
return true;
if (existingOrder.Status != SharedOrderStatus.Open && newOrder.Status == SharedOrderStatus.Open)
// status changed from not open to open; stale
return false;
if (existingOrder.UpdateTime != null && newOrder.UpdateTime != null)
{
// If both have an update time base of that
if (existingOrder.UpdateTime < newOrder.UpdateTime)
return true;
if (existingOrder.UpdateTime > newOrder.UpdateTime)
return false;
}
if (existingOrder.QuantityFilled != null && newOrder.QuantityFilled != null)
{
if (existingOrder.QuantityFilled.QuantityInBaseAsset != null && newOrder.QuantityFilled.QuantityInBaseAsset != null)
{
// If base quantity is not null we can base it on that
if (existingOrder.QuantityFilled.QuantityInBaseAsset < newOrder.QuantityFilled.QuantityInBaseAsset)
return true;
else if (existingOrder.QuantityFilled.QuantityInBaseAsset > newOrder.QuantityFilled.QuantityInBaseAsset)
return false;
}
if (existingOrder.QuantityFilled.QuantityInQuoteAsset != null && newOrder.QuantityFilled.QuantityInQuoteAsset != null)
{
// If quote quantity is not null we can base it on that
if (existingOrder.QuantityFilled.QuantityInQuoteAsset < newOrder.QuantityFilled.QuantityInQuoteAsset)
return true;
else if (existingOrder.QuantityFilled.QuantityInQuoteAsset > newOrder.QuantityFilled.QuantityInQuoteAsset)
return false;
}
}
if (existingOrder.Fee != null && newOrder.Fee != null)
{
// Higher fee means later processing
if (existingOrder.Fee < newOrder.Fee)
return true;
if (existingOrder.Fee > newOrder.Fee)
return false;
}
return null;
}
/// <inheritdoc />
protected override async Task<bool> DoPollAsync()
{
_logger.LogDebug("Starting user data requesting");
var anyError = false;
var balancesResult = await _balanceRestClient.GetBalancesAsync(new GetBalancesRequest(exchangeParameters: _exchangeParameters)).ConfigureAwait(false);
if (!balancesResult.Success)
{
// .. ?
var transientError = balancesResult.Error!.IsTransient;
// If transient we can retry
// Should communicate errors, also for websocket disconnecting
anyError = true;
}
else
{
await HandleBalanceUpdateAsync(UpdateSource.Poll, balancesResult.Data).ConfigureAwait(false);
}
var openOrdersResult = await _spotOrderRestClient.GetOpenSpotOrdersAsync(new GetOpenOrdersRequest(exchangeParameters: _exchangeParameters)).ConfigureAwait(false);
if (!openOrdersResult.Success)
{
// .. ?
anyError = true;
}
else
{
await HandleOrderUpdateAsync(UpdateSource.Poll, openOrdersResult.Data).ConfigureAwait(false);
}
foreach (var symbol in _symbols)
{
var fromTimeOrders = _lastDataTimeOrdersBeforeDisconnect ?? _lastPollTimeOrders ?? _startTime;
var updatedPollTime = DateTime.UtcNow;
var closedOrdersResult = await _spotOrderRestClient.GetClosedSpotOrdersAsync(new GetClosedOrdersRequest(symbol, startTime: fromTimeOrders, exchangeParameters: _exchangeParameters)).ConfigureAwait(false);
if (!closedOrdersResult.Success)
{
// .. ?
anyError = true;
}
else
{
_lastDataTimeOrdersBeforeDisconnect = null;
_lastPollTimeOrders = updatedPollTime;
// Filter orders to only include where close time is after the start time
var relevantOrders = closedOrdersResult.Data.Where(x =>
x.UpdateTime != null && x.UpdateTime >= _startTime // Updated after the tracker start time
|| x.CreateTime != null && x.CreateTime >= _startTime // Created after the tracker start time
|| x.CreateTime == null && x.UpdateTime == null // Unknown time
).ToArray();
if (relevantOrders.Length > 0)
await HandleOrderUpdateAsync(UpdateSource.Poll, relevantOrders).ConfigureAwait(false);
}
if (_trackTrades)
{
var fromTimeTrades = _lastDataTimeTradesBeforeDisconnect ?? _lastPollTimeTrades ?? _startTime;
var tradesResult = await _spotOrderRestClient.GetSpotUserTradesAsync(new GetUserTradesRequest(symbol, startTime: fromTimeTrades, exchangeParameters: _exchangeParameters)).ConfigureAwait(false);
if (!tradesResult.Success)
{
// .. ?
anyError = true;
}
else
{
_lastDataTimeTradesBeforeDisconnect = null;
_lastPollTimeTrades = updatedPollTime;
// Filter trades to only include where timestamp is after the start time OR it's part of an order we're tracking
var relevantTrades = tradesResult.Data.Where(x => x.Timestamp >= _startTime || _orderStore.ContainsKey(x.OrderId)).ToArray();
if (relevantTrades.Length > 0)
await HandleTradeUpdateAsync(UpdateSource.Poll, tradesResult.Data).ConfigureAwait(false);
}
}
}
_logger.LogDebug("User data requesting completed");
return anyError;
}
}
}