diff --git a/CryptoExchange.Net/Clients/SocketApiClient.cs b/CryptoExchange.Net/Clients/SocketApiClient.cs index 6fd7ea1..b2ef32e 100644 --- a/CryptoExchange.Net/Clients/SocketApiClient.cs +++ b/CryptoExchange.Net/Clients/SocketApiClient.cs @@ -142,6 +142,10 @@ namespace CryptoExchange.Net.Clients /// public int? MaxIndividualSubscriptionsPerConnection { get; set; } + /// + /// Whether or not to enforce that sequence number updates are always (lastSequenceNumber + 1) + /// + public bool EnforceSequenceNumbers { get; set; } #endregion /// @@ -706,6 +710,10 @@ namespace CryptoExchange.Net.Clients if (connection != null && !connection.DedicatedRequestConnection.Authenticated) // Mark dedicated request connection as authenticated if the request is authenticated connection.DedicatedRequestConnection.Authenticated = authenticated; + + if (connection == null) + // Fall back to an existing connection if there is no dedicated request connection available + connection = socketQuery.OrderBy(s => s.UserSubscriptionCount).FirstOrDefault(); } bool maxConnectionsReached = _socketConnections.Count >= (ApiOptions.MaxSocketConnections ?? ClientOptions.MaxSocketConnections); diff --git a/CryptoExchange.Net/Converters/SystemTextJson/IntBoolConverter.cs b/CryptoExchange.Net/Converters/SystemTextJson/IntBoolConverter.cs new file mode 100644 index 0000000..82f2b6f --- /dev/null +++ b/CryptoExchange.Net/Converters/SystemTextJson/IntBoolConverter.cs @@ -0,0 +1,36 @@ +using System; +using System.Text.Json; +using System.Text.Json.Serialization; + +namespace CryptoExchange.Net.Converters.SystemTextJson +{ + /// + /// Bool converter + /// + public class IntBoolConverter : JsonConverter + { + private readonly int _trueValue; + + /// + /// ctor + /// + /// The int value representing the true value + public IntBoolConverter(int trueValue) + { + _trueValue = trueValue; + } + + public override bool Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + if (reader.TokenType != JsonTokenType.Number) + return false; + + return reader.GetDecimal() == _trueValue; + } + + public override void Write(Utf8JsonWriter writer, bool value, JsonSerializerOptions options) + { + writer.WriteNumberValue(_trueValue); + } + } +} diff --git a/CryptoExchange.Net/Logging/Extensions/SymbolOrderBookLoggingExtensions.cs b/CryptoExchange.Net/Logging/Extensions/SymbolOrderBookLoggingExtensions.cs index 2997974..379cd62 100644 --- a/CryptoExchange.Net/Logging/Extensions/SymbolOrderBookLoggingExtensions.cs +++ b/CryptoExchange.Net/Logging/Extensions/SymbolOrderBookLoggingExtensions.cs @@ -22,7 +22,7 @@ namespace CryptoExchange.Net.Logging.Extensions private static readonly Action _orderBookResyncing; private static readonly Action _orderBookResynced; private static readonly Action _orderBookMessageSkippedBecauseOfResubscribing; - private static readonly Action _orderBookDataSet; + private static readonly Action _orderBookDataSet; private static readonly Action _orderBookUpdateBuffered; private static readonly Action _orderBookOutOfSyncDetected; private static readonly Action _orderBookReconnectingSocket; @@ -30,6 +30,7 @@ namespace CryptoExchange.Net.Logging.Extensions private static readonly Action _orderBookProcessedMessage; private static readonly Action _orderBookProcessedMessageSingle; private static readonly Action _orderBookOutOfSync; + private static readonly Action _orderBookUpdateSkippedStartEnd; static SymbolOrderBookLoggingExtensions() { @@ -74,7 +75,7 @@ namespace CryptoExchange.Net.Logging.Extensions "{Api} order book {Symbol} Processing {NumberBufferedUpdated} buffered updates"); _orderBookUpdateSkipped = LoggerMessage.Define( - LogLevel.Debug, + LogLevel.Trace, new EventId(5008, "OrderBookUpdateSkipped"), "{Api} order book {Symbol} update skipped #{SequenceNumber}, currently at #{LastSequenceNumber}"); @@ -93,10 +94,10 @@ namespace CryptoExchange.Net.Logging.Extensions new EventId(5011, "OrderBookMessageSkippedResubscribing"), "{Api} order book {Symbol} Skipping message because of resubscribing"); - _orderBookDataSet = LoggerMessage.Define( - LogLevel.Debug, + _orderBookDataSet = LoggerMessage.Define( + LogLevel.Trace, new EventId(5012, "OrderBookDataSet"), - "{Api} order book {Symbol} data set: {BidCount} bids, {AskCount} asks. #{EndUpdateId}"); + "{Api} order book {Symbol} snapshot set: {BidCount} bids, {AskCount} asks. #{EndUpdateId}"); _orderBookUpdateBuffered = LoggerMessage.Define( LogLevel.Trace, @@ -142,6 +143,12 @@ namespace CryptoExchange.Net.Logging.Extensions LogLevel.Trace, new EventId(5021, "OrderBookProcessedMessage"), "{Api} order book {Symbol} update processed #{UpdateId}"); + + _orderBookUpdateSkippedStartEnd = LoggerMessage.Define( + LogLevel.Trace, + new EventId(5022, "OrderBookUpdateSkippedStartEnd"), + "{Api} order book {Symbol} update skipped #{SequenceStart}-#{SequenceEnd}, currently at #{LastSequenceNumber}"); + } public static void OrderBookStatusChanged(this ILogger logger, string api, string symbol, OrderBookStatus previousStatus, OrderBookStatus newStatus) @@ -200,7 +207,7 @@ namespace CryptoExchange.Net.Logging.Extensions { _orderBookMessageSkippedBecauseOfResubscribing(logger, api, symbol, null); } - public static void OrderBookDataSet(this ILogger logger, string api, string symbol, long bidCount, long askCount, long endUpdateId) + public static void OrderBookDataSet(this ILogger logger, string api, string symbol, long bidCount, long askCount, long? endUpdateId) { _orderBookDataSet(logger, api, symbol, bidCount, askCount, endUpdateId, null); } @@ -243,5 +250,10 @@ namespace CryptoExchange.Net.Logging.Extensions { _orderBookOutOfSyncChecksum(logger, api, symbol, null); } + + public static void OrderBookUpdateSkipped(this ILogger logger, string api, string symbol, long sequenceStart, long sequenceEnd, long lastSequenceNumber) + { + _orderBookUpdateSkippedStartEnd(logger, api, symbol, sequenceStart, sequenceEnd, lastSequenceNumber, null); + } } } diff --git a/CryptoExchange.Net/Objects/Sockets/DataEvent.cs b/CryptoExchange.Net/Objects/Sockets/DataEvent.cs index cacf430..88a7d71 100644 --- a/CryptoExchange.Net/Objects/Sockets/DataEvent.cs +++ b/CryptoExchange.Net/Objects/Sockets/DataEvent.cs @@ -53,6 +53,11 @@ namespace CryptoExchange.Net.Objects.Sockets /// public SocketUpdateType? UpdateType { get; set; } + /// + /// Sequence number of the update + /// + public long? SequenceNumber { get; set; } + /// /// ctor /// @@ -126,6 +131,15 @@ namespace CryptoExchange.Net.Objects.Sockets return this; } + /// + /// Specify the sequence number of the update + /// + public DataEvent WithSequenceNumber(long? sequenceNumber) + { + SequenceNumber = sequenceNumber; + return this; + } + /// /// Specify the data timestamp /// diff --git a/CryptoExchange.Net/OrderBook/ProcessQueueItem.cs b/CryptoExchange.Net/OrderBook/OrderBookUpdate.cs similarity index 71% rename from CryptoExchange.Net/OrderBook/ProcessQueueItem.cs rename to CryptoExchange.Net/OrderBook/OrderBookUpdate.cs index 290d492..f8fae79 100644 --- a/CryptoExchange.Net/OrderBook/ProcessQueueItem.cs +++ b/CryptoExchange.Net/OrderBook/OrderBookUpdate.cs @@ -3,28 +3,28 @@ using System; namespace CryptoExchange.Net.OrderBook { - internal class ProcessQueueItem + internal class OrderBookUpdate { public DateTime? LocalDataTime { get; set; } public DateTime? ServerDataTime { get; set; } - public long StartUpdateId { get; set; } - public long EndUpdateId { get; set; } + public long StartSequenceNumber { get; set; } + public long EndSequenceNumber { get; set; } public ISymbolOrderBookEntry[] Bids { get; set; } = Array.Empty(); public ISymbolOrderBookEntry[] Asks { get; set; } = Array.Empty(); } - internal class InitialOrderBookItem + internal class OrderBookSnapshot { public DateTime? LocalDataTime { get; set; } public DateTime? ServerDataTime { get; set; } - public long StartUpdateId { get; set; } - public long EndUpdateId { get; set; } + public long? SequenceNumber { get; set; } public ISymbolOrderBookEntry[] Bids { get; set; } = Array.Empty(); public ISymbolOrderBookEntry[] Asks { get; set; } = Array.Empty(); } - internal class ChecksumItem + internal class OrderBookChecksum { + public long? SequenceNumber { get; set; } public int Checksum { get; set; } } } diff --git a/CryptoExchange.Net/OrderBook/SymbolOrderBook.cs b/CryptoExchange.Net/OrderBook/SymbolOrderBook.cs index a3cf80a..3106381 100644 --- a/CryptoExchange.Net/OrderBook/SymbolOrderBook.cs +++ b/CryptoExchange.Net/OrderBook/SymbolOrderBook.cs @@ -39,6 +39,7 @@ namespace CryptoExchange.Net.OrderBook private readonly AsyncResetEvent _queueEvent; private readonly ConcurrentQueue _processQueue; private bool _validateChecksum; + private bool _firstUpdateAfterSnapshotDone; private class EmptySymbolOrderBookEntry : ISymbolOrderBookEntry { @@ -50,6 +51,13 @@ namespace CryptoExchange.Net.OrderBook private static readonly ISymbolOrderBookEntry _emptySymbolOrderBookEntry = new EmptySymbolOrderBookEntry(); + private enum SequenceNumberResult + { + Skip, + Ok, + OutOfSync + } + /// /// A buffer to store messages received before the initial book snapshot is processed. These messages /// will be processed after the book snapshot is set. Any messages in this buffer with sequence numbers lower @@ -77,7 +85,12 @@ namespace CryptoExchange.Net.OrderBook /// the book will resynchronize as it is deemed out of sync /// protected bool _sequencesAreConsecutive; - + + /// + /// Whether the first update message after a snapshot may have overlapping sequence numbers instead of the snapshot sequence number + 1 + /// + protected bool _skipSequenceCheckFirstUpdateAfterSnapshotSet; + /// /// Whether levels should be strictly enforced. For example, when an order book has 25 levels and a new update comes in which pushes /// the current level 25 ask out of the top 25, should the level 26 entry be removed from the book or does the server handle this @@ -267,6 +280,7 @@ namespace CryptoExchange.Net.OrderBook _processBuffer.Clear(); _bookSet = false; + _firstUpdateAfterSnapshotDone = false; Status = OrderBookStatus.Connecting; _processTask = Task.Factory.StartNew(ProcessQueue, TaskCreationOptions.LongRunning); @@ -419,28 +433,27 @@ namespace CryptoExchange.Net.OrderBook protected virtual bool DoChecksum(int checksum) => true; /// - /// Set the initial data for the order book. Typically the snapshot which was requested from the Rest API, or the first snapshot - /// received from a socket subscription + /// Set snapshot data for the order book. Typically the snapshot which was requested from the Rest API, or the first snapshot + /// received from a socket subscription. Will clear any previous data. /// /// The last update sequence number until which the snapshot is in sync /// List of asks /// List of bids /// Server data timestamp /// local data timestamp - protected void SetInitialOrderBook( - long orderBookSequenceNumber, + protected void SetSnapshot( + long? orderBookSequenceNumber, ISymbolOrderBookEntry[] bidList, ISymbolOrderBookEntry[] askList, DateTime? serverDataTime = null, DateTime? localDataTime = null) { _processQueue.Enqueue( - new InitialOrderBookItem + new OrderBookSnapshot { LocalDataTime = localDataTime, ServerDataTime = serverDataTime, - StartUpdateId = orderBookSequenceNumber, - EndUpdateId = orderBookSequenceNumber, + SequenceNumber = orderBookSequenceNumber, Asks = askList, Bids = bidList }); @@ -450,25 +463,25 @@ namespace CryptoExchange.Net.OrderBook /// /// Add an update to the process queue. Updates the book by providing changed bids and asks, along with an update number which should be higher than the previous update numbers /// - /// The sequence number + /// The sequence number /// List of updated/new bids /// List of updated/new asks /// Server data timestamp /// local data timestamp protected void UpdateOrderBook( - long updateId, + long sequenceNumber, ISymbolOrderBookEntry[] bids, ISymbolOrderBookEntry[] asks, DateTime? serverDataTime = null, DateTime? localDataTime = null) { _processQueue.Enqueue( - new ProcessQueueItem + new OrderBookUpdate { LocalDataTime = localDataTime, ServerDataTime = serverDataTime, - StartUpdateId = updateId, - EndUpdateId = updateId, + StartSequenceNumber = sequenceNumber, + EndSequenceNumber = sequenceNumber, Asks = asks, Bids = bids }); @@ -478,27 +491,27 @@ namespace CryptoExchange.Net.OrderBook /// /// Add an update to the process queue. Updates the book by providing changed bids and asks, along with the first and last sequence number in the update /// - /// The sequence number of the first update - /// The sequence number of the last update + /// The sequence number of the first update + /// The sequence number of the last update /// List of updated/new bids /// List of updated/new asks /// Server data timestamp /// local data timestamp protected void UpdateOrderBook( - long firstUpdateId, - long lastUpdateId, + long firstSequenceNumber, + long lastSequenceNumber, ISymbolOrderBookEntry[] bids, ISymbolOrderBookEntry[] asks, DateTime? serverDataTime = null, DateTime? localDataTime = null) { _processQueue.Enqueue( - new ProcessQueueItem + new OrderBookUpdate { LocalDataTime = localDataTime, ServerDataTime = serverDataTime, - StartUpdateId = firstUpdateId, - EndUpdateId = lastUpdateId, + StartSequenceNumber = firstSequenceNumber, + EndSequenceNumber = lastSequenceNumber, Asks = asks, Bids = bids }); @@ -522,12 +535,12 @@ namespace CryptoExchange.Net.OrderBook var lowest = Math.Min(bids.Any() ? bids.Min(b => b.Sequence) : long.MaxValue, asks.Any() ? asks.Min(a => a.Sequence) : long.MaxValue); _processQueue.Enqueue( - new ProcessQueueItem + new OrderBookUpdate { LocalDataTime = localDataTime, ServerDataTime = serverDataTime, - StartUpdateId = lowest, - EndUpdateId = highest, + StartSequenceNumber = lowest, + EndSequenceNumber = highest, Asks = asks, Bids = bids }); @@ -538,9 +551,10 @@ namespace CryptoExchange.Net.OrderBook /// Add a checksum value to the process queue /// /// The checksum value - protected void AddChecksum(int checksum) + /// The sequence number of the message if it's a separate message with separate number + protected void AddChecksum(int checksum, long? sequenceNumber = null) { - _processQueue.Enqueue(new ChecksumItem() { Checksum = checksum }); + _processQueue.Enqueue(new OrderBookChecksum() { Checksum = checksum, SequenceNumber = sequenceNumber }); _queueEvent.Set(); } @@ -553,7 +567,12 @@ namespace CryptoExchange.Net.OrderBook _logger.OrderBookProcessingBufferedUpdates(Api, Symbol, _processBuffer.Count); foreach (var bufferEntry in _processBuffer) - ProcessRangeUpdates(bufferEntry.FirstUpdateId, bufferEntry.LastUpdateId, bufferEntry.Bids, bufferEntry.Asks); + { + if (_stopProcessing) + break; + + ProcessUpdate(bufferEntry.FirstUpdateId, bufferEntry.LastUpdateId, bufferEntry.Bids, bufferEntry.Asks, true); + } _processBuffer.Clear(); } @@ -561,26 +580,10 @@ namespace CryptoExchange.Net.OrderBook /// /// Update order book with an entry /// - /// Sequence number of the update /// Type of entry /// The entry - protected virtual bool ProcessUpdate(long sequence, OrderBookEntryType type, ISymbolOrderBookEntry entry) + protected virtual bool UpdateValue(OrderBookEntryType type, ISymbolOrderBookEntry entry) { - if (sequence <= LastSequenceNumber) - { - _logger.OrderBookSkippedMessage(Api, Symbol, sequence, LastSequenceNumber); - return false; - } - - if (_sequencesAreConsecutive && sequence > LastSequenceNumber + 1) - { - // Out of sync - _logger.OrderBookOutOfSync(Api, Symbol, LastSequenceNumber + 1, sequence); - _stopProcessing = true; - Resubscribe(); - return false; - } - UpdateTime = DateTime.UtcNow; var listToChange = type == OrderBookEntryType.Ask ? _asks : _bids; if (entry.Quantity == 0) @@ -735,8 +738,11 @@ namespace CryptoExchange.Net.OrderBook // Clear queue while (_processQueue.TryDequeue(out _)) { } + LastSequenceNumber = 0; _processBuffer.Clear(); _bookSet = false; + _firstUpdateAfterSnapshotDone = false; + DoReset(); } @@ -774,17 +780,17 @@ namespace CryptoExchange.Net.OrderBook continue; } - if (item is InitialOrderBookItem iobi) - ProcessInitialOrderBookItem(iobi); - if (item is ProcessQueueItem pqi) - ProcessQueueItem(pqi); - else if (item is ChecksumItem ci) - ProcessChecksum(ci); + if (item is OrderBookSnapshot snapshot) + ProcessOrderBookSnapshot(snapshot); + if (item is OrderBookUpdate update) + ProcessQueueItem(update); + else if (item is OrderBookChecksum checksum) + ProcessChecksum(checksum); } } } - private void ProcessInitialOrderBookItem(InitialOrderBookItem item) + private void ProcessOrderBookSnapshot(OrderBookSnapshot item) { lock (_bookLock) { @@ -796,7 +802,8 @@ namespace CryptoExchange.Net.OrderBook foreach (var bid in item.Bids) _bids.Add(bid.Price, bid); - LastSequenceNumber = item.EndUpdateId; + if (item.SequenceNumber != null) + LastSequenceNumber = item.SequenceNumber.Value; AskCount = _asks.Count; BidCount = _bids.Count; @@ -805,14 +812,15 @@ namespace CryptoExchange.Net.OrderBook UpdateServerTime = item.ServerDataTime; UpdateLocalTime = item.LocalDataTime; - _logger.OrderBookDataSet(Api, Symbol, BidCount, AskCount, item.EndUpdateId); + _logger.OrderBookDataSet(Api, Symbol, BidCount, AskCount, item.SequenceNumber); CheckProcessBuffer(); + OnOrderBookUpdate?.Invoke((item.Bids.ToArray(), item.Asks.ToArray())); OnBestOffersChanged?.Invoke((BestBid, BestAsk)); } } - private void ProcessQueueItem(ProcessQueueItem item) + private void ProcessQueueItem(OrderBookUpdate item) { lock (_bookLock) { @@ -822,19 +830,19 @@ namespace CryptoExchange.Net.OrderBook { Asks = item.Asks, Bids = item.Bids, - FirstUpdateId = item.StartUpdateId, - LastUpdateId = item.EndUpdateId, + FirstUpdateId = item.StartSequenceNumber, + LastUpdateId = item.EndSequenceNumber, }); if (_logger.IsEnabled(LogLevel.Trace)) - _logger.OrderBookUpdateBuffered(Api, Symbol, item.StartUpdateId, item.EndUpdateId, item.Asks.Length, item.Bids.Length); + _logger.OrderBookUpdateBuffered(Api, Symbol, item.StartSequenceNumber, item.EndSequenceNumber, item.Asks.Length, item.Bids.Length); } else { CheckProcessBuffer(); var (prevBestBid, prevBestAsk) = BestOffers; - ProcessRangeUpdates(item.StartUpdateId, item.EndUpdateId, item.Bids, item.Asks); + ProcessUpdate(item.StartSequenceNumber, item.EndSequenceNumber, item.Bids, item.Asks, false); if (_asks.Count == 0 || _bids.Count == 0) return; @@ -856,7 +864,7 @@ namespace CryptoExchange.Net.OrderBook } } - private void ProcessChecksum(ChecksumItem ci) + private void ProcessChecksum(OrderBookChecksum ci) { lock (_bookLock) { @@ -876,6 +884,9 @@ namespace CryptoExchange.Net.OrderBook throw; } + if (ci.SequenceNumber != null) + LastSequenceNumber = ci.SequenceNumber.Value; + if (!checksumResult) { _logger.OrderBookOutOfSyncChecksum(Api, Symbol); @@ -913,23 +924,37 @@ namespace CryptoExchange.Net.OrderBook }); } - private void ProcessRangeUpdates( - long firstUpdateId, - long lastUpdateId, + private void ProcessUpdate( + long updateSequenceNumberStart, + long updateSequenceNumberEnd, IEnumerable bids, - IEnumerable asks) + IEnumerable asks, + bool fromBuffer) { - if (lastUpdateId <= LastSequenceNumber) + var sequenceResult = fromBuffer ? ValidateBufferSequenceNumber(updateSequenceNumberStart, updateSequenceNumberEnd) : ValidateLiveSequenceNumber(updateSequenceNumberStart); + if (sequenceResult == SequenceNumberResult.Skip) { - _logger.OrderBookUpdateSkipped(Api, Symbol, lastUpdateId, LastSequenceNumber); + if (updateSequenceNumberStart != updateSequenceNumberEnd) + _logger.OrderBookUpdateSkipped(Api, Symbol, updateSequenceNumberStart, updateSequenceNumberEnd, LastSequenceNumber); + else + _logger.OrderBookUpdateSkipped(Api, Symbol, updateSequenceNumberStart, LastSequenceNumber); + + return; + } + + if (sequenceResult == SequenceNumberResult.OutOfSync) + { + _logger.OrderBookOutOfSync(Api, Symbol, LastSequenceNumber + 1, updateSequenceNumberStart); + _stopProcessing = true; + Resubscribe(); return; } foreach (var entry in bids) - ProcessUpdate(LastSequenceNumber + 1, OrderBookEntryType.Bid, entry); + UpdateValue(OrderBookEntryType.Bid, entry); foreach (var entry in asks) - ProcessUpdate(LastSequenceNumber + 1, OrderBookEntryType.Ask, entry); + UpdateValue(OrderBookEntryType.Ask, entry); if (Levels.HasValue && _strictLevels) { @@ -946,16 +971,51 @@ namespace CryptoExchange.Net.OrderBook } } - LastSequenceNumber = lastUpdateId; + _firstUpdateAfterSnapshotDone = true; + LastSequenceNumber = updateSequenceNumberEnd; if (_logger.IsEnabled(LogLevel.Trace)) { - if (firstUpdateId != lastUpdateId) - _logger.OrderBookProcessedMessage(Api, Symbol, firstUpdateId, lastUpdateId); + if (updateSequenceNumberStart != updateSequenceNumberEnd) + _logger.OrderBookProcessedMessage(Api, Symbol, updateSequenceNumberStart, updateSequenceNumberEnd); else - _logger.OrderBookProcessedMessage(Api, Symbol, firstUpdateId); + _logger.OrderBookProcessedMessage(Api, Symbol, updateSequenceNumberStart); } - } + } + + private SequenceNumberResult ValidateBufferSequenceNumber(long startSequenceNumber, long endSequenceNumber) + { + if (endSequenceNumber <= LastSequenceNumber) + // Buffered update is from before the snapshot, ignore + return SequenceNumberResult.Skip; + + if (_sequencesAreConsecutive && startSequenceNumber != LastSequenceNumber + 1) + { + if (_firstUpdateAfterSnapshotDone || !_skipSequenceCheckFirstUpdateAfterSnapshotSet) + // Buffered update is not the next sequence number when it was expected to be + return SequenceNumberResult.OutOfSync; + } + + // Buffered sequence number is larger than the last sequence number + return SequenceNumberResult.Ok; + } + + private SequenceNumberResult ValidateLiveSequenceNumber(long sequenceNumber) + { + if (sequenceNumber < LastSequenceNumber) + // Update is somehow from before the current state + return SequenceNumberResult.OutOfSync; + + if (_sequencesAreConsecutive + && LastSequenceNumber != 0 + && sequenceNumber != LastSequenceNumber + 1) + { + if (_firstUpdateAfterSnapshotDone || !_skipSequenceCheckFirstUpdateAfterSnapshotSet) + return SequenceNumberResult.OutOfSync; + } + + return SequenceNumberResult.Ok; + } } internal class DescComparer : IComparer diff --git a/CryptoExchange.Net/Sockets/Default/SocketConnection.cs b/CryptoExchange.Net/Sockets/Default/SocketConnection.cs index 41855ea..7ed852f 100644 --- a/CryptoExchange.Net/Sockets/Default/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/Default/SocketConnection.cs @@ -257,6 +257,7 @@ namespace CryptoExchange.Net.Sockets.Default } } + private bool _pausedActivity; #if NET9_0_OR_GREATER private readonly Lock _listenersLock = new Lock(); @@ -274,6 +275,8 @@ namespace CryptoExchange.Net.Sockets.Default private ISocketMessageHandler? _byteMessageConverter; private ISocketMessageHandler? _textMessageConverter; + private long _lastSequenceNumber; + /// /// The task that is sending periodic data on the websocket. Can be used for sending Ping messages every x seconds or similar. Not necessary. /// @@ -293,7 +296,7 @@ namespace CryptoExchange.Net.Sockets.Default /// Cache for deserialization, only caches for a single message /// private readonly Dictionary _deserializationCache = new Dictionary(); - + /// /// New socket connection /// @@ -340,6 +343,7 @@ namespace CryptoExchange.Net.Sockets.Default { Status = SocketStatus.Closed; Authenticated = false; + _lastSequenceNumber = 0; if (ApiClient._socketConnections.ContainsKey(SocketId)) ApiClient._socketConnections.TryRemove(SocketId, out _); @@ -371,6 +375,7 @@ namespace CryptoExchange.Net.Sockets.Default Status = SocketStatus.Reconnecting; DisconnectTime = DateTime.UtcNow; Authenticated = false; + _lastSequenceNumber = 0; lock (_listenersLock) { @@ -1280,6 +1285,23 @@ namespace CryptoExchange.Net.Sockets.Default return result; } + /// + /// Update the sequence number for this connection + /// + public void UpdateSequenceNumber(long sequenceNumber) + { + if (ApiClient.EnforceSequenceNumbers + && _lastSequenceNumber != 0 + && _lastSequenceNumber + 1 != sequenceNumber) + { + // Not sequential + _logger.LogWarning("[Sckt {SocketId}] update not in sequence. Last recorded sequence number: {LastSequence}, update sequence number: {UpdateSequence}. Reconnecting", SocketId, _lastSequenceNumber, sequenceNumber); + _ = TriggerReconnectAsync(); + } + + _lastSequenceNumber = sequenceNumber; + } + /// /// Periodically sends data over a socket connection ///