diff --git a/CryptoExchange.Net/Logging/Extensions/SymbolOrderBookLoggingExtensions.cs b/CryptoExchange.Net/Logging/Extensions/SymbolOrderBookLoggingExtensions.cs index 2997974..85d2725 100644 --- a/CryptoExchange.Net/Logging/Extensions/SymbolOrderBookLoggingExtensions.cs +++ b/CryptoExchange.Net/Logging/Extensions/SymbolOrderBookLoggingExtensions.cs @@ -96,7 +96,7 @@ namespace CryptoExchange.Net.Logging.Extensions _orderBookDataSet = LoggerMessage.Define( LogLevel.Debug, new EventId(5012, "OrderBookDataSet"), - "{Api} order book {Symbol} data set: {BidCount} bids, {AskCount} asks. #{EndUpdateId}"); + "{Api} order book {Symbol} snapshot set: {BidCount} bids, {AskCount} asks. #{EndUpdateId}"); _orderBookUpdateBuffered = LoggerMessage.Define( LogLevel.Trace, diff --git a/CryptoExchange.Net/OrderBook/ProcessQueueItem.cs b/CryptoExchange.Net/OrderBook/OrderBookUpdate.cs similarity index 90% rename from CryptoExchange.Net/OrderBook/ProcessQueueItem.cs rename to CryptoExchange.Net/OrderBook/OrderBookUpdate.cs index c50b026..ed5cc43 100644 --- a/CryptoExchange.Net/OrderBook/ProcessQueueItem.cs +++ b/CryptoExchange.Net/OrderBook/OrderBookUpdate.cs @@ -3,7 +3,7 @@ using System; namespace CryptoExchange.Net.OrderBook { - internal class ProcessQueueItem + internal class OrderBookUpdate { public DateTime? LocalDataTime { get; set; } public DateTime? ServerDataTime { get; set; } @@ -13,7 +13,7 @@ namespace CryptoExchange.Net.OrderBook public ISymbolOrderBookEntry[] Asks { get; set; } = Array.Empty(); } - internal class InitialOrderBookItem + internal class OrderBookSnapshot { public DateTime? LocalDataTime { get; set; } public DateTime? ServerDataTime { get; set; } @@ -22,7 +22,7 @@ namespace CryptoExchange.Net.OrderBook public ISymbolOrderBookEntry[] Asks { get; set; } = Array.Empty(); } - internal class ChecksumItem + internal class OrderBookChecksum { public long? SequenceNumber { get; set; } public int Checksum { get; set; } diff --git a/CryptoExchange.Net/OrderBook/SymbolOrderBook.cs b/CryptoExchange.Net/OrderBook/SymbolOrderBook.cs index 5b39af2..4608176 100644 --- a/CryptoExchange.Net/OrderBook/SymbolOrderBook.cs +++ b/CryptoExchange.Net/OrderBook/SymbolOrderBook.cs @@ -77,7 +77,10 @@ namespace CryptoExchange.Net.OrderBook /// the book will resynchronize as it is deemed out of sync /// protected bool _sequencesAreConsecutive; - + + protected bool _skipSequenceCheckFirstUpdateAfterSnapshotSet; + private bool _firstUpdateAfterSnapshotDone; + /// /// Whether levels should be strictly enforced. For example, when an order book has 25 levels and a new update comes in which pushes /// the current level 25 ask out of the top 25, should the level 26 entry be removed from the book or does the server handle this @@ -267,6 +270,7 @@ namespace CryptoExchange.Net.OrderBook _processBuffer.Clear(); _bookSet = false; + _firstUpdateAfterSnapshotDone = false; Status = OrderBookStatus.Connecting; _processTask = Task.Factory.StartNew(ProcessQueue, TaskCreationOptions.LongRunning); @@ -435,7 +439,7 @@ namespace CryptoExchange.Net.OrderBook DateTime? localDataTime = null) { _processQueue.Enqueue( - new InitialOrderBookItem + new OrderBookSnapshot { LocalDataTime = localDataTime, ServerDataTime = serverDataTime, @@ -449,25 +453,25 @@ namespace CryptoExchange.Net.OrderBook /// /// Add an update to the process queue. Updates the book by providing changed bids and asks, along with an update number which should be higher than the previous update numbers /// - /// The sequence number + /// The sequence number /// List of updated/new bids /// List of updated/new asks /// Server data timestamp /// local data timestamp protected void UpdateOrderBook( - long updateId, + long sequenceNumber, ISymbolOrderBookEntry[] bids, ISymbolOrderBookEntry[] asks, DateTime? serverDataTime = null, DateTime? localDataTime = null) { _processQueue.Enqueue( - new ProcessQueueItem + new OrderBookUpdate { LocalDataTime = localDataTime, ServerDataTime = serverDataTime, - StartSequenceNumber = updateId, - EndSequenceNumber = updateId, + StartSequenceNumber = sequenceNumber, + EndSequenceNumber = sequenceNumber, Asks = asks, Bids = bids }); @@ -492,7 +496,7 @@ namespace CryptoExchange.Net.OrderBook DateTime? localDataTime = null) { _processQueue.Enqueue( - new ProcessQueueItem + new OrderBookUpdate { LocalDataTime = localDataTime, ServerDataTime = serverDataTime, @@ -521,7 +525,7 @@ namespace CryptoExchange.Net.OrderBook var lowest = Math.Min(bids.Any() ? bids.Min(b => b.Sequence) : long.MaxValue, asks.Any() ? asks.Min(a => a.Sequence) : long.MaxValue); _processQueue.Enqueue( - new ProcessQueueItem + new OrderBookUpdate { LocalDataTime = localDataTime, ServerDataTime = serverDataTime, @@ -537,10 +541,10 @@ namespace CryptoExchange.Net.OrderBook /// Add a checksum value to the process queue /// /// The checksum value - /// The id of the update + /// The sequence number of the message if it's a separate message with separate number protected void AddChecksum(int checksum, long? sequenceNumber = null) { - _processQueue.Enqueue(new ChecksumItem() { Checksum = checksum, SequenceNumber = sequenceNumber }); + _processQueue.Enqueue(new OrderBookChecksum() { Checksum = checksum, SequenceNumber = sequenceNumber }); _queueEvent.Set(); } @@ -553,7 +557,7 @@ namespace CryptoExchange.Net.OrderBook _logger.OrderBookProcessingBufferedUpdates(Api, Symbol, _processBuffer.Count); foreach (var bufferEntry in _processBuffer) - ProcessRangeUpdates(bufferEntry.FirstUpdateId, bufferEntry.LastUpdateId, bufferEntry.Bids, bufferEntry.Asks); + ProcessUpdate(bufferEntry.FirstUpdateId, bufferEntry.LastUpdateId, bufferEntry.Bids, bufferEntry.Asks, true); _processBuffer.Clear(); } @@ -563,7 +567,7 @@ namespace CryptoExchange.Net.OrderBook /// /// Type of entry /// The entry - protected virtual bool ProcessUpdate(OrderBookEntryType type, ISymbolOrderBookEntry entry) + protected virtual bool UpdateValue(OrderBookEntryType type, ISymbolOrderBookEntry entry) { UpdateTime = DateTime.UtcNow; var listToChange = type == OrderBookEntryType.Ask ? _asks : _bids; @@ -703,6 +707,12 @@ namespace CryptoExchange.Net.OrderBook }); } + protected void TriggerResubscribe() + { + _stopProcessing = true; + Resubscribe(); + } + private void CheckBestOffersChanged(ISymbolOrderBookEntry prevBestBid, ISymbolOrderBookEntry prevBestAsk) { var (bestBid, bestAsk) = BestOffers; @@ -719,8 +729,11 @@ namespace CryptoExchange.Net.OrderBook // Clear queue while (_processQueue.TryDequeue(out _)) { } + LastSequenceNumber = 0; _processBuffer.Clear(); _bookSet = false; + _firstUpdateAfterSnapshotDone = false; + DoReset(); } @@ -758,17 +771,17 @@ namespace CryptoExchange.Net.OrderBook continue; } - if (item is InitialOrderBookItem iobi) - ProcessInitialOrderBookItem(iobi); - if (item is ProcessQueueItem pqi) - ProcessQueueItem(pqi); - else if (item is ChecksumItem ci) - ProcessChecksum(ci); + if (item is OrderBookSnapshot snapshot) + ProcessOrderBookSnapshot(snapshot); + if (item is OrderBookUpdate update) + ProcessQueueItem(update); + else if (item is OrderBookChecksum checksum) + ProcessChecksum(checksum); } } } - private void ProcessInitialOrderBookItem(InitialOrderBookItem item) + private void ProcessOrderBookSnapshot(OrderBookSnapshot item) { lock (_bookLock) { @@ -791,12 +804,13 @@ namespace CryptoExchange.Net.OrderBook _logger.OrderBookDataSet(Api, Symbol, BidCount, AskCount, item.SequenceNumber); CheckProcessBuffer(); + OnOrderBookUpdate?.Invoke((item.Bids.ToArray(), item.Asks.ToArray())); OnBestOffersChanged?.Invoke((BestBid, BestAsk)); } } - private void ProcessQueueItem(ProcessQueueItem item) + private void ProcessQueueItem(OrderBookUpdate item) { lock (_bookLock) { @@ -818,7 +832,7 @@ namespace CryptoExchange.Net.OrderBook { CheckProcessBuffer(); var (prevBestBid, prevBestAsk) = BestOffers; - ProcessRangeUpdates(item.StartSequenceNumber, item.EndSequenceNumber, item.Bids, item.Asks); + ProcessUpdate(item.StartSequenceNumber, item.EndSequenceNumber, item.Bids, item.Asks, false); if (_asks.Count == 0 || _bids.Count == 0) return; @@ -840,7 +854,7 @@ namespace CryptoExchange.Net.OrderBook } } - private void ProcessChecksum(ChecksumItem ci) + private void ProcessChecksum(OrderBookChecksum ci) { lock (_bookLock) { @@ -900,33 +914,47 @@ namespace CryptoExchange.Net.OrderBook }); } - private void ProcessRangeUpdates( + private void ProcessUpdate( long updateSequenceNumberStart, long updateSequenceNumberEnd, IEnumerable bids, - IEnumerable asks) + IEnumerable asks, + bool fromBuffer) { if (updateSequenceNumberEnd <= LastSequenceNumber) { - // We're already past this update - _logger.OrderBookUpdateSkipped(Api, Symbol, updateSequenceNumberEnd, LastSequenceNumber); - return; + if (fromBuffer) + { + // We're already past this update, discard buffered update + _logger.OrderBookUpdateSkipped(Api, Symbol, updateSequenceNumberEnd, LastSequenceNumber); + return; + } + else + { + // Somehow this sequence number is before the last sequence number + _stopProcessing = true; + Resubscribe(); + return; + } } if (_sequencesAreConsecutive && updateSequenceNumberStart != LastSequenceNumber + 1) { - // Expected the start sequenceNumber to be LastSequenceNumber + 1, but wasn't - _logger.OrderBookOutOfSync(Api, Symbol, LastSequenceNumber + 1, updateSequenceNumberEnd); - _stopProcessing = true; - Resubscribe(); - return; + if (_firstUpdateAfterSnapshotDone || !_skipSequenceCheckFirstUpdateAfterSnapshotSet) + { + // Expected the start sequenceNumber to be LastSequenceNumber + 1, but wasn't + _logger.OrderBookOutOfSync(Api, Symbol, LastSequenceNumber + 1, updateSequenceNumberEnd); + _stopProcessing = true; + Resubscribe(); + return; + } } foreach (var entry in bids) - ProcessUpdate(OrderBookEntryType.Bid, entry); + UpdateValue(OrderBookEntryType.Bid, entry); foreach (var entry in asks) - ProcessUpdate(OrderBookEntryType.Ask, entry); + UpdateValue(OrderBookEntryType.Ask, entry); if (Levels.HasValue && _strictLevels) { @@ -943,6 +971,7 @@ namespace CryptoExchange.Net.OrderBook } } + _firstUpdateAfterSnapshotDone = true; LastSequenceNumber = updateSequenceNumberEnd; if (_logger.IsEnabled(LogLevel.Trace))