From d7ca1b69ded50c33f4fe3a1ea438e3d14fbde72e Mon Sep 17 00:00:00 2001 From: Jkorf Date: Fri, 9 Jan 2026 16:29:01 +0100 Subject: [PATCH] wip --- CryptoExchange.Net/Clients/SocketApiClient.cs | 4 + .../SymbolOrderBookLoggingExtensions.cs | 18 +++- .../OrderBook/OrderBookUpdate.cs | 2 +- .../OrderBook/SymbolOrderBook.cs | 98 ++++++++++++------- 4 files changed, 84 insertions(+), 38 deletions(-) diff --git a/CryptoExchange.Net/Clients/SocketApiClient.cs b/CryptoExchange.Net/Clients/SocketApiClient.cs index 14b6014..b2ef32e 100644 --- a/CryptoExchange.Net/Clients/SocketApiClient.cs +++ b/CryptoExchange.Net/Clients/SocketApiClient.cs @@ -710,6 +710,10 @@ namespace CryptoExchange.Net.Clients if (connection != null && !connection.DedicatedRequestConnection.Authenticated) // Mark dedicated request connection as authenticated if the request is authenticated connection.DedicatedRequestConnection.Authenticated = authenticated; + + if (connection == null) + // Fall back to an existing connection if there is no dedicated request connection available + connection = socketQuery.OrderBy(s => s.UserSubscriptionCount).FirstOrDefault(); } bool maxConnectionsReached = _socketConnections.Count >= (ApiOptions.MaxSocketConnections ?? ClientOptions.MaxSocketConnections); diff --git a/CryptoExchange.Net/Logging/Extensions/SymbolOrderBookLoggingExtensions.cs b/CryptoExchange.Net/Logging/Extensions/SymbolOrderBookLoggingExtensions.cs index 85d2725..d34f669 100644 --- a/CryptoExchange.Net/Logging/Extensions/SymbolOrderBookLoggingExtensions.cs +++ b/CryptoExchange.Net/Logging/Extensions/SymbolOrderBookLoggingExtensions.cs @@ -22,7 +22,7 @@ namespace CryptoExchange.Net.Logging.Extensions private static readonly Action _orderBookResyncing; private static readonly Action _orderBookResynced; private static readonly Action _orderBookMessageSkippedBecauseOfResubscribing; - private static readonly Action _orderBookDataSet; + private static readonly Action _orderBookDataSet; private static readonly Action _orderBookUpdateBuffered; private static readonly Action _orderBookOutOfSyncDetected; private static readonly Action _orderBookReconnectingSocket; @@ -30,6 +30,7 @@ namespace CryptoExchange.Net.Logging.Extensions private static readonly Action _orderBookProcessedMessage; private static readonly Action _orderBookProcessedMessageSingle; private static readonly Action _orderBookOutOfSync; + private static readonly Action _orderBookUpdateSkippedStartEnd; static SymbolOrderBookLoggingExtensions() { @@ -93,7 +94,7 @@ namespace CryptoExchange.Net.Logging.Extensions new EventId(5011, "OrderBookMessageSkippedResubscribing"), "{Api} order book {Symbol} Skipping message because of resubscribing"); - _orderBookDataSet = LoggerMessage.Define( + _orderBookDataSet = LoggerMessage.Define( LogLevel.Debug, new EventId(5012, "OrderBookDataSet"), "{Api} order book {Symbol} snapshot set: {BidCount} bids, {AskCount} asks. #{EndUpdateId}"); @@ -142,6 +143,12 @@ namespace CryptoExchange.Net.Logging.Extensions LogLevel.Trace, new EventId(5021, "OrderBookProcessedMessage"), "{Api} order book {Symbol} update processed #{UpdateId}"); + + _orderBookUpdateSkippedStartEnd = LoggerMessage.Define( + LogLevel.Debug, + new EventId(5022, "OrderBookUpdateSkippedStartEnd"), + "{Api} order book {Symbol} update skipped #{SequenceStart}-#{SequenceEnd}, currently at #{LastSequenceNumber}"); + } public static void OrderBookStatusChanged(this ILogger logger, string api, string symbol, OrderBookStatus previousStatus, OrderBookStatus newStatus) @@ -200,7 +207,7 @@ namespace CryptoExchange.Net.Logging.Extensions { _orderBookMessageSkippedBecauseOfResubscribing(logger, api, symbol, null); } - public static void OrderBookDataSet(this ILogger logger, string api, string symbol, long bidCount, long askCount, long endUpdateId) + public static void OrderBookDataSet(this ILogger logger, string api, string symbol, long bidCount, long askCount, long? endUpdateId) { _orderBookDataSet(logger, api, symbol, bidCount, askCount, endUpdateId, null); } @@ -243,5 +250,10 @@ namespace CryptoExchange.Net.Logging.Extensions { _orderBookOutOfSyncChecksum(logger, api, symbol, null); } + + public static void OrderBookUpdateSkipped(this ILogger logger, string api, string symbol, long sequenceStart, long sequenceEnd, long lastSequenceNumber) + { + _orderBookUpdateSkippedStartEnd(logger, api, symbol, sequenceStart, sequenceEnd, lastSequenceNumber, null); + } } } diff --git a/CryptoExchange.Net/OrderBook/OrderBookUpdate.cs b/CryptoExchange.Net/OrderBook/OrderBookUpdate.cs index ed5cc43..f8fae79 100644 --- a/CryptoExchange.Net/OrderBook/OrderBookUpdate.cs +++ b/CryptoExchange.Net/OrderBook/OrderBookUpdate.cs @@ -17,7 +17,7 @@ namespace CryptoExchange.Net.OrderBook { public DateTime? LocalDataTime { get; set; } public DateTime? ServerDataTime { get; set; } - public long SequenceNumber { get; set; } + public long? SequenceNumber { get; set; } public ISymbolOrderBookEntry[] Bids { get; set; } = Array.Empty(); public ISymbolOrderBookEntry[] Asks { get; set; } = Array.Empty(); } diff --git a/CryptoExchange.Net/OrderBook/SymbolOrderBook.cs b/CryptoExchange.Net/OrderBook/SymbolOrderBook.cs index 4608176..6e9efd3 100644 --- a/CryptoExchange.Net/OrderBook/SymbolOrderBook.cs +++ b/CryptoExchange.Net/OrderBook/SymbolOrderBook.cs @@ -50,6 +50,13 @@ namespace CryptoExchange.Net.OrderBook private static readonly ISymbolOrderBookEntry _emptySymbolOrderBookEntry = new EmptySymbolOrderBookEntry(); + private enum SequenceNumberResult + { + Skip, + Ok, + OutOfSync + } + /// /// A buffer to store messages received before the initial book snapshot is processed. These messages /// will be processed after the book snapshot is set. Any messages in this buffer with sequence numbers lower @@ -423,16 +430,16 @@ namespace CryptoExchange.Net.OrderBook protected virtual bool DoChecksum(int checksum) => true; /// - /// Set the initial data for the order book. Typically the snapshot which was requested from the Rest API, or the first snapshot - /// received from a socket subscription + /// Set snapshot data for the order book. Typically the snapshot which was requested from the Rest API, or the first snapshot + /// received from a socket subscription. Will clear any previous data. /// /// The last update sequence number until which the snapshot is in sync /// List of asks /// List of bids /// Server data timestamp /// local data timestamp - protected void SetInitialOrderBook( - long orderBookSequenceNumber, + protected void SetSnapshot( + long? orderBookSequenceNumber, ISymbolOrderBookEntry[] bidList, ISymbolOrderBookEntry[] askList, DateTime? serverDataTime = null, @@ -557,7 +564,12 @@ namespace CryptoExchange.Net.OrderBook _logger.OrderBookProcessingBufferedUpdates(Api, Symbol, _processBuffer.Count); foreach (var bufferEntry in _processBuffer) + { + if (_stopProcessing) + break; + ProcessUpdate(bufferEntry.FirstUpdateId, bufferEntry.LastUpdateId, bufferEntry.Bids, bufferEntry.Asks, true); + } _processBuffer.Clear(); } @@ -707,12 +719,6 @@ namespace CryptoExchange.Net.OrderBook }); } - protected void TriggerResubscribe() - { - _stopProcessing = true; - Resubscribe(); - } - private void CheckBestOffersChanged(ISymbolOrderBookEntry prevBestBid, ISymbolOrderBookEntry prevBestAsk) { var (bestBid, bestAsk) = BestOffers; @@ -793,7 +799,8 @@ namespace CryptoExchange.Net.OrderBook foreach (var bid in item.Bids) _bids.Add(bid.Price, bid); - LastSequenceNumber = item.SequenceNumber; + if (item.SequenceNumber != null) + LastSequenceNumber = item.SequenceNumber.Value; AskCount = _asks.Count; BidCount = _bids.Count; @@ -921,33 +928,23 @@ namespace CryptoExchange.Net.OrderBook IEnumerable asks, bool fromBuffer) { - if (updateSequenceNumberEnd <= LastSequenceNumber) + var sequenceResult = fromBuffer ? ValidateBufferSequenceNumber(updateSequenceNumberStart, updateSequenceNumberEnd) : ValidateLiveSequenceNumber(updateSequenceNumberStart); + if (sequenceResult == SequenceNumberResult.Skip) { - if (fromBuffer) - { - // We're already past this update, discard buffered update - _logger.OrderBookUpdateSkipped(Api, Symbol, updateSequenceNumberEnd, LastSequenceNumber); - return; - } + if (updateSequenceNumberStart != updateSequenceNumberEnd) + _logger.OrderBookUpdateSkipped(Api, Symbol, updateSequenceNumberStart, updateSequenceNumberEnd, LastSequenceNumber); else - { - // Somehow this sequence number is before the last sequence number - _stopProcessing = true; - Resubscribe(); - return; - } + _logger.OrderBookUpdateSkipped(Api, Symbol, updateSequenceNumberStart, LastSequenceNumber); + + return; } - if (_sequencesAreConsecutive && updateSequenceNumberStart != LastSequenceNumber + 1) + if (sequenceResult == SequenceNumberResult.OutOfSync) { - if (_firstUpdateAfterSnapshotDone || !_skipSequenceCheckFirstUpdateAfterSnapshotSet) - { - // Expected the start sequenceNumber to be LastSequenceNumber + 1, but wasn't - _logger.OrderBookOutOfSync(Api, Symbol, LastSequenceNumber + 1, updateSequenceNumberEnd); - _stopProcessing = true; - Resubscribe(); - return; - } + _logger.OrderBookOutOfSync(Api, Symbol, LastSequenceNumber + 1, updateSequenceNumberStart); + _stopProcessing = true; + Resubscribe(); + return; } foreach (var entry in bids) @@ -981,7 +978,40 @@ namespace CryptoExchange.Net.OrderBook else _logger.OrderBookProcessedMessage(Api, Symbol, updateSequenceNumberStart); } - } + } + + private SequenceNumberResult ValidateBufferSequenceNumber(long startSequenceNumber, long endSequenceNumber) + { + if (endSequenceNumber <= LastSequenceNumber) + // Buffered update is from before the snapshot, ignore + return SequenceNumberResult.Skip; + + if (_sequencesAreConsecutive && startSequenceNumber != LastSequenceNumber + 1) + { + if (_firstUpdateAfterSnapshotDone || !_skipSequenceCheckFirstUpdateAfterSnapshotSet) + // Buffered update is not the next sequence number when it was expected + return SequenceNumberResult.OutOfSync; + } + + // Buffered sequence number is larger than the last sequence number + return SequenceNumberResult.Ok; + } + + private SequenceNumberResult ValidateLiveSequenceNumber(long sequenceNumber) + { + if (sequenceNumber < LastSequenceNumber) + return SequenceNumberResult.OutOfSync; + + if (_sequencesAreConsecutive + && LastSequenceNumber != 0 + && sequenceNumber != LastSequenceNumber + 1) + { + if (_firstUpdateAfterSnapshotDone || !_skipSequenceCheckFirstUpdateAfterSnapshotSet) + return SequenceNumberResult.OutOfSync; + } + + return SequenceNumberResult.Ok; + } } internal class DescComparer : IComparer