1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2026-02-16 14:13:46 +00:00
This commit is contained in:
JKorf 2026-01-09 09:02:25 +01:00
parent 77abdfa81c
commit 41b3decd9e
3 changed files with 68 additions and 39 deletions

View File

@ -96,7 +96,7 @@ namespace CryptoExchange.Net.Logging.Extensions
_orderBookDataSet = LoggerMessage.Define<string, string, long, long, long>( _orderBookDataSet = LoggerMessage.Define<string, string, long, long, long>(
LogLevel.Debug, LogLevel.Debug,
new EventId(5012, "OrderBookDataSet"), new EventId(5012, "OrderBookDataSet"),
"{Api} order book {Symbol} data set: {BidCount} bids, {AskCount} asks. #{EndUpdateId}"); "{Api} order book {Symbol} snapshot set: {BidCount} bids, {AskCount} asks. #{EndUpdateId}");
_orderBookUpdateBuffered = LoggerMessage.Define<string, string, long, long, long, long>( _orderBookUpdateBuffered = LoggerMessage.Define<string, string, long, long, long, long>(
LogLevel.Trace, LogLevel.Trace,

View File

@ -3,7 +3,7 @@ using System;
namespace CryptoExchange.Net.OrderBook namespace CryptoExchange.Net.OrderBook
{ {
internal class ProcessQueueItem internal class OrderBookUpdate
{ {
public DateTime? LocalDataTime { get; set; } public DateTime? LocalDataTime { get; set; }
public DateTime? ServerDataTime { get; set; } public DateTime? ServerDataTime { get; set; }
@ -13,7 +13,7 @@ namespace CryptoExchange.Net.OrderBook
public ISymbolOrderBookEntry[] Asks { get; set; } = Array.Empty<ISymbolOrderBookEntry>(); public ISymbolOrderBookEntry[] Asks { get; set; } = Array.Empty<ISymbolOrderBookEntry>();
} }
internal class InitialOrderBookItem internal class OrderBookSnapshot
{ {
public DateTime? LocalDataTime { get; set; } public DateTime? LocalDataTime { get; set; }
public DateTime? ServerDataTime { get; set; } public DateTime? ServerDataTime { get; set; }
@ -22,7 +22,7 @@ namespace CryptoExchange.Net.OrderBook
public ISymbolOrderBookEntry[] Asks { get; set; } = Array.Empty<ISymbolOrderBookEntry>(); public ISymbolOrderBookEntry[] Asks { get; set; } = Array.Empty<ISymbolOrderBookEntry>();
} }
internal class ChecksumItem internal class OrderBookChecksum
{ {
public long? SequenceNumber { get; set; } public long? SequenceNumber { get; set; }
public int Checksum { get; set; } public int Checksum { get; set; }

View File

@ -77,7 +77,10 @@ namespace CryptoExchange.Net.OrderBook
/// the book will resynchronize as it is deemed out of sync /// the book will resynchronize as it is deemed out of sync
/// </summary> /// </summary>
protected bool _sequencesAreConsecutive; protected bool _sequencesAreConsecutive;
protected bool _skipSequenceCheckFirstUpdateAfterSnapshotSet;
private bool _firstUpdateAfterSnapshotDone;
/// <summary> /// <summary>
/// Whether levels should be strictly enforced. For example, when an order book has 25 levels and a new update comes in which pushes /// Whether levels should be strictly enforced. For example, when an order book has 25 levels and a new update comes in which pushes
/// the current level 25 ask out of the top 25, should the level 26 entry be removed from the book or does the server handle this /// the current level 25 ask out of the top 25, should the level 26 entry be removed from the book or does the server handle this
@ -267,6 +270,7 @@ namespace CryptoExchange.Net.OrderBook
_processBuffer.Clear(); _processBuffer.Clear();
_bookSet = false; _bookSet = false;
_firstUpdateAfterSnapshotDone = false;
Status = OrderBookStatus.Connecting; Status = OrderBookStatus.Connecting;
_processTask = Task.Factory.StartNew(ProcessQueue, TaskCreationOptions.LongRunning); _processTask = Task.Factory.StartNew(ProcessQueue, TaskCreationOptions.LongRunning);
@ -435,7 +439,7 @@ namespace CryptoExchange.Net.OrderBook
DateTime? localDataTime = null) DateTime? localDataTime = null)
{ {
_processQueue.Enqueue( _processQueue.Enqueue(
new InitialOrderBookItem new OrderBookSnapshot
{ {
LocalDataTime = localDataTime, LocalDataTime = localDataTime,
ServerDataTime = serverDataTime, ServerDataTime = serverDataTime,
@ -449,25 +453,25 @@ namespace CryptoExchange.Net.OrderBook
/// <summary> /// <summary>
/// Add an update to the process queue. Updates the book by providing changed bids and asks, along with an update number which should be higher than the previous update numbers /// Add an update to the process queue. Updates the book by providing changed bids and asks, along with an update number which should be higher than the previous update numbers
/// </summary> /// </summary>
/// <param name="updateId">The sequence number</param> /// <param name="sequenceNumber">The sequence number</param>
/// <param name="bids">List of updated/new bids</param> /// <param name="bids">List of updated/new bids</param>
/// <param name="asks">List of updated/new asks</param> /// <param name="asks">List of updated/new asks</param>
/// <param name="serverDataTime">Server data timestamp</param> /// <param name="serverDataTime">Server data timestamp</param>
/// <param name="localDataTime">local data timestamp</param> /// <param name="localDataTime">local data timestamp</param>
protected void UpdateOrderBook( protected void UpdateOrderBook(
long updateId, long sequenceNumber,
ISymbolOrderBookEntry[] bids, ISymbolOrderBookEntry[] bids,
ISymbolOrderBookEntry[] asks, ISymbolOrderBookEntry[] asks,
DateTime? serverDataTime = null, DateTime? serverDataTime = null,
DateTime? localDataTime = null) DateTime? localDataTime = null)
{ {
_processQueue.Enqueue( _processQueue.Enqueue(
new ProcessQueueItem new OrderBookUpdate
{ {
LocalDataTime = localDataTime, LocalDataTime = localDataTime,
ServerDataTime = serverDataTime, ServerDataTime = serverDataTime,
StartSequenceNumber = updateId, StartSequenceNumber = sequenceNumber,
EndSequenceNumber = updateId, EndSequenceNumber = sequenceNumber,
Asks = asks, Asks = asks,
Bids = bids Bids = bids
}); });
@ -492,7 +496,7 @@ namespace CryptoExchange.Net.OrderBook
DateTime? localDataTime = null) DateTime? localDataTime = null)
{ {
_processQueue.Enqueue( _processQueue.Enqueue(
new ProcessQueueItem new OrderBookUpdate
{ {
LocalDataTime = localDataTime, LocalDataTime = localDataTime,
ServerDataTime = serverDataTime, ServerDataTime = serverDataTime,
@ -521,7 +525,7 @@ namespace CryptoExchange.Net.OrderBook
var lowest = Math.Min(bids.Any() ? bids.Min(b => b.Sequence) : long.MaxValue, asks.Any() ? asks.Min(a => a.Sequence) : long.MaxValue); var lowest = Math.Min(bids.Any() ? bids.Min(b => b.Sequence) : long.MaxValue, asks.Any() ? asks.Min(a => a.Sequence) : long.MaxValue);
_processQueue.Enqueue( _processQueue.Enqueue(
new ProcessQueueItem new OrderBookUpdate
{ {
LocalDataTime = localDataTime, LocalDataTime = localDataTime,
ServerDataTime = serverDataTime, ServerDataTime = serverDataTime,
@ -537,10 +541,10 @@ namespace CryptoExchange.Net.OrderBook
/// Add a checksum value to the process queue /// Add a checksum value to the process queue
/// </summary> /// </summary>
/// <param name="checksum">The checksum value</param> /// <param name="checksum">The checksum value</param>
/// <param name="sequenceNumber">The id of the update</param> /// <param name="sequenceNumber">The sequence number of the message if it's a separate message with separate number</param>
protected void AddChecksum(int checksum, long? sequenceNumber = null) protected void AddChecksum(int checksum, long? sequenceNumber = null)
{ {
_processQueue.Enqueue(new ChecksumItem() { Checksum = checksum, SequenceNumber = sequenceNumber }); _processQueue.Enqueue(new OrderBookChecksum() { Checksum = checksum, SequenceNumber = sequenceNumber });
_queueEvent.Set(); _queueEvent.Set();
} }
@ -553,7 +557,7 @@ namespace CryptoExchange.Net.OrderBook
_logger.OrderBookProcessingBufferedUpdates(Api, Symbol, _processBuffer.Count); _logger.OrderBookProcessingBufferedUpdates(Api, Symbol, _processBuffer.Count);
foreach (var bufferEntry in _processBuffer) foreach (var bufferEntry in _processBuffer)
ProcessRangeUpdates(bufferEntry.FirstUpdateId, bufferEntry.LastUpdateId, bufferEntry.Bids, bufferEntry.Asks); ProcessUpdate(bufferEntry.FirstUpdateId, bufferEntry.LastUpdateId, bufferEntry.Bids, bufferEntry.Asks, true);
_processBuffer.Clear(); _processBuffer.Clear();
} }
@ -563,7 +567,7 @@ namespace CryptoExchange.Net.OrderBook
/// </summary> /// </summary>
/// <param name="type">Type of entry</param> /// <param name="type">Type of entry</param>
/// <param name="entry">The entry</param> /// <param name="entry">The entry</param>
protected virtual bool ProcessUpdate(OrderBookEntryType type, ISymbolOrderBookEntry entry) protected virtual bool UpdateValue(OrderBookEntryType type, ISymbolOrderBookEntry entry)
{ {
UpdateTime = DateTime.UtcNow; UpdateTime = DateTime.UtcNow;
var listToChange = type == OrderBookEntryType.Ask ? _asks : _bids; var listToChange = type == OrderBookEntryType.Ask ? _asks : _bids;
@ -703,6 +707,12 @@ namespace CryptoExchange.Net.OrderBook
}); });
} }
protected void TriggerResubscribe()
{
_stopProcessing = true;
Resubscribe();
}
private void CheckBestOffersChanged(ISymbolOrderBookEntry prevBestBid, ISymbolOrderBookEntry prevBestAsk) private void CheckBestOffersChanged(ISymbolOrderBookEntry prevBestBid, ISymbolOrderBookEntry prevBestAsk)
{ {
var (bestBid, bestAsk) = BestOffers; var (bestBid, bestAsk) = BestOffers;
@ -719,8 +729,11 @@ namespace CryptoExchange.Net.OrderBook
// Clear queue // Clear queue
while (_processQueue.TryDequeue(out _)) { } while (_processQueue.TryDequeue(out _)) { }
LastSequenceNumber = 0;
_processBuffer.Clear(); _processBuffer.Clear();
_bookSet = false; _bookSet = false;
_firstUpdateAfterSnapshotDone = false;
DoReset(); DoReset();
} }
@ -758,17 +771,17 @@ namespace CryptoExchange.Net.OrderBook
continue; continue;
} }
if (item is InitialOrderBookItem iobi) if (item is OrderBookSnapshot snapshot)
ProcessInitialOrderBookItem(iobi); ProcessOrderBookSnapshot(snapshot);
if (item is ProcessQueueItem pqi) if (item is OrderBookUpdate update)
ProcessQueueItem(pqi); ProcessQueueItem(update);
else if (item is ChecksumItem ci) else if (item is OrderBookChecksum checksum)
ProcessChecksum(ci); ProcessChecksum(checksum);
} }
} }
} }
private void ProcessInitialOrderBookItem(InitialOrderBookItem item) private void ProcessOrderBookSnapshot(OrderBookSnapshot item)
{ {
lock (_bookLock) lock (_bookLock)
{ {
@ -791,12 +804,13 @@ namespace CryptoExchange.Net.OrderBook
_logger.OrderBookDataSet(Api, Symbol, BidCount, AskCount, item.SequenceNumber); _logger.OrderBookDataSet(Api, Symbol, BidCount, AskCount, item.SequenceNumber);
CheckProcessBuffer(); CheckProcessBuffer();
OnOrderBookUpdate?.Invoke((item.Bids.ToArray(), item.Asks.ToArray())); OnOrderBookUpdate?.Invoke((item.Bids.ToArray(), item.Asks.ToArray()));
OnBestOffersChanged?.Invoke((BestBid, BestAsk)); OnBestOffersChanged?.Invoke((BestBid, BestAsk));
} }
} }
private void ProcessQueueItem(ProcessQueueItem item) private void ProcessQueueItem(OrderBookUpdate item)
{ {
lock (_bookLock) lock (_bookLock)
{ {
@ -818,7 +832,7 @@ namespace CryptoExchange.Net.OrderBook
{ {
CheckProcessBuffer(); CheckProcessBuffer();
var (prevBestBid, prevBestAsk) = BestOffers; var (prevBestBid, prevBestAsk) = BestOffers;
ProcessRangeUpdates(item.StartSequenceNumber, item.EndSequenceNumber, item.Bids, item.Asks); ProcessUpdate(item.StartSequenceNumber, item.EndSequenceNumber, item.Bids, item.Asks, false);
if (_asks.Count == 0 || _bids.Count == 0) if (_asks.Count == 0 || _bids.Count == 0)
return; return;
@ -840,7 +854,7 @@ namespace CryptoExchange.Net.OrderBook
} }
} }
private void ProcessChecksum(ChecksumItem ci) private void ProcessChecksum(OrderBookChecksum ci)
{ {
lock (_bookLock) lock (_bookLock)
{ {
@ -900,33 +914,47 @@ namespace CryptoExchange.Net.OrderBook
}); });
} }
private void ProcessRangeUpdates( private void ProcessUpdate(
long updateSequenceNumberStart, long updateSequenceNumberStart,
long updateSequenceNumberEnd, long updateSequenceNumberEnd,
IEnumerable<ISymbolOrderBookEntry> bids, IEnumerable<ISymbolOrderBookEntry> bids,
IEnumerable<ISymbolOrderBookEntry> asks) IEnumerable<ISymbolOrderBookEntry> asks,
bool fromBuffer)
{ {
if (updateSequenceNumberEnd <= LastSequenceNumber) if (updateSequenceNumberEnd <= LastSequenceNumber)
{ {
// We're already past this update if (fromBuffer)
_logger.OrderBookUpdateSkipped(Api, Symbol, updateSequenceNumberEnd, LastSequenceNumber); {
return; // We're already past this update, discard buffered update
_logger.OrderBookUpdateSkipped(Api, Symbol, updateSequenceNumberEnd, LastSequenceNumber);
return;
}
else
{
// Somehow this sequence number is before the last sequence number
_stopProcessing = true;
Resubscribe();
return;
}
} }
if (_sequencesAreConsecutive && updateSequenceNumberStart != LastSequenceNumber + 1) if (_sequencesAreConsecutive && updateSequenceNumberStart != LastSequenceNumber + 1)
{ {
// Expected the start sequenceNumber to be LastSequenceNumber + 1, but wasn't if (_firstUpdateAfterSnapshotDone || !_skipSequenceCheckFirstUpdateAfterSnapshotSet)
_logger.OrderBookOutOfSync(Api, Symbol, LastSequenceNumber + 1, updateSequenceNumberEnd); {
_stopProcessing = true; // Expected the start sequenceNumber to be LastSequenceNumber + 1, but wasn't
Resubscribe(); _logger.OrderBookOutOfSync(Api, Symbol, LastSequenceNumber + 1, updateSequenceNumberEnd);
return; _stopProcessing = true;
Resubscribe();
return;
}
} }
foreach (var entry in bids) foreach (var entry in bids)
ProcessUpdate(OrderBookEntryType.Bid, entry); UpdateValue(OrderBookEntryType.Bid, entry);
foreach (var entry in asks) foreach (var entry in asks)
ProcessUpdate(OrderBookEntryType.Ask, entry); UpdateValue(OrderBookEntryType.Ask, entry);
if (Levels.HasValue && _strictLevels) if (Levels.HasValue && _strictLevels)
{ {
@ -943,6 +971,7 @@ namespace CryptoExchange.Net.OrderBook
} }
} }
_firstUpdateAfterSnapshotDone = true;
LastSequenceNumber = updateSequenceNumberEnd; LastSequenceNumber = updateSequenceNumberEnd;
if (_logger.IsEnabled(LogLevel.Trace)) if (_logger.IsEnabled(LogLevel.Trace))