mirror of
https://github.com/JKorf/CryptoExchange.Net
synced 2026-02-16 14:13:46 +00:00
wip
This commit is contained in:
parent
5aa5790d0a
commit
0ad94cce5a
@ -109,6 +109,16 @@ namespace CryptoExchange.Net.Objects.Sockets
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
public int Id => _subscription.Id;
|
public int Id => _subscription.Id;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// The last timestamp anything was received from the server
|
||||||
|
/// </summary>
|
||||||
|
public DateTime? LastReceiveTime => _connection.LastReceiveTime;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// The current websocket status
|
||||||
|
/// </summary>
|
||||||
|
public SocketStatus Status => _connection.Status;
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// ctor
|
/// ctor
|
||||||
/// </summary>
|
/// </summary>
|
||||||
|
|||||||
@ -49,7 +49,6 @@ namespace CryptoExchange.Net.Sockets.Default
|
|||||||
private int _reconnectAttempt;
|
private int _reconnectAttempt;
|
||||||
private readonly int _receiveBufferSize;
|
private readonly int _receiveBufferSize;
|
||||||
|
|
||||||
private const int _defaultReceiveBufferSize = 1048576;
|
|
||||||
private const int _sendBufferSize = 4096;
|
private const int _sendBufferSize = 4096;
|
||||||
|
|
||||||
private int _bytesReceived = 0;
|
private int _bytesReceived = 0;
|
||||||
@ -71,7 +70,7 @@ namespace CryptoExchange.Net.Sockets.Default
|
|||||||
/// <summary>
|
/// <summary>
|
||||||
/// The timestamp this socket has been active for the last time
|
/// The timestamp this socket has been active for the last time
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public DateTime LastActionTime { get; private set; }
|
public DateTime? LastReceiveTime { get; private set; }
|
||||||
|
|
||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
public Uri Uri => Parameters.Uri;
|
public Uri Uri => Parameters.Uri;
|
||||||
@ -623,6 +622,7 @@ namespace CryptoExchange.Net.Sockets.Default
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LastReceiveTime = DateTime.UtcNow;
|
||||||
if (receiveResult.MessageType == WebSocketMessageType.Close)
|
if (receiveResult.MessageType == WebSocketMessageType.Close)
|
||||||
{
|
{
|
||||||
// Connection closed
|
// Connection closed
|
||||||
@ -773,6 +773,7 @@ namespace CryptoExchange.Net.Sockets.Default
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LastReceiveTime = DateTime.UtcNow;
|
||||||
if (receiveResult.MessageType == WebSocketMessageType.Close)
|
if (receiveResult.MessageType == WebSocketMessageType.Close)
|
||||||
{
|
{
|
||||||
// Connection closed
|
// Connection closed
|
||||||
@ -881,7 +882,6 @@ namespace CryptoExchange.Net.Sockets.Default
|
|||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
protected void ProcessDataNew(WebSocketMessageType type, ReadOnlySpan<byte> data)
|
protected void ProcessDataNew(WebSocketMessageType type, ReadOnlySpan<byte> data)
|
||||||
{
|
{
|
||||||
LastActionTime = DateTime.UtcNow;
|
|
||||||
_connection.HandleStreamMessage2(type, data);
|
_connection.HandleStreamMessage2(type, data);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -892,7 +892,7 @@ namespace CryptoExchange.Net.Sockets.Default
|
|||||||
protected async Task CheckTimeoutAsync()
|
protected async Task CheckTimeoutAsync()
|
||||||
{
|
{
|
||||||
_logger.SocketStartingTaskForNoDataReceivedCheck(Id, Parameters.Timeout);
|
_logger.SocketStartingTaskForNoDataReceivedCheck(Id, Parameters.Timeout);
|
||||||
LastActionTime = DateTime.UtcNow;
|
LastReceiveTime = DateTime.UtcNow;
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
while (true)
|
while (true)
|
||||||
@ -900,7 +900,7 @@ namespace CryptoExchange.Net.Sockets.Default
|
|||||||
if (_ctsSource.IsCancellationRequested)
|
if (_ctsSource.IsCancellationRequested)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
if (DateTime.UtcNow - LastActionTime > Parameters.Timeout)
|
if (DateTime.UtcNow - LastReceiveTime > Parameters.Timeout)
|
||||||
{
|
{
|
||||||
_logger.SocketNoDataReceiveTimoutReconnect(Id, Parameters.Timeout);
|
_logger.SocketNoDataReceiveTimoutReconnect(Id, Parameters.Timeout);
|
||||||
_ = ReconnectAsync().ConfigureAwait(false);
|
_ = ReconnectAsync().ConfigureAwait(false);
|
||||||
|
|||||||
@ -69,6 +69,10 @@ namespace CryptoExchange.Net.Sockets.Default.Interfaces
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
bool IsOpen { get; }
|
bool IsOpen { get; }
|
||||||
/// <summary>
|
/// <summary>
|
||||||
|
/// Last timestamp something was received from the server
|
||||||
|
/// </summary>
|
||||||
|
DateTime? LastReceiveTime { get; }
|
||||||
|
/// <summary>
|
||||||
/// Connect the socket
|
/// Connect the socket
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
|
|||||||
@ -178,6 +178,11 @@ namespace CryptoExchange.Net.Sockets.Default
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
public DateTime? DisconnectTime { get; set; }
|
public DateTime? DisconnectTime { get; set; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Last timestamp something was received from the server
|
||||||
|
/// </summary>
|
||||||
|
public DateTime? LastReceiveTime => _socket.LastReceiveTime;
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Tag for identification
|
/// Tag for identification
|
||||||
/// </summary>
|
/// </summary>
|
||||||
|
|||||||
@ -39,6 +39,7 @@ namespace CryptoExchange.Net.Testing.Implementations
|
|||||||
public Func<Task<Uri?>>? GetReconnectionUrl { get; set; }
|
public Func<Task<Uri?>>? GetReconnectionUrl { get; set; }
|
||||||
|
|
||||||
public static int lastId = 0;
|
public static int lastId = 0;
|
||||||
|
public DateTime? LastReceiveTime { get; }
|
||||||
#if NET9_0_OR_GREATER
|
#if NET9_0_OR_GREATER
|
||||||
public static readonly Lock lastIdLock = new Lock();
|
public static readonly Lock lastIdLock = new Lock();
|
||||||
#else
|
#else
|
||||||
|
|||||||
54
CryptoExchange.Net/Trackers/UserData/IUserDataTracker.cs
Normal file
54
CryptoExchange.Net/Trackers/UserData/IUserDataTracker.cs
Normal file
@ -0,0 +1,54 @@
|
|||||||
|
using CryptoExchange.Net.Objects;
|
||||||
|
using CryptoExchange.Net.SharedApis;
|
||||||
|
using System;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
|
namespace CryptoExchange.Net.Trackers.UserData
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// User data tracker
|
||||||
|
/// </summary>
|
||||||
|
public interface IUserDataTracker
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// User identifier
|
||||||
|
/// </summary>
|
||||||
|
public string? UserIdentifier { get; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Current balances
|
||||||
|
/// </summary>
|
||||||
|
SharedBalance[] Balances { get; }
|
||||||
|
/// <summary>
|
||||||
|
/// Currently tracked orders
|
||||||
|
/// </summary>
|
||||||
|
SharedSpotOrder[] Orders { get; }
|
||||||
|
/// <summary>
|
||||||
|
/// Currently tracked trades
|
||||||
|
/// </summary>
|
||||||
|
SharedUserTrade[] Trades { get; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// On balance update
|
||||||
|
/// </summary>
|
||||||
|
event Action<UserDataUpdate<SharedBalance[]>>? OnBalanceUpdate;
|
||||||
|
/// <summary>
|
||||||
|
/// On order update
|
||||||
|
/// </summary>
|
||||||
|
event Action<UserDataUpdate<SharedSpotOrder[]>>? OnOrderUpdate;
|
||||||
|
/// <summary>
|
||||||
|
/// On user trade update
|
||||||
|
/// </summary>
|
||||||
|
event Action<UserDataUpdate<SharedUserTrade[]>>? OnTradeUpdate;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Start tracking user data
|
||||||
|
/// </summary>
|
||||||
|
Task<CallResult> StartAsync();
|
||||||
|
/// <summary>
|
||||||
|
/// Stop tracking data
|
||||||
|
/// </summary>
|
||||||
|
/// <returns></returns>
|
||||||
|
Task StopAsync();
|
||||||
|
}
|
||||||
|
}
|
||||||
17
CryptoExchange.Net/Trackers/UserData/UpdateSource.cs
Normal file
17
CryptoExchange.Net/Trackers/UserData/UpdateSource.cs
Normal file
@ -0,0 +1,17 @@
|
|||||||
|
namespace CryptoExchange.Net.Trackers.UserData
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Update source
|
||||||
|
/// </summary>
|
||||||
|
public enum UpdateSource
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Polling result
|
||||||
|
/// </summary>
|
||||||
|
Poll,
|
||||||
|
/// <summary>
|
||||||
|
/// Websocket push
|
||||||
|
/// </summary>
|
||||||
|
Push
|
||||||
|
}
|
||||||
|
}
|
||||||
643
CryptoExchange.Net/Trackers/UserData/UserDataTracker.cs
Normal file
643
CryptoExchange.Net/Trackers/UserData/UserDataTracker.cs
Normal file
@ -0,0 +1,643 @@
|
|||||||
|
using CryptoExchange.Net.Objects;
|
||||||
|
using CryptoExchange.Net.Objects.Sockets;
|
||||||
|
using CryptoExchange.Net.SharedApis;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
using System;
|
||||||
|
using System.Collections.Concurrent;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Linq;
|
||||||
|
using System.Text;
|
||||||
|
using System.Threading;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
|
namespace CryptoExchange.Net.Trackers.UserData
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// User data tracker
|
||||||
|
/// </summary>
|
||||||
|
public abstract class UserDataTracker : IUserDataTracker
|
||||||
|
{
|
||||||
|
// Cached data
|
||||||
|
private ConcurrentDictionary<string, SharedBalance> _balanceStore = new ConcurrentDictionary<string, SharedBalance>();
|
||||||
|
private ConcurrentDictionary<string, SharedSpotOrder> _orderStore = new ConcurrentDictionary<string, SharedSpotOrder>();
|
||||||
|
private ConcurrentDictionary<string, SharedUserTrade> _tradeStore = new ConcurrentDictionary<string, SharedUserTrade>();
|
||||||
|
|
||||||
|
// Typed clients
|
||||||
|
private readonly IListenKeyRestClient? _listenKeyRestClient;
|
||||||
|
private readonly ISpotSymbolRestClient _spotSymbolRestClient;
|
||||||
|
private readonly IBalanceRestClient _balanceRestClient;
|
||||||
|
private readonly IBalanceSocketClient _balanceSocketClient;
|
||||||
|
private readonly ISpotOrderRestClient _spotOrderRestClient;
|
||||||
|
private readonly ISpotOrderSocketClient _spotOrderSocketClient;
|
||||||
|
private readonly IUserTradeSocketClient? _userTradeSocketClient;
|
||||||
|
private readonly ILogger _logger;
|
||||||
|
|
||||||
|
// State management
|
||||||
|
private DateTime? _startTime = null;
|
||||||
|
private DateTime? _lastPollAttempt = null;
|
||||||
|
private bool _lastPollSuccessful = false;
|
||||||
|
private DateTime? _lastPollTimeOrders = null;
|
||||||
|
private DateTime? _lastPollTimeTrades = null;
|
||||||
|
private DateTime? _lastDataTimeOrdersBeforeDisconnect = null;
|
||||||
|
private DateTime? _lastDataTimeTradesBeforeDisconnect = null;
|
||||||
|
private bool _firstPollDone = false;
|
||||||
|
|
||||||
|
// Config
|
||||||
|
private List<SharedSymbol> _symbols = new List<SharedSymbol>();
|
||||||
|
private TimeSpan _pollIntervalConnected;
|
||||||
|
private TimeSpan _pollIntervalDisconnected;
|
||||||
|
private bool _pollAtStart;
|
||||||
|
private bool _onlyTrackProvidedSymbols;
|
||||||
|
|
||||||
|
// Subscriptions
|
||||||
|
private UpdateSubscription? _balanceSubscription;
|
||||||
|
private UpdateSubscription? _orderSubscription;
|
||||||
|
private UpdateSubscription? _tradeSubscription;
|
||||||
|
|
||||||
|
|
||||||
|
private AsyncResetEvent _pollWaitEvent = new AsyncResetEvent(false, true);
|
||||||
|
private Task? _pollTask;
|
||||||
|
private CancellationTokenSource? _cts;
|
||||||
|
private object _symbolLock = new object();
|
||||||
|
|
||||||
|
private bool Connected =>
|
||||||
|
_balanceSubscription?.Status == Sockets.Default.SocketStatus.Connected
|
||||||
|
&& _orderSubscription?.Status == Sockets.Default.SocketStatus.Connected
|
||||||
|
&& _tradeSubscription == null || _tradeSubscription?.Status == Sockets.Default.SocketStatus.Connected;
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public event Action<UserDataUpdate<SharedBalance[]>>? OnBalanceUpdate;
|
||||||
|
/// <inheritdoc />
|
||||||
|
public event Action<UserDataUpdate<SharedSpotOrder[]>>? OnOrderUpdate;
|
||||||
|
/// <inheritdoc />
|
||||||
|
public event Action<UserDataUpdate<SharedUserTrade[]>>? OnTradeUpdate;
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public string? UserIdentifier { get; }
|
||||||
|
/// <inheritdoc />
|
||||||
|
public SharedBalance[] Balances => _balanceStore.Values.ToArray();
|
||||||
|
/// <inheritdoc />
|
||||||
|
public SharedSpotOrder[] Orders => _orderStore.Values.ToArray();
|
||||||
|
/// <inheritdoc />
|
||||||
|
public SharedUserTrade[] Trades => _tradeStore.Values.ToArray();
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// ctor
|
||||||
|
/// </summary>
|
||||||
|
protected UserDataTracker(
|
||||||
|
ILogger logger,
|
||||||
|
ISharedClient restClient,
|
||||||
|
ISharedClient socketClient,
|
||||||
|
string? userIdentifier,
|
||||||
|
UserDataTrackerConfig config
|
||||||
|
)
|
||||||
|
{
|
||||||
|
_logger = logger;
|
||||||
|
_spotSymbolRestClient = (ISpotSymbolRestClient)restClient;
|
||||||
|
_balanceRestClient = (IBalanceRestClient)restClient;
|
||||||
|
_balanceSocketClient = (IBalanceSocketClient)socketClient;
|
||||||
|
_spotOrderRestClient = (ISpotOrderRestClient)restClient;
|
||||||
|
_spotOrderSocketClient = (ISpotOrderSocketClient)socketClient;
|
||||||
|
_listenKeyRestClient = restClient as IListenKeyRestClient;
|
||||||
|
_userTradeSocketClient = socketClient as IUserTradeSocketClient;
|
||||||
|
|
||||||
|
_pollIntervalConnected = config.PollIntervalConnected;
|
||||||
|
_pollIntervalDisconnected = config.PollIntervalDisconnected;
|
||||||
|
_symbols = config.Symbols?.ToList() ?? [];
|
||||||
|
_onlyTrackProvidedSymbols = config.OnlyTrackProvidedSymbols;
|
||||||
|
_pollAtStart = config.PollAtStart;
|
||||||
|
|
||||||
|
UserIdentifier = userIdentifier;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public async Task<CallResult> StartAsync()
|
||||||
|
{
|
||||||
|
_startTime = DateTime.UtcNow;
|
||||||
|
|
||||||
|
_logger.LogDebug("Starting UserDataTracker");
|
||||||
|
_cts = new CancellationTokenSource();
|
||||||
|
|
||||||
|
// Request symbols so SharedSymbol property can be filled on updates
|
||||||
|
var symbolResult = await _spotSymbolRestClient.GetSpotSymbolsAsync(new GetSymbolsRequest()).ConfigureAwait(false);
|
||||||
|
if (!symbolResult)
|
||||||
|
{
|
||||||
|
_logger.LogWarning("Failed to start UserDataTracker; symbols request failed: {Error}", symbolResult.Error!.Message);
|
||||||
|
return symbolResult;
|
||||||
|
}
|
||||||
|
|
||||||
|
string? listenKey = null;
|
||||||
|
if (_listenKeyRestClient != null)
|
||||||
|
{
|
||||||
|
var lkResult = await _listenKeyRestClient.StartListenKeyAsync(new StartListenKeyRequest()).ConfigureAwait(false);
|
||||||
|
if (!lkResult)
|
||||||
|
{
|
||||||
|
_logger.LogWarning("Failed to start UserDataTracker; listen key request failed: {Error}", lkResult.Error!.Message);
|
||||||
|
return lkResult;
|
||||||
|
}
|
||||||
|
|
||||||
|
listenKey = lkResult.Data;
|
||||||
|
}
|
||||||
|
|
||||||
|
var subBalanceResult = await _balanceSocketClient.SubscribeToBalanceUpdatesAsync(new SubscribeBalancesRequest(listenKey), x => HandleBalanceUpdate(UpdateSource.Push, x.Data), ct: _cts.Token).ConfigureAwait(false);
|
||||||
|
if (!subBalanceResult)
|
||||||
|
{
|
||||||
|
_logger.LogWarning("Failed to start UserDataTracker; failed to subscribe to balance stream: {Error}", subBalanceResult.Error!.Message);
|
||||||
|
return subBalanceResult;
|
||||||
|
}
|
||||||
|
|
||||||
|
_balanceSubscription = subBalanceResult.Data;
|
||||||
|
subBalanceResult.Data.SubscriptionStatusChanged += BalanceSubscriptionStatusChanged;
|
||||||
|
|
||||||
|
var subOrderResult = await _spotOrderSocketClient.SubscribeToSpotOrderUpdatesAsync(new SubscribeSpotOrderRequest(listenKey), x => HandleOrderUpdate(UpdateSource.Push, x.Data), ct: _cts.Token).ConfigureAwait(false);
|
||||||
|
if (!subOrderResult)
|
||||||
|
{
|
||||||
|
_cts.Cancel();
|
||||||
|
_logger.LogWarning("Failed to start UserDataTracker; failed to subscribe to order stream: {Error}", subOrderResult.Error!.Message);
|
||||||
|
return subOrderResult;
|
||||||
|
}
|
||||||
|
|
||||||
|
_orderSubscription = subOrderResult.Data;
|
||||||
|
subOrderResult.Data.SubscriptionStatusChanged += OrderSubscriptionStatusChanged;
|
||||||
|
|
||||||
|
if (_userTradeSocketClient != null)
|
||||||
|
{
|
||||||
|
var subTradeResult = await _userTradeSocketClient.SubscribeToUserTradeUpdatesAsync(new SubscribeUserTradeRequest(listenKey), x => HandleTradeUpdate(UpdateSource.Push, x.Data), ct: _cts.Token).ConfigureAwait(false);
|
||||||
|
if (!subOrderResult)
|
||||||
|
{
|
||||||
|
_cts.Cancel();
|
||||||
|
_logger.LogWarning("Failed to start UserDataTracker; failed to subscribe to trade stream: {Error}", subTradeResult.Error!.Message);
|
||||||
|
return subOrderResult;
|
||||||
|
}
|
||||||
|
|
||||||
|
_tradeSubscription = subTradeResult.Data;
|
||||||
|
subTradeResult.Data.SubscriptionStatusChanged += TradeSubscriptionStatusChanged;
|
||||||
|
}
|
||||||
|
|
||||||
|
_pollTask = PollAsync();
|
||||||
|
_logger.LogDebug("Started UserDataTracker");
|
||||||
|
return CallResult.SuccessResult;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void UpdateSymbolsList(IEnumerable<SharedSymbol> symbols)
|
||||||
|
{
|
||||||
|
lock (_symbolLock)
|
||||||
|
{
|
||||||
|
foreach (var symbol in symbols.Distinct())
|
||||||
|
{
|
||||||
|
if (!_symbols.Any(x => x.TradingMode == symbol.TradingMode && x.BaseAsset == symbol.BaseAsset && x.QuoteAsset == symbol.QuoteAsset))
|
||||||
|
{
|
||||||
|
_symbols.Add(symbol);
|
||||||
|
_logger.LogDebug("Adding {BaseAsset}/{QuoteAsset} to symbol tracking list", symbol.BaseAsset, symbol.QuoteAsset);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void HandleTradeUpdate(UpdateSource source, SharedUserTrade[] @event)
|
||||||
|
{
|
||||||
|
var unknownSymbols = @event.Where(x => x.SharedSymbol == null);
|
||||||
|
if (unknownSymbols.Any())
|
||||||
|
{
|
||||||
|
_logger.LogWarning("Received order without SharedSymbol set, ignoring");
|
||||||
|
@event = @event.Except(unknownSymbols).ToArray();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!_onlyTrackProvidedSymbols)
|
||||||
|
UpdateSymbolsList(@event.Select(x => x.SharedSymbol!));
|
||||||
|
else
|
||||||
|
@event = @event.Where(x => _symbols.Any(y => y.TradingMode == x.SharedSymbol!.TradingMode && y.BaseAsset == x.SharedSymbol.BaseAsset && y.QuoteAsset == x.SharedSymbol.QuoteAsset)).ToArray();
|
||||||
|
|
||||||
|
// Update local store
|
||||||
|
var updatedIds = @event.Select(x => x.Id).ToList();
|
||||||
|
foreach (var item in @event)
|
||||||
|
{
|
||||||
|
if (_tradeStore.TryAdd(item.Id, item))
|
||||||
|
_logger.LogDebug("Added user trade {Symbol}.{Id}", item.Symbol, item.Id);
|
||||||
|
else
|
||||||
|
updatedIds.Remove(item.Id);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (updatedIds.Count > 0)
|
||||||
|
{
|
||||||
|
OnTradeUpdate?.Invoke(
|
||||||
|
new UserDataUpdate<SharedUserTrade[]>
|
||||||
|
{
|
||||||
|
Source = source,
|
||||||
|
Data = _tradeStore.Where(x => updatedIds.Contains(x.Key)).Select(x => x.Value).ToArray()
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void HandleOrderUpdate(UpdateSource source, SharedSpotOrder[] @event)
|
||||||
|
{
|
||||||
|
var unknownSymbols = @event.Where(x => x.SharedSymbol == null);
|
||||||
|
if (unknownSymbols.Any())
|
||||||
|
{
|
||||||
|
_logger.LogWarning("Received order without SharedSymbol set, ignoring");
|
||||||
|
@event = @event.Except(unknownSymbols).ToArray();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!_onlyTrackProvidedSymbols)
|
||||||
|
UpdateSymbolsList(@event.Select(x => x.SharedSymbol!));
|
||||||
|
else
|
||||||
|
@event = @event.Where(x => _symbols.Any(y => y.TradingMode == x.SharedSymbol!.TradingMode && y.BaseAsset == x.SharedSymbol.BaseAsset && y.QuoteAsset == x.SharedSymbol.QuoteAsset)).ToArray();
|
||||||
|
|
||||||
|
// Update local store
|
||||||
|
var updatedIds = @event.Select(x => x.OrderId).ToList();
|
||||||
|
|
||||||
|
foreach (var item in @event)
|
||||||
|
{
|
||||||
|
bool orderExisted = false;
|
||||||
|
_orderStore.AddOrUpdate(item.OrderId, item, (id, existing) =>
|
||||||
|
{
|
||||||
|
orderExisted = true;
|
||||||
|
var updated = UpdateSpotOrder(existing, item);
|
||||||
|
if (!updated)
|
||||||
|
updatedIds.Remove(id);
|
||||||
|
else
|
||||||
|
_logger.LogDebug("Updated spot order {Symbol}.{Id}", item.Symbol, item.OrderId);
|
||||||
|
|
||||||
|
return existing;
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!orderExisted)
|
||||||
|
_logger.LogDebug("Added spot order {Symbol}.{Id}", item.Symbol, item.OrderId);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (updatedIds.Count > 0)
|
||||||
|
{
|
||||||
|
OnOrderUpdate?.Invoke(
|
||||||
|
new UserDataUpdate<SharedSpotOrder[]>
|
||||||
|
{
|
||||||
|
Source = source,
|
||||||
|
Data = _orderStore.Where(x => updatedIds.Contains(x.Key)).Select(x => x.Value).ToArray()
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
var trades = @event.Where(x => x.LastTrade != null).Select(x => x.LastTrade!).ToArray();
|
||||||
|
if (trades.Length != 0)
|
||||||
|
HandleTradeUpdate(source, trades);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void HandleBalanceUpdate(UpdateSource source, SharedBalance[] @event)
|
||||||
|
{
|
||||||
|
// Update local store
|
||||||
|
var updatedAssets = @event.Select(x => x.Asset).ToList();
|
||||||
|
|
||||||
|
foreach (var item in @event)
|
||||||
|
{
|
||||||
|
bool balanceExisted = false;
|
||||||
|
_balanceStore.AddOrUpdate(item.Asset, item, (asset, existing) =>
|
||||||
|
{
|
||||||
|
balanceExisted = true;
|
||||||
|
var updated = UpdateBalance(existing, item);
|
||||||
|
if (!updated)
|
||||||
|
updatedAssets.Remove(asset);
|
||||||
|
else
|
||||||
|
_logger.LogDebug("Updated balance for {Asset}", item.Asset);
|
||||||
|
|
||||||
|
return existing;
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!balanceExisted)
|
||||||
|
_logger.LogDebug("Added balance for {Asset}", item.Asset);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (updatedAssets.Count > 0)
|
||||||
|
{
|
||||||
|
OnBalanceUpdate?.Invoke(
|
||||||
|
new UserDataUpdate<SharedBalance[]>
|
||||||
|
{
|
||||||
|
Source = source,
|
||||||
|
Data = _balanceStore.Where(x => updatedAssets.Contains(x.Key)).Select(x => x.Value).ToArray()
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void BalanceSubscriptionStatusChanged(SubscriptionStatus newState)
|
||||||
|
{
|
||||||
|
_logger.LogDebug("Balance stream status changed: {NewState}", newState);
|
||||||
|
if (newState == SubscriptionStatus.Subscribed)
|
||||||
|
// Trigger REST polling since we weren't connected
|
||||||
|
_pollWaitEvent.Set();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void OrderSubscriptionStatusChanged(SubscriptionStatus newState)
|
||||||
|
{
|
||||||
|
_logger.LogDebug("Order stream status changed: {NewState}", newState);
|
||||||
|
|
||||||
|
if (newState == SubscriptionStatus.Pending)
|
||||||
|
{
|
||||||
|
// Record last data receive time since we need to request data from that timestamp on when polling
|
||||||
|
// Only set to new value if it isn't already set since if we disconnect/reconnect a couple of times without
|
||||||
|
// managing to do a poll we don't want to override the time since we still need to request that earlier data
|
||||||
|
|
||||||
|
if (_lastDataTimeOrdersBeforeDisconnect == null)
|
||||||
|
_lastDataTimeOrdersBeforeDisconnect = _orderSubscription!.LastReceiveTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (newState == SubscriptionStatus.Subscribed)
|
||||||
|
// Trigger REST polling since we weren't connected
|
||||||
|
_pollWaitEvent.Set();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void TradeSubscriptionStatusChanged(SubscriptionStatus newState)
|
||||||
|
{
|
||||||
|
_logger.LogDebug("Trade stream status changed: {NewState}", newState);
|
||||||
|
|
||||||
|
if (newState == SubscriptionStatus.Pending)
|
||||||
|
{
|
||||||
|
// Record last data receive time since we need to request data from that timestamp on when polling
|
||||||
|
// Only set to new value if it isn't already set since if we disconnect/reconnect a couple of times without
|
||||||
|
// managing to do a poll we don't want to override the time since we still need to request that earlier data
|
||||||
|
|
||||||
|
if (_lastDataTimeTradesBeforeDisconnect == null)
|
||||||
|
_lastDataTimeTradesBeforeDisconnect = _tradeSubscription?.LastReceiveTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (newState == SubscriptionStatus.Subscribed)
|
||||||
|
// Trigger REST polling since we weren't connected
|
||||||
|
_pollWaitEvent.Set();
|
||||||
|
}
|
||||||
|
|
||||||
|
private bool UpdateBalance(SharedBalance existingBalance, SharedBalance newBalance)
|
||||||
|
{
|
||||||
|
// Some other way to way to determine sequence? Maybe timestamp?
|
||||||
|
var changed = false;
|
||||||
|
if (existingBalance.Total != newBalance.Total)
|
||||||
|
{
|
||||||
|
existingBalance.Total = newBalance.Total;
|
||||||
|
changed = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (existingBalance.Available != newBalance.Available)
|
||||||
|
{
|
||||||
|
existingBalance.Available = newBalance.Available;
|
||||||
|
changed = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return changed;
|
||||||
|
}
|
||||||
|
|
||||||
|
private bool UpdateSpotOrder(SharedSpotOrder existingOrder, SharedSpotOrder newOrder)
|
||||||
|
{
|
||||||
|
if (CheckIfOrderUpdateIsNewer(existingOrder, newOrder) == false)
|
||||||
|
// Update is older than the existing data, ignore
|
||||||
|
return false;
|
||||||
|
|
||||||
|
var changed = false;
|
||||||
|
if (newOrder.AveragePrice != null && newOrder.AveragePrice != existingOrder.AveragePrice)
|
||||||
|
{
|
||||||
|
existingOrder.AveragePrice = newOrder.AveragePrice;
|
||||||
|
changed = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (newOrder.OrderPrice != null && newOrder.OrderPrice != existingOrder.OrderPrice)
|
||||||
|
{
|
||||||
|
existingOrder.OrderPrice = newOrder.OrderPrice;
|
||||||
|
changed = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (newOrder.Fee != null && newOrder.Fee != existingOrder.Fee)
|
||||||
|
{
|
||||||
|
existingOrder.Fee = newOrder.Fee;
|
||||||
|
changed = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (newOrder.FeeAsset != null && newOrder.FeeAsset != existingOrder.FeeAsset)
|
||||||
|
{
|
||||||
|
existingOrder.FeeAsset = newOrder.FeeAsset;
|
||||||
|
changed = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (newOrder.OrderQuantity != null && newOrder.OrderQuantity != existingOrder.OrderQuantity)
|
||||||
|
{
|
||||||
|
existingOrder.OrderQuantity = newOrder.OrderQuantity;
|
||||||
|
changed = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (newOrder.QuantityFilled != null && newOrder.QuantityFilled != existingOrder.QuantityFilled)
|
||||||
|
{
|
||||||
|
existingOrder.QuantityFilled = newOrder.QuantityFilled;
|
||||||
|
changed = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (newOrder.Status != existingOrder.Status)
|
||||||
|
{
|
||||||
|
existingOrder.Status = newOrder.Status;
|
||||||
|
changed = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (newOrder.UpdateTime != null && newOrder.UpdateTime != existingOrder.UpdateTime)
|
||||||
|
{
|
||||||
|
existingOrder.UpdateTime = newOrder.UpdateTime;
|
||||||
|
changed = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return changed;
|
||||||
|
}
|
||||||
|
|
||||||
|
private bool? CheckIfOrderUpdateIsNewer(SharedSpotOrder existingOrder, SharedSpotOrder newOrder)
|
||||||
|
{
|
||||||
|
if (existingOrder.Status == SharedOrderStatus.Open && newOrder.Status != SharedOrderStatus.Open)
|
||||||
|
// status changed from open to not open
|
||||||
|
return true;
|
||||||
|
|
||||||
|
if (existingOrder.Status != SharedOrderStatus.Open && newOrder.Status == SharedOrderStatus.Open)
|
||||||
|
// status changed from not open to open; stale
|
||||||
|
return false;
|
||||||
|
|
||||||
|
if (existingOrder.UpdateTime != null && newOrder.UpdateTime != null)
|
||||||
|
{
|
||||||
|
// If both have an update time base of that
|
||||||
|
if (existingOrder.UpdateTime < newOrder.UpdateTime)
|
||||||
|
return true;
|
||||||
|
|
||||||
|
if (existingOrder.UpdateTime > newOrder.UpdateTime)
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (existingOrder.QuantityFilled != null && newOrder.QuantityFilled != null)
|
||||||
|
{
|
||||||
|
if (existingOrder.QuantityFilled.QuantityInBaseAsset != null && newOrder.QuantityFilled.QuantityInBaseAsset != null)
|
||||||
|
{
|
||||||
|
// If base quantity is not null we can base it on that
|
||||||
|
if (existingOrder.QuantityFilled.QuantityInBaseAsset < newOrder.QuantityFilled.QuantityInBaseAsset)
|
||||||
|
return true;
|
||||||
|
|
||||||
|
else if (existingOrder.QuantityFilled.QuantityInBaseAsset > newOrder.QuantityFilled.QuantityInBaseAsset)
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (existingOrder.QuantityFilled.QuantityInQuoteAsset != null && newOrder.QuantityFilled.QuantityInQuoteAsset != null)
|
||||||
|
{
|
||||||
|
// If quote quantity is not null we can base it on that
|
||||||
|
if (existingOrder.QuantityFilled.QuantityInQuoteAsset < newOrder.QuantityFilled.QuantityInQuoteAsset)
|
||||||
|
return true;
|
||||||
|
|
||||||
|
else if (existingOrder.QuantityFilled.QuantityInQuoteAsset > newOrder.QuantityFilled.QuantityInQuoteAsset)
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (existingOrder.Fee != null && newOrder.Fee != null)
|
||||||
|
{
|
||||||
|
// Higher fee means later processing
|
||||||
|
if (existingOrder.Fee < newOrder.Fee)
|
||||||
|
return true;
|
||||||
|
|
||||||
|
if (existingOrder.Fee > newOrder.Fee)
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private TimeSpan? GetNextPollDelay()
|
||||||
|
{
|
||||||
|
if (!_firstPollDone && _pollAtStart)
|
||||||
|
// First polling should be done immediately
|
||||||
|
return TimeSpan.Zero;
|
||||||
|
|
||||||
|
if (!Connected)
|
||||||
|
{
|
||||||
|
if (_pollIntervalDisconnected == TimeSpan.Zero)
|
||||||
|
// No polling interval
|
||||||
|
return null;
|
||||||
|
|
||||||
|
return _pollIntervalDisconnected;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (_pollIntervalConnected == TimeSpan.Zero)
|
||||||
|
// No polling interval
|
||||||
|
return null;
|
||||||
|
|
||||||
|
// Wait for next poll
|
||||||
|
return _pollIntervalConnected;
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task PollAsync()
|
||||||
|
{
|
||||||
|
while (!_cts!.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
var delayForNextPoll = GetNextPollDelay();
|
||||||
|
if (delayForNextPoll != TimeSpan.Zero)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
if (delayForNextPoll != null)
|
||||||
|
_logger.LogTrace("Delay for next polling: {Delay}", delayForNextPoll);
|
||||||
|
|
||||||
|
await _pollWaitEvent.WaitAsync(delayForNextPoll, _cts.Token).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
catch { }
|
||||||
|
}
|
||||||
|
|
||||||
|
_firstPollDone = true;
|
||||||
|
if (_cts.IsCancellationRequested)
|
||||||
|
break;
|
||||||
|
|
||||||
|
if (_lastPollAttempt != null && (DateTime.UtcNow - _lastPollAttempt.Value) < TimeSpan.FromSeconds(2))
|
||||||
|
{
|
||||||
|
if (_lastPollSuccessful)
|
||||||
|
// If last poll was less than 2 seconds ago and it was successful don't bother immediately polling again
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
_logger.LogDebug("Starting user data requesting");
|
||||||
|
_lastPollAttempt = DateTime.UtcNow;
|
||||||
|
_lastPollSuccessful = false;
|
||||||
|
|
||||||
|
var anyError = false;
|
||||||
|
var balancesResult = await _balanceRestClient.GetBalancesAsync(new GetBalancesRequest()).ConfigureAwait(false);
|
||||||
|
if (!balancesResult.Success)
|
||||||
|
{
|
||||||
|
// .. ?
|
||||||
|
var transientError = balancesResult.Error!.IsTransient;
|
||||||
|
// If transient we can retry
|
||||||
|
// Should communicate errors, also for websocket disconnecting
|
||||||
|
|
||||||
|
anyError = true;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
HandleBalanceUpdate(UpdateSource.Poll, balancesResult.Data);
|
||||||
|
}
|
||||||
|
|
||||||
|
var openOrdersResult = await _spotOrderRestClient.GetOpenSpotOrdersAsync(new GetOpenOrdersRequest()).ConfigureAwait(false);
|
||||||
|
if (!openOrdersResult.Success)
|
||||||
|
{
|
||||||
|
// .. ?
|
||||||
|
|
||||||
|
anyError = true;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
HandleOrderUpdate(UpdateSource.Poll, openOrdersResult.Data);
|
||||||
|
}
|
||||||
|
|
||||||
|
foreach (var symbol in _symbols)
|
||||||
|
{
|
||||||
|
var fromTimeOrders = _lastDataTimeOrdersBeforeDisconnect ?? _lastPollTimeOrders ?? _startTime;
|
||||||
|
var updatedPollTime = DateTime.UtcNow;
|
||||||
|
var closedOrdersResult = await _spotOrderRestClient.GetClosedSpotOrdersAsync(new GetClosedOrdersRequest(symbol, startTime: fromTimeOrders)).ConfigureAwait(false);
|
||||||
|
if (!closedOrdersResult.Success)
|
||||||
|
{
|
||||||
|
// .. ?
|
||||||
|
|
||||||
|
anyError = true;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
_lastDataTimeOrdersBeforeDisconnect = null;
|
||||||
|
_lastPollTimeOrders = updatedPollTime;
|
||||||
|
|
||||||
|
// Filter orders to only include where close time is after the start time
|
||||||
|
var relevantOrders = closedOrdersResult.Data.Where(x =>
|
||||||
|
x.UpdateTime != null && x.UpdateTime >= _startTime // Updated after the tracker start time
|
||||||
|
|| x.CreateTime != null && x.CreateTime >= _startTime // Created after the tracker start time
|
||||||
|
|| x.CreateTime == null && x.UpdateTime == null // Unknown time
|
||||||
|
).ToArray();
|
||||||
|
|
||||||
|
if (relevantOrders.Length > 0)
|
||||||
|
HandleOrderUpdate(UpdateSource.Poll, relevantOrders);
|
||||||
|
}
|
||||||
|
|
||||||
|
var fromTimeTrades = _lastDataTimeTradesBeforeDisconnect ?? _lastPollTimeTrades ?? _startTime;
|
||||||
|
var tradesResult = await _spotOrderRestClient.GetSpotUserTradesAsync(new GetUserTradesRequest(symbol, startTime: fromTimeTrades)).ConfigureAwait(false);
|
||||||
|
if (!tradesResult.Success)
|
||||||
|
{
|
||||||
|
// .. ?
|
||||||
|
anyError = true;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
_lastDataTimeTradesBeforeDisconnect = null;
|
||||||
|
_lastPollTimeTrades = updatedPollTime;
|
||||||
|
|
||||||
|
// Filter trades to only include where timestamp is after the start time OR it's part of an order we're tracking
|
||||||
|
var relevantTrades = tradesResult.Data.Where(x => x.Timestamp >= _startTime || _orderStore.ContainsKey(x.OrderId)).ToArray();
|
||||||
|
if (relevantTrades.Length > 0)
|
||||||
|
HandleTradeUpdate(UpdateSource.Poll, tradesResult.Data);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_lastPollSuccessful = !anyError;
|
||||||
|
_logger.LogDebug("User data requesting completed");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public async Task StopAsync()
|
||||||
|
{
|
||||||
|
_logger.LogDebug("Stopping UserDataTracker");
|
||||||
|
_cts?.Cancel();
|
||||||
|
|
||||||
|
if (_pollTask != null)
|
||||||
|
await _pollTask.ConfigureAwait(false);
|
||||||
|
|
||||||
|
_logger.LogDebug("Stopped UserDataTracker");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,34 @@
|
|||||||
|
using CryptoExchange.Net.SharedApis;
|
||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Text;
|
||||||
|
|
||||||
|
namespace CryptoExchange.Net.Trackers.UserData
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// User data tracker configuration
|
||||||
|
/// </summary>
|
||||||
|
public record UserDataTrackerConfig
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Symbols to initially track, used when polling data. Other symbols will get tracked when updates are received for orders or trades on a new symbol and when there are open orders on a new symbol. To only track the symbols specified here set `OnlyTrackProvidedSymbols` to true.
|
||||||
|
/// </summary>
|
||||||
|
public IEnumerable<SharedSymbol> Symbols { get; set; } = [];
|
||||||
|
/// <summary>
|
||||||
|
/// If true only orders and trades in the `Symbols` options will get tracked, data on other symbols will be ignored.
|
||||||
|
/// </summary>
|
||||||
|
public bool OnlyTrackProvidedSymbols { get; set; } = false;
|
||||||
|
/// <summary>
|
||||||
|
/// Interval to poll data at as backup, even when the websocket stream is still connected.
|
||||||
|
/// </summary>
|
||||||
|
public TimeSpan PollIntervalConnected { get; set; } = TimeSpan.Zero;
|
||||||
|
/// <summary>
|
||||||
|
/// Interval to poll data at while the websocket is disconnected.
|
||||||
|
/// </summary>
|
||||||
|
public TimeSpan PollIntervalDisconnected { get; set; } = TimeSpan.Zero;
|
||||||
|
/// <summary>
|
||||||
|
/// Whether to poll for data initially when starting the tracker.
|
||||||
|
/// </summary>
|
||||||
|
public bool PollAtStart { get; set; } = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
18
CryptoExchange.Net/Trackers/UserData/UserDataUpdate.cs
Normal file
18
CryptoExchange.Net/Trackers/UserData/UserDataUpdate.cs
Normal file
@ -0,0 +1,18 @@
|
|||||||
|
namespace CryptoExchange.Net.Trackers.UserData
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// User data update
|
||||||
|
/// </summary>
|
||||||
|
/// <typeparam name="T">Data type</typeparam>
|
||||||
|
public class UserDataUpdate<T>
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Source
|
||||||
|
/// </summary>
|
||||||
|
public UpdateSource Source { get; set; }
|
||||||
|
/// <summary>
|
||||||
|
/// Data
|
||||||
|
/// </summary>
|
||||||
|
public T Data { get; set; } = default!;
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user