1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2026-02-16 14:13:46 +00:00

Feature/websocket sequencing (#267)

Added EnforceSequenceNumbers property on SocketApiClient to configure whether websocket message contain sequence numbers and if these should be checked to be sequential
Added fallback to existing websocket connection if no dedicated request connection was found
Added IntBoolConverter base class for arbitrary int value to bool mapping
Added SequenceNumber property to DataEvent object
Added _skipSequenceCheckFirstUpdateAfterSnapshotSet property for SymbolOrderBook implementations
Updated SymbolOrderBook sequenceNumber validation
Updated SymbolOrderBook log verbosities
Renamed SetInitialOrderBook to SetSnapshot in SymbolOrderBook
Renamed updateId references to sequenceNumber in SymbolOrderBook
This commit is contained in:
Jan Korf 2026-01-12 14:26:50 +01:00 committed by GitHub
parent c512bee825
commit 3b2a85d210
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 237 additions and 85 deletions

View File

@ -142,6 +142,10 @@ namespace CryptoExchange.Net.Clients
/// </summary> /// </summary>
public int? MaxIndividualSubscriptionsPerConnection { get; set; } public int? MaxIndividualSubscriptionsPerConnection { get; set; }
/// <summary>
/// Whether or not to enforce that sequence number updates are always (lastSequenceNumber + 1)
/// </summary>
public bool EnforceSequenceNumbers { get; set; }
#endregion #endregion
/// <summary> /// <summary>
@ -706,6 +710,10 @@ namespace CryptoExchange.Net.Clients
if (connection != null && !connection.DedicatedRequestConnection.Authenticated) if (connection != null && !connection.DedicatedRequestConnection.Authenticated)
// Mark dedicated request connection as authenticated if the request is authenticated // Mark dedicated request connection as authenticated if the request is authenticated
connection.DedicatedRequestConnection.Authenticated = authenticated; connection.DedicatedRequestConnection.Authenticated = authenticated;
if (connection == null)
// Fall back to an existing connection if there is no dedicated request connection available
connection = socketQuery.OrderBy(s => s.UserSubscriptionCount).FirstOrDefault();
} }
bool maxConnectionsReached = _socketConnections.Count >= (ApiOptions.MaxSocketConnections ?? ClientOptions.MaxSocketConnections); bool maxConnectionsReached = _socketConnections.Count >= (ApiOptions.MaxSocketConnections ?? ClientOptions.MaxSocketConnections);

View File

@ -0,0 +1,36 @@
using System;
using System.Text.Json;
using System.Text.Json.Serialization;
namespace CryptoExchange.Net.Converters.SystemTextJson
{
/// <summary>
/// Bool converter
/// </summary>
public class IntBoolConverter : JsonConverter<bool>
{
private readonly int _trueValue;
/// <summary>
/// ctor
/// </summary>
/// <param name="trueValue">The int value representing the true value</param>
public IntBoolConverter(int trueValue)
{
_trueValue = trueValue;
}
public override bool Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
{
if (reader.TokenType != JsonTokenType.Number)
return false;
return reader.GetDecimal() == _trueValue;
}
public override void Write(Utf8JsonWriter writer, bool value, JsonSerializerOptions options)
{
writer.WriteNumberValue(_trueValue);
}
}
}

View File

@ -22,7 +22,7 @@ namespace CryptoExchange.Net.Logging.Extensions
private static readonly Action<ILogger, string, string, Exception?> _orderBookResyncing; private static readonly Action<ILogger, string, string, Exception?> _orderBookResyncing;
private static readonly Action<ILogger, string, string, Exception?> _orderBookResynced; private static readonly Action<ILogger, string, string, Exception?> _orderBookResynced;
private static readonly Action<ILogger, string, string, Exception?> _orderBookMessageSkippedBecauseOfResubscribing; private static readonly Action<ILogger, string, string, Exception?> _orderBookMessageSkippedBecauseOfResubscribing;
private static readonly Action<ILogger, string, string, long, long, long, Exception?> _orderBookDataSet; private static readonly Action<ILogger, string, string, long, long, long?, Exception?> _orderBookDataSet;
private static readonly Action<ILogger, string, string, long, long, long, long, Exception?> _orderBookUpdateBuffered; private static readonly Action<ILogger, string, string, long, long, long, long, Exception?> _orderBookUpdateBuffered;
private static readonly Action<ILogger, string, string, decimal, decimal, Exception?> _orderBookOutOfSyncDetected; private static readonly Action<ILogger, string, string, decimal, decimal, Exception?> _orderBookOutOfSyncDetected;
private static readonly Action<ILogger, string, string, Exception?> _orderBookReconnectingSocket; private static readonly Action<ILogger, string, string, Exception?> _orderBookReconnectingSocket;
@ -30,6 +30,7 @@ namespace CryptoExchange.Net.Logging.Extensions
private static readonly Action<ILogger, string, string, long, long, Exception?> _orderBookProcessedMessage; private static readonly Action<ILogger, string, string, long, long, Exception?> _orderBookProcessedMessage;
private static readonly Action<ILogger, string, string, long, Exception?> _orderBookProcessedMessageSingle; private static readonly Action<ILogger, string, string, long, Exception?> _orderBookProcessedMessageSingle;
private static readonly Action<ILogger, string, string, long, long, Exception?> _orderBookOutOfSync; private static readonly Action<ILogger, string, string, long, long, Exception?> _orderBookOutOfSync;
private static readonly Action<ILogger, string, string, long, long, long, Exception?> _orderBookUpdateSkippedStartEnd;
static SymbolOrderBookLoggingExtensions() static SymbolOrderBookLoggingExtensions()
{ {
@ -74,7 +75,7 @@ namespace CryptoExchange.Net.Logging.Extensions
"{Api} order book {Symbol} Processing {NumberBufferedUpdated} buffered updates"); "{Api} order book {Symbol} Processing {NumberBufferedUpdated} buffered updates");
_orderBookUpdateSkipped = LoggerMessage.Define<string, string, long, long>( _orderBookUpdateSkipped = LoggerMessage.Define<string, string, long, long>(
LogLevel.Debug, LogLevel.Trace,
new EventId(5008, "OrderBookUpdateSkipped"), new EventId(5008, "OrderBookUpdateSkipped"),
"{Api} order book {Symbol} update skipped #{SequenceNumber}, currently at #{LastSequenceNumber}"); "{Api} order book {Symbol} update skipped #{SequenceNumber}, currently at #{LastSequenceNumber}");
@ -93,10 +94,10 @@ namespace CryptoExchange.Net.Logging.Extensions
new EventId(5011, "OrderBookMessageSkippedResubscribing"), new EventId(5011, "OrderBookMessageSkippedResubscribing"),
"{Api} order book {Symbol} Skipping message because of resubscribing"); "{Api} order book {Symbol} Skipping message because of resubscribing");
_orderBookDataSet = LoggerMessage.Define<string, string, long, long, long>( _orderBookDataSet = LoggerMessage.Define<string, string, long, long, long?>(
LogLevel.Debug, LogLevel.Trace,
new EventId(5012, "OrderBookDataSet"), new EventId(5012, "OrderBookDataSet"),
"{Api} order book {Symbol} data set: {BidCount} bids, {AskCount} asks. #{EndUpdateId}"); "{Api} order book {Symbol} snapshot set: {BidCount} bids, {AskCount} asks. #{EndUpdateId}");
_orderBookUpdateBuffered = LoggerMessage.Define<string, string, long, long, long, long>( _orderBookUpdateBuffered = LoggerMessage.Define<string, string, long, long, long, long>(
LogLevel.Trace, LogLevel.Trace,
@ -142,6 +143,12 @@ namespace CryptoExchange.Net.Logging.Extensions
LogLevel.Trace, LogLevel.Trace,
new EventId(5021, "OrderBookProcessedMessage"), new EventId(5021, "OrderBookProcessedMessage"),
"{Api} order book {Symbol} update processed #{UpdateId}"); "{Api} order book {Symbol} update processed #{UpdateId}");
_orderBookUpdateSkippedStartEnd = LoggerMessage.Define<string, string, long, long, long>(
LogLevel.Trace,
new EventId(5022, "OrderBookUpdateSkippedStartEnd"),
"{Api} order book {Symbol} update skipped #{SequenceStart}-#{SequenceEnd}, currently at #{LastSequenceNumber}");
} }
public static void OrderBookStatusChanged(this ILogger logger, string api, string symbol, OrderBookStatus previousStatus, OrderBookStatus newStatus) public static void OrderBookStatusChanged(this ILogger logger, string api, string symbol, OrderBookStatus previousStatus, OrderBookStatus newStatus)
@ -200,7 +207,7 @@ namespace CryptoExchange.Net.Logging.Extensions
{ {
_orderBookMessageSkippedBecauseOfResubscribing(logger, api, symbol, null); _orderBookMessageSkippedBecauseOfResubscribing(logger, api, symbol, null);
} }
public static void OrderBookDataSet(this ILogger logger, string api, string symbol, long bidCount, long askCount, long endUpdateId) public static void OrderBookDataSet(this ILogger logger, string api, string symbol, long bidCount, long askCount, long? endUpdateId)
{ {
_orderBookDataSet(logger, api, symbol, bidCount, askCount, endUpdateId, null); _orderBookDataSet(logger, api, symbol, bidCount, askCount, endUpdateId, null);
} }
@ -243,5 +250,10 @@ namespace CryptoExchange.Net.Logging.Extensions
{ {
_orderBookOutOfSyncChecksum(logger, api, symbol, null); _orderBookOutOfSyncChecksum(logger, api, symbol, null);
} }
public static void OrderBookUpdateSkipped(this ILogger logger, string api, string symbol, long sequenceStart, long sequenceEnd, long lastSequenceNumber)
{
_orderBookUpdateSkippedStartEnd(logger, api, symbol, sequenceStart, sequenceEnd, lastSequenceNumber, null);
}
} }
} }

View File

@ -53,6 +53,11 @@ namespace CryptoExchange.Net.Objects.Sockets
/// </summary> /// </summary>
public SocketUpdateType? UpdateType { get; set; } public SocketUpdateType? UpdateType { get; set; }
/// <summary>
/// Sequence number of the update
/// </summary>
public long? SequenceNumber { get; set; }
/// <summary> /// <summary>
/// ctor /// ctor
/// </summary> /// </summary>
@ -126,6 +131,15 @@ namespace CryptoExchange.Net.Objects.Sockets
return this; return this;
} }
/// <summary>
/// Specify the sequence number of the update
/// </summary>
public DataEvent<T> WithSequenceNumber(long? sequenceNumber)
{
SequenceNumber = sequenceNumber;
return this;
}
/// <summary> /// <summary>
/// Specify the data timestamp /// Specify the data timestamp
/// </summary> /// </summary>

View File

@ -3,28 +3,28 @@ using System;
namespace CryptoExchange.Net.OrderBook namespace CryptoExchange.Net.OrderBook
{ {
internal class ProcessQueueItem internal class OrderBookUpdate
{ {
public DateTime? LocalDataTime { get; set; } public DateTime? LocalDataTime { get; set; }
public DateTime? ServerDataTime { get; set; } public DateTime? ServerDataTime { get; set; }
public long StartUpdateId { get; set; } public long StartSequenceNumber { get; set; }
public long EndUpdateId { get; set; } public long EndSequenceNumber { get; set; }
public ISymbolOrderBookEntry[] Bids { get; set; } = Array.Empty<ISymbolOrderBookEntry>(); public ISymbolOrderBookEntry[] Bids { get; set; } = Array.Empty<ISymbolOrderBookEntry>();
public ISymbolOrderBookEntry[] Asks { get; set; } = Array.Empty<ISymbolOrderBookEntry>(); public ISymbolOrderBookEntry[] Asks { get; set; } = Array.Empty<ISymbolOrderBookEntry>();
} }
internal class InitialOrderBookItem internal class OrderBookSnapshot
{ {
public DateTime? LocalDataTime { get; set; } public DateTime? LocalDataTime { get; set; }
public DateTime? ServerDataTime { get; set; } public DateTime? ServerDataTime { get; set; }
public long StartUpdateId { get; set; } public long? SequenceNumber { get; set; }
public long EndUpdateId { get; set; }
public ISymbolOrderBookEntry[] Bids { get; set; } = Array.Empty<ISymbolOrderBookEntry>(); public ISymbolOrderBookEntry[] Bids { get; set; } = Array.Empty<ISymbolOrderBookEntry>();
public ISymbolOrderBookEntry[] Asks { get; set; } = Array.Empty<ISymbolOrderBookEntry>(); public ISymbolOrderBookEntry[] Asks { get; set; } = Array.Empty<ISymbolOrderBookEntry>();
} }
internal class ChecksumItem internal class OrderBookChecksum
{ {
public long? SequenceNumber { get; set; }
public int Checksum { get; set; } public int Checksum { get; set; }
} }
} }

View File

@ -39,6 +39,7 @@ namespace CryptoExchange.Net.OrderBook
private readonly AsyncResetEvent _queueEvent; private readonly AsyncResetEvent _queueEvent;
private readonly ConcurrentQueue<object> _processQueue; private readonly ConcurrentQueue<object> _processQueue;
private bool _validateChecksum; private bool _validateChecksum;
private bool _firstUpdateAfterSnapshotDone;
private class EmptySymbolOrderBookEntry : ISymbolOrderBookEntry private class EmptySymbolOrderBookEntry : ISymbolOrderBookEntry
{ {
@ -50,6 +51,13 @@ namespace CryptoExchange.Net.OrderBook
private static readonly ISymbolOrderBookEntry _emptySymbolOrderBookEntry = new EmptySymbolOrderBookEntry(); private static readonly ISymbolOrderBookEntry _emptySymbolOrderBookEntry = new EmptySymbolOrderBookEntry();
private enum SequenceNumberResult
{
Skip,
Ok,
OutOfSync
}
/// <summary> /// <summary>
/// A buffer to store messages received before the initial book snapshot is processed. These messages /// A buffer to store messages received before the initial book snapshot is processed. These messages
/// will be processed after the book snapshot is set. Any messages in this buffer with sequence numbers lower /// will be processed after the book snapshot is set. Any messages in this buffer with sequence numbers lower
@ -78,6 +86,11 @@ namespace CryptoExchange.Net.OrderBook
/// </summary> /// </summary>
protected bool _sequencesAreConsecutive; protected bool _sequencesAreConsecutive;
/// <summary>
/// Whether the first update message after a snapshot may have overlapping sequence numbers instead of the snapshot sequence number + 1
/// </summary>
protected bool _skipSequenceCheckFirstUpdateAfterSnapshotSet;
/// <summary> /// <summary>
/// Whether levels should be strictly enforced. For example, when an order book has 25 levels and a new update comes in which pushes /// Whether levels should be strictly enforced. For example, when an order book has 25 levels and a new update comes in which pushes
/// the current level 25 ask out of the top 25, should the level 26 entry be removed from the book or does the server handle this /// the current level 25 ask out of the top 25, should the level 26 entry be removed from the book or does the server handle this
@ -267,6 +280,7 @@ namespace CryptoExchange.Net.OrderBook
_processBuffer.Clear(); _processBuffer.Clear();
_bookSet = false; _bookSet = false;
_firstUpdateAfterSnapshotDone = false;
Status = OrderBookStatus.Connecting; Status = OrderBookStatus.Connecting;
_processTask = Task.Factory.StartNew(ProcessQueue, TaskCreationOptions.LongRunning); _processTask = Task.Factory.StartNew(ProcessQueue, TaskCreationOptions.LongRunning);
@ -419,28 +433,27 @@ namespace CryptoExchange.Net.OrderBook
protected virtual bool DoChecksum(int checksum) => true; protected virtual bool DoChecksum(int checksum) => true;
/// <summary> /// <summary>
/// Set the initial data for the order book. Typically the snapshot which was requested from the Rest API, or the first snapshot /// Set snapshot data for the order book. Typically the snapshot which was requested from the Rest API, or the first snapshot
/// received from a socket subscription /// received from a socket subscription. Will clear any previous data.
/// </summary> /// </summary>
/// <param name="orderBookSequenceNumber">The last update sequence number until which the snapshot is in sync</param> /// <param name="orderBookSequenceNumber">The last update sequence number until which the snapshot is in sync</param>
/// <param name="askList">List of asks</param> /// <param name="askList">List of asks</param>
/// <param name="bidList">List of bids</param> /// <param name="bidList">List of bids</param>
/// <param name="serverDataTime">Server data timestamp</param> /// <param name="serverDataTime">Server data timestamp</param>
/// <param name="localDataTime">local data timestamp</param> /// <param name="localDataTime">local data timestamp</param>
protected void SetInitialOrderBook( protected void SetSnapshot(
long orderBookSequenceNumber, long? orderBookSequenceNumber,
ISymbolOrderBookEntry[] bidList, ISymbolOrderBookEntry[] bidList,
ISymbolOrderBookEntry[] askList, ISymbolOrderBookEntry[] askList,
DateTime? serverDataTime = null, DateTime? serverDataTime = null,
DateTime? localDataTime = null) DateTime? localDataTime = null)
{ {
_processQueue.Enqueue( _processQueue.Enqueue(
new InitialOrderBookItem new OrderBookSnapshot
{ {
LocalDataTime = localDataTime, LocalDataTime = localDataTime,
ServerDataTime = serverDataTime, ServerDataTime = serverDataTime,
StartUpdateId = orderBookSequenceNumber, SequenceNumber = orderBookSequenceNumber,
EndUpdateId = orderBookSequenceNumber,
Asks = askList, Asks = askList,
Bids = bidList Bids = bidList
}); });
@ -450,25 +463,25 @@ namespace CryptoExchange.Net.OrderBook
/// <summary> /// <summary>
/// Add an update to the process queue. Updates the book by providing changed bids and asks, along with an update number which should be higher than the previous update numbers /// Add an update to the process queue. Updates the book by providing changed bids and asks, along with an update number which should be higher than the previous update numbers
/// </summary> /// </summary>
/// <param name="updateId">The sequence number</param> /// <param name="sequenceNumber">The sequence number</param>
/// <param name="bids">List of updated/new bids</param> /// <param name="bids">List of updated/new bids</param>
/// <param name="asks">List of updated/new asks</param> /// <param name="asks">List of updated/new asks</param>
/// <param name="serverDataTime">Server data timestamp</param> /// <param name="serverDataTime">Server data timestamp</param>
/// <param name="localDataTime">local data timestamp</param> /// <param name="localDataTime">local data timestamp</param>
protected void UpdateOrderBook( protected void UpdateOrderBook(
long updateId, long sequenceNumber,
ISymbolOrderBookEntry[] bids, ISymbolOrderBookEntry[] bids,
ISymbolOrderBookEntry[] asks, ISymbolOrderBookEntry[] asks,
DateTime? serverDataTime = null, DateTime? serverDataTime = null,
DateTime? localDataTime = null) DateTime? localDataTime = null)
{ {
_processQueue.Enqueue( _processQueue.Enqueue(
new ProcessQueueItem new OrderBookUpdate
{ {
LocalDataTime = localDataTime, LocalDataTime = localDataTime,
ServerDataTime = serverDataTime, ServerDataTime = serverDataTime,
StartUpdateId = updateId, StartSequenceNumber = sequenceNumber,
EndUpdateId = updateId, EndSequenceNumber = sequenceNumber,
Asks = asks, Asks = asks,
Bids = bids Bids = bids
}); });
@ -478,27 +491,27 @@ namespace CryptoExchange.Net.OrderBook
/// <summary> /// <summary>
/// Add an update to the process queue. Updates the book by providing changed bids and asks, along with the first and last sequence number in the update /// Add an update to the process queue. Updates the book by providing changed bids and asks, along with the first and last sequence number in the update
/// </summary> /// </summary>
/// <param name="firstUpdateId">The sequence number of the first update</param> /// <param name="firstSequenceNumber">The sequence number of the first update</param>
/// <param name="lastUpdateId">The sequence number of the last update</param> /// <param name="lastSequenceNumber">The sequence number of the last update</param>
/// <param name="bids">List of updated/new bids</param> /// <param name="bids">List of updated/new bids</param>
/// <param name="asks">List of updated/new asks</param> /// <param name="asks">List of updated/new asks</param>
/// <param name="serverDataTime">Server data timestamp</param> /// <param name="serverDataTime">Server data timestamp</param>
/// <param name="localDataTime">local data timestamp</param> /// <param name="localDataTime">local data timestamp</param>
protected void UpdateOrderBook( protected void UpdateOrderBook(
long firstUpdateId, long firstSequenceNumber,
long lastUpdateId, long lastSequenceNumber,
ISymbolOrderBookEntry[] bids, ISymbolOrderBookEntry[] bids,
ISymbolOrderBookEntry[] asks, ISymbolOrderBookEntry[] asks,
DateTime? serverDataTime = null, DateTime? serverDataTime = null,
DateTime? localDataTime = null) DateTime? localDataTime = null)
{ {
_processQueue.Enqueue( _processQueue.Enqueue(
new ProcessQueueItem new OrderBookUpdate
{ {
LocalDataTime = localDataTime, LocalDataTime = localDataTime,
ServerDataTime = serverDataTime, ServerDataTime = serverDataTime,
StartUpdateId = firstUpdateId, StartSequenceNumber = firstSequenceNumber,
EndUpdateId = lastUpdateId, EndSequenceNumber = lastSequenceNumber,
Asks = asks, Asks = asks,
Bids = bids Bids = bids
}); });
@ -522,12 +535,12 @@ namespace CryptoExchange.Net.OrderBook
var lowest = Math.Min(bids.Any() ? bids.Min(b => b.Sequence) : long.MaxValue, asks.Any() ? asks.Min(a => a.Sequence) : long.MaxValue); var lowest = Math.Min(bids.Any() ? bids.Min(b => b.Sequence) : long.MaxValue, asks.Any() ? asks.Min(a => a.Sequence) : long.MaxValue);
_processQueue.Enqueue( _processQueue.Enqueue(
new ProcessQueueItem new OrderBookUpdate
{ {
LocalDataTime = localDataTime, LocalDataTime = localDataTime,
ServerDataTime = serverDataTime, ServerDataTime = serverDataTime,
StartUpdateId = lowest, StartSequenceNumber = lowest,
EndUpdateId = highest, EndSequenceNumber = highest,
Asks = asks, Asks = asks,
Bids = bids Bids = bids
}); });
@ -538,9 +551,10 @@ namespace CryptoExchange.Net.OrderBook
/// Add a checksum value to the process queue /// Add a checksum value to the process queue
/// </summary> /// </summary>
/// <param name="checksum">The checksum value</param> /// <param name="checksum">The checksum value</param>
protected void AddChecksum(int checksum) /// <param name="sequenceNumber">The sequence number of the message if it's a separate message with separate number</param>
protected void AddChecksum(int checksum, long? sequenceNumber = null)
{ {
_processQueue.Enqueue(new ChecksumItem() { Checksum = checksum }); _processQueue.Enqueue(new OrderBookChecksum() { Checksum = checksum, SequenceNumber = sequenceNumber });
_queueEvent.Set(); _queueEvent.Set();
} }
@ -553,7 +567,12 @@ namespace CryptoExchange.Net.OrderBook
_logger.OrderBookProcessingBufferedUpdates(Api, Symbol, _processBuffer.Count); _logger.OrderBookProcessingBufferedUpdates(Api, Symbol, _processBuffer.Count);
foreach (var bufferEntry in _processBuffer) foreach (var bufferEntry in _processBuffer)
ProcessRangeUpdates(bufferEntry.FirstUpdateId, bufferEntry.LastUpdateId, bufferEntry.Bids, bufferEntry.Asks); {
if (_stopProcessing)
break;
ProcessUpdate(bufferEntry.FirstUpdateId, bufferEntry.LastUpdateId, bufferEntry.Bids, bufferEntry.Asks, true);
}
_processBuffer.Clear(); _processBuffer.Clear();
} }
@ -561,26 +580,10 @@ namespace CryptoExchange.Net.OrderBook
/// <summary> /// <summary>
/// Update order book with an entry /// Update order book with an entry
/// </summary> /// </summary>
/// <param name="sequence">Sequence number of the update</param>
/// <param name="type">Type of entry</param> /// <param name="type">Type of entry</param>
/// <param name="entry">The entry</param> /// <param name="entry">The entry</param>
protected virtual bool ProcessUpdate(long sequence, OrderBookEntryType type, ISymbolOrderBookEntry entry) protected virtual bool UpdateValue(OrderBookEntryType type, ISymbolOrderBookEntry entry)
{ {
if (sequence <= LastSequenceNumber)
{
_logger.OrderBookSkippedMessage(Api, Symbol, sequence, LastSequenceNumber);
return false;
}
if (_sequencesAreConsecutive && sequence > LastSequenceNumber + 1)
{
// Out of sync
_logger.OrderBookOutOfSync(Api, Symbol, LastSequenceNumber + 1, sequence);
_stopProcessing = true;
Resubscribe();
return false;
}
UpdateTime = DateTime.UtcNow; UpdateTime = DateTime.UtcNow;
var listToChange = type == OrderBookEntryType.Ask ? _asks : _bids; var listToChange = type == OrderBookEntryType.Ask ? _asks : _bids;
if (entry.Quantity == 0) if (entry.Quantity == 0)
@ -735,8 +738,11 @@ namespace CryptoExchange.Net.OrderBook
// Clear queue // Clear queue
while (_processQueue.TryDequeue(out _)) { } while (_processQueue.TryDequeue(out _)) { }
LastSequenceNumber = 0;
_processBuffer.Clear(); _processBuffer.Clear();
_bookSet = false; _bookSet = false;
_firstUpdateAfterSnapshotDone = false;
DoReset(); DoReset();
} }
@ -774,17 +780,17 @@ namespace CryptoExchange.Net.OrderBook
continue; continue;
} }
if (item is InitialOrderBookItem iobi) if (item is OrderBookSnapshot snapshot)
ProcessInitialOrderBookItem(iobi); ProcessOrderBookSnapshot(snapshot);
if (item is ProcessQueueItem pqi) if (item is OrderBookUpdate update)
ProcessQueueItem(pqi); ProcessQueueItem(update);
else if (item is ChecksumItem ci) else if (item is OrderBookChecksum checksum)
ProcessChecksum(ci); ProcessChecksum(checksum);
} }
} }
} }
private void ProcessInitialOrderBookItem(InitialOrderBookItem item) private void ProcessOrderBookSnapshot(OrderBookSnapshot item)
{ {
lock (_bookLock) lock (_bookLock)
{ {
@ -796,7 +802,8 @@ namespace CryptoExchange.Net.OrderBook
foreach (var bid in item.Bids) foreach (var bid in item.Bids)
_bids.Add(bid.Price, bid); _bids.Add(bid.Price, bid);
LastSequenceNumber = item.EndUpdateId; if (item.SequenceNumber != null)
LastSequenceNumber = item.SequenceNumber.Value;
AskCount = _asks.Count; AskCount = _asks.Count;
BidCount = _bids.Count; BidCount = _bids.Count;
@ -805,14 +812,15 @@ namespace CryptoExchange.Net.OrderBook
UpdateServerTime = item.ServerDataTime; UpdateServerTime = item.ServerDataTime;
UpdateLocalTime = item.LocalDataTime; UpdateLocalTime = item.LocalDataTime;
_logger.OrderBookDataSet(Api, Symbol, BidCount, AskCount, item.EndUpdateId); _logger.OrderBookDataSet(Api, Symbol, BidCount, AskCount, item.SequenceNumber);
CheckProcessBuffer(); CheckProcessBuffer();
OnOrderBookUpdate?.Invoke((item.Bids.ToArray(), item.Asks.ToArray())); OnOrderBookUpdate?.Invoke((item.Bids.ToArray(), item.Asks.ToArray()));
OnBestOffersChanged?.Invoke((BestBid, BestAsk)); OnBestOffersChanged?.Invoke((BestBid, BestAsk));
} }
} }
private void ProcessQueueItem(ProcessQueueItem item) private void ProcessQueueItem(OrderBookUpdate item)
{ {
lock (_bookLock) lock (_bookLock)
{ {
@ -822,19 +830,19 @@ namespace CryptoExchange.Net.OrderBook
{ {
Asks = item.Asks, Asks = item.Asks,
Bids = item.Bids, Bids = item.Bids,
FirstUpdateId = item.StartUpdateId, FirstUpdateId = item.StartSequenceNumber,
LastUpdateId = item.EndUpdateId, LastUpdateId = item.EndSequenceNumber,
}); });
if (_logger.IsEnabled(LogLevel.Trace)) if (_logger.IsEnabled(LogLevel.Trace))
_logger.OrderBookUpdateBuffered(Api, Symbol, item.StartUpdateId, item.EndUpdateId, item.Asks.Length, item.Bids.Length); _logger.OrderBookUpdateBuffered(Api, Symbol, item.StartSequenceNumber, item.EndSequenceNumber, item.Asks.Length, item.Bids.Length);
} }
else else
{ {
CheckProcessBuffer(); CheckProcessBuffer();
var (prevBestBid, prevBestAsk) = BestOffers; var (prevBestBid, prevBestAsk) = BestOffers;
ProcessRangeUpdates(item.StartUpdateId, item.EndUpdateId, item.Bids, item.Asks); ProcessUpdate(item.StartSequenceNumber, item.EndSequenceNumber, item.Bids, item.Asks, false);
if (_asks.Count == 0 || _bids.Count == 0) if (_asks.Count == 0 || _bids.Count == 0)
return; return;
@ -856,7 +864,7 @@ namespace CryptoExchange.Net.OrderBook
} }
} }
private void ProcessChecksum(ChecksumItem ci) private void ProcessChecksum(OrderBookChecksum ci)
{ {
lock (_bookLock) lock (_bookLock)
{ {
@ -876,6 +884,9 @@ namespace CryptoExchange.Net.OrderBook
throw; throw;
} }
if (ci.SequenceNumber != null)
LastSequenceNumber = ci.SequenceNumber.Value;
if (!checksumResult) if (!checksumResult)
{ {
_logger.OrderBookOutOfSyncChecksum(Api, Symbol); _logger.OrderBookOutOfSyncChecksum(Api, Symbol);
@ -913,23 +924,37 @@ namespace CryptoExchange.Net.OrderBook
}); });
} }
private void ProcessRangeUpdates( private void ProcessUpdate(
long firstUpdateId, long updateSequenceNumberStart,
long lastUpdateId, long updateSequenceNumberEnd,
IEnumerable<ISymbolOrderBookEntry> bids, IEnumerable<ISymbolOrderBookEntry> bids,
IEnumerable<ISymbolOrderBookEntry> asks) IEnumerable<ISymbolOrderBookEntry> asks,
bool fromBuffer)
{ {
if (lastUpdateId <= LastSequenceNumber) var sequenceResult = fromBuffer ? ValidateBufferSequenceNumber(updateSequenceNumberStart, updateSequenceNumberEnd) : ValidateLiveSequenceNumber(updateSequenceNumberStart);
if (sequenceResult == SequenceNumberResult.Skip)
{ {
_logger.OrderBookUpdateSkipped(Api, Symbol, lastUpdateId, LastSequenceNumber); if (updateSequenceNumberStart != updateSequenceNumberEnd)
_logger.OrderBookUpdateSkipped(Api, Symbol, updateSequenceNumberStart, updateSequenceNumberEnd, LastSequenceNumber);
else
_logger.OrderBookUpdateSkipped(Api, Symbol, updateSequenceNumberStart, LastSequenceNumber);
return;
}
if (sequenceResult == SequenceNumberResult.OutOfSync)
{
_logger.OrderBookOutOfSync(Api, Symbol, LastSequenceNumber + 1, updateSequenceNumberStart);
_stopProcessing = true;
Resubscribe();
return; return;
} }
foreach (var entry in bids) foreach (var entry in bids)
ProcessUpdate(LastSequenceNumber + 1, OrderBookEntryType.Bid, entry); UpdateValue(OrderBookEntryType.Bid, entry);
foreach (var entry in asks) foreach (var entry in asks)
ProcessUpdate(LastSequenceNumber + 1, OrderBookEntryType.Ask, entry); UpdateValue(OrderBookEntryType.Ask, entry);
if (Levels.HasValue && _strictLevels) if (Levels.HasValue && _strictLevels)
{ {
@ -946,16 +971,51 @@ namespace CryptoExchange.Net.OrderBook
} }
} }
LastSequenceNumber = lastUpdateId; _firstUpdateAfterSnapshotDone = true;
LastSequenceNumber = updateSequenceNumberEnd;
if (_logger.IsEnabled(LogLevel.Trace)) if (_logger.IsEnabled(LogLevel.Trace))
{ {
if (firstUpdateId != lastUpdateId) if (updateSequenceNumberStart != updateSequenceNumberEnd)
_logger.OrderBookProcessedMessage(Api, Symbol, firstUpdateId, lastUpdateId); _logger.OrderBookProcessedMessage(Api, Symbol, updateSequenceNumberStart, updateSequenceNumberEnd);
else else
_logger.OrderBookProcessedMessage(Api, Symbol, firstUpdateId); _logger.OrderBookProcessedMessage(Api, Symbol, updateSequenceNumberStart);
} }
} }
private SequenceNumberResult ValidateBufferSequenceNumber(long startSequenceNumber, long endSequenceNumber)
{
if (endSequenceNumber <= LastSequenceNumber)
// Buffered update is from before the snapshot, ignore
return SequenceNumberResult.Skip;
if (_sequencesAreConsecutive && startSequenceNumber != LastSequenceNumber + 1)
{
if (_firstUpdateAfterSnapshotDone || !_skipSequenceCheckFirstUpdateAfterSnapshotSet)
// Buffered update is not the next sequence number when it was expected to be
return SequenceNumberResult.OutOfSync;
}
// Buffered sequence number is larger than the last sequence number
return SequenceNumberResult.Ok;
}
private SequenceNumberResult ValidateLiveSequenceNumber(long sequenceNumber)
{
if (sequenceNumber < LastSequenceNumber)
// Update is somehow from before the current state
return SequenceNumberResult.OutOfSync;
if (_sequencesAreConsecutive
&& LastSequenceNumber != 0
&& sequenceNumber != LastSequenceNumber + 1)
{
if (_firstUpdateAfterSnapshotDone || !_skipSequenceCheckFirstUpdateAfterSnapshotSet)
return SequenceNumberResult.OutOfSync;
}
return SequenceNumberResult.Ok;
}
} }
internal class DescComparer<T> : IComparer<T> internal class DescComparer<T> : IComparer<T>

View File

@ -257,6 +257,7 @@ namespace CryptoExchange.Net.Sockets.Default
} }
} }
private bool _pausedActivity; private bool _pausedActivity;
#if NET9_0_OR_GREATER #if NET9_0_OR_GREATER
private readonly Lock _listenersLock = new Lock(); private readonly Lock _listenersLock = new Lock();
@ -274,6 +275,8 @@ namespace CryptoExchange.Net.Sockets.Default
private ISocketMessageHandler? _byteMessageConverter; private ISocketMessageHandler? _byteMessageConverter;
private ISocketMessageHandler? _textMessageConverter; private ISocketMessageHandler? _textMessageConverter;
private long _lastSequenceNumber;
/// <summary> /// <summary>
/// The task that is sending periodic data on the websocket. Can be used for sending Ping messages every x seconds or similar. Not necessary. /// The task that is sending periodic data on the websocket. Can be used for sending Ping messages every x seconds or similar. Not necessary.
/// </summary> /// </summary>
@ -340,6 +343,7 @@ namespace CryptoExchange.Net.Sockets.Default
{ {
Status = SocketStatus.Closed; Status = SocketStatus.Closed;
Authenticated = false; Authenticated = false;
_lastSequenceNumber = 0;
if (ApiClient._socketConnections.ContainsKey(SocketId)) if (ApiClient._socketConnections.ContainsKey(SocketId))
ApiClient._socketConnections.TryRemove(SocketId, out _); ApiClient._socketConnections.TryRemove(SocketId, out _);
@ -371,6 +375,7 @@ namespace CryptoExchange.Net.Sockets.Default
Status = SocketStatus.Reconnecting; Status = SocketStatus.Reconnecting;
DisconnectTime = DateTime.UtcNow; DisconnectTime = DateTime.UtcNow;
Authenticated = false; Authenticated = false;
_lastSequenceNumber = 0;
lock (_listenersLock) lock (_listenersLock)
{ {
@ -1280,6 +1285,23 @@ namespace CryptoExchange.Net.Sockets.Default
return result; return result;
} }
/// <summary>
/// Update the sequence number for this connection
/// </summary>
public void UpdateSequenceNumber(long sequenceNumber)
{
if (ApiClient.EnforceSequenceNumbers
&& _lastSequenceNumber != 0
&& _lastSequenceNumber + 1 != sequenceNumber)
{
// Not sequential
_logger.LogWarning("[Sckt {SocketId}] update not in sequence. Last recorded sequence number: {LastSequence}, update sequence number: {UpdateSequence}. Reconnecting", SocketId, _lastSequenceNumber, sequenceNumber);
_ = TriggerReconnectAsync();
}
_lastSequenceNumber = sequenceNumber;
}
/// <summary> /// <summary>
/// Periodically sends data over a socket connection /// Periodically sends data over a socket connection
/// </summary> /// </summary>