1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-06-10 17:36:19 +00:00

Added CancellationToken support on Common client interface and SymbolOrderBook, improved SymbolOrderBook start/stop robustness

This commit is contained in:
Jan Korf 2022-02-05 20:28:08 +01:00
parent baa23c2ecc
commit b18204a52d
3 changed files with 80 additions and 25 deletions

View File

@ -2,6 +2,7 @@
using CryptoExchange.Net.Objects;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Interfaces
@ -28,29 +29,32 @@ namespace CryptoExchange.Net.Interfaces
/// <summary>
/// Get the symbol name based on a base and quote asset
/// </summary>
/// <param name="baseAsset"></param>
/// <param name="quoteAsset"></param>
/// <param name="baseAsset">The base asset</param>
/// <param name="quoteAsset">The quote asset</param>
/// <returns></returns>
string GetSymbolName(string baseAsset, string quoteAsset);
/// <summary>
/// Get a list of symbols for the exchange
/// </summary>
/// <param name="ct">[Optional] Cancellation token for cancelling the request</param>
/// <returns></returns>
Task<WebCallResult<IEnumerable<Symbol>>> GetSymbolsAsync();
Task<WebCallResult<IEnumerable<Symbol>>> GetSymbolsAsync(CancellationToken ct = default);
/// <summary>
/// Get a ticker for the exchange
/// </summary>
/// <param name="symbol">The symbol to get klines for</param>
/// <param name="ct">[Optional] Cancellation token for cancelling the request</param>
/// <returns></returns>
Task<WebCallResult<Ticker>> GetTickerAsync(string symbol);
Task<WebCallResult<Ticker>> GetTickerAsync(string symbol, CancellationToken ct = default);
/// <summary>
/// Get a list of tickers for the exchange
/// </summary>
/// <param name="ct">[Optional] Cancellation token for cancelling the request</param>
/// <returns></returns>
Task<WebCallResult<IEnumerable<Ticker>>> GetTickersAsync();
Task<WebCallResult<IEnumerable<Ticker>>> GetTickersAsync(CancellationToken ct = default);
/// <summary>
/// Get a list of candles for a given symbol on the exchange
@ -60,67 +64,76 @@ namespace CryptoExchange.Net.Interfaces
/// <param name="startTime">[Optional] Start time to retrieve klines for</param>
/// <param name="endTime">[Optional] End time to retrieve klines for</param>
/// <param name="limit">[Optional] Max number of results</param>
/// <param name="ct">[Optional] Cancellation token for cancelling the request</param>
/// <returns></returns>
Task<WebCallResult<IEnumerable<Kline>>> GetKlinesAsync(string symbol, TimeSpan timespan, DateTime? startTime = null, DateTime? endTime = null, int? limit = null);
Task<WebCallResult<IEnumerable<Kline>>> GetKlinesAsync(string symbol, TimeSpan timespan, DateTime? startTime = null, DateTime? endTime = null, int? limit = null, CancellationToken ct = default);
/// <summary>
/// Get the order book for a symbol
/// </summary>
/// <param name="symbol">The symbol to get the book for</param>
/// <param name="ct">[Optional] Cancellation token for cancelling the request</param>
/// <returns></returns>
Task<WebCallResult<CommonObjects.OrderBook>> GetOrderBookAsync(string symbol);
Task<WebCallResult<CommonObjects.OrderBook>> GetOrderBookAsync(string symbol, CancellationToken ct = default);
/// <summary>
/// The recent trades for a symbol
/// </summary>
/// <param name="symbol">The symbol to get the trades for</param>
/// <param name="ct">[Optional] Cancellation token for cancelling the request</param>
/// <returns></returns>
Task<WebCallResult<IEnumerable<Trade>>> GetRecentTradesAsync(string symbol);
Task<WebCallResult<IEnumerable<Trade>>> GetRecentTradesAsync(string symbol, CancellationToken ct = default);
/// <summary>
/// Get balances
/// </summary>
/// <param name="accountId">[Optional] The account id to retrieve balances for, required for some exchanges, ignored otherwise</param>
/// <param name="ct">[Optional] Cancellation token for cancelling the request</param>
/// <returns></returns>
Task<WebCallResult<IEnumerable<Balance>>> GetBalancesAsync(string? accountId = null);
Task<WebCallResult<IEnumerable<Balance>>> GetBalancesAsync(string? accountId = null, CancellationToken ct = default);
/// <summary>
/// Get an order by id
/// </summary>
/// <param name="orderId">The id</param>
/// <param name="symbol">[Optional] The symbol the order is on, required for some exchanges, ignored otherwise</param>
/// <param name="ct">[Optional] Cancellation token for cancelling the request</param>
/// <returns></returns>
Task<WebCallResult<Order>> GetOrderAsync(string orderId, string? symbol = null);
Task<WebCallResult<Order>> GetOrderAsync(string orderId, string? symbol = null, CancellationToken ct = default);
/// <summary>
/// Get trades for an order by id
/// </summary>
/// <param name="orderId">The id</param>
/// <param name="symbol">[Optional] The symbol the order is on, required for some exchanges, ignored otherwise</param>
/// <param name="ct">[Optional] Cancellation token for cancelling the request</param>
/// <returns></returns>
Task<WebCallResult<IEnumerable<UserTrade>>> GetOrderTradesAsync(string orderId, string? symbol = null);
Task<WebCallResult<IEnumerable<UserTrade>>> GetOrderTradesAsync(string orderId, string? symbol = null, CancellationToken ct = default);
/// <summary>
/// Get a list of open orders
/// </summary>
/// <param name="symbol">[Optional] The symbol to get open orders for, required for some exchanges, ignored otherwise</param>
/// <param name="ct">[Optional] Cancellation token for cancelling the request</param>
/// <returns></returns>
Task<WebCallResult<IEnumerable<Order>>> GetOpenOrdersAsync(string? symbol = null);
Task<WebCallResult<IEnumerable<Order>>> GetOpenOrdersAsync(string? symbol = null, CancellationToken ct = default);
/// <summary>
/// Get a list of closed orders
/// </summary>
/// <param name="symbol">[Optional] The symbol to get closed orders for, required for some exchanges, ignored otherwise</param>
/// <param name="ct">[Optional] Cancellation token for cancelling the request</param>
/// <returns></returns>
Task<WebCallResult<IEnumerable<Order>>> GetClosedOrdersAsync(string? symbol = null);
Task<WebCallResult<IEnumerable<Order>>> GetClosedOrdersAsync(string? symbol = null, CancellationToken ct = default);
/// <summary>
/// Cancel an order by id
/// </summary>
/// <param name="orderId">The id</param>
/// <param name="symbol">[Optional] The symbol the order is on, required for some exchanges, ignored otherwise</param>
/// <param name="ct">[Optional] Cancellation token for cancelling the request</param>
/// <returns></returns>
Task<WebCallResult<OrderId>> CancelOrderAsync(string orderId, string? symbol = null);
Task<WebCallResult<OrderId>> CancelOrderAsync(string orderId, string? symbol = null, CancellationToken ct = default);
}
/// <summary>
@ -138,14 +151,16 @@ namespace CryptoExchange.Net.Interfaces
/// <param name="price">The price of the order, only for limit orders</param>
/// <param name="accountId">[Optional] The account id to place the order on, required for some exchanges, ignored otherwise</param>
/// <param name="leverage">[Optional] Leverage for this order. This is needed for some exchanges. For exchanges where this is not needed this parameter is ignored (and should be set before hand)</param>
/// <param name="ct">[Optional] Cancellation token for cancelling the request</param>
/// <returns>The id of the resulting order</returns>
Task<WebCallResult<OrderId>> PlaceOrderAsync(string symbol, CommonOrderSide side, CommonOrderType type, decimal quantity, decimal? price = null, int? leverage = null, string? accountId = null);
Task<WebCallResult<OrderId>> PlaceOrderAsync(string symbol, CommonOrderSide side, CommonOrderType type, decimal quantity, decimal? price = null, int? leverage = null, string? accountId = null, CancellationToken ct = default);
/// <summary>
/// Get position
/// </summary>
/// <param name="ct">[Optional] Cancellation token for cancelling the request</param>
/// <returns></returns>
Task<WebCallResult<IEnumerable<Position>>> GetPositionsAsync();
Task<WebCallResult<IEnumerable<Position>>> GetPositionsAsync(CancellationToken ct = default);
}
/// <summary>
@ -162,7 +177,8 @@ namespace CryptoExchange.Net.Interfaces
/// <param name="quantity">The quantity of the order</param>
/// <param name="price">The price of the order, only for limit orders</param>
/// <param name="accountId">[Optional] The account id to place the order on, required for some exchanges, ignored otherwise</param>
/// <param name="ct">[Optional] Cancellation token for cancelling the request</param>
/// <returns>The id of the resulting order</returns>
Task<WebCallResult<OrderId>> PlaceOrderAsync(string symbol, CommonOrderSide side, CommonOrderType type, decimal quantity, decimal? price = null, string? accountId = null);
Task<WebCallResult<OrderId>> PlaceOrderAsync(string symbol, CommonOrderSide side, CommonOrderType type, decimal quantity, decimal? price = null, string? accountId = null, CancellationToken ct = default);
}
}

View File

@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using CryptoExchange.Net.Objects;
@ -88,8 +89,9 @@ namespace CryptoExchange.Net.Interfaces
/// <summary>
/// Start connecting and synchronizing the order book
/// </summary>
/// <param name="ct">A cancellation token to stop the order book when canceled</param>
/// <returns></returns>
Task<CallResult<bool>> StartAsync();
Task<CallResult<bool>> StartAsync(CancellationToken? ct = null);
/// <summary>
/// Stop syncing the order book

View File

@ -26,6 +26,7 @@ namespace CryptoExchange.Net.OrderBook
private bool _stopProcessing;
private Task? _processTask;
private CancellationTokenSource _cts;
private readonly AsyncResetEvent _queueEvent;
private readonly ConcurrentQueue<object> _processQueue;
@ -220,22 +221,42 @@ namespace CryptoExchange.Net.OrderBook
}
/// <inheritdoc/>
public async Task<CallResult<bool>> StartAsync()
public async Task<CallResult<bool>> StartAsync(CancellationToken? ct = null)
{
if (Status != OrderBookStatus.Disconnected)
throw new InvalidOperationException($"Can't start book unless state is {OrderBookStatus.Connecting}. Was {Status}");
log.Write(LogLevel.Debug, $"{Id} order book {Symbol} starting");
_cts = new CancellationTokenSource();
ct?.Register(async () =>
{
_cts.Cancel();
await StopAsync().ConfigureAwait(false);
}, false);
// Clear any previous messages
while (_processQueue.TryDequeue(out _)) { }
processBuffer.Clear();
bookSet = false;
Status = OrderBookStatus.Connecting;
_processTask = Task.Factory.StartNew(ProcessQueue, TaskCreationOptions.LongRunning);
var startResult = await DoStartAsync().ConfigureAwait(false);
var startResult = await DoStartAsync(_cts.Token).ConfigureAwait(false);
if (!startResult)
{
Status = OrderBookStatus.Disconnected;
return new CallResult<bool>(startResult.Error!);
}
if (_cts.IsCancellationRequested)
{
log.Write(LogLevel.Debug, $"{Id} order book {Symbol} stopped while starting");
await startResult.Data.CloseAsync().ConfigureAwait(false);
Status = OrderBookStatus.Disconnected;
return new CallResult<bool>(new CancellationRequestedError());
}
_subscription = startResult.Data;
_subscription.ConnectionLost += () =>
{
@ -260,6 +281,7 @@ namespace CryptoExchange.Net.OrderBook
{
log.Write(LogLevel.Debug, $"{Id} order book {Symbol} stopping");
Status = OrderBookStatus.Disconnected;
_cts.Cancel();
_queueEvent.Set();
if (_processTask != null)
await _processTask.ConfigureAwait(false);
@ -305,7 +327,7 @@ namespace CryptoExchange.Net.OrderBook
/// and setting the initial order book
/// </summary>
/// <returns></returns>
protected abstract Task<CallResult<UpdateSubscription>> DoStartAsync();
protected abstract Task<CallResult<UpdateSubscription>> DoStartAsync(CancellationToken ct);
/// <summary>
/// Reset the order book
@ -316,7 +338,7 @@ namespace CryptoExchange.Net.OrderBook
/// Resync the order book
/// </summary>
/// <returns></returns>
protected abstract Task<CallResult<bool>> DoResyncAsync();
protected abstract Task<CallResult<bool>> DoResyncAsync(CancellationToken ct);
/// <summary>
/// Implementation for validating a checksum value with the current order book. If checksum validation fails (returns false)
@ -459,16 +481,25 @@ namespace CryptoExchange.Net.OrderBook
/// Wait until the order book snapshot has been set
/// </summary>
/// <param name="timeout">Max wait time</param>
/// <param name="ct">Cancellation token</param>
/// <returns></returns>
protected async Task<CallResult<bool>> WaitForSetOrderBookAsync(int timeout)
protected async Task<CallResult<bool>> WaitForSetOrderBookAsync(int timeout, CancellationToken ct)
{
var startWait = DateTime.UtcNow;
while (!bookSet && Status == OrderBookStatus.Syncing)
{
if(ct.IsCancellationRequested)
return new CallResult<bool>(new CancellationRequestedError());
if ((DateTime.UtcNow - startWait).TotalMilliseconds > timeout)
return new CallResult<bool>(new ServerError("Timeout while waiting for data"));
await Task.Delay(10).ConfigureAwait(false);
try
{
await Task.Delay(10, ct).ConfigureAwait(false);
}
catch (OperationCanceledException)
{ }
}
return new CallResult<bool>(true);
@ -533,7 +564,7 @@ namespace CryptoExchange.Net.OrderBook
if (Status != OrderBookStatus.Syncing)
return;
var resyncResult = await DoResyncAsync().ConfigureAwait(false);
var resyncResult = await DoResyncAsync(_cts.Token).ConfigureAwait(false);
success = resyncResult;
}
@ -665,6 +696,12 @@ namespace CryptoExchange.Net.OrderBook
Status = OrderBookStatus.Syncing;
_ = Task.Run(async () =>
{
if(_subscription == null)
{
Status = OrderBookStatus.Disconnected;
return;
}
await _subscription!.UnsubscribeAsync().ConfigureAwait(false);
Reset();
_stopProcessing = false;