diff --git a/CryptoExchange.Net/Objects/AsyncResetEvent.cs b/CryptoExchange.Net/Objects/AsyncResetEvent.cs index 7d0b70b..66ada5b 100644 --- a/CryptoExchange.Net/Objects/AsyncResetEvent.cs +++ b/CryptoExchange.Net/Objects/AsyncResetEvent.cs @@ -95,6 +95,10 @@ namespace CryptoExchange.Net.Objects /// public void Set() { + if (!_autoReset && _signaled) + // Already signaled and not resetting + return; + lock (_waitersLock) { if (_autoReset) diff --git a/CryptoExchange.Net/Objects/Sockets/UpdateSubscription.cs b/CryptoExchange.Net/Objects/Sockets/UpdateSubscription.cs index f128c79..4f07ebb 100644 --- a/CryptoExchange.Net/Objects/Sockets/UpdateSubscription.cs +++ b/CryptoExchange.Net/Objects/Sockets/UpdateSubscription.cs @@ -109,6 +109,21 @@ namespace CryptoExchange.Net.Objects.Sockets /// public int Id => _subscription.Id; + /// + /// The last timestamp anything was received from the server + /// + public DateTime? LastReceiveTime => _connection.LastReceiveTime; + + /// + /// The current websocket status + /// + public SocketStatus SocketStatus => _connection.Status; + + /// + /// The current subscription status + /// + public SubscriptionStatus SubscriptionStatus => _subscription.Status; + /// /// ctor /// diff --git a/CryptoExchange.Net/SharedApis/Enums/SharedTransferStatus.cs b/CryptoExchange.Net/SharedApis/Enums/SharedTransferStatus.cs new file mode 100644 index 0000000..8be1789 --- /dev/null +++ b/CryptoExchange.Net/SharedApis/Enums/SharedTransferStatus.cs @@ -0,0 +1,21 @@ +namespace CryptoExchange.Net.SharedApis +{ + /// + /// Transfer status + /// + public enum SharedTransferStatus + { + /// + /// In progress + /// + InProgress, + /// + /// Failed + /// + Failed, + /// + /// Completed + /// + Completed + } +} diff --git a/CryptoExchange.Net/SharedApis/ResponseModels/SharedDeposit.cs b/CryptoExchange.Net/SharedApis/ResponseModels/SharedDeposit.cs index a59a141..c0232d5 100644 --- a/CryptoExchange.Net/SharedApis/ResponseModels/SharedDeposit.cs +++ b/CryptoExchange.Net/SharedApis/ResponseModels/SharedDeposit.cs @@ -44,15 +44,21 @@ namespace CryptoExchange.Net.SharedApis /// public bool Completed { get; set; } + /// + /// Status of the deposit + /// + public SharedTransferStatus Status { get; set; } + /// /// ctor /// - public SharedDeposit(string asset, decimal quantity, bool completed, DateTime timestamp) + public SharedDeposit(string asset, decimal quantity, bool completed, DateTime timestamp, SharedTransferStatus status) { Asset = asset; Quantity = quantity; Timestamp = timestamp; Completed = completed; + Status = status; } } diff --git a/CryptoExchange.Net/SharedApis/ResponseModels/SharedPosition.cs b/CryptoExchange.Net/SharedApis/ResponseModels/SharedPosition.cs index b251f71..de8c5fd 100644 --- a/CryptoExchange.Net/SharedApis/ResponseModels/SharedPosition.cs +++ b/CryptoExchange.Net/SharedApis/ResponseModels/SharedPosition.cs @@ -20,6 +20,10 @@ namespace CryptoExchange.Net.SharedApis /// public SharedPositionSide PositionSide { get; set; } /// + /// Whether the position is one way mode + /// + public SharedPositionMode PositionMode { get; set; } + /// /// Average open price /// public decimal? AverageOpenPrice { get; set; } diff --git a/CryptoExchange.Net/SharedApis/SharedQuantity.cs b/CryptoExchange.Net/SharedApis/SharedQuantity.cs index a6c83a4..0b5f36d 100644 --- a/CryptoExchange.Net/SharedApis/SharedQuantity.cs +++ b/CryptoExchange.Net/SharedApis/SharedQuantity.cs @@ -21,6 +21,11 @@ namespace CryptoExchange.Net.SharedApis /// public decimal? QuantityInContracts { get; set; } + /// + /// Whether all values are null or zero + /// + public bool IsZero => !(QuantityInBaseAsset > 0) && !(QuantityInQuoteAsset > 0) && !(QuantityInContracts > 0); + /// /// ctor /// diff --git a/CryptoExchange.Net/Sockets/Default/CryptoExchangeWebSocketClient.cs b/CryptoExchange.Net/Sockets/Default/CryptoExchangeWebSocketClient.cs index 1c797c1..c0de716 100644 --- a/CryptoExchange.Net/Sockets/Default/CryptoExchangeWebSocketClient.cs +++ b/CryptoExchange.Net/Sockets/Default/CryptoExchangeWebSocketClient.cs @@ -49,7 +49,6 @@ namespace CryptoExchange.Net.Sockets.Default private int _reconnectAttempt; private readonly int _receiveBufferSize; - private const int _defaultReceiveBufferSize = 1048576; private const int _sendBufferSize = 4096; private int _bytesReceived = 0; @@ -71,7 +70,7 @@ namespace CryptoExchange.Net.Sockets.Default /// /// The timestamp this socket has been active for the last time /// - public DateTime LastActionTime { get; private set; } + public DateTime? LastReceiveTime { get; private set; } /// public Uri Uri => Parameters.Uri; @@ -622,6 +621,7 @@ namespace CryptoExchange.Net.Sockets.Default break; } + LastReceiveTime = DateTime.UtcNow; if (receiveResult.MessageType == WebSocketMessageType.Close) { // Connection closed @@ -772,6 +772,7 @@ namespace CryptoExchange.Net.Sockets.Default break; } + LastReceiveTime = DateTime.UtcNow; if (receiveResult.MessageType == WebSocketMessageType.Close) { // Connection closed @@ -880,7 +881,6 @@ namespace CryptoExchange.Net.Sockets.Default /// protected void ProcessDataNew(WebSocketMessageType type, ReadOnlySpan data) { - LastActionTime = DateTime.UtcNow; _connection.HandleStreamMessage2(type, data); } @@ -891,7 +891,7 @@ namespace CryptoExchange.Net.Sockets.Default protected async Task CheckTimeoutAsync() { _logger.SocketStartingTaskForNoDataReceivedCheck(Id, Parameters.Timeout); - LastActionTime = DateTime.UtcNow; + LastReceiveTime = DateTime.UtcNow; try { while (true) @@ -899,7 +899,7 @@ namespace CryptoExchange.Net.Sockets.Default if (_ctsSource.IsCancellationRequested) return; - if (DateTime.UtcNow - LastActionTime > Parameters.Timeout) + if (DateTime.UtcNow - LastReceiveTime > Parameters.Timeout) { _logger.SocketNoDataReceiveTimoutReconnect(Id, Parameters.Timeout); _ = ReconnectAsync().ConfigureAwait(false); diff --git a/CryptoExchange.Net/Sockets/Default/Interfaces/IWebsocket.cs b/CryptoExchange.Net/Sockets/Default/Interfaces/IWebsocket.cs index 22fe927..a2518bb 100644 --- a/CryptoExchange.Net/Sockets/Default/Interfaces/IWebsocket.cs +++ b/CryptoExchange.Net/Sockets/Default/Interfaces/IWebsocket.cs @@ -69,6 +69,10 @@ namespace CryptoExchange.Net.Sockets.Default.Interfaces /// bool IsOpen { get; } /// + /// Last timestamp something was received from the server + /// + DateTime? LastReceiveTime { get; } + /// /// Connect the socket /// /// diff --git a/CryptoExchange.Net/Sockets/Default/SocketConnection.cs b/CryptoExchange.Net/Sockets/Default/SocketConnection.cs index 886b863..1f67f89 100644 --- a/CryptoExchange.Net/Sockets/Default/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/Default/SocketConnection.cs @@ -178,6 +178,11 @@ namespace CryptoExchange.Net.Sockets.Default /// public DateTime? DisconnectTime { get; set; } + /// + /// Last timestamp something was received from the server + /// + public DateTime? LastReceiveTime => _socket.LastReceiveTime; + /// /// Tag for identification /// diff --git a/CryptoExchange.Net/Sockets/Default/Subscription.cs b/CryptoExchange.Net/Sockets/Default/Subscription.cs index ebf9295..5ef3126 100644 --- a/CryptoExchange.Net/Sockets/Default/Subscription.cs +++ b/CryptoExchange.Net/Sockets/Default/Subscription.cs @@ -46,7 +46,7 @@ namespace CryptoExchange.Net.Sockets.Default return; _status = value; - Task.Run(() => StatusChanged?.Invoke(value)); + StatusChanged?.Invoke(value); } } diff --git a/CryptoExchange.Net/Testing/Implementations/TestSocket.cs b/CryptoExchange.Net/Testing/Implementations/TestSocket.cs index 584e275..ed7c4a2 100644 --- a/CryptoExchange.Net/Testing/Implementations/TestSocket.cs +++ b/CryptoExchange.Net/Testing/Implementations/TestSocket.cs @@ -39,6 +39,7 @@ namespace CryptoExchange.Net.Testing.Implementations public Func>? GetReconnectionUrl { get; set; } public static int lastId = 0; + public DateTime? LastReceiveTime { get; } #if NET9_0_OR_GREATER public static readonly Lock lastIdLock = new Lock(); #else diff --git a/CryptoExchange.Net/Trackers/UserData/Interfaces/IUserDataTracker.cs b/CryptoExchange.Net/Trackers/UserData/Interfaces/IUserDataTracker.cs new file mode 100644 index 0000000..2cf7d00 --- /dev/null +++ b/CryptoExchange.Net/Trackers/UserData/Interfaces/IUserDataTracker.cs @@ -0,0 +1,41 @@ +using CryptoExchange.Net.SharedApis; +using CryptoExchange.Net.Trackers.UserData.Objects; +using System; +using System.Collections.Generic; +using System.Threading.Tasks; + +namespace CryptoExchange.Net.Trackers.UserData.Interfaces +{ + /// + /// Data tracker interface + /// + public interface IUserDataTracker + { + /// + /// Whether the tracker is currently fully connected + /// + bool Connected { get; } + + /// + /// Currently tracked symbols. Data for these symbols will be requested when polling. + /// Websocket updates will be available for all symbols regardless. + /// When new data is received for a symbol which is not yet being tracked it will be added to this list and polled in the future unless the `OnlyTrackProvidedSymbols` option is set in the configuration. + /// + IEnumerable TrackedSymbols { get; } + + /// + /// On connection status change. Might trigger multiple times with the same status depending on the underlying subscriptions. + /// + event Action? OnConnectedChange; + + /// + /// Currently tracker values + /// + T[] Values { get; } + + /// + /// On data update + /// + event Func, Task>? OnUpdate; + } +} diff --git a/CryptoExchange.Net/Trackers/UserData/Interfaces/IUserFuturesDataTracker.cs b/CryptoExchange.Net/Trackers/UserData/Interfaces/IUserFuturesDataTracker.cs new file mode 100644 index 0000000..3c429eb --- /dev/null +++ b/CryptoExchange.Net/Trackers/UserData/Interfaces/IUserFuturesDataTracker.cs @@ -0,0 +1,61 @@ +using CryptoExchange.Net.Objects; +using CryptoExchange.Net.SharedApis; +using CryptoExchange.Net.Trackers.UserData.Objects; +using System; +using System.Threading.Tasks; + +namespace CryptoExchange.Net.Trackers.UserData.Interfaces +{ + /// + /// Futures user data tracker + /// + public interface IUserFuturesDataTracker + { + /// + /// User identifier + /// + string? UserIdentifier { get; } + + /// + /// Whether the tracker is currently fully connected + /// + bool Connected { get; } + + /// + /// Exchange name + /// + public string Exchange { get; } + + /// + /// Balances tracker + /// + IUserDataTracker Balances { get; } + /// + /// Orders tracker + /// + IUserDataTracker Orders { get; } + /// + /// Positions tracker + /// + IUserDataTracker Positions { get; } + /// + /// Trades tracker + /// + IUserDataTracker? Trades { get; } + + /// + /// On connection status change + /// + event Action? OnConnectedChange; + + /// + /// Start tracking user data + /// + Task StartAsync(); + /// + /// Stop tracking data + /// + /// + Task StopAsync(); + } +} \ No newline at end of file diff --git a/CryptoExchange.Net/Trackers/UserData/Interfaces/IUserSpotDataTracker.cs b/CryptoExchange.Net/Trackers/UserData/Interfaces/IUserSpotDataTracker.cs new file mode 100644 index 0000000..35612d0 --- /dev/null +++ b/CryptoExchange.Net/Trackers/UserData/Interfaces/IUserSpotDataTracker.cs @@ -0,0 +1,57 @@ +using CryptoExchange.Net.Objects; +using CryptoExchange.Net.SharedApis; +using CryptoExchange.Net.Trackers.UserData.Objects; +using System; +using System.Threading.Tasks; + +namespace CryptoExchange.Net.Trackers.UserData.Interfaces +{ + /// + /// User data tracker + /// + public interface IUserSpotDataTracker + { + /// + /// User identifier + /// + string? UserIdentifier { get; } + + /// + /// Whether the tracker is currently fully connected + /// + bool Connected { get; } + + /// + /// Exchange name + /// + public string Exchange { get; } + + /// + /// Balances tracker + /// + IUserDataTracker Balances { get; } + /// + /// Orders tracker + /// + IUserDataTracker Orders { get; } + /// + /// Trades tracker + /// + IUserDataTracker? Trades { get; } + + /// + /// On connection status change + /// + event Action? OnConnectedChange; + + /// + /// Start tracking user data + /// + Task StartAsync(); + /// + /// Stop tracking data + /// + /// + Task StopAsync(); + } +} \ No newline at end of file diff --git a/CryptoExchange.Net/Trackers/UserData/ItemTrackers/BalanceTracker.cs b/CryptoExchange.Net/Trackers/UserData/ItemTrackers/BalanceTracker.cs new file mode 100644 index 0000000..66f24fd --- /dev/null +++ b/CryptoExchange.Net/Trackers/UserData/ItemTrackers/BalanceTracker.cs @@ -0,0 +1,94 @@ +using CryptoExchange.Net.Objects; +using CryptoExchange.Net.Objects.Sockets; +using CryptoExchange.Net.SharedApis; +using CryptoExchange.Net.Trackers.UserData.Objects; +using Microsoft.Extensions.Logging; +using System.Threading.Tasks; + +namespace CryptoExchange.Net.Trackers.UserData.ItemTrackers +{ + /// + /// Balance tracker implementation + /// + public class BalanceTracker : UserDataItemTracker + { + private readonly IBalanceRestClient _restClient; + private readonly IBalanceSocketClient? _socketClient; + private readonly ExchangeParameters? _exchangeParameters; + private readonly SharedAccountType _accountType; + + /// + /// ctor + /// + public BalanceTracker( + ILogger logger, + IBalanceRestClient restClient, + IBalanceSocketClient? socketClient, + SharedAccountType accountType, + TrackerItemConfig config, + ExchangeParameters? exchangeParameters = null + ) : base(logger, UserDataType.Balances, restClient.Exchange, config, false, null) + { + if (_socketClient == null) + config = config with { PollIntervalConnected = config.PollIntervalDisconnected }; + + _restClient = restClient; + _socketClient = socketClient; + _exchangeParameters = exchangeParameters; + _accountType = accountType; + } + + /// + protected override bool Update(SharedBalance existingItem, SharedBalance updateItem) + { + var changed = false; + if (existingItem.Total != updateItem.Total) + { + existingItem.Total = updateItem.Total; + changed = true; + } + + if (existingItem.Available != updateItem.Available) + { + existingItem.Available = updateItem.Available; + changed = true; + } + + return changed; + } + + /// + protected override string GetKey(SharedBalance item) => item.Asset + item.IsolatedMarginSymbol; + + /// + protected override bool? CheckIfUpdateShouldBeApplied(SharedBalance existingItem, SharedBalance updateItem) => true; + + /// + protected override Task> DoSubscribeAsync(string? listenKey) + { + if (_socketClient == null) + return Task.FromResult(new CallResult(data: null)); + + var accountType = _accountType == SharedAccountType.Spot ? TradingMode.Spot : + _accountType == SharedAccountType.PerpetualInverseFutures ? TradingMode.PerpetualInverse : + _accountType == SharedAccountType.DeliveryLinearFutures ? TradingMode.DeliveryLinear : + _accountType == SharedAccountType.DeliveryInverseFutures ? TradingMode.DeliveryInverse : + TradingMode.PerpetualLinear; + return ExchangeHelpers.ProcessQueuedAsync( + async handler => await _socketClient.SubscribeToBalanceUpdatesAsync(new SubscribeBalancesRequest(listenKey, accountType, exchangeParameters: _exchangeParameters), handler, ct: _cts!.Token).ConfigureAwait(false), + x => HandleUpdateAsync(UpdateSource.Push, x.Data))!; + } + + /// + protected override async Task DoPollAsync() + { + var balances = await _restClient.GetBalancesAsync(new GetBalancesRequest(accountType: _accountType, exchangeParameters: _exchangeParameters)).ConfigureAwait(false); + if (balances.Success) + await HandleUpdateAsync(UpdateSource.Poll, balances.Data).ConfigureAwait(false); + else + _initialPollingError ??= balances.Error; + + return !balances.Success; + } + } +} diff --git a/CryptoExchange.Net/Trackers/UserData/ItemTrackers/FuturesOrderTracker.cs b/CryptoExchange.Net/Trackers/UserData/ItemTrackers/FuturesOrderTracker.cs new file mode 100644 index 0000000..b1923b1 --- /dev/null +++ b/CryptoExchange.Net/Trackers/UserData/ItemTrackers/FuturesOrderTracker.cs @@ -0,0 +1,298 @@ +using CryptoExchange.Net.Objects; +using CryptoExchange.Net.Objects.Sockets; +using CryptoExchange.Net.SharedApis; +using CryptoExchange.Net.Trackers.UserData.Objects; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace CryptoExchange.Net.Trackers.UserData.ItemTrackers +{ + /// + /// Futures order tracker + /// + public class FuturesOrderTracker : UserDataItemTracker + { + private readonly IFuturesOrderRestClient _restClient; + private readonly IFuturesOrderSocketClient? _socketClient; + private readonly ExchangeParameters? _exchangeParameters; + private readonly bool _requiresSymbolParameterOpenOrders; + + internal event Func? OnTradeUpdate; + + /// + /// ctor + /// + public FuturesOrderTracker( + ILogger logger, + IFuturesOrderRestClient restClient, + IFuturesOrderSocketClient? socketClient, + TrackerItemConfig config, + IEnumerable symbols, + bool onlyTrackProvidedSymbols, + ExchangeParameters? exchangeParameters = null + ) : base(logger, UserDataType.Orders, restClient.Exchange, config, onlyTrackProvidedSymbols, symbols) + { + if (_socketClient == null) + config = config with { PollIntervalConnected = config.PollIntervalDisconnected }; + + _restClient = restClient; + _socketClient = socketClient; + _exchangeParameters = exchangeParameters; + + _requiresSymbolParameterOpenOrders = restClient.GetOpenFuturesOrdersOptions.RequiredOptionalParameters.Any(x => x.Name == "Symbol"); + } + + /// + protected override bool Update(SharedFuturesOrder existingItem, SharedFuturesOrder updateItem) + { + var changed = false; + if (updateItem.AveragePrice != null && updateItem.AveragePrice != existingItem.AveragePrice) + { + existingItem.AveragePrice = updateItem.AveragePrice; + changed = true; + } + + if (updateItem.OrderPrice != null && updateItem.OrderPrice != existingItem.OrderPrice) + { + existingItem.OrderPrice = updateItem.OrderPrice; + changed = true; + } + + if (updateItem.Fee != null && updateItem.Fee != existingItem.Fee) + { + existingItem.Fee = updateItem.Fee; + changed = true; + } + + if (updateItem.FeeAsset != null && updateItem.FeeAsset != existingItem.FeeAsset) + { + existingItem.FeeAsset = updateItem.FeeAsset; + changed = true; + } + + if (updateItem.OrderQuantity != null && updateItem.OrderQuantity != existingItem.OrderQuantity) + { + existingItem.OrderQuantity = updateItem.OrderQuantity; + changed = true; + } + + if (updateItem.QuantityFilled != null && updateItem.QuantityFilled != existingItem.QuantityFilled) + { + existingItem.QuantityFilled = updateItem.QuantityFilled; + changed = true; + } + + if (updateItem.Status != existingItem.Status) + { + existingItem.Status = updateItem.Status; + changed = true; + } + + if (updateItem.StopLossPrice != existingItem.StopLossPrice) + { + existingItem.StopLossPrice = updateItem.StopLossPrice; + changed = true; + } + + if (updateItem.TakeProfitPrice != existingItem.TakeProfitPrice) + { + existingItem.TakeProfitPrice = updateItem.TakeProfitPrice; + changed = true; + } + + if (updateItem.TriggerPrice != existingItem.TriggerPrice) + { + existingItem.TriggerPrice = updateItem.TriggerPrice; + changed = true; + } + + if (updateItem.UpdateTime != null && updateItem.UpdateTime != existingItem.UpdateTime) + { + existingItem.UpdateTime = updateItem.UpdateTime; + changed = true; + } + + return changed; + } + + /// + protected override string GetKey(SharedFuturesOrder item) => item.OrderId; + /// + protected override TimeSpan GetAge(DateTime time, SharedFuturesOrder item) => item.Status == SharedOrderStatus.Open ? TimeSpan.Zero : time - (item.UpdateTime ?? item.CreateTime ?? time); + /// + protected override bool? CheckIfUpdateShouldBeApplied(SharedFuturesOrder existingItem, SharedFuturesOrder updateItem) + { + if (existingItem.Status == SharedOrderStatus.Open && updateItem.Status != SharedOrderStatus.Open) + // status changed from open to not open + return true; + + if (existingItem.Status != SharedOrderStatus.Open && updateItem.Status == SharedOrderStatus.Open) + // status changed from not open to open; stale + return false; + + if (existingItem.UpdateTime != null && updateItem.UpdateTime != null) + { + // If both have an update time base of that + if (existingItem.UpdateTime < updateItem.UpdateTime) + return true; + + if (existingItem.UpdateTime > updateItem.UpdateTime) + return false; + } + + if (existingItem.QuantityFilled != null && updateItem.QuantityFilled != null) + { + if (existingItem.QuantityFilled.QuantityInBaseAsset != null && updateItem.QuantityFilled.QuantityInBaseAsset != null) + { + // If base quantity is not null we can base it on that + if (existingItem.QuantityFilled.QuantityInBaseAsset < updateItem.QuantityFilled.QuantityInBaseAsset) + return true; + + else if (existingItem.QuantityFilled.QuantityInBaseAsset > updateItem.QuantityFilled.QuantityInBaseAsset) + return false; + } + + if (existingItem.QuantityFilled.QuantityInQuoteAsset != null && updateItem.QuantityFilled.QuantityInQuoteAsset != null) + { + // If quote quantity is not null we can base it on that + if (existingItem.QuantityFilled.QuantityInQuoteAsset < updateItem.QuantityFilled.QuantityInQuoteAsset) + return true; + + else if (existingItem.QuantityFilled.QuantityInQuoteAsset > updateItem.QuantityFilled.QuantityInQuoteAsset) + return false; + } + } + + if (existingItem.Fee != null && updateItem.Fee != null) + { + // Higher fee means later processing + if (existingItem.Fee < updateItem.Fee) + return true; + + if (existingItem.Fee > updateItem.Fee) + return false; + } + + return null; + } + + /// + protected internal override async Task HandleUpdateAsync(UpdateSource source, SharedFuturesOrder[] @event) + { + await base.HandleUpdateAsync(source, @event).ConfigureAwait(false); + + var trades = @event.Where(x => x.LastTrade != null).Select(x => x.LastTrade!).ToArray(); + if (trades.Length != 0 && OnTradeUpdate != null) + await OnTradeUpdate.Invoke(source, trades).ConfigureAwait(false); + } + + /// + protected override Task> DoSubscribeAsync(string? listenKey) + { + if (_socketClient == null) + return Task.FromResult(new CallResult(data: null)); + + return ExchangeHelpers.ProcessQueuedAsync( + async handler => await _socketClient.SubscribeToFuturesOrderUpdatesAsync(new SubscribeFuturesOrderRequest(listenKey, exchangeParameters: _exchangeParameters), handler, ct: _cts!.Token).ConfigureAwait(false), + x => HandleUpdateAsync(UpdateSource.Push, x.Data))!; + } + + /// + protected override async Task DoPollAsync() + { + var anyError = false; + List openOrders = new List(); + + if (!_requiresSymbolParameterOpenOrders) + { + var openOrdersResult = await _restClient.GetOpenFuturesOrdersAsync(new GetOpenOrdersRequest(exchangeParameters: _exchangeParameters)).ConfigureAwait(false); + if (!openOrdersResult.Success) + { + anyError = true; + + _initialPollingError ??= openOrdersResult.Error; + if (!_firstPollDone) + return anyError; + } + else + { + openOrders.AddRange(openOrdersResult.Data); + await HandleUpdateAsync(UpdateSource.Poll, openOrdersResult.Data).ConfigureAwait(false); + } + } + else + { + foreach (var symbol in _symbols.ToList()) + { + var openOrdersResult = await _restClient.GetOpenFuturesOrdersAsync(new GetOpenOrdersRequest(symbol, exchangeParameters: _exchangeParameters)).ConfigureAwait(false); + if (!openOrdersResult.Success) + { + anyError = true; + + _initialPollingError ??= openOrdersResult.Error; + if (!_firstPollDone) + break; + } + else + { + openOrders.AddRange(openOrdersResult.Data); + await HandleUpdateAsync(UpdateSource.Poll, openOrdersResult.Data).ConfigureAwait(false); + } + } + } + + foreach (var symbol in _symbols.ToList()) + { + var fromTimeOrders = _lastDataTimeBeforeDisconnect ?? _lastPollTime ?? _startTime; + var updatedPollTime = DateTime.UtcNow; + var closedOrdersResult = await _restClient.GetClosedFuturesOrdersAsync(new GetClosedOrdersRequest(symbol, startTime: fromTimeOrders, exchangeParameters: _exchangeParameters)).ConfigureAwait(false); + if (!closedOrdersResult.Success) + { + anyError = true; + + _initialPollingError ??= closedOrdersResult.Error; + if (!_firstPollDone) + break; + } + else + { + _lastDataTimeBeforeDisconnect = null; + _lastPollTime = updatedPollTime; + + // Filter orders to only include where close time is after the start time + var relevantOrders = closedOrdersResult.Data.Where(x => + x.UpdateTime != null && x.UpdateTime >= _startTime // Updated after the tracker start time + || x.CreateTime != null && x.CreateTime >= _startTime // Created after the tracker start time + || x.CreateTime == null && x.UpdateTime == null // Unknown time + ).ToArray(); + + // Check for orders which are no longer returned in either open/closed and assume they're canceled without fill + var openOrdersNotReturned = Values.Where(x => + x.SharedSymbol!.BaseAsset == symbol.BaseAsset && x.SharedSymbol.QuoteAsset == symbol.QuoteAsset // Orders for the same symbol + && x.QuantityFilled?.IsZero == true // With no filled value + && !openOrders.Any(r => r.OrderId == x.OrderId) // Not returned in open orders + && !relevantOrders.Any(r => r.OrderId == x.OrderId) // Not return in closed orders + ).ToList(); + + var additionalUpdates = new List(); + foreach (var order in openOrdersNotReturned) + { + additionalUpdates.Add(order with + { + Status = SharedOrderStatus.Canceled + }); + } + + relevantOrders = relevantOrders.Concat(additionalUpdates).ToArray(); + if (relevantOrders.Length > 0) + await HandleUpdateAsync(UpdateSource.Poll, relevantOrders).ConfigureAwait(false); + } + } + + return anyError; + } + } +} diff --git a/CryptoExchange.Net/Trackers/UserData/ItemTrackers/FuturesUserTradeTracker.cs b/CryptoExchange.Net/Trackers/UserData/ItemTrackers/FuturesUserTradeTracker.cs new file mode 100644 index 0000000..93291ea --- /dev/null +++ b/CryptoExchange.Net/Trackers/UserData/ItemTrackers/FuturesUserTradeTracker.cs @@ -0,0 +1,98 @@ +using CryptoExchange.Net.Objects; +using CryptoExchange.Net.Objects.Sockets; +using CryptoExchange.Net.SharedApis; +using CryptoExchange.Net.Trackers.UserData.Objects; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace CryptoExchange.Net.Trackers.UserData.ItemTrackers +{ + /// + /// Futures user trade tracker + /// + public class FuturesUserTradeTracker : UserDataItemTracker + { + private readonly IFuturesOrderRestClient _restClient; + private readonly IUserTradeSocketClient? _socketClient; + private readonly ExchangeParameters? _exchangeParameters; + + internal Func? GetTrackedOrderIds { get; set; } + + /// + /// ctor + /// + public FuturesUserTradeTracker( + ILogger logger, + IFuturesOrderRestClient restClient, + IUserTradeSocketClient? socketClient, + TrackerItemConfig config, + IEnumerable symbols, + bool onlyTrackProvidedSymbols, + ExchangeParameters? exchangeParameters = null + ) : base(logger, UserDataType.Trades, restClient.Exchange, config, onlyTrackProvidedSymbols, symbols) + { + if (_socketClient == null) + config = config with { PollIntervalConnected = config.PollIntervalDisconnected }; + + _restClient = restClient; + _socketClient = socketClient; + _exchangeParameters = exchangeParameters; + } + + /// + protected override string GetKey(SharedUserTrade item) => item.Id; + /// + protected override bool? CheckIfUpdateShouldBeApplied(SharedUserTrade existingItem, SharedUserTrade updateItem) => false; + /// + protected override bool Update(SharedUserTrade existingItem, SharedUserTrade updateItem) => false; // trades are never updated + /// + protected override TimeSpan GetAge(DateTime time, SharedUserTrade item) => time - item.Timestamp; + + /// + protected override async Task DoPollAsync() + { + var anyError = false; + foreach (var symbol in _symbols) + { + var fromTimeTrades = _lastDataTimeBeforeDisconnect ?? _lastPollTime ?? _startTime; + var updatedPollTime = DateTime.UtcNow; + var tradesResult = await _restClient.GetFuturesUserTradesAsync(new GetUserTradesRequest(symbol, startTime: fromTimeTrades, exchangeParameters: _exchangeParameters)).ConfigureAwait(false); + if (!tradesResult.Success) + { + anyError = true; + + _initialPollingError ??= tradesResult.Error; + if (!_firstPollDone) + break; + } + else + { + _lastDataTimeBeforeDisconnect = null; + _lastPollTime = updatedPollTime; + + // Filter trades to only include where timestamp is after the start time OR it's part of an order we're tracking + var relevantTrades = tradesResult.Data.Where(x => x.Timestamp >= _startTime || (GetTrackedOrderIds?.Invoke() ?? []).Any(o => o == x.OrderId)).ToArray(); + if (relevantTrades.Length > 0) + await HandleUpdateAsync(UpdateSource.Poll, tradesResult.Data).ConfigureAwait(false); + } + } + + return anyError; + } + + /// + protected override Task> DoSubscribeAsync(string? listenKey) + { + if (_socketClient == null) + return Task.FromResult(new CallResult(data: null)); + + return ExchangeHelpers.ProcessQueuedAsync( + async handler => await _socketClient.SubscribeToUserTradeUpdatesAsync(new SubscribeUserTradeRequest(listenKey, exchangeParameters: _exchangeParameters), handler, ct: _cts!.Token).ConfigureAwait(false), + x => HandleUpdateAsync(UpdateSource.Push, x.Data))!; + } + + } +} diff --git a/CryptoExchange.Net/Trackers/UserData/ItemTrackers/PositionTracker.cs b/CryptoExchange.Net/Trackers/UserData/ItemTrackers/PositionTracker.cs new file mode 100644 index 0000000..231cf65 --- /dev/null +++ b/CryptoExchange.Net/Trackers/UserData/ItemTrackers/PositionTracker.cs @@ -0,0 +1,240 @@ +using CryptoExchange.Net.Objects; +using CryptoExchange.Net.Objects.Sockets; +using CryptoExchange.Net.SharedApis; +using CryptoExchange.Net.Trackers.UserData.Objects; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace CryptoExchange.Net.Trackers.UserData.ItemTrackers +{ + /// + /// Position tracker + /// + public class PositionTracker : UserDataItemTracker + { + private readonly IFuturesOrderRestClient _restClient; + private readonly IPositionSocketClient? _socketClient; + private readonly ExchangeParameters? _exchangeParameters; + + /// + /// Whether websocket position updates are full snapshots and missing positions should be considered 0 + /// + protected bool WebsocketPositionUpdatesAreFullSnapshots { get; } + + /// + /// ctor + /// + public PositionTracker( + ILogger logger, + IFuturesOrderRestClient restClient, + IPositionSocketClient? socketClient, + TrackerItemConfig config, + IEnumerable symbols, + bool onlyTrackProvidedSymbols, + bool websocketPositionUpdatesAreFullSnapshots, + ExchangeParameters? exchangeParameters = null + ) : base(logger, UserDataType.Positions, restClient.Exchange, config, onlyTrackProvidedSymbols, symbols) + { + if (_socketClient == null) + config = config with { PollIntervalConnected = config.PollIntervalDisconnected }; + + _restClient = restClient; + _socketClient = socketClient; + _exchangeParameters = exchangeParameters; + + WebsocketPositionUpdatesAreFullSnapshots = websocketPositionUpdatesAreFullSnapshots; + } + + /// + protected override bool Update(SharedPosition existingItem, SharedPosition updateItem) + { + // Some other way to way to determine sequence? Maybe timestamp? + var changed = false; + if (existingItem.AverageOpenPrice != updateItem.AverageOpenPrice) + { + existingItem.AverageOpenPrice = updateItem.AverageOpenPrice; + changed = true; + } + + if (existingItem.Leverage != updateItem.Leverage) + { + existingItem.Leverage = updateItem.Leverage; + changed = true; + } + + if (existingItem.LiquidationPrice != updateItem.LiquidationPrice) + { + existingItem.LiquidationPrice = updateItem.LiquidationPrice; + changed = true; + } + + if (existingItem.PositionSize != updateItem.PositionSize) + { + existingItem.PositionSize = updateItem.PositionSize; + changed = true; + } + + if (existingItem.StopLossPrice != updateItem.StopLossPrice) + { + existingItem.StopLossPrice = updateItem.StopLossPrice; + changed = true; + } + + if (existingItem.TakeProfitPrice != updateItem.TakeProfitPrice) + { + existingItem.TakeProfitPrice = updateItem.TakeProfitPrice; + changed = true; + } + + if (updateItem.UnrealizedPnl != null && existingItem.UnrealizedPnl != updateItem.UnrealizedPnl) + { + existingItem.UnrealizedPnl = updateItem.UnrealizedPnl; + changed = true; + } + + if (updateItem.UpdateTime != null && existingItem.UpdateTime != updateItem.UpdateTime) + { + existingItem.UpdateTime = updateItem.UpdateTime; + // If update time is the only changed prop don't mark it as changed + } + + return changed; + } + + /// + protected internal override async Task HandleUpdateAsync(UpdateSource source, SharedPosition[] @event) + { + LastUpdateTime = DateTime.UtcNow; + + List? toRemove = null; + foreach (var item in @event) + { + if (item is SharedSymbolModel symbolModel) + { + if (symbolModel.SharedSymbol == null) + { + toRemove ??= new List(); + toRemove.Add(item); + } + else if (_onlyTrackProvidedSymbols + && !_symbols.Any(y => y.TradingMode == symbolModel.SharedSymbol!.TradingMode && y.BaseAsset == symbolModel.SharedSymbol.BaseAsset && y.QuoteAsset == symbolModel.SharedSymbol.QuoteAsset)) + { + toRemove ??= new List(); + toRemove.Add(item); + } + } + } + + if (toRemove != null) + @event = @event.Except(toRemove).ToArray(); + + if (!_onlyTrackProvidedSymbols) + UpdateSymbolsList(@event.OfType().Select(x => x.SharedSymbol!)); + + + // Update local store + var updatedItems = @event.Select(GetKey).ToList(); + + if (WebsocketPositionUpdatesAreFullSnapshots) + { + // Reset any tracking position to zero/null values when it's no longer in the snapshot as it means there is no open position any more + var notInSnapshot = _store.Where(x => !updatedItems.Contains(x.Key) && x.Value.PositionSize != 0).ToList(); + foreach (var position in notInSnapshot) + { + position.Value.UpdateTime = DateTime.UtcNow; + position.Value.AverageOpenPrice = null; + position.Value.LiquidationPrice = null; + position.Value.PositionSize = 0; + position.Value.StopLossPrice = null; + position.Value.TakeProfitPrice = null; + position.Value.UnrealizedPnl = null; + updatedItems.Add(position.Key); + + LastChangeTime = DateTime.UtcNow; + } + } + + foreach (var item in @event) + { + bool existed = false; + _store.AddOrUpdate(GetKey(item), item, (key, existing) => + { + existed = true; + if (CheckIfUpdateShouldBeApplied(existing, item) == false) + { + updatedItems.Remove(key); + } + else + { + var updated = Update(existing, item); + if (!updated) + { + updatedItems.Remove(key); + } + else + { + _logger.LogDebug("Updated {DataType} {Item}", DataType, key); + LastChangeTime = DateTime.UtcNow; + } + } + + return existing; + }); + + if (!existed) + { + _logger.LogDebug("Added {DataType} {Item}", DataType, GetKey(item)); + LastChangeTime = DateTime.UtcNow; + } + } + + if (updatedItems.Count > 0) + { + await InvokeUpdate( + new UserDataUpdate(source, _exchange, _store.Where(x => updatedItems.Contains(x.Key)).Select(x => x.Value).ToArray())).ConfigureAwait(false); + } + } + + /// + protected override string GetKey(SharedPosition item) => + item.SharedSymbol!.TradingMode + item.SharedSymbol.BaseAsset + item.SharedSymbol.QuoteAsset + item.PositionMode + (item.PositionMode != SharedPositionMode.OneWay ? item.PositionSide.ToString() : ""); + + /// + protected override bool? CheckIfUpdateShouldBeApplied(SharedPosition existingItem, SharedPosition updateItem) => true; + + /// + protected override Task> DoSubscribeAsync(string? listenKey) + { + if (_socketClient == null) + return Task.FromResult(new CallResult(data: null)); + + return ExchangeHelpers.ProcessQueuedAsync( + async handler => await _socketClient.SubscribeToPositionUpdatesAsync(new SubscribePositionRequest(listenKey, exchangeParameters: _exchangeParameters), handler, ct: _cts!.Token).ConfigureAwait(false), + x => HandleUpdateAsync(UpdateSource.Push, x.Data))!; + } + + /// + protected override async Task DoPollAsync() + { + var anyError = false; + var positionsResult = await _restClient.GetPositionsAsync(new GetPositionsRequest(exchangeParameters: _exchangeParameters)).ConfigureAwait(false); + if (!positionsResult.Success) + { + anyError = true; + + _initialPollingError ??= positionsResult.Error; + if (!_firstPollDone) + return anyError; + } + else + { + await HandleUpdateAsync(UpdateSource.Poll, positionsResult.Data).ConfigureAwait(false); + } + + return anyError; + } + } +} diff --git a/CryptoExchange.Net/Trackers/UserData/ItemTrackers/SpotOrderTracker.cs b/CryptoExchange.Net/Trackers/UserData/ItemTrackers/SpotOrderTracker.cs new file mode 100644 index 0000000..da4aebd --- /dev/null +++ b/CryptoExchange.Net/Trackers/UserData/ItemTrackers/SpotOrderTracker.cs @@ -0,0 +1,312 @@ +using CryptoExchange.Net.Objects; +using CryptoExchange.Net.Objects.Sockets; +using CryptoExchange.Net.SharedApis; +using CryptoExchange.Net.Trackers.UserData.Objects; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace CryptoExchange.Net.Trackers.UserData.ItemTrackers +{ + /// + /// Spot order tracker + /// + public class SpotOrderTracker : UserDataItemTracker + { + private readonly ISpotOrderRestClient _restClient; + private readonly ISpotOrderSocketClient? _socketClient; + private readonly ExchangeParameters? _exchangeParameters; + private readonly bool _requiresSymbolParameterOpenOrders; + + internal event Func? OnTradeUpdate; + + /// + /// ctor + /// + public SpotOrderTracker( + ILogger logger, + ISpotOrderRestClient restClient, + ISpotOrderSocketClient? socketClient, + TrackerItemConfig config, + IEnumerable symbols, + bool onlyTrackProvidedSymbols, + ExchangeParameters? exchangeParameters = null + ) : base(logger, UserDataType.Orders, restClient.Exchange, config, onlyTrackProvidedSymbols, symbols) + { + if (_socketClient == null) + config = config with { PollIntervalConnected = config.PollIntervalDisconnected }; + + _restClient = restClient; + _socketClient = socketClient; + _exchangeParameters = exchangeParameters; + + _requiresSymbolParameterOpenOrders = restClient.GetOpenSpotOrdersOptions.RequiredOptionalParameters.Any(x => x.Name == "Symbol"); + } + + /// + protected override bool Update(SharedSpotOrder existingItem, SharedSpotOrder updateItem) + { + var changed = false; + if (updateItem.AveragePrice != null && updateItem.AveragePrice != existingItem.AveragePrice) + { + existingItem.AveragePrice = updateItem.AveragePrice; + changed = true; + } + + if (updateItem.OrderPrice != null && updateItem.OrderPrice != existingItem.OrderPrice) + { + existingItem.OrderPrice = updateItem.OrderPrice; + changed = true; + } + + if (updateItem.Fee != null && updateItem.Fee != existingItem.Fee) + { + existingItem.Fee = updateItem.Fee; + changed = true; + } + + if (updateItem.FeeAsset != null && updateItem.FeeAsset != existingItem.FeeAsset) + { + existingItem.FeeAsset = updateItem.FeeAsset; + changed = true; + } + + if (updateItem.OrderQuantity != null && updateItem.OrderQuantity != existingItem.OrderQuantity) + { + existingItem.OrderQuantity ??= new SharedOrderQuantity(); + if (updateItem.OrderQuantity.QuantityInBaseAsset != null) + { + existingItem.OrderQuantity.QuantityInBaseAsset = updateItem.OrderQuantity.QuantityInBaseAsset; + changed = true; + } + if (updateItem.OrderQuantity.QuantityInQuoteAsset != null) + { + existingItem.OrderQuantity.QuantityInQuoteAsset = updateItem.OrderQuantity.QuantityInQuoteAsset; + changed = true; + } + if (updateItem.OrderQuantity.QuantityInContracts != null) + { + existingItem.OrderQuantity.QuantityInContracts = updateItem.OrderQuantity.QuantityInContracts; + changed = true; + } + } + + if (updateItem.QuantityFilled != null && updateItem.QuantityFilled != existingItem.QuantityFilled) + { + existingItem.QuantityFilled ??= new SharedOrderQuantity(); + if (updateItem.QuantityFilled.QuantityInBaseAsset != null) + { + existingItem.QuantityFilled.QuantityInBaseAsset = updateItem.QuantityFilled.QuantityInBaseAsset; + changed = true; + } + if (updateItem.QuantityFilled.QuantityInQuoteAsset != null) + { + existingItem.QuantityFilled.QuantityInQuoteAsset = updateItem.QuantityFilled.QuantityInQuoteAsset; + changed = true; + } + if (updateItem.QuantityFilled.QuantityInContracts != null) + { + existingItem.QuantityFilled.QuantityInContracts = updateItem.QuantityFilled.QuantityInContracts; + changed = true; + } + } + + if (updateItem.Status != existingItem.Status) + { + existingItem.Status = updateItem.Status; + changed = true; + } + + if (updateItem.UpdateTime != null && updateItem.UpdateTime != existingItem.UpdateTime) + { + existingItem.UpdateTime = updateItem.UpdateTime; + changed = true; + } + + return changed; + } + + /// + protected override string GetKey(SharedSpotOrder item) => item.OrderId; + /// + protected override TimeSpan GetAge(DateTime time, SharedSpotOrder item) => item.Status == SharedOrderStatus.Open ? TimeSpan.Zero : time - (item.UpdateTime ?? item.CreateTime ?? time); + + /// + protected override bool? CheckIfUpdateShouldBeApplied(SharedSpotOrder existingItem, SharedSpotOrder updateItem) + { + if (existingItem.Status == SharedOrderStatus.Open && updateItem.Status != SharedOrderStatus.Open) + // status changed from open to not open + return true; + + if (existingItem.Status != SharedOrderStatus.Open && updateItem.Status == SharedOrderStatus.Open) + // status changed from not open to open; stale + return false; + + if (existingItem.UpdateTime != null && updateItem.UpdateTime != null) + { + // If both have an update time base of that + if (existingItem.UpdateTime < updateItem.UpdateTime) + return true; + + if (existingItem.UpdateTime > updateItem.UpdateTime) + return false; + } + + if (existingItem.QuantityFilled != null && updateItem.QuantityFilled != null) + { + if (existingItem.QuantityFilled.QuantityInBaseAsset != null && updateItem.QuantityFilled.QuantityInBaseAsset != null) + { + // If base quantity is not null we can base it on that + if (existingItem.QuantityFilled.QuantityInBaseAsset < updateItem.QuantityFilled.QuantityInBaseAsset) + return true; + + else if (existingItem.QuantityFilled.QuantityInBaseAsset > updateItem.QuantityFilled.QuantityInBaseAsset) + return false; + } + + if (existingItem.QuantityFilled.QuantityInQuoteAsset != null && updateItem.QuantityFilled.QuantityInQuoteAsset != null) + { + // If quote quantity is not null we can base it on that + if (existingItem.QuantityFilled.QuantityInQuoteAsset < updateItem.QuantityFilled.QuantityInQuoteAsset) + return true; + + else if (existingItem.QuantityFilled.QuantityInQuoteAsset > updateItem.QuantityFilled.QuantityInQuoteAsset) + return false; + } + } + + if (existingItem.Fee != null && updateItem.Fee != null) + { + // Higher fee means later processing + if (existingItem.Fee < updateItem.Fee) + return true; + + if (existingItem.Fee > updateItem.Fee) + return false; + } + + return null; + } + + /// + protected internal override async Task HandleUpdateAsync(UpdateSource source, SharedSpotOrder[] @event) + { + await base.HandleUpdateAsync(source, @event).ConfigureAwait(false); + + var trades = @event.Where(x => x.LastTrade != null).Select(x => x.LastTrade!).ToArray(); + if (trades.Length != 0 && OnTradeUpdate != null) + await OnTradeUpdate(source, trades).ConfigureAwait(false); + } + + /// + protected override Task> DoSubscribeAsync(string? listenKey) + { + if (_socketClient == null) + return Task.FromResult(new CallResult(data: null)); + + return ExchangeHelpers.ProcessQueuedAsync( + async handler => await _socketClient.SubscribeToSpotOrderUpdatesAsync(new SubscribeSpotOrderRequest(listenKey, exchangeParameters: _exchangeParameters), handler, ct: _cts!.Token).ConfigureAwait(false), + x => HandleUpdateAsync(UpdateSource.Push, x.Data))!; + } + + /// + protected override async Task DoPollAsync() + { + var anyError = false; + List openOrders = new List(); + + if (!_requiresSymbolParameterOpenOrders) + { + var openOrdersResult = await _restClient.GetOpenSpotOrdersAsync(new GetOpenOrdersRequest(exchangeParameters: _exchangeParameters)).ConfigureAwait(false); + if (!openOrdersResult.Success) + { + anyError = true; + + _initialPollingError ??= openOrdersResult.Error; + if (!_firstPollDone) + return anyError; + } + else + { + openOrders.AddRange(openOrdersResult.Data); + await HandleUpdateAsync(UpdateSource.Poll, openOrdersResult.Data).ConfigureAwait(false); + } + } + else + { + foreach (var symbol in _symbols.ToList()) + { + var openOrdersResult = await _restClient.GetOpenSpotOrdersAsync(new GetOpenOrdersRequest(symbol, exchangeParameters: _exchangeParameters)).ConfigureAwait(false); + if (!openOrdersResult.Success) + { + anyError = true; + + _initialPollingError ??= openOrdersResult.Error; + if (!_firstPollDone) + break; + } + else + { + openOrders.AddRange(openOrdersResult.Data); + await HandleUpdateAsync(UpdateSource.Poll, openOrdersResult.Data).ConfigureAwait(false); + } + } + } + + if (!_firstPollDone && anyError) + return anyError; + + foreach (var symbol in _symbols.ToList()) + { + var fromTimeOrders = _lastDataTimeBeforeDisconnect ?? _lastPollTime ?? _startTime; + var updatedPollTime = DateTime.UtcNow; + var closedOrdersResult = await _restClient.GetClosedSpotOrdersAsync(new GetClosedOrdersRequest(symbol, startTime: fromTimeOrders, exchangeParameters: _exchangeParameters)).ConfigureAwait(false); + if (!closedOrdersResult.Success) + { + anyError = true; + + _initialPollingError ??= closedOrdersResult.Error; + if (!_firstPollDone) + break; + } + else + { + _lastDataTimeBeforeDisconnect = null; + _lastPollTime = updatedPollTime; + + // Filter orders to only include where close time is after the start time + var relevantOrders = closedOrdersResult.Data.Where(x => + x.UpdateTime != null && x.UpdateTime >= _startTime // Updated after the tracker start time + || x.CreateTime != null && x.CreateTime >= _startTime // Created after the tracker start time + || x.CreateTime == null && x.UpdateTime == null // Unknown time + ).ToArray(); + + // Check for orders which are no longer returned in either open/closed and assume they're canceled without fill + var openOrdersNotReturned = Values.Where(x => + x.SharedSymbol!.BaseAsset == symbol.BaseAsset && x.SharedSymbol.QuoteAsset == symbol.QuoteAsset // Orders for the same symbol + && x.QuantityFilled?.IsZero == true // With no filled value + && !openOrders.Any(r => r.OrderId == x.OrderId) // Not returned in open orders + && !relevantOrders.Any(r => r.OrderId == x.OrderId) // Not return in closed orders + ).ToList(); + + var additionalUpdates = new List(); + foreach (var order in openOrdersNotReturned) + { + additionalUpdates.Add(order with + { + Status = SharedOrderStatus.Canceled + }); + } + + relevantOrders = relevantOrders.Concat(additionalUpdates).ToArray(); + if (relevantOrders.Length > 0) + await HandleUpdateAsync(UpdateSource.Poll, relevantOrders).ConfigureAwait(false); + } + } + + return anyError; + } + } +} diff --git a/CryptoExchange.Net/Trackers/UserData/ItemTrackers/SpotUserTradeTracker.cs b/CryptoExchange.Net/Trackers/UserData/ItemTrackers/SpotUserTradeTracker.cs new file mode 100644 index 0000000..0578ac6 --- /dev/null +++ b/CryptoExchange.Net/Trackers/UserData/ItemTrackers/SpotUserTradeTracker.cs @@ -0,0 +1,98 @@ +using CryptoExchange.Net.Objects; +using CryptoExchange.Net.Objects.Sockets; +using CryptoExchange.Net.SharedApis; +using CryptoExchange.Net.Trackers.UserData.Objects; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace CryptoExchange.Net.Trackers.UserData.ItemTrackers +{ + /// + /// Spot user trade tracker + /// + public class SpotUserTradeTracker : UserDataItemTracker + { + private readonly ISpotOrderRestClient _restClient; + private readonly IUserTradeSocketClient? _socketClient; + private readonly ExchangeParameters? _exchangeParameters; + + internal Func? GetTrackedOrderIds { get; set; } + + /// + /// ctor + /// + public SpotUserTradeTracker( + ILogger logger, + ISpotOrderRestClient restClient, + IUserTradeSocketClient? socketClient, + TrackerItemConfig config, + IEnumerable symbols, + bool onlyTrackProvidedSymbols, + ExchangeParameters? exchangeParameters = null + ) : base(logger, UserDataType.Trades, restClient.Exchange, config, onlyTrackProvidedSymbols, symbols) + { + if (_socketClient == null) + config = config with { PollIntervalConnected = config.PollIntervalDisconnected }; + + _restClient = restClient; + _socketClient = socketClient; + _exchangeParameters = exchangeParameters; + } + + /// + protected override string GetKey(SharedUserTrade item) => item.Id; + /// + protected override bool? CheckIfUpdateShouldBeApplied(SharedUserTrade existingItem, SharedUserTrade updateItem) => false; + /// + protected override bool Update(SharedUserTrade existingItem, SharedUserTrade updateItem) => false; // Trades are never updated + /// + protected override TimeSpan GetAge(DateTime time, SharedUserTrade item) => time - item.Timestamp; + + /// + protected override async Task DoPollAsync() + { + var anyError = false; + foreach (var symbol in _symbols) + { + var fromTimeTrades = _lastDataTimeBeforeDisconnect ?? _lastPollTime ?? _startTime; + var updatedPollTime = DateTime.UtcNow; + var tradesResult = await _restClient.GetSpotUserTradesAsync(new GetUserTradesRequest(symbol, startTime: fromTimeTrades, exchangeParameters: _exchangeParameters)).ConfigureAwait(false); + if (!tradesResult.Success) + { + anyError = true; + + _initialPollingError ??= tradesResult.Error; + if (!_firstPollDone) + break; + } + else + { + _lastDataTimeBeforeDisconnect = null; + _lastPollTime = updatedPollTime; + + // Filter trades to only include where timestamp is after the start time OR it's part of an order we're tracking + var relevantTrades = tradesResult.Data.Where(x => x.Timestamp >= _startTime || (GetTrackedOrderIds?.Invoke() ?? []).Any(o => o == x.OrderId)).ToArray(); + if (relevantTrades.Length > 0) + await HandleUpdateAsync(UpdateSource.Poll, tradesResult.Data).ConfigureAwait(false); + } + } + + return anyError; + } + + /// + protected override Task> DoSubscribeAsync(string? listenKey) + { + if (_socketClient == null) + return Task.FromResult(new CallResult(data: null)); + + return ExchangeHelpers.ProcessQueuedAsync( + async handler => await _socketClient.SubscribeToUserTradeUpdatesAsync(new SubscribeUserTradeRequest(listenKey, exchangeParameters: _exchangeParameters), handler, ct: _cts!.Token).ConfigureAwait(false), + x => HandleUpdateAsync(UpdateSource.Push, x.Data))!; + } + + } +} diff --git a/CryptoExchange.Net/Trackers/UserData/ItemTrackers/UserDataItemTracker.cs b/CryptoExchange.Net/Trackers/UserData/ItemTrackers/UserDataItemTracker.cs new file mode 100644 index 0000000..df0e57f --- /dev/null +++ b/CryptoExchange.Net/Trackers/UserData/ItemTrackers/UserDataItemTracker.cs @@ -0,0 +1,533 @@ +using CryptoExchange.Net.Objects; +using CryptoExchange.Net.Objects.Sockets; +using CryptoExchange.Net.SharedApis; +using CryptoExchange.Net.Trackers.UserData.Interfaces; +using CryptoExchange.Net.Trackers.UserData.Objects; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; + +namespace CryptoExchange.Net.Trackers.UserData.ItemTrackers +{ + /// + /// User data tracker + /// + public abstract class UserDataItemTracker + { + private bool _connected; + + /// + /// Logger + /// + protected ILogger _logger; + /// + /// Polling wait event + /// + protected AsyncResetEvent _pollWaitEvent = new AsyncResetEvent(false, true); + /// + /// Initial polling done event + /// + protected AsyncResetEvent _initialPollDoneEvent = new AsyncResetEvent(false, false); + /// + /// The error from the initial polling; + /// + protected Error? _initialPollingError; + /// + /// Polling task + /// + protected Task? _pollTask; + /// + /// Cancellation token + /// + protected CancellationTokenSource? _cts; + /// + /// Websocket subscription + /// + protected UpdateSubscription? _subscription; + /// + /// Start time + /// + protected DateTime? _startTime = null; + /// + /// Last polling attempt + /// + protected DateTime? _lastPollAttempt; + /// + /// Last polling timestamp + /// + protected DateTime? _lastPollTime; + /// + /// Timestamp of last message received before websocket disconnecting + /// + protected DateTime? _lastDataTimeBeforeDisconnect; + /// + /// Whether last polling was successful + /// + protected bool _lastPollSuccess; + /// + /// Whether first polling was done + /// + protected bool _firstPollDone; + /// + /// Whether websocket was disconnected before a polling + /// + protected bool _wasDisconnected; + /// + /// Poll at the start + /// + protected bool _pollAtStart; + /// + /// Poll interval when connected + /// + protected TimeSpan _pollIntervalConnected; + /// + /// Poll interval when disconnected + /// + protected TimeSpan _pollIntervalDisconnected; + /// + /// Exchange name + /// + protected string _exchange; + /// + /// Time completed data is retained + /// + public TimeSpan _retentionTime; + + /// + /// Data type + /// + public UserDataType DataType { get; } + + /// + /// Timestamp an update was handled. Does not necessarily mean the data was changed + /// + public DateTime? LastUpdateTime { get; protected set; } + /// + /// Timestamp any change was applied to the data + /// + public DateTime? LastChangeTime { get; protected set; } + + /// + /// Connection status changed + /// + public event Action? OnConnectedChange; + + /// + /// ctor + /// + public UserDataItemTracker(ILogger logger, UserDataType dataType, string exchange) + { + _logger = logger; + _exchange = exchange; + + DataType = dataType; + } + + /// + /// Start the tracker + /// + /// Optional listen key + public abstract Task StartAsync(string? listenKey); + + /// + /// Stop the tracker + /// + /// + public async Task StopAsync() + { + _cts?.Cancel(); + + if (_pollTask != null) + await _pollTask.ConfigureAwait(false); + } + + /// + /// Get the delay until next poll + /// + /// + protected TimeSpan? GetNextPollDelay() + { + if (!_firstPollDone && _pollAtStart) + // First polling should be done immediately + return TimeSpan.Zero; + + if (!Connected) + { + if (_pollIntervalDisconnected == TimeSpan.Zero) + // No polling interval + return null; + + return _pollIntervalDisconnected; + } + + if (_pollIntervalConnected == TimeSpan.Zero) + // No polling interval + return null; + + // Wait for next poll + return _pollIntervalConnected; + } + + + /// + public bool Connected + { + get => _connected; + protected set + { + if (_connected == value) + return; + + _connected = value; + if (!_connected) + _wasDisconnected = true; + else + _pollWaitEvent.Set(); + + OnConnectedChange?.Invoke(_connected); + } + } + } + + /// + /// User data tracker + /// + public abstract class UserDataItemTracker : UserDataItemTracker, IUserDataTracker + { + /// + /// Data store + /// + protected ConcurrentDictionary _store = new ConcurrentDictionary(StringComparer.InvariantCultureIgnoreCase); + /// + /// Tracked symbols list + /// + protected readonly List _symbols; + /// + /// Symbol lock + /// + protected object _symbolLock = new object(); + /// + /// Only track provided symbols setting + /// + protected bool _onlyTrackProvidedSymbols; + /// + /// Is SharedSymbol model + /// + protected bool _isSymbolModel; + + /// + public T[] Values + { + get + { + if (_retentionTime != TimeSpan.MaxValue) + { + var timestamp = DateTime.UtcNow; + foreach (var value in _store.Values) + { + if (GetAge(timestamp, value) > _retentionTime) + _store.TryRemove(GetKey(value), out _); + } + } + + return _store.Values.ToArray(); + } + } + + /// + public event Func, Task>? OnUpdate; + /// + public IEnumerable TrackedSymbols => _symbols; + + /// + /// ctor + /// + public UserDataItemTracker(ILogger logger, UserDataType dataType, string exchange, TrackerItemConfig config, bool onlyTrackProvidedSymbols, IEnumerable? symbols) : base(logger, dataType, exchange) + { + _onlyTrackProvidedSymbols = onlyTrackProvidedSymbols; + _symbols = symbols?.ToList() ?? []; + + _pollIntervalDisconnected = config.PollIntervalDisconnected; + _pollIntervalConnected = config.PollIntervalConnected; + _pollAtStart = config.PollAtStart; + _retentionTime = config is TrackerTimedItemConfig timeConfig ? timeConfig.RetentionTime : TimeSpan.MaxValue; + _isSymbolModel = typeof(T).IsSubclassOf(typeof(SharedSymbolModel)); + } + + /// + /// Invoke OnUpdate event + /// + protected async Task InvokeUpdate(UserDataUpdate data) + { + if (OnUpdate == null) + return; + + await OnUpdate(data).ConfigureAwait(false); + } + + /// + public async override Task StartAsync(string? listenKey) + { + _startTime = DateTime.UtcNow; + _cts = new CancellationTokenSource(); + + var start = await SubscribeAsync(listenKey).ConfigureAwait(false); + if (!start) + return start; + + Connected = true; + + _pollTask = PollAsync(); + + await _initialPollDoneEvent.WaitAsync().ConfigureAwait(false); + if (_initialPollingError != null) + { + await StopAsync().ConfigureAwait(false); + return new CallResult(_initialPollingError); + } + + return CallResult.SuccessResult; + } + + /// + /// Subscribe the websocket + /// + public async Task SubscribeAsync(string? listenKey) + { + var subscriptionResult = await DoSubscribeAsync(listenKey).ConfigureAwait(false); + if (!subscriptionResult) + { + // Failed + // .. + return subscriptionResult; + } + + if (subscriptionResult.Data == null) + { + // No subscription available + // .. + return CallResult.SuccessResult; + } + + _subscription = subscriptionResult.Data; + _subscription.SubscriptionStatusChanged += SubscriptionStatusChanged; + return CallResult.SuccessResult; + } + + /// + /// Get the unique identifier for the item + /// + protected abstract string GetKey(T item); + /// + /// Check whether an update should be applied + /// + protected abstract bool? CheckIfUpdateShouldBeApplied(T existingItem, T updateItem); + /// + /// Update an existing item with an update + /// + protected abstract bool Update(T existingItem, T updateItem); + /// + /// Get the age of an item + /// + protected virtual TimeSpan GetAge(DateTime time, T item) => TimeSpan.Zero; + + /// + /// Update the tracked symbol list with potential new symbols + /// + /// + protected void UpdateSymbolsList(IEnumerable symbols) + { + lock (_symbolLock) + { + foreach (var symbol in symbols.Distinct()) + { + if (!_symbols.Any(x => x.TradingMode == symbol.TradingMode && x.BaseAsset == symbol.BaseAsset && x.QuoteAsset == symbol.QuoteAsset)) + { + _symbols.Add(symbol); + _logger.LogDebug("Adding {BaseAsset}/{QuoteAsset} to symbol tracking list", symbol.BaseAsset, symbol.QuoteAsset); + } + } + } + } + + /// + /// Handle an update + /// + protected internal virtual async Task HandleUpdateAsync(UpdateSource source, T[] @event) + { + LastUpdateTime = DateTime.UtcNow; + + if (_isSymbolModel) + { + List? toRemove = null; + foreach (var item in @event) + { + if (item is SharedSymbolModel symbolModel) + { + if (symbolModel.SharedSymbol == null) + { + toRemove ??= new List(); + toRemove.Add(item); + } + else if (_onlyTrackProvidedSymbols + && !_symbols.Any(y => y.TradingMode == symbolModel.SharedSymbol!.TradingMode && y.BaseAsset == symbolModel.SharedSymbol.BaseAsset && y.QuoteAsset == symbolModel.SharedSymbol.QuoteAsset)) + { + toRemove ??= new List(); + toRemove.Add(item); + } + } + } + + if (toRemove != null) + @event = @event.Except(toRemove).ToArray(); + + if (!_onlyTrackProvidedSymbols) + UpdateSymbolsList(@event.OfType().Select(x => x.SharedSymbol!)); + } + + // Update local store + var updatedItems = @event.Select(GetKey).ToList(); + + foreach (var item in @event) + { + bool existed = false; + _store.AddOrUpdate(GetKey(item), item, (key, existing) => + { + existed = true; + if (CheckIfUpdateShouldBeApplied(existing, item) == false) + { + updatedItems.Remove(key); + } + else + { + var updated = Update(existing, item); + if (!updated) + { + updatedItems.Remove(key); + } + else + { + _logger.LogDebug("Updated {DataType} {Item}", DataType, key); + LastChangeTime = DateTime.UtcNow; + } + } + + return existing; + }); + + if (!existed) + { + _logger.LogDebug("Added {DataType} {Item}", DataType, GetKey(item)); + LastChangeTime = DateTime.UtcNow; + } + } + + if (updatedItems.Count > 0 && OnUpdate != null) + { + await OnUpdate.Invoke( + new UserDataUpdate(source, _exchange, _store.Where(x => updatedItems.Contains(x.Key)).Select(x => x.Value).ToArray())).ConfigureAwait(false); + } + } + + /// + /// Websocket subscription implementation + /// + protected abstract Task> DoSubscribeAsync(string? listenKey); + + /// + /// Polling task + /// + protected async Task PollAsync() + { + while (!_cts!.IsCancellationRequested) + { + var delayForNextPoll = GetNextPollDelay(); + if (delayForNextPoll != TimeSpan.Zero) + { + try + { + if (delayForNextPoll != null) + _logger.LogTrace("{DataType} delay for next polling: {Delay}", DataType, delayForNextPoll); + + await _pollWaitEvent.WaitAsync(delayForNextPoll, _cts.Token).ConfigureAwait(false); + } + catch { } + } + + var currentlyFirstPoll = !_firstPollDone; + _firstPollDone = true; + if (_cts.IsCancellationRequested) + break; + + if (_lastPollAttempt != null + && (DateTime.UtcNow - _lastPollAttempt.Value) < TimeSpan.FromSeconds(2) + && !(Connected && _wasDisconnected)) + { + if (_lastPollSuccess) + // If last poll was less than 2 seconds ago and it was successful don't bother immediately polling again + continue; + } + + if (Connected) + _wasDisconnected = false; + + _lastPollSuccess = false; + + try + { + var anyError = await DoPollAsync().ConfigureAwait(false); + + _initialPollDoneEvent.Set(); + _lastPollAttempt = DateTime.UtcNow; + _lastPollSuccess = !anyError; + + if (anyError && currentlyFirstPoll && _pollAtStart) + { + if (_initialPollingError == null) + throw new Exception("Error in initial polling but error not set"); + } + } + catch (Exception ex) + { + _logger.LogError(ex, "{DataType} UserDataTracker polling exception", DataType); + } + } + } + + /// + /// Polling implementation + /// + /// + protected abstract Task DoPollAsync(); + + /// + /// Handle subscription status change + /// + /// + private void SubscriptionStatusChanged(SubscriptionStatus newState) + { + _logger.LogDebug("{DataType} stream status changed: {NewState}", DataType, newState); + + if (newState == SubscriptionStatus.Pending) + { + // Record last data receive time since we need to request data from that timestamp on when polling + // Only set to new value if it isn't already set since if we disconnect/reconnect a couple of times without + // managing to do a poll we don't want to override the time since we still need to request that earlier data + + if (_lastDataTimeBeforeDisconnect == null) + { + _lastDataTimeBeforeDisconnect = _subscription!.LastReceiveTime; + + // When changing to pending (disconnected) trigger polling to start checking + _pollWaitEvent.Set(); + } + } + + Connected = newState == SubscriptionStatus.Subscribed; + } + } +} diff --git a/CryptoExchange.Net/Trackers/UserData/Objects/TrackerItemConfig.cs b/CryptoExchange.Net/Trackers/UserData/Objects/TrackerItemConfig.cs new file mode 100644 index 0000000..5f541dd --- /dev/null +++ b/CryptoExchange.Net/Trackers/UserData/Objects/TrackerItemConfig.cs @@ -0,0 +1,57 @@ +using System; + +namespace CryptoExchange.Net.Trackers.UserData.Objects +{ + /// + /// Tracker configuration + /// + public record TrackerItemConfig + { + /// + /// Interval to poll data at as backup, even when the websocket stream is still connected. + /// + public TimeSpan PollIntervalConnected { get; set; } = TimeSpan.Zero; + /// + /// Interval to poll data at while the websocket is disconnected. + /// + public TimeSpan PollIntervalDisconnected { get; set; } = TimeSpan.FromSeconds(30); + /// + /// Whether to poll for data initially when starting the tracker. + /// + public bool PollAtStart { get; set; } = true; + + /// + /// ctor + /// + /// Whether to poll for data initially when starting the tracker + /// Interval to poll data at as backup, even when the websocket stream is still connected + /// Interval to poll data at while the websocket is disconnected + public TrackerItemConfig(bool pollAtStart, TimeSpan pollIntervalConnected, TimeSpan pollIntervalDisconnected) + { + PollAtStart = pollAtStart; + PollIntervalConnected = pollIntervalConnected; + PollIntervalDisconnected = pollIntervalDisconnected; + } + } + + /// + public record TrackerTimedItemConfig: TrackerItemConfig + { + /// + /// The timespan data is retained after being completed + /// + public TimeSpan RetentionTime { get; set; } = TimeSpan.MaxValue; + + /// + /// ctor + /// + /// Whether to poll for data initially when starting the tracker + /// Interval to poll data at as backup, even when the websocket stream is still connected + /// Interval to poll data at while the websocket is disconnected + /// The timespan data is retained after being completed + public TrackerTimedItemConfig(bool pollAtStart, TimeSpan pollIntervalConnected, TimeSpan pollIntervalDisconnected, TimeSpan retentionTime) : base(pollAtStart, pollIntervalConnected, pollIntervalDisconnected) + { + RetentionTime = retentionTime; + } + } +} diff --git a/CryptoExchange.Net/Trackers/UserData/Objects/UpdateSource.cs b/CryptoExchange.Net/Trackers/UserData/Objects/UpdateSource.cs new file mode 100644 index 0000000..3971b23 --- /dev/null +++ b/CryptoExchange.Net/Trackers/UserData/Objects/UpdateSource.cs @@ -0,0 +1,17 @@ +namespace CryptoExchange.Net.Trackers.UserData.Objects +{ + /// + /// Update source + /// + public enum UpdateSource + { + /// + /// Polling result + /// + Poll, + /// + /// Websocket push + /// + Push + } +} diff --git a/CryptoExchange.Net/Trackers/UserData/Objects/UserDataTrackerConfig.cs b/CryptoExchange.Net/Trackers/UserData/Objects/UserDataTrackerConfig.cs new file mode 100644 index 0000000..a3d02ab --- /dev/null +++ b/CryptoExchange.Net/Trackers/UserData/Objects/UserDataTrackerConfig.cs @@ -0,0 +1,68 @@ +using CryptoExchange.Net.SharedApis; +using System; +using System.Collections.Generic; + +namespace CryptoExchange.Net.Trackers.UserData.Objects +{ + /// + /// User data tracker configuration + /// + public abstract record UserDataTrackerConfig + { + /// + /// Symbols to initially track, used when polling data. Other symbols will get tracked when updates are received for orders or trades on a new symbol and when there are open orders or positions on a new symbol. To only track the symbols specified here set `OnlyTrackProvidedSymbols` to true. + /// + public IEnumerable TrackedSymbols { get; set; } = []; + /// + /// If true only orders and trades in the `Symbols` options will get tracked, data on other symbols will be ignored. + /// + public bool OnlyTrackProvidedSymbols { get; set; } = false; + /// + /// Whether to track order trades, can lead to increased requests when polling since they're requested per symbol. + /// + public bool TrackTrades { get; set; } = true; + } + + /// + /// Spot user data tracker config + /// + public record SpotUserDataTrackerConfig : UserDataTrackerConfig + { + /// + /// Balance tracking config + /// + public TrackerItemConfig BalancesConfig { get; set; } = new TrackerItemConfig(true, TimeSpan.Zero, TimeSpan.FromSeconds(10)); + /// + /// Order tracking config + /// + public TrackerTimedItemConfig OrdersConfig { get; set; } = new TrackerTimedItemConfig(true, TimeSpan.Zero, TimeSpan.FromSeconds(30), TimeSpan.MaxValue); + /// + /// Trade tracking config + /// + public TrackerTimedItemConfig UserTradesConfig { get; set; } = new TrackerTimedItemConfig(false, TimeSpan.Zero, TimeSpan.FromSeconds(30), TimeSpan.MaxValue); + } + + + /// + /// Futures user data tracker config + /// + public record FuturesUserDataTrackerConfig : UserDataTrackerConfig + { + /// + /// Balance tracking config + /// + public TrackerItemConfig BalancesConfig { get; set; } = new TrackerItemConfig(true, TimeSpan.Zero, TimeSpan.FromSeconds(10)); + /// + /// Order tracking config + /// + public TrackerTimedItemConfig OrdersConfig { get; set; } = new TrackerTimedItemConfig(true, TimeSpan.Zero, TimeSpan.FromSeconds(30), TimeSpan.MaxValue); + /// + /// Trade tracking config + /// + public TrackerTimedItemConfig UserTradesConfig { get; set; } = new TrackerTimedItemConfig(false, TimeSpan.Zero, TimeSpan.FromSeconds(30), TimeSpan.MaxValue); + /// + /// Position tracking config + /// + public TrackerItemConfig PositionConfig { get; set; } = new TrackerItemConfig(true, TimeSpan.Zero, TimeSpan.FromSeconds(30)); + } +} diff --git a/CryptoExchange.Net/Trackers/UserData/Objects/UserDataType.cs b/CryptoExchange.Net/Trackers/UserData/Objects/UserDataType.cs new file mode 100644 index 0000000..b01b27d --- /dev/null +++ b/CryptoExchange.Net/Trackers/UserData/Objects/UserDataType.cs @@ -0,0 +1,25 @@ +namespace CryptoExchange.Net.Trackers.UserData.Objects +{ + /// + /// Data type + /// + public enum UserDataType + { + /// + /// Balances + /// + Balances, + /// + /// Orders + /// + Orders, + /// + /// Trades + /// + Trades, + /// + /// Positions + /// + Positions + } +} diff --git a/CryptoExchange.Net/Trackers/UserData/Objects/UserDataUpdate.cs b/CryptoExchange.Net/Trackers/UserData/Objects/UserDataUpdate.cs new file mode 100644 index 0000000..ab6ca89 --- /dev/null +++ b/CryptoExchange.Net/Trackers/UserData/Objects/UserDataUpdate.cs @@ -0,0 +1,32 @@ +namespace CryptoExchange.Net.Trackers.UserData.Objects +{ + /// + /// User data update + /// + /// Data type + public class UserDataUpdate + { + /// + /// Source + /// + public UpdateSource Source { get; set; } + /// + /// Exchange name + /// + public string Exchange { get; set; } + /// + /// Data + /// + public T Data { get; set; } = default!; + + /// + /// ctor + /// + public UserDataUpdate(UpdateSource source, string exchange, T data) + { + Source = source; + Exchange = exchange; + Data = data; + } + } +} diff --git a/CryptoExchange.Net/Trackers/UserData/UserDataTracker.cs b/CryptoExchange.Net/Trackers/UserData/UserDataTracker.cs new file mode 100644 index 0000000..ff27446 --- /dev/null +++ b/CryptoExchange.Net/Trackers/UserData/UserDataTracker.cs @@ -0,0 +1,111 @@ +using CryptoExchange.Net.Objects; +using CryptoExchange.Net.Trackers.UserData.ItemTrackers; +using CryptoExchange.Net.Trackers.UserData.Objects; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace CryptoExchange.Net.Trackers.UserData +{ + /// + /// User data tracker + /// + public abstract class UserDataTracker + { + /// + /// Logger + /// + protected readonly ILogger _logger; + /// + /// Listen key to use for subscriptions + /// + protected string? _listenKey; + /// + /// List of data trackers + /// + protected abstract UserDataItemTracker[] DataTrackers { get; } + + /// + public string? UserIdentifier { get; } + + /// + /// Connected status changed + /// + public event Action? OnConnectedChange; + + /// + /// Exchange name + /// + public string Exchange { get; } + + /// + /// Whether all trackers are full connected + /// + public bool Connected => DataTrackers.All(x => x.Connected); + + /// + /// ctor + /// + public UserDataTracker( + ILogger logger, + string exchange, + UserDataTrackerConfig config, + string? userIdentifier) + { + if (config.OnlyTrackProvidedSymbols && !config.TrackedSymbols.Any()) + throw new ArgumentException(nameof(config.TrackedSymbols), "Conflicting options; `OnlyTrackProvidedSymbols` but no symbols specific in `TrackedSymbols`"); + + _logger = logger; + + Exchange = exchange; + UserIdentifier = userIdentifier; + } + + /// + /// Start the data tracker + /// + public async Task StartAsync() + { + foreach(var tracker in DataTrackers) + tracker.OnConnectedChange += (x) => OnConnectedChange?.Invoke(tracker.DataType, x); + + var result = await DoStartAsync().ConfigureAwait(false); + if (!result) + return result; + + var tasks = new List>(); + foreach (var dataTracker in DataTrackers) + tasks.Add(dataTracker.StartAsync(_listenKey)); + + await Task.WhenAll(tasks).ConfigureAwait(false); + if (!tasks.All(x => x.Result.Success)) + { + await Task.WhenAll(DataTrackers.Select(x => x.StopAsync())).ConfigureAwait(false); + return tasks.First(x => !x.Result.Success).Result; + } + + return CallResult.SuccessResult; + } + + /// + /// Implementation specific start logic + /// + protected abstract Task DoStartAsync(); + + /// + /// Stop the data tracker + /// + public async Task StopAsync() + { + _logger.LogDebug("Stopping UserDataTracker"); + var tasks = new List(); + foreach (var dataTracker in DataTrackers) + tasks.Add(dataTracker.StopAsync()); + + await Task.WhenAll(tasks).ConfigureAwait(false); + _logger.LogDebug("Stopped UserDataTracker"); + } + } +} diff --git a/CryptoExchange.Net/Trackers/UserData/UserFuturesDataTracker.cs b/CryptoExchange.Net/Trackers/UserData/UserFuturesDataTracker.cs new file mode 100644 index 0000000..2e05306 --- /dev/null +++ b/CryptoExchange.Net/Trackers/UserData/UserFuturesDataTracker.cs @@ -0,0 +1,124 @@ +using CryptoExchange.Net.Objects; +using CryptoExchange.Net.SharedApis; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; +using System.Linq; +using CryptoExchange.Net.Trackers.UserData.ItemTrackers; +using CryptoExchange.Net.Trackers.UserData.Interfaces; +using CryptoExchange.Net.Trackers.UserData.Objects; + +namespace CryptoExchange.Net.Trackers.UserData +{ + /// + /// User futures data tracker + /// + public abstract class UserFuturesDataTracker : UserDataTracker, IUserFuturesDataTracker + { + private readonly IFuturesSymbolRestClient _symbolClient; + private readonly IListenKeyRestClient? _listenKeyClient; + private readonly ExchangeParameters? _exchangeParameters; + + /// + protected override UserDataItemTracker[] DataTrackers { get; } + /// + /// Balances tracker + /// + public IUserDataTracker Balances { get; } + /// + /// Orders tracker + /// + public IUserDataTracker Orders { get; } + /// + /// Positions tracker + /// + public IUserDataTracker Positions { get; } + /// + /// Trades tracker + /// + public IUserDataTracker? Trades { get; } + + /// + /// Whether websocket position updates are full snapshots and missing positions should be considered 0 + /// + protected abstract bool WebsocketPositionUpdatesAreFullSnapshots { get; } + + /// + /// ctor + /// + public UserFuturesDataTracker( + ILogger logger, + IFuturesSymbolRestClient symbolRestClient, + IListenKeyRestClient? listenKeyRestClient, + IBalanceRestClient balanceRestClient, + IBalanceSocketClient? balanceSocketClient, + IFuturesOrderRestClient futuresOrderRestClient, + IFuturesOrderSocketClient? futuresOrderSocketClient, + IUserTradeSocketClient? userTradeSocketClient, + IPositionSocketClient? positionSocketClient, + string? userIdentifier, + FuturesUserDataTrackerConfig config, + SharedAccountType? accountType = null, + ExchangeParameters? exchangeParameters = null) : base(logger, symbolRestClient.Exchange, config, userIdentifier) + { + // create trackers + _symbolClient = symbolRestClient; + _listenKeyClient = listenKeyRestClient; + _exchangeParameters = exchangeParameters; + + var trackers = new List(); + + var balanceAccountType = accountType ?? SharedAccountType.PerpetualLinearFutures; + var balanceTracker = new BalanceTracker(logger, balanceRestClient, balanceSocketClient, balanceAccountType, config.BalancesConfig, exchangeParameters); + Balances = balanceTracker; + trackers.Add(balanceTracker); + + var orderTracker = new FuturesOrderTracker(logger, futuresOrderRestClient, futuresOrderSocketClient, config.OrdersConfig, config.TrackedSymbols, config.OnlyTrackProvidedSymbols, exchangeParameters); + Orders = orderTracker; + trackers.Add(orderTracker); + + var positionTracker = new PositionTracker(logger, futuresOrderRestClient, positionSocketClient, config.PositionConfig, config.TrackedSymbols, config.OnlyTrackProvidedSymbols, WebsocketPositionUpdatesAreFullSnapshots, exchangeParameters); + Positions = positionTracker; + trackers.Add(positionTracker); + + if (config.TrackTrades) + { + var tradeTracker = new FuturesUserTradeTracker(logger, futuresOrderRestClient, userTradeSocketClient, config.UserTradesConfig, config.TrackedSymbols, config.OnlyTrackProvidedSymbols, exchangeParameters); + Trades = tradeTracker; + trackers.Add(tradeTracker); + + orderTracker.OnTradeUpdate += tradeTracker.HandleUpdateAsync; + tradeTracker.GetTrackedOrderIds = () => orderTracker.Values.Select(x => x.OrderId).ToArray(); + } + + DataTrackers = trackers.ToArray(); + } + + /// + protected override async Task DoStartAsync() + { + var symbolResult = await _symbolClient.GetFuturesSymbolsAsync(new GetSymbolsRequest(exchangeParameters: _exchangeParameters)).ConfigureAwait(false); + if (!symbolResult) + { + _logger.LogWarning("Failed to start UserFuturesDataTracker; symbols request failed: {Error}", symbolResult.Error); + return symbolResult; + } + + if (_listenKeyClient != null) + { + var lkResult = await _listenKeyClient.StartListenKeyAsync(new StartListenKeyRequest(exchangeParameters: _exchangeParameters)).ConfigureAwait(false); + if (!lkResult) + { + _logger.LogWarning("Failed to start UserFuturesDataTracker; listen key request failed: {Error}", lkResult.Error); + return lkResult; + } + + _listenKey = lkResult.Data; + } + + return CallResult.SuccessResult; + } + } +} diff --git a/CryptoExchange.Net/Trackers/UserData/UserSpotDataTracker.cs b/CryptoExchange.Net/Trackers/UserData/UserSpotDataTracker.cs new file mode 100644 index 0000000..22f78dd --- /dev/null +++ b/CryptoExchange.Net/Trackers/UserData/UserSpotDataTracker.cs @@ -0,0 +1,100 @@ +using CryptoExchange.Net.Objects; +using CryptoExchange.Net.SharedApis; +using Microsoft.Extensions.Logging; +using System.Collections.Generic; +using System.Threading.Tasks; +using System.Linq; +using CryptoExchange.Net.Trackers.UserData.Interfaces; +using CryptoExchange.Net.Trackers.UserData.Objects; +using CryptoExchange.Net.Trackers.UserData.ItemTrackers; + +namespace CryptoExchange.Net.Trackers.UserData +{ + /// + /// Spot user data tracker + /// + public class UserSpotDataTracker : UserDataTracker, IUserSpotDataTracker + { + private readonly ISpotSymbolRestClient _symbolClient; + private readonly IListenKeyRestClient? _listenKeyClient; + private readonly ExchangeParameters? _exchangeParameters; + + /// + protected override UserDataItemTracker[] DataTrackers { get; } + /// + public IUserDataTracker Balances { get; } + /// + public IUserDataTracker Orders { get; } + /// + public IUserDataTracker? Trades { get; } + + /// + /// ctor + /// + public UserSpotDataTracker( + ILogger logger, + ISpotSymbolRestClient symbolRestClient, + IListenKeyRestClient? listenKeyRestClient, + IBalanceRestClient balanceRestClient, + IBalanceSocketClient? balanceSocketClient, + ISpotOrderRestClient spotOrderRestClient, + ISpotOrderSocketClient? spotOrderSocketClient, + IUserTradeSocketClient? userTradeSocketClient, + string? userIdentifier, + SpotUserDataTrackerConfig config, + ExchangeParameters? exchangeParameters = null) : base(logger, symbolRestClient.Exchange, config, userIdentifier) + { + // create trackers + _symbolClient = symbolRestClient; + _listenKeyClient = listenKeyRestClient; + _exchangeParameters = exchangeParameters; + + var trackers = new List(); + + var balanceTracker = new BalanceTracker(logger, balanceRestClient, balanceSocketClient, SharedAccountType.Spot, config.BalancesConfig, exchangeParameters); + Balances = balanceTracker; + trackers.Add(balanceTracker); + + var orderTracker = new SpotOrderTracker(logger, spotOrderRestClient, spotOrderSocketClient, config.OrdersConfig, config.TrackedSymbols, config.OnlyTrackProvidedSymbols, exchangeParameters); + Orders = orderTracker; + trackers.Add(orderTracker); + + if (config.TrackTrades) + { + var tradeTracker = new SpotUserTradeTracker(logger, spotOrderRestClient, userTradeSocketClient, config.UserTradesConfig, config.TrackedSymbols, config.OnlyTrackProvidedSymbols, exchangeParameters); + Trades = tradeTracker; + trackers.Add(tradeTracker); + + orderTracker.OnTradeUpdate += tradeTracker.HandleUpdateAsync; + tradeTracker.GetTrackedOrderIds = () => orderTracker.Values.Select(x => x.OrderId).ToArray(); + } + + DataTrackers = trackers.ToArray(); + } + + /// + protected override async Task DoStartAsync() + { + var symbolResult = await _symbolClient.GetSpotSymbolsAsync(new GetSymbolsRequest(exchangeParameters: _exchangeParameters)).ConfigureAwait(false); + if (!symbolResult) + { + _logger.LogWarning("Failed to start UserSpotDataTracker; symbols request failed: {Error}", symbolResult.Error); + return symbolResult; + } + + if (_listenKeyClient != null) + { + var lkResult = await _listenKeyClient.StartListenKeyAsync(new StartListenKeyRequest(exchangeParameters: _exchangeParameters)).ConfigureAwait(false); + if (!lkResult) + { + _logger.LogWarning("Failed to start UserSpotDataTracker; listen key request failed: {Error}", lkResult.Error); + return lkResult; + } + + _listenKey = lkResult.Data; + } + + return CallResult.SuccessResult; + } + } +}