From 3b2a85d210cfebac57593d2c4a9acca19cf3b194 Mon Sep 17 00:00:00 2001 From: Jan Korf Date: Mon, 12 Jan 2026 14:26:50 +0100 Subject: [PATCH] Feature/websocket sequencing (#267) Added EnforceSequenceNumbers property on SocketApiClient to configure whether websocket message contain sequence numbers and if these should be checked to be sequential Added fallback to existing websocket connection if no dedicated request connection was found Added IntBoolConverter base class for arbitrary int value to bool mapping Added SequenceNumber property to DataEvent object Added _skipSequenceCheckFirstUpdateAfterSnapshotSet property for SymbolOrderBook implementations Updated SymbolOrderBook sequenceNumber validation Updated SymbolOrderBook log verbosities Renamed SetInitialOrderBook to SetSnapshot in SymbolOrderBook Renamed updateId references to sequenceNumber in SymbolOrderBook --- CryptoExchange.Net/Clients/SocketApiClient.cs | 8 + .../SystemTextJson/IntBoolConverter.cs | 36 ++++ .../SymbolOrderBookLoggingExtensions.cs | 24 ++- .../Objects/Sockets/DataEvent.cs | 14 ++ ...ProcessQueueItem.cs => OrderBookUpdate.cs} | 14 +- .../OrderBook/SymbolOrderBook.cs | 202 ++++++++++++------ .../Sockets/Default/SocketConnection.cs | 24 ++- 7 files changed, 237 insertions(+), 85 deletions(-) create mode 100644 CryptoExchange.Net/Converters/SystemTextJson/IntBoolConverter.cs rename CryptoExchange.Net/OrderBook/{ProcessQueueItem.cs => OrderBookUpdate.cs} (71%) diff --git a/CryptoExchange.Net/Clients/SocketApiClient.cs b/CryptoExchange.Net/Clients/SocketApiClient.cs index 6fd7ea1..b2ef32e 100644 --- a/CryptoExchange.Net/Clients/SocketApiClient.cs +++ b/CryptoExchange.Net/Clients/SocketApiClient.cs @@ -142,6 +142,10 @@ namespace CryptoExchange.Net.Clients /// public int? MaxIndividualSubscriptionsPerConnection { get; set; } + /// + /// Whether or not to enforce that sequence number updates are always (lastSequenceNumber + 1) + /// + public bool EnforceSequenceNumbers { get; set; } #endregion /// @@ -706,6 +710,10 @@ namespace CryptoExchange.Net.Clients if (connection != null && !connection.DedicatedRequestConnection.Authenticated) // Mark dedicated request connection as authenticated if the request is authenticated connection.DedicatedRequestConnection.Authenticated = authenticated; + + if (connection == null) + // Fall back to an existing connection if there is no dedicated request connection available + connection = socketQuery.OrderBy(s => s.UserSubscriptionCount).FirstOrDefault(); } bool maxConnectionsReached = _socketConnections.Count >= (ApiOptions.MaxSocketConnections ?? ClientOptions.MaxSocketConnections); diff --git a/CryptoExchange.Net/Converters/SystemTextJson/IntBoolConverter.cs b/CryptoExchange.Net/Converters/SystemTextJson/IntBoolConverter.cs new file mode 100644 index 0000000..82f2b6f --- /dev/null +++ b/CryptoExchange.Net/Converters/SystemTextJson/IntBoolConverter.cs @@ -0,0 +1,36 @@ +using System; +using System.Text.Json; +using System.Text.Json.Serialization; + +namespace CryptoExchange.Net.Converters.SystemTextJson +{ + /// + /// Bool converter + /// + public class IntBoolConverter : JsonConverter + { + private readonly int _trueValue; + + /// + /// ctor + /// + /// The int value representing the true value + public IntBoolConverter(int trueValue) + { + _trueValue = trueValue; + } + + public override bool Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + if (reader.TokenType != JsonTokenType.Number) + return false; + + return reader.GetDecimal() == _trueValue; + } + + public override void Write(Utf8JsonWriter writer, bool value, JsonSerializerOptions options) + { + writer.WriteNumberValue(_trueValue); + } + } +} diff --git a/CryptoExchange.Net/Logging/Extensions/SymbolOrderBookLoggingExtensions.cs b/CryptoExchange.Net/Logging/Extensions/SymbolOrderBookLoggingExtensions.cs index 2997974..379cd62 100644 --- a/CryptoExchange.Net/Logging/Extensions/SymbolOrderBookLoggingExtensions.cs +++ b/CryptoExchange.Net/Logging/Extensions/SymbolOrderBookLoggingExtensions.cs @@ -22,7 +22,7 @@ namespace CryptoExchange.Net.Logging.Extensions private static readonly Action _orderBookResyncing; private static readonly Action _orderBookResynced; private static readonly Action _orderBookMessageSkippedBecauseOfResubscribing; - private static readonly Action _orderBookDataSet; + private static readonly Action _orderBookDataSet; private static readonly Action _orderBookUpdateBuffered; private static readonly Action _orderBookOutOfSyncDetected; private static readonly Action _orderBookReconnectingSocket; @@ -30,6 +30,7 @@ namespace CryptoExchange.Net.Logging.Extensions private static readonly Action _orderBookProcessedMessage; private static readonly Action _orderBookProcessedMessageSingle; private static readonly Action _orderBookOutOfSync; + private static readonly Action _orderBookUpdateSkippedStartEnd; static SymbolOrderBookLoggingExtensions() { @@ -74,7 +75,7 @@ namespace CryptoExchange.Net.Logging.Extensions "{Api} order book {Symbol} Processing {NumberBufferedUpdated} buffered updates"); _orderBookUpdateSkipped = LoggerMessage.Define( - LogLevel.Debug, + LogLevel.Trace, new EventId(5008, "OrderBookUpdateSkipped"), "{Api} order book {Symbol} update skipped #{SequenceNumber}, currently at #{LastSequenceNumber}"); @@ -93,10 +94,10 @@ namespace CryptoExchange.Net.Logging.Extensions new EventId(5011, "OrderBookMessageSkippedResubscribing"), "{Api} order book {Symbol} Skipping message because of resubscribing"); - _orderBookDataSet = LoggerMessage.Define( - LogLevel.Debug, + _orderBookDataSet = LoggerMessage.Define( + LogLevel.Trace, new EventId(5012, "OrderBookDataSet"), - "{Api} order book {Symbol} data set: {BidCount} bids, {AskCount} asks. #{EndUpdateId}"); + "{Api} order book {Symbol} snapshot set: {BidCount} bids, {AskCount} asks. #{EndUpdateId}"); _orderBookUpdateBuffered = LoggerMessage.Define( LogLevel.Trace, @@ -142,6 +143,12 @@ namespace CryptoExchange.Net.Logging.Extensions LogLevel.Trace, new EventId(5021, "OrderBookProcessedMessage"), "{Api} order book {Symbol} update processed #{UpdateId}"); + + _orderBookUpdateSkippedStartEnd = LoggerMessage.Define( + LogLevel.Trace, + new EventId(5022, "OrderBookUpdateSkippedStartEnd"), + "{Api} order book {Symbol} update skipped #{SequenceStart}-#{SequenceEnd}, currently at #{LastSequenceNumber}"); + } public static void OrderBookStatusChanged(this ILogger logger, string api, string symbol, OrderBookStatus previousStatus, OrderBookStatus newStatus) @@ -200,7 +207,7 @@ namespace CryptoExchange.Net.Logging.Extensions { _orderBookMessageSkippedBecauseOfResubscribing(logger, api, symbol, null); } - public static void OrderBookDataSet(this ILogger logger, string api, string symbol, long bidCount, long askCount, long endUpdateId) + public static void OrderBookDataSet(this ILogger logger, string api, string symbol, long bidCount, long askCount, long? endUpdateId) { _orderBookDataSet(logger, api, symbol, bidCount, askCount, endUpdateId, null); } @@ -243,5 +250,10 @@ namespace CryptoExchange.Net.Logging.Extensions { _orderBookOutOfSyncChecksum(logger, api, symbol, null); } + + public static void OrderBookUpdateSkipped(this ILogger logger, string api, string symbol, long sequenceStart, long sequenceEnd, long lastSequenceNumber) + { + _orderBookUpdateSkippedStartEnd(logger, api, symbol, sequenceStart, sequenceEnd, lastSequenceNumber, null); + } } } diff --git a/CryptoExchange.Net/Objects/Sockets/DataEvent.cs b/CryptoExchange.Net/Objects/Sockets/DataEvent.cs index cacf430..88a7d71 100644 --- a/CryptoExchange.Net/Objects/Sockets/DataEvent.cs +++ b/CryptoExchange.Net/Objects/Sockets/DataEvent.cs @@ -53,6 +53,11 @@ namespace CryptoExchange.Net.Objects.Sockets /// public SocketUpdateType? UpdateType { get; set; } + /// + /// Sequence number of the update + /// + public long? SequenceNumber { get; set; } + /// /// ctor /// @@ -126,6 +131,15 @@ namespace CryptoExchange.Net.Objects.Sockets return this; } + /// + /// Specify the sequence number of the update + /// + public DataEvent WithSequenceNumber(long? sequenceNumber) + { + SequenceNumber = sequenceNumber; + return this; + } + /// /// Specify the data timestamp /// diff --git a/CryptoExchange.Net/OrderBook/ProcessQueueItem.cs b/CryptoExchange.Net/OrderBook/OrderBookUpdate.cs similarity index 71% rename from CryptoExchange.Net/OrderBook/ProcessQueueItem.cs rename to CryptoExchange.Net/OrderBook/OrderBookUpdate.cs index 290d492..f8fae79 100644 --- a/CryptoExchange.Net/OrderBook/ProcessQueueItem.cs +++ b/CryptoExchange.Net/OrderBook/OrderBookUpdate.cs @@ -3,28 +3,28 @@ using System; namespace CryptoExchange.Net.OrderBook { - internal class ProcessQueueItem + internal class OrderBookUpdate { public DateTime? LocalDataTime { get; set; } public DateTime? ServerDataTime { get; set; } - public long StartUpdateId { get; set; } - public long EndUpdateId { get; set; } + public long StartSequenceNumber { get; set; } + public long EndSequenceNumber { get; set; } public ISymbolOrderBookEntry[] Bids { get; set; } = Array.Empty(); public ISymbolOrderBookEntry[] Asks { get; set; } = Array.Empty(); } - internal class InitialOrderBookItem + internal class OrderBookSnapshot { public DateTime? LocalDataTime { get; set; } public DateTime? ServerDataTime { get; set; } - public long StartUpdateId { get; set; } - public long EndUpdateId { get; set; } + public long? SequenceNumber { get; set; } public ISymbolOrderBookEntry[] Bids { get; set; } = Array.Empty(); public ISymbolOrderBookEntry[] Asks { get; set; } = Array.Empty(); } - internal class ChecksumItem + internal class OrderBookChecksum { + public long? SequenceNumber { get; set; } public int Checksum { get; set; } } } diff --git a/CryptoExchange.Net/OrderBook/SymbolOrderBook.cs b/CryptoExchange.Net/OrderBook/SymbolOrderBook.cs index a3cf80a..3106381 100644 --- a/CryptoExchange.Net/OrderBook/SymbolOrderBook.cs +++ b/CryptoExchange.Net/OrderBook/SymbolOrderBook.cs @@ -39,6 +39,7 @@ namespace CryptoExchange.Net.OrderBook private readonly AsyncResetEvent _queueEvent; private readonly ConcurrentQueue _processQueue; private bool _validateChecksum; + private bool _firstUpdateAfterSnapshotDone; private class EmptySymbolOrderBookEntry : ISymbolOrderBookEntry { @@ -50,6 +51,13 @@ namespace CryptoExchange.Net.OrderBook private static readonly ISymbolOrderBookEntry _emptySymbolOrderBookEntry = new EmptySymbolOrderBookEntry(); + private enum SequenceNumberResult + { + Skip, + Ok, + OutOfSync + } + /// /// A buffer to store messages received before the initial book snapshot is processed. These messages /// will be processed after the book snapshot is set. Any messages in this buffer with sequence numbers lower @@ -77,7 +85,12 @@ namespace CryptoExchange.Net.OrderBook /// the book will resynchronize as it is deemed out of sync /// protected bool _sequencesAreConsecutive; - + + /// + /// Whether the first update message after a snapshot may have overlapping sequence numbers instead of the snapshot sequence number + 1 + /// + protected bool _skipSequenceCheckFirstUpdateAfterSnapshotSet; + /// /// Whether levels should be strictly enforced. For example, when an order book has 25 levels and a new update comes in which pushes /// the current level 25 ask out of the top 25, should the level 26 entry be removed from the book or does the server handle this @@ -267,6 +280,7 @@ namespace CryptoExchange.Net.OrderBook _processBuffer.Clear(); _bookSet = false; + _firstUpdateAfterSnapshotDone = false; Status = OrderBookStatus.Connecting; _processTask = Task.Factory.StartNew(ProcessQueue, TaskCreationOptions.LongRunning); @@ -419,28 +433,27 @@ namespace CryptoExchange.Net.OrderBook protected virtual bool DoChecksum(int checksum) => true; /// - /// Set the initial data for the order book. Typically the snapshot which was requested from the Rest API, or the first snapshot - /// received from a socket subscription + /// Set snapshot data for the order book. Typically the snapshot which was requested from the Rest API, or the first snapshot + /// received from a socket subscription. Will clear any previous data. /// /// The last update sequence number until which the snapshot is in sync /// List of asks /// List of bids /// Server data timestamp /// local data timestamp - protected void SetInitialOrderBook( - long orderBookSequenceNumber, + protected void SetSnapshot( + long? orderBookSequenceNumber, ISymbolOrderBookEntry[] bidList, ISymbolOrderBookEntry[] askList, DateTime? serverDataTime = null, DateTime? localDataTime = null) { _processQueue.Enqueue( - new InitialOrderBookItem + new OrderBookSnapshot { LocalDataTime = localDataTime, ServerDataTime = serverDataTime, - StartUpdateId = orderBookSequenceNumber, - EndUpdateId = orderBookSequenceNumber, + SequenceNumber = orderBookSequenceNumber, Asks = askList, Bids = bidList }); @@ -450,25 +463,25 @@ namespace CryptoExchange.Net.OrderBook /// /// Add an update to the process queue. Updates the book by providing changed bids and asks, along with an update number which should be higher than the previous update numbers /// - /// The sequence number + /// The sequence number /// List of updated/new bids /// List of updated/new asks /// Server data timestamp /// local data timestamp protected void UpdateOrderBook( - long updateId, + long sequenceNumber, ISymbolOrderBookEntry[] bids, ISymbolOrderBookEntry[] asks, DateTime? serverDataTime = null, DateTime? localDataTime = null) { _processQueue.Enqueue( - new ProcessQueueItem + new OrderBookUpdate { LocalDataTime = localDataTime, ServerDataTime = serverDataTime, - StartUpdateId = updateId, - EndUpdateId = updateId, + StartSequenceNumber = sequenceNumber, + EndSequenceNumber = sequenceNumber, Asks = asks, Bids = bids }); @@ -478,27 +491,27 @@ namespace CryptoExchange.Net.OrderBook /// /// Add an update to the process queue. Updates the book by providing changed bids and asks, along with the first and last sequence number in the update /// - /// The sequence number of the first update - /// The sequence number of the last update + /// The sequence number of the first update + /// The sequence number of the last update /// List of updated/new bids /// List of updated/new asks /// Server data timestamp /// local data timestamp protected void UpdateOrderBook( - long firstUpdateId, - long lastUpdateId, + long firstSequenceNumber, + long lastSequenceNumber, ISymbolOrderBookEntry[] bids, ISymbolOrderBookEntry[] asks, DateTime? serverDataTime = null, DateTime? localDataTime = null) { _processQueue.Enqueue( - new ProcessQueueItem + new OrderBookUpdate { LocalDataTime = localDataTime, ServerDataTime = serverDataTime, - StartUpdateId = firstUpdateId, - EndUpdateId = lastUpdateId, + StartSequenceNumber = firstSequenceNumber, + EndSequenceNumber = lastSequenceNumber, Asks = asks, Bids = bids }); @@ -522,12 +535,12 @@ namespace CryptoExchange.Net.OrderBook var lowest = Math.Min(bids.Any() ? bids.Min(b => b.Sequence) : long.MaxValue, asks.Any() ? asks.Min(a => a.Sequence) : long.MaxValue); _processQueue.Enqueue( - new ProcessQueueItem + new OrderBookUpdate { LocalDataTime = localDataTime, ServerDataTime = serverDataTime, - StartUpdateId = lowest, - EndUpdateId = highest, + StartSequenceNumber = lowest, + EndSequenceNumber = highest, Asks = asks, Bids = bids }); @@ -538,9 +551,10 @@ namespace CryptoExchange.Net.OrderBook /// Add a checksum value to the process queue /// /// The checksum value - protected void AddChecksum(int checksum) + /// The sequence number of the message if it's a separate message with separate number + protected void AddChecksum(int checksum, long? sequenceNumber = null) { - _processQueue.Enqueue(new ChecksumItem() { Checksum = checksum }); + _processQueue.Enqueue(new OrderBookChecksum() { Checksum = checksum, SequenceNumber = sequenceNumber }); _queueEvent.Set(); } @@ -553,7 +567,12 @@ namespace CryptoExchange.Net.OrderBook _logger.OrderBookProcessingBufferedUpdates(Api, Symbol, _processBuffer.Count); foreach (var bufferEntry in _processBuffer) - ProcessRangeUpdates(bufferEntry.FirstUpdateId, bufferEntry.LastUpdateId, bufferEntry.Bids, bufferEntry.Asks); + { + if (_stopProcessing) + break; + + ProcessUpdate(bufferEntry.FirstUpdateId, bufferEntry.LastUpdateId, bufferEntry.Bids, bufferEntry.Asks, true); + } _processBuffer.Clear(); } @@ -561,26 +580,10 @@ namespace CryptoExchange.Net.OrderBook /// /// Update order book with an entry /// - /// Sequence number of the update /// Type of entry /// The entry - protected virtual bool ProcessUpdate(long sequence, OrderBookEntryType type, ISymbolOrderBookEntry entry) + protected virtual bool UpdateValue(OrderBookEntryType type, ISymbolOrderBookEntry entry) { - if (sequence <= LastSequenceNumber) - { - _logger.OrderBookSkippedMessage(Api, Symbol, sequence, LastSequenceNumber); - return false; - } - - if (_sequencesAreConsecutive && sequence > LastSequenceNumber + 1) - { - // Out of sync - _logger.OrderBookOutOfSync(Api, Symbol, LastSequenceNumber + 1, sequence); - _stopProcessing = true; - Resubscribe(); - return false; - } - UpdateTime = DateTime.UtcNow; var listToChange = type == OrderBookEntryType.Ask ? _asks : _bids; if (entry.Quantity == 0) @@ -735,8 +738,11 @@ namespace CryptoExchange.Net.OrderBook // Clear queue while (_processQueue.TryDequeue(out _)) { } + LastSequenceNumber = 0; _processBuffer.Clear(); _bookSet = false; + _firstUpdateAfterSnapshotDone = false; + DoReset(); } @@ -774,17 +780,17 @@ namespace CryptoExchange.Net.OrderBook continue; } - if (item is InitialOrderBookItem iobi) - ProcessInitialOrderBookItem(iobi); - if (item is ProcessQueueItem pqi) - ProcessQueueItem(pqi); - else if (item is ChecksumItem ci) - ProcessChecksum(ci); + if (item is OrderBookSnapshot snapshot) + ProcessOrderBookSnapshot(snapshot); + if (item is OrderBookUpdate update) + ProcessQueueItem(update); + else if (item is OrderBookChecksum checksum) + ProcessChecksum(checksum); } } } - private void ProcessInitialOrderBookItem(InitialOrderBookItem item) + private void ProcessOrderBookSnapshot(OrderBookSnapshot item) { lock (_bookLock) { @@ -796,7 +802,8 @@ namespace CryptoExchange.Net.OrderBook foreach (var bid in item.Bids) _bids.Add(bid.Price, bid); - LastSequenceNumber = item.EndUpdateId; + if (item.SequenceNumber != null) + LastSequenceNumber = item.SequenceNumber.Value; AskCount = _asks.Count; BidCount = _bids.Count; @@ -805,14 +812,15 @@ namespace CryptoExchange.Net.OrderBook UpdateServerTime = item.ServerDataTime; UpdateLocalTime = item.LocalDataTime; - _logger.OrderBookDataSet(Api, Symbol, BidCount, AskCount, item.EndUpdateId); + _logger.OrderBookDataSet(Api, Symbol, BidCount, AskCount, item.SequenceNumber); CheckProcessBuffer(); + OnOrderBookUpdate?.Invoke((item.Bids.ToArray(), item.Asks.ToArray())); OnBestOffersChanged?.Invoke((BestBid, BestAsk)); } } - private void ProcessQueueItem(ProcessQueueItem item) + private void ProcessQueueItem(OrderBookUpdate item) { lock (_bookLock) { @@ -822,19 +830,19 @@ namespace CryptoExchange.Net.OrderBook { Asks = item.Asks, Bids = item.Bids, - FirstUpdateId = item.StartUpdateId, - LastUpdateId = item.EndUpdateId, + FirstUpdateId = item.StartSequenceNumber, + LastUpdateId = item.EndSequenceNumber, }); if (_logger.IsEnabled(LogLevel.Trace)) - _logger.OrderBookUpdateBuffered(Api, Symbol, item.StartUpdateId, item.EndUpdateId, item.Asks.Length, item.Bids.Length); + _logger.OrderBookUpdateBuffered(Api, Symbol, item.StartSequenceNumber, item.EndSequenceNumber, item.Asks.Length, item.Bids.Length); } else { CheckProcessBuffer(); var (prevBestBid, prevBestAsk) = BestOffers; - ProcessRangeUpdates(item.StartUpdateId, item.EndUpdateId, item.Bids, item.Asks); + ProcessUpdate(item.StartSequenceNumber, item.EndSequenceNumber, item.Bids, item.Asks, false); if (_asks.Count == 0 || _bids.Count == 0) return; @@ -856,7 +864,7 @@ namespace CryptoExchange.Net.OrderBook } } - private void ProcessChecksum(ChecksumItem ci) + private void ProcessChecksum(OrderBookChecksum ci) { lock (_bookLock) { @@ -876,6 +884,9 @@ namespace CryptoExchange.Net.OrderBook throw; } + if (ci.SequenceNumber != null) + LastSequenceNumber = ci.SequenceNumber.Value; + if (!checksumResult) { _logger.OrderBookOutOfSyncChecksum(Api, Symbol); @@ -913,23 +924,37 @@ namespace CryptoExchange.Net.OrderBook }); } - private void ProcessRangeUpdates( - long firstUpdateId, - long lastUpdateId, + private void ProcessUpdate( + long updateSequenceNumberStart, + long updateSequenceNumberEnd, IEnumerable bids, - IEnumerable asks) + IEnumerable asks, + bool fromBuffer) { - if (lastUpdateId <= LastSequenceNumber) + var sequenceResult = fromBuffer ? ValidateBufferSequenceNumber(updateSequenceNumberStart, updateSequenceNumberEnd) : ValidateLiveSequenceNumber(updateSequenceNumberStart); + if (sequenceResult == SequenceNumberResult.Skip) { - _logger.OrderBookUpdateSkipped(Api, Symbol, lastUpdateId, LastSequenceNumber); + if (updateSequenceNumberStart != updateSequenceNumberEnd) + _logger.OrderBookUpdateSkipped(Api, Symbol, updateSequenceNumberStart, updateSequenceNumberEnd, LastSequenceNumber); + else + _logger.OrderBookUpdateSkipped(Api, Symbol, updateSequenceNumberStart, LastSequenceNumber); + + return; + } + + if (sequenceResult == SequenceNumberResult.OutOfSync) + { + _logger.OrderBookOutOfSync(Api, Symbol, LastSequenceNumber + 1, updateSequenceNumberStart); + _stopProcessing = true; + Resubscribe(); return; } foreach (var entry in bids) - ProcessUpdate(LastSequenceNumber + 1, OrderBookEntryType.Bid, entry); + UpdateValue(OrderBookEntryType.Bid, entry); foreach (var entry in asks) - ProcessUpdate(LastSequenceNumber + 1, OrderBookEntryType.Ask, entry); + UpdateValue(OrderBookEntryType.Ask, entry); if (Levels.HasValue && _strictLevels) { @@ -946,16 +971,51 @@ namespace CryptoExchange.Net.OrderBook } } - LastSequenceNumber = lastUpdateId; + _firstUpdateAfterSnapshotDone = true; + LastSequenceNumber = updateSequenceNumberEnd; if (_logger.IsEnabled(LogLevel.Trace)) { - if (firstUpdateId != lastUpdateId) - _logger.OrderBookProcessedMessage(Api, Symbol, firstUpdateId, lastUpdateId); + if (updateSequenceNumberStart != updateSequenceNumberEnd) + _logger.OrderBookProcessedMessage(Api, Symbol, updateSequenceNumberStart, updateSequenceNumberEnd); else - _logger.OrderBookProcessedMessage(Api, Symbol, firstUpdateId); + _logger.OrderBookProcessedMessage(Api, Symbol, updateSequenceNumberStart); } - } + } + + private SequenceNumberResult ValidateBufferSequenceNumber(long startSequenceNumber, long endSequenceNumber) + { + if (endSequenceNumber <= LastSequenceNumber) + // Buffered update is from before the snapshot, ignore + return SequenceNumberResult.Skip; + + if (_sequencesAreConsecutive && startSequenceNumber != LastSequenceNumber + 1) + { + if (_firstUpdateAfterSnapshotDone || !_skipSequenceCheckFirstUpdateAfterSnapshotSet) + // Buffered update is not the next sequence number when it was expected to be + return SequenceNumberResult.OutOfSync; + } + + // Buffered sequence number is larger than the last sequence number + return SequenceNumberResult.Ok; + } + + private SequenceNumberResult ValidateLiveSequenceNumber(long sequenceNumber) + { + if (sequenceNumber < LastSequenceNumber) + // Update is somehow from before the current state + return SequenceNumberResult.OutOfSync; + + if (_sequencesAreConsecutive + && LastSequenceNumber != 0 + && sequenceNumber != LastSequenceNumber + 1) + { + if (_firstUpdateAfterSnapshotDone || !_skipSequenceCheckFirstUpdateAfterSnapshotSet) + return SequenceNumberResult.OutOfSync; + } + + return SequenceNumberResult.Ok; + } } internal class DescComparer : IComparer diff --git a/CryptoExchange.Net/Sockets/Default/SocketConnection.cs b/CryptoExchange.Net/Sockets/Default/SocketConnection.cs index 41855ea..7ed852f 100644 --- a/CryptoExchange.Net/Sockets/Default/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/Default/SocketConnection.cs @@ -257,6 +257,7 @@ namespace CryptoExchange.Net.Sockets.Default } } + private bool _pausedActivity; #if NET9_0_OR_GREATER private readonly Lock _listenersLock = new Lock(); @@ -274,6 +275,8 @@ namespace CryptoExchange.Net.Sockets.Default private ISocketMessageHandler? _byteMessageConverter; private ISocketMessageHandler? _textMessageConverter; + private long _lastSequenceNumber; + /// /// The task that is sending periodic data on the websocket. Can be used for sending Ping messages every x seconds or similar. Not necessary. /// @@ -293,7 +296,7 @@ namespace CryptoExchange.Net.Sockets.Default /// Cache for deserialization, only caches for a single message /// private readonly Dictionary _deserializationCache = new Dictionary(); - + /// /// New socket connection /// @@ -340,6 +343,7 @@ namespace CryptoExchange.Net.Sockets.Default { Status = SocketStatus.Closed; Authenticated = false; + _lastSequenceNumber = 0; if (ApiClient._socketConnections.ContainsKey(SocketId)) ApiClient._socketConnections.TryRemove(SocketId, out _); @@ -371,6 +375,7 @@ namespace CryptoExchange.Net.Sockets.Default Status = SocketStatus.Reconnecting; DisconnectTime = DateTime.UtcNow; Authenticated = false; + _lastSequenceNumber = 0; lock (_listenersLock) { @@ -1280,6 +1285,23 @@ namespace CryptoExchange.Net.Sockets.Default return result; } + /// + /// Update the sequence number for this connection + /// + public void UpdateSequenceNumber(long sequenceNumber) + { + if (ApiClient.EnforceSequenceNumbers + && _lastSequenceNumber != 0 + && _lastSequenceNumber + 1 != sequenceNumber) + { + // Not sequential + _logger.LogWarning("[Sckt {SocketId}] update not in sequence. Last recorded sequence number: {LastSequence}, update sequence number: {UpdateSequence}. Reconnecting", SocketId, _lastSequenceNumber, sequenceNumber); + _ = TriggerReconnectAsync(); + } + + _lastSequenceNumber = sequenceNumber; + } + /// /// Periodically sends data over a socket connection ///