using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Globalization; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using CryptoExchange.Net.Interfaces; using CryptoExchange.Net.Logging.Extensions; using CryptoExchange.Net.Objects; using CryptoExchange.Net.Objects.Errors; using CryptoExchange.Net.Objects.Options; using CryptoExchange.Net.Objects.Sockets; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; namespace CryptoExchange.Net.OrderBook { /// /// Base for order book implementations /// public abstract class SymbolOrderBook : ISymbolOrderBook, IDisposable { #if NET9_0_OR_GREATER private readonly Lock _bookLock = new Lock(); #else private readonly object _bookLock = new object(); #endif private OrderBookStatus _status; private UpdateSubscription? _subscription; private bool _stopProcessing; private Task? _processTask; private CancellationTokenSource? _cts; private readonly AsyncResetEvent _queueEvent; private readonly ConcurrentQueue _processQueue; private bool _validateChecksum; private bool _firstUpdateAfterSnapshotDone; private class EmptySymbolOrderBookEntry : ISymbolOrderBookEntry { public decimal Quantity { get => 0m; set { } } public decimal Price { get => 0m; set { } } } private static readonly ISymbolOrderBookEntry _emptySymbolOrderBookEntry = new EmptySymbolOrderBookEntry(); private enum SequenceNumberResult { Skip, Ok, OutOfSync } /// /// A buffer to store messages received before the initial book snapshot is processed. These messages /// will be processed after the book snapshot is set. Any messages in this buffer with sequence numbers lower /// than the snapshot sequence number will be discarded /// protected readonly List _processBuffer; /// /// The ask list, should only be accessed using the bookLock /// protected SortedList _asks; /// /// The bid list, should only be accessed using the bookLock /// protected SortedList _bids; /// /// The log /// protected ILogger _logger; /// /// Whether update numbers are consecutive. If set to true and an update comes in which isn't the previous sequences number + 1 /// the book will resynchronize as it is deemed out of sync /// protected bool _sequencesAreConsecutive; /// /// Whether the first update message after a snapshot may have overlapping sequence numbers instead of the snapshot sequence number + 1 /// protected bool _skipSequenceCheckFirstUpdateAfterSnapshotSet; /// /// Whether levels should be strictly enforced. For example, when an order book has 25 levels and a new update comes in which pushes /// the current level 25 ask out of the top 25, should the level 26 entry be removed from the book or does the server handle this /// protected bool _strictLevels; /// /// If the initial snapshot of the book has been set /// protected bool _bookSet; /// /// The amount of levels for this book /// protected int? Levels { get; set; } = null; /// public string Exchange { get; } /// public string Api { get; } /// public OrderBookStatus Status { get => _status; set { if (value == _status) return; var old = _status; _status = value; _logger.OrderBookStatusChanged(Api, Symbol, old, value); OnStatusChange?.Invoke(old, _status); } } /// public long LastSequenceNumber { get; private set; } /// public string Symbol { get; } /// public event Action? OnStatusChange; /// public event Action<(ISymbolOrderBookEntry BestBid, ISymbolOrderBookEntry BestAsk)>? OnBestOffersChanged; /// public event Action<(ISymbolOrderBookEntry[] Bids, ISymbolOrderBookEntry[] Asks)>? OnOrderBookUpdate; /// public DateTime UpdateTime { get; private set; } /// public DateTime? UpdateServerTime { get; private set; } /// public DateTime? UpdateLocalTime { get; set; } /// public TimeSpan? DataAge => DateTime.UtcNow - UpdateLocalTime; /// public int AskCount { get; private set; } /// public int BidCount { get; private set; } /// public ISymbolOrderBookEntry[] Asks { get { lock (_bookLock) return _asks.Select(a => a.Value).ToArray(); } } /// public ISymbolOrderBookEntry[] Bids { get { lock (_bookLock) return _bids.Select(a => a.Value).ToArray(); } } /// public (ISymbolOrderBookEntry[] bids, ISymbolOrderBookEntry[] asks) Book { get { lock (_bookLock) return (Bids, Asks); } } /// public ISymbolOrderBookEntry BestBid { get { lock (_bookLock) return _bids.FirstOrDefault().Value ?? _emptySymbolOrderBookEntry; } } /// public ISymbolOrderBookEntry BestAsk { get { lock (_bookLock) return _asks.FirstOrDefault().Value ?? _emptySymbolOrderBookEntry; } } /// public (ISymbolOrderBookEntry Bid, ISymbolOrderBookEntry Ask) BestOffers { get { lock (_bookLock) return (BestBid,BestAsk); } } /// /// ctor /// /// Logger to use. If not provided will create a TraceLogger /// The exchange of the order book /// The API the book is for, for example Spot /// The symbol the order book is for protected SymbolOrderBook(ILoggerFactory? logger, string exchange, string api, string symbol) { if (symbol == null) throw new ArgumentNullException(nameof(symbol)); Exchange = exchange; Api = api; _processBuffer = new List(); _processQueue = new ConcurrentQueue(); _queueEvent = new AsyncResetEvent(false, true); Symbol = symbol; Status = OrderBookStatus.Disconnected; _asks = new SortedList(); _bids = new SortedList(new DescComparer()); _logger = logger?.CreateLogger(Exchange) ?? NullLoggerFactory.Instance.CreateLogger(Exchange); } /// /// Initialize the order book using the provided options /// /// The options /// protected void Initialize(OrderBookOptions options) { if (options == null) throw new ArgumentNullException(nameof(options)); _validateChecksum = options.ChecksumValidationEnabled; } /// public async Task> StartAsync(CancellationToken? ct = null) { if (Status != OrderBookStatus.Disconnected) throw new InvalidOperationException($"Can't start book unless state is {OrderBookStatus.Disconnected}. Current state: {Status}"); _logger.OrderBookStarting(Api, Symbol); _cts = new CancellationTokenSource(); ct?.Register(async () => { _cts.Cancel(); await StopAsync().ConfigureAwait(false); }, false); // Clear any previous messages while (_processQueue.TryDequeue(out _)) { } _processBuffer.Clear(); _bookSet = false; _firstUpdateAfterSnapshotDone = false; Status = OrderBookStatus.Connecting; _processTask = Task.Factory.StartNew(ProcessQueue, TaskCreationOptions.LongRunning); var startResult = await DoStartAsync(_cts.Token).ConfigureAwait(false); if (!startResult) { Status = OrderBookStatus.Disconnected; return new CallResult(startResult.Error!); } if (_cts.IsCancellationRequested) { _logger.OrderBookStoppedStarting(Api, Symbol); await startResult.Data.CloseAsync().ConfigureAwait(false); Status = OrderBookStatus.Disconnected; return new CallResult(new CancellationRequestedError()); } _subscription = startResult.Data; _subscription.ConnectionLost += HandleConnectionLost; _subscription.ConnectionClosed += HandleConnectionClosed; _subscription.ConnectionRestored += HandleConnectionRestored; Status = OrderBookStatus.Synced; return new CallResult(true); } private void HandleConnectionLost() { _logger.OrderBookConnectionLost(Api, Symbol); if (Status != OrderBookStatus.Disposed) { Status = OrderBookStatus.Reconnecting; Reset(); } } private void HandleConnectionClosed() { _logger.OrderBookDisconnected(Api, Symbol); Status = OrderBookStatus.Disconnected; _ = StopAsync(); } private async void HandleConnectionRestored(TimeSpan _) { await ResyncAsync().ConfigureAwait(false); } /// public async Task StopAsync() { _logger.OrderBookStopping(Api, Symbol); Status = OrderBookStatus.Disconnected; _cts?.Cancel(); _queueEvent.Set(); if (_processTask != null) await _processTask.ConfigureAwait(false); if (_subscription != null) { await _subscription.CloseAsync().ConfigureAwait(false); _subscription.ConnectionLost -= HandleConnectionLost; _subscription.ConnectionClosed -= HandleConnectionClosed; _subscription.ConnectionRestored -= HandleConnectionRestored; } _logger.OrderBookStopped(Api, Symbol); } /// public CallResult CalculateAverageFillPrice(decimal baseQuantity, OrderBookEntryType type) { if (Status != OrderBookStatus.Synced) return new CallResult(new InvalidOperationError($"{nameof(CalculateAverageFillPrice)} is not available when book is not in Synced state")); var totalCost = 0m; var totalAmount = 0m; var amountLeft = baseQuantity; lock (_bookLock) { var list = type == OrderBookEntryType.Ask ? _asks : _bids; var step = 0; while (amountLeft > 0) { if (step == list.Count) return new CallResult(new InvalidOperationError("Quantity is larger than order in the order book")); var element = list.ElementAt(step); var stepAmount = Math.Min(element.Value.Quantity, amountLeft); totalCost += stepAmount * element.Value.Price; totalAmount += stepAmount; amountLeft -= stepAmount; step++; } } return new CallResult(Math.Round(totalCost / totalAmount, 8)); } /// public CallResult CalculateTradableAmount(decimal quoteQuantity, OrderBookEntryType type) { if (Status != OrderBookStatus.Synced) return new CallResult(new InvalidOperationError($"{nameof(CalculateTradableAmount)} is not available when book is not in Synced state")); var quoteQuantityLeft = quoteQuantity; var totalBaseQuantity = 0m; lock (_bookLock) { var list = type == OrderBookEntryType.Ask ? _asks : _bids; var step = 0; while (quoteQuantityLeft > 0) { if (step == list.Count) return new CallResult(new InvalidOperationError("Quantity is larger than order in the order book")); var element = list.ElementAt(step); var stepAmount = Math.Min(element.Value.Quantity * element.Value.Price, quoteQuantityLeft); quoteQuantityLeft -= stepAmount; totalBaseQuantity += stepAmount / element.Value.Price; step++; } } return new CallResult(Math.Round(totalBaseQuantity, 8)); } /// /// Implementation for starting the order book. Should typically have logic for subscribing to the update stream and retrieving /// and setting the initial order book /// /// protected abstract Task> DoStartAsync(CancellationToken ct); /// /// Reset the order book /// protected virtual void DoReset() { } /// /// Resync the order book /// /// protected abstract Task> DoResyncAsync(CancellationToken ct); /// /// Implementation for validating a checksum value with the current order book. If checksum validation fails (returns false) /// the order book will be resynchronized /// protected virtual bool DoChecksum(int checksum) => true; /// /// Set snapshot data for the order book. Typically the snapshot which was requested from the Rest API, or the first snapshot /// received from a socket subscription. Will clear any previous data. /// /// The last update sequence number until which the snapshot is in sync /// List of asks /// List of bids /// Server data timestamp /// local data timestamp protected void SetSnapshot( long? orderBookSequenceNumber, ISymbolOrderBookEntry[] bidList, ISymbolOrderBookEntry[] askList, DateTime? serverDataTime = null, DateTime? localDataTime = null) { _processQueue.Enqueue( new OrderBookSnapshot { LocalDataTime = localDataTime, ServerDataTime = serverDataTime, SequenceNumber = orderBookSequenceNumber, Asks = askList, Bids = bidList }); _queueEvent.Set(); } /// /// Add an update to the process queue. Updates the book by providing changed bids and asks, along with an update number which should be higher than the previous update numbers /// /// The sequence number /// List of updated/new bids /// List of updated/new asks /// Server data timestamp /// local data timestamp protected void UpdateOrderBook( long sequenceNumber, ISymbolOrderBookEntry[] bids, ISymbolOrderBookEntry[] asks, DateTime? serverDataTime = null, DateTime? localDataTime = null) { _processQueue.Enqueue( new OrderBookUpdate { LocalDataTime = localDataTime, ServerDataTime = serverDataTime, StartSequenceNumber = sequenceNumber, EndSequenceNumber = sequenceNumber, Asks = asks, Bids = bids }); _queueEvent.Set(); } /// /// Add an update to the process queue. Updates the book by providing changed bids and asks, along with the first and last sequence number in the update /// /// The sequence number of the first update /// The sequence number of the last update /// List of updated/new bids /// List of updated/new asks /// Server data timestamp /// local data timestamp protected void UpdateOrderBook( long firstSequenceNumber, long lastSequenceNumber, ISymbolOrderBookEntry[] bids, ISymbolOrderBookEntry[] asks, DateTime? serverDataTime = null, DateTime? localDataTime = null) { _processQueue.Enqueue( new OrderBookUpdate { LocalDataTime = localDataTime, ServerDataTime = serverDataTime, StartSequenceNumber = firstSequenceNumber, EndSequenceNumber = lastSequenceNumber, Asks = asks, Bids = bids }); _queueEvent.Set(); } /// /// Add an update to the process queue. Updates the book by providing changed bids and asks, each with its own sequence number /// /// List of updated/new bids /// List of updated/new asks /// Server data timestamp /// local data timestamp protected void UpdateOrderBook( ISymbolOrderSequencedBookEntry[] bids, ISymbolOrderSequencedBookEntry[] asks, DateTime? serverDataTime = null, DateTime? localDataTime = null) { var highest = Math.Max(bids.Any() ? bids.Max(b => b.Sequence) : 0, asks.Any() ? asks.Max(a => a.Sequence) : 0); var lowest = Math.Min(bids.Any() ? bids.Min(b => b.Sequence) : long.MaxValue, asks.Any() ? asks.Min(a => a.Sequence) : long.MaxValue); _processQueue.Enqueue( new OrderBookUpdate { LocalDataTime = localDataTime, ServerDataTime = serverDataTime, StartSequenceNumber = lowest, EndSequenceNumber = highest, Asks = asks, Bids = bids }); _queueEvent.Set(); } /// /// Add a checksum value to the process queue /// /// The checksum value /// The sequence number of the message if it's a separate message with separate number protected void AddChecksum(int checksum, long? sequenceNumber = null) { _processQueue.Enqueue(new OrderBookChecksum() { Checksum = checksum, SequenceNumber = sequenceNumber }); _queueEvent.Set(); } /// /// Check and empty the process buffer; see what entries to update the book with /// protected void CheckProcessBuffer() { if (_processBuffer.Count > 0) _logger.OrderBookProcessingBufferedUpdates(Api, Symbol, _processBuffer.Count); foreach (var bufferEntry in _processBuffer) { if (_stopProcessing) break; ProcessUpdate(bufferEntry.FirstUpdateId, bufferEntry.LastUpdateId, bufferEntry.Bids, bufferEntry.Asks, true); } _processBuffer.Clear(); } /// /// Update order book with an entry /// /// Type of entry /// The entry protected virtual bool UpdateValue(OrderBookEntryType type, ISymbolOrderBookEntry entry) { UpdateTime = DateTime.UtcNow; var listToChange = type == OrderBookEntryType.Ask ? _asks : _bids; if (entry.Quantity == 0) { if (!listToChange.ContainsKey(entry.Price)) return true; listToChange.Remove(entry.Price); if (type == OrderBookEntryType.Ask) AskCount--; else BidCount--; } else { if (!listToChange.ContainsKey(entry.Price)) { listToChange.Add(entry.Price, entry); if (type == OrderBookEntryType.Ask) AskCount++; else BidCount++; } else { listToChange[entry.Price] = entry; } } return true; } /// /// Wait until the order book snapshot has been set /// /// Max wait time /// Cancellation token /// protected async Task> WaitForSetOrderBookAsync(TimeSpan timeout, CancellationToken ct) { var startWait = DateTime.UtcNow; while (!_bookSet && Status == OrderBookStatus.Syncing) { if(ct.IsCancellationRequested) return new CallResult(new CancellationRequestedError()); if (DateTime.UtcNow - startWait > timeout) return new CallResult(new ServerError(new ErrorInfo(ErrorType.OrderBookTimeout, "Timeout while waiting for data"))); try { await Task.Delay(50, ct).ConfigureAwait(false); } catch (OperationCanceledException) { } } return new CallResult(true); } /// /// Wait until an update has been buffered /// /// Max wait time /// Cancellation token /// protected async Task> WaitUntilFirstUpdateBufferedAsync(TimeSpan timeout, CancellationToken ct) { var startWait = DateTime.UtcNow; while (_processBuffer.Count == 0) { if (ct.IsCancellationRequested) return new CallResult(new CancellationRequestedError()); if (DateTime.UtcNow - startWait > timeout) return new CallResult(new ServerError(new ErrorInfo(ErrorType.OrderBookTimeout, "Timeout while waiting for data"))); try { await Task.Delay(20, ct).ConfigureAwait(false); } catch (OperationCanceledException) { } } return new CallResult(true); } /// /// IDisposable implementation for the order book /// public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } /// /// Dispose method /// /// protected virtual void Dispose(bool disposing) { Status = OrderBookStatus.Disposing; _cts?.Cancel(); _queueEvent.Set(); // Clear queue while (_processQueue.TryDequeue(out _)) { } _processBuffer.Clear(); _asks.Clear(); _bids.Clear(); AskCount = 0; BidCount = 0; Status = OrderBookStatus.Disposed; } /// /// String representation of the top 3 entries /// /// public override string ToString() { return ToString(3); } /// /// String representation of the top x entries /// /// public string ToString(int numberOfEntries) { var stringBuilder = new StringBuilder(); var book = Book; stringBuilder.AppendLine($"{Exchange} - {Symbol}"); stringBuilder.AppendLine($"Update time local: {UpdateTime:HH:mm:ss.fff} ({Math.Round((DateTime.UtcNow - UpdateTime).TotalMilliseconds)}ms ago)"); stringBuilder.AppendLine($"Data timestamp server: {UpdateServerTime:HH:mm:ss.fff}"); stringBuilder.AppendLine($"Data timestamp local: {UpdateLocalTime:HH:mm:ss.fff}"); stringBuilder.AppendLine($"Data age: {DataAge?.TotalMilliseconds}ms"); stringBuilder.AppendLine(); stringBuilder.AppendLine($" Ask quantity Ask price | Bid price Bid quantity"); for(var i = 0; i < numberOfEntries; i++) { var ask = book.asks.Count() > i ? book.asks.ElementAt(i): null; var bid = book.bids.Count() > i ? book.bids.ElementAt(i): null; stringBuilder.AppendLine($"[{ask?.Quantity.ToString(CultureInfo.InvariantCulture),14}] {ask?.Price.ToString(CultureInfo.InvariantCulture),14} | {bid?.Price.ToString(CultureInfo.InvariantCulture),-14} [{bid?.Quantity.ToString(CultureInfo.InvariantCulture),-14}]"); } return stringBuilder.ToString(); } /// public Task OutputToConsoleAsync(int numberOfEntries, TimeSpan refreshInterval, CancellationToken ct = default) { return Task.Run(async () => { var referenceTime = DateTime.UtcNow; while (!ct.IsCancellationRequested) { Console.Clear(); Console.WriteLine(ToString(numberOfEntries)); var delay = Math.Max(1, (DateTime.UtcNow - referenceTime).TotalMilliseconds % refreshInterval.TotalMilliseconds); try { await Task.Delay(refreshInterval.Add(TimeSpan.FromMilliseconds(-delay)), ct).ConfigureAwait(false); } catch { } } }); } private void CheckBestOffersChanged(ISymbolOrderBookEntry prevBestBid, ISymbolOrderBookEntry prevBestAsk) { var (bestBid, bestAsk) = BestOffers; if (bestBid.Price != prevBestBid.Price || bestBid.Quantity != prevBestBid.Quantity || bestAsk.Price != prevBestAsk.Price || bestAsk.Quantity != prevBestAsk.Quantity) { OnBestOffersChanged?.Invoke((bestBid, bestAsk)); } } private void Reset() { _queueEvent.Set(); // Clear queue while (_processQueue.TryDequeue(out _)) { } LastSequenceNumber = 0; _processBuffer.Clear(); _bookSet = false; _firstUpdateAfterSnapshotDone = false; DoReset(); } private async Task ResyncAsync() { Status = OrderBookStatus.Syncing; var success = false; while (!success) { if (Status != OrderBookStatus.Syncing) return; var resyncResult = await DoResyncAsync(_cts!.Token).ConfigureAwait(false); success = resyncResult; } _logger.OrderBookResynced(Api, Symbol); Status = OrderBookStatus.Synced; } private async Task ProcessQueue() { while (Status != OrderBookStatus.Disconnected && Status != OrderBookStatus.Disposed) { await _queueEvent.WaitAsync().ConfigureAwait(false); while (_processQueue.TryDequeue(out var item)) { if (Status == OrderBookStatus.Disconnected || Status == OrderBookStatus.Disposed) break; if (_stopProcessing) { _logger.OrderBookMessageSkippedResubscribing(Api, Symbol); continue; } if (item is OrderBookSnapshot snapshot) ProcessOrderBookSnapshot(snapshot); if (item is OrderBookUpdate update) ProcessQueueItem(update); else if (item is OrderBookChecksum checksum) ProcessChecksum(checksum); } } } private void ProcessOrderBookSnapshot(OrderBookSnapshot item) { lock (_bookLock) { _bookSet = true; _asks.Clear(); foreach (var ask in item.Asks) _asks.Add(ask.Price, ask); _bids.Clear(); foreach (var bid in item.Bids) _bids.Add(bid.Price, bid); if (item.SequenceNumber != null) LastSequenceNumber = item.SequenceNumber.Value; AskCount = _asks.Count; BidCount = _bids.Count; UpdateTime = DateTime.UtcNow; UpdateServerTime = item.ServerDataTime; UpdateLocalTime = item.LocalDataTime; _logger.OrderBookDataSet(Api, Symbol, BidCount, AskCount, item.SequenceNumber); CheckProcessBuffer(); OnOrderBookUpdate?.Invoke((item.Bids.ToArray(), item.Asks.ToArray())); OnBestOffersChanged?.Invoke((BestBid, BestAsk)); } } private void ProcessQueueItem(OrderBookUpdate item) { lock (_bookLock) { if (!_bookSet) { _processBuffer.Add(new ProcessBufferRangeSequenceEntry() { Asks = item.Asks, Bids = item.Bids, FirstUpdateId = item.StartSequenceNumber, LastUpdateId = item.EndSequenceNumber, }); if (_logger.IsEnabled(LogLevel.Trace)) _logger.OrderBookUpdateBuffered(Api, Symbol, item.StartSequenceNumber, item.EndSequenceNumber, item.Asks.Length, item.Bids.Length); } else { CheckProcessBuffer(); var (prevBestBid, prevBestAsk) = BestOffers; ProcessUpdate(item.StartSequenceNumber, item.EndSequenceNumber, item.Bids, item.Asks, false); if (_asks.Count == 0 || _bids.Count == 0) return; if (_asks.First().Key < _bids.First().Key) { _logger.OrderBookOutOfSyncDetected(Api, Symbol, _asks.First().Key, _bids.First().Key); _stopProcessing = true; Resubscribe(); return; } UpdateServerTime = item.ServerDataTime; UpdateLocalTime = item.LocalDataTime; OnOrderBookUpdate?.Invoke((item.Bids.ToArray(), item.Asks.ToArray())); CheckBestOffersChanged(prevBestBid, prevBestAsk); } } } private void ProcessChecksum(OrderBookChecksum ci) { lock (_bookLock) { if (!_validateChecksum) return; bool checksumResult = false; try { checksumResult = DoChecksum(ci.Checksum); } catch (Exception) { // If the status is not synced it can be expected a checksum is failing if (Status == OrderBookStatus.Synced) throw; } if (ci.SequenceNumber != null) LastSequenceNumber = ci.SequenceNumber.Value; if (!checksumResult) { _logger.OrderBookOutOfSyncChecksum(Api, Symbol); _stopProcessing = true; Resubscribe(); } } } private void Resubscribe() { Status = OrderBookStatus.Syncing; _ = Task.Run(async () => { if(_subscription == null) { Status = OrderBookStatus.Disconnected; return; } await _subscription!.UnsubscribeAsync().ConfigureAwait(false); Reset(); _stopProcessing = false; if (!await _subscription!.ResubscribeAsync().ConfigureAwait(false)) { // Resubscribing failed, reconnect the socket _logger.OrderBookResyncFailed(Api, Symbol); Status = OrderBookStatus.Reconnecting; _ = _subscription!.ReconnectAsync(); } else { await ResyncAsync().ConfigureAwait(false); } }); } private void ProcessUpdate( long updateSequenceNumberStart, long updateSequenceNumberEnd, IEnumerable bids, IEnumerable asks, bool fromBuffer) { var sequenceResult = fromBuffer ? ValidateBufferSequenceNumber(updateSequenceNumberStart, updateSequenceNumberEnd) : ValidateLiveSequenceNumber(updateSequenceNumberStart); if (sequenceResult == SequenceNumberResult.Skip) { if (updateSequenceNumberStart != updateSequenceNumberEnd) _logger.OrderBookUpdateSkipped(Api, Symbol, updateSequenceNumberStart, updateSequenceNumberEnd, LastSequenceNumber); else _logger.OrderBookUpdateSkipped(Api, Symbol, updateSequenceNumberStart, LastSequenceNumber); return; } if (sequenceResult == SequenceNumberResult.OutOfSync) { _logger.OrderBookOutOfSync(Api, Symbol, LastSequenceNumber + 1, updateSequenceNumberStart); _stopProcessing = true; Resubscribe(); return; } foreach (var entry in bids) UpdateValue(OrderBookEntryType.Bid, entry); foreach (var entry in asks) UpdateValue(OrderBookEntryType.Ask, entry); if (Levels.HasValue && _strictLevels) { while (_bids.Count > Levels.Value) { BidCount--; _bids.Remove(_bids.Last().Key); } while (_asks.Count > Levels.Value) { AskCount--; _asks.Remove(this._asks.Last().Key); } } _firstUpdateAfterSnapshotDone = true; LastSequenceNumber = updateSequenceNumberEnd; if (_logger.IsEnabled(LogLevel.Trace)) { if (updateSequenceNumberStart != updateSequenceNumberEnd) _logger.OrderBookProcessedMessage(Api, Symbol, updateSequenceNumberStart, updateSequenceNumberEnd); else _logger.OrderBookProcessedMessage(Api, Symbol, updateSequenceNumberStart); } } private SequenceNumberResult ValidateBufferSequenceNumber(long startSequenceNumber, long endSequenceNumber) { if (endSequenceNumber <= LastSequenceNumber) // Buffered update is from before the snapshot, ignore return SequenceNumberResult.Skip; if (_sequencesAreConsecutive && startSequenceNumber != LastSequenceNumber + 1) { if (_firstUpdateAfterSnapshotDone || !_skipSequenceCheckFirstUpdateAfterSnapshotSet) // Buffered update is not the next sequence number when it was expected to be return SequenceNumberResult.OutOfSync; } // Buffered sequence number is larger than the last sequence number return SequenceNumberResult.Ok; } private SequenceNumberResult ValidateLiveSequenceNumber(long sequenceNumber) { if (sequenceNumber < LastSequenceNumber && (_firstUpdateAfterSnapshotDone || !_skipSequenceCheckFirstUpdateAfterSnapshotSet)) // Update is somehow from before the current state return SequenceNumberResult.OutOfSync; if (_sequencesAreConsecutive && LastSequenceNumber != 0 && sequenceNumber != LastSequenceNumber + 1) { if (_firstUpdateAfterSnapshotDone || !_skipSequenceCheckFirstUpdateAfterSnapshotSet) return SequenceNumberResult.OutOfSync; } return SequenceNumberResult.Ok; } } internal class DescComparer : IComparer { public int Compare(T? x, T? y) { return Comparer.Default.Compare(y!, x!); } } }