1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2026-02-16 22:23:54 +00:00
This commit is contained in:
Jkorf 2026-01-08 16:42:05 +01:00
parent c512bee825
commit 77abdfa81c
5 changed files with 89 additions and 52 deletions

View File

@ -142,6 +142,10 @@ namespace CryptoExchange.Net.Clients
/// </summary> /// </summary>
public int? MaxIndividualSubscriptionsPerConnection { get; set; } public int? MaxIndividualSubscriptionsPerConnection { get; set; }
/// <summary>
/// Whether or not to enforce that sequence number updates are always (lastSequenceNumber + 1)
/// </summary>
public bool EnforceSequenceNumbers { get; set; }
#endregion #endregion
/// <summary> /// <summary>

View File

@ -53,6 +53,11 @@ namespace CryptoExchange.Net.Objects.Sockets
/// </summary> /// </summary>
public SocketUpdateType? UpdateType { get; set; } public SocketUpdateType? UpdateType { get; set; }
/// <summary>
/// Sequence number of the update
/// </summary>
public long? SequenceNumber { get; set; }
/// <summary> /// <summary>
/// ctor /// ctor
/// </summary> /// </summary>
@ -126,6 +131,15 @@ namespace CryptoExchange.Net.Objects.Sockets
return this; return this;
} }
/// <summary>
/// Specify the sequence number of the update
/// </summary>
public DataEvent<T> WithSequenceNumber(long? sequenceNumber)
{
SequenceNumber = sequenceNumber;
return this;
}
/// <summary> /// <summary>
/// Specify the data timestamp /// Specify the data timestamp
/// </summary> /// </summary>

View File

@ -7,8 +7,8 @@ namespace CryptoExchange.Net.OrderBook
{ {
public DateTime? LocalDataTime { get; set; } public DateTime? LocalDataTime { get; set; }
public DateTime? ServerDataTime { get; set; } public DateTime? ServerDataTime { get; set; }
public long StartUpdateId { get; set; } public long StartSequenceNumber { get; set; }
public long EndUpdateId { get; set; } public long EndSequenceNumber { get; set; }
public ISymbolOrderBookEntry[] Bids { get; set; } = Array.Empty<ISymbolOrderBookEntry>(); public ISymbolOrderBookEntry[] Bids { get; set; } = Array.Empty<ISymbolOrderBookEntry>();
public ISymbolOrderBookEntry[] Asks { get; set; } = Array.Empty<ISymbolOrderBookEntry>(); public ISymbolOrderBookEntry[] Asks { get; set; } = Array.Empty<ISymbolOrderBookEntry>();
} }
@ -17,14 +17,14 @@ namespace CryptoExchange.Net.OrderBook
{ {
public DateTime? LocalDataTime { get; set; } public DateTime? LocalDataTime { get; set; }
public DateTime? ServerDataTime { get; set; } public DateTime? ServerDataTime { get; set; }
public long StartUpdateId { get; set; } public long SequenceNumber { get; set; }
public long EndUpdateId { get; set; }
public ISymbolOrderBookEntry[] Bids { get; set; } = Array.Empty<ISymbolOrderBookEntry>(); public ISymbolOrderBookEntry[] Bids { get; set; } = Array.Empty<ISymbolOrderBookEntry>();
public ISymbolOrderBookEntry[] Asks { get; set; } = Array.Empty<ISymbolOrderBookEntry>(); public ISymbolOrderBookEntry[] Asks { get; set; } = Array.Empty<ISymbolOrderBookEntry>();
} }
internal class ChecksumItem internal class ChecksumItem
{ {
public long? SequenceNumber { get; set; }
public int Checksum { get; set; } public int Checksum { get; set; }
} }
} }

View File

@ -439,8 +439,7 @@ namespace CryptoExchange.Net.OrderBook
{ {
LocalDataTime = localDataTime, LocalDataTime = localDataTime,
ServerDataTime = serverDataTime, ServerDataTime = serverDataTime,
StartUpdateId = orderBookSequenceNumber, SequenceNumber = orderBookSequenceNumber,
EndUpdateId = orderBookSequenceNumber,
Asks = askList, Asks = askList,
Bids = bidList Bids = bidList
}); });
@ -467,8 +466,8 @@ namespace CryptoExchange.Net.OrderBook
{ {
LocalDataTime = localDataTime, LocalDataTime = localDataTime,
ServerDataTime = serverDataTime, ServerDataTime = serverDataTime,
StartUpdateId = updateId, StartSequenceNumber = updateId,
EndUpdateId = updateId, EndSequenceNumber = updateId,
Asks = asks, Asks = asks,
Bids = bids Bids = bids
}); });
@ -478,15 +477,15 @@ namespace CryptoExchange.Net.OrderBook
/// <summary> /// <summary>
/// Add an update to the process queue. Updates the book by providing changed bids and asks, along with the first and last sequence number in the update /// Add an update to the process queue. Updates the book by providing changed bids and asks, along with the first and last sequence number in the update
/// </summary> /// </summary>
/// <param name="firstUpdateId">The sequence number of the first update</param> /// <param name="firstSequenceNumber">The sequence number of the first update</param>
/// <param name="lastUpdateId">The sequence number of the last update</param> /// <param name="lastSequenceNumber">The sequence number of the last update</param>
/// <param name="bids">List of updated/new bids</param> /// <param name="bids">List of updated/new bids</param>
/// <param name="asks">List of updated/new asks</param> /// <param name="asks">List of updated/new asks</param>
/// <param name="serverDataTime">Server data timestamp</param> /// <param name="serverDataTime">Server data timestamp</param>
/// <param name="localDataTime">local data timestamp</param> /// <param name="localDataTime">local data timestamp</param>
protected void UpdateOrderBook( protected void UpdateOrderBook(
long firstUpdateId, long firstSequenceNumber,
long lastUpdateId, long lastSequenceNumber,
ISymbolOrderBookEntry[] bids, ISymbolOrderBookEntry[] bids,
ISymbolOrderBookEntry[] asks, ISymbolOrderBookEntry[] asks,
DateTime? serverDataTime = null, DateTime? serverDataTime = null,
@ -497,8 +496,8 @@ namespace CryptoExchange.Net.OrderBook
{ {
LocalDataTime = localDataTime, LocalDataTime = localDataTime,
ServerDataTime = serverDataTime, ServerDataTime = serverDataTime,
StartUpdateId = firstUpdateId, StartSequenceNumber = firstSequenceNumber,
EndUpdateId = lastUpdateId, EndSequenceNumber = lastSequenceNumber,
Asks = asks, Asks = asks,
Bids = bids Bids = bids
}); });
@ -526,8 +525,8 @@ namespace CryptoExchange.Net.OrderBook
{ {
LocalDataTime = localDataTime, LocalDataTime = localDataTime,
ServerDataTime = serverDataTime, ServerDataTime = serverDataTime,
StartUpdateId = lowest, StartSequenceNumber = lowest,
EndUpdateId = highest, EndSequenceNumber = highest,
Asks = asks, Asks = asks,
Bids = bids Bids = bids
}); });
@ -538,9 +537,10 @@ namespace CryptoExchange.Net.OrderBook
/// Add a checksum value to the process queue /// Add a checksum value to the process queue
/// </summary> /// </summary>
/// <param name="checksum">The checksum value</param> /// <param name="checksum">The checksum value</param>
protected void AddChecksum(int checksum) /// <param name="sequenceNumber">The id of the update</param>
protected void AddChecksum(int checksum, long? sequenceNumber = null)
{ {
_processQueue.Enqueue(new ChecksumItem() { Checksum = checksum }); _processQueue.Enqueue(new ChecksumItem() { Checksum = checksum, SequenceNumber = sequenceNumber });
_queueEvent.Set(); _queueEvent.Set();
} }
@ -561,26 +561,10 @@ namespace CryptoExchange.Net.OrderBook
/// <summary> /// <summary>
/// Update order book with an entry /// Update order book with an entry
/// </summary> /// </summary>
/// <param name="sequence">Sequence number of the update</param>
/// <param name="type">Type of entry</param> /// <param name="type">Type of entry</param>
/// <param name="entry">The entry</param> /// <param name="entry">The entry</param>
protected virtual bool ProcessUpdate(long sequence, OrderBookEntryType type, ISymbolOrderBookEntry entry) protected virtual bool ProcessUpdate(OrderBookEntryType type, ISymbolOrderBookEntry entry)
{ {
if (sequence <= LastSequenceNumber)
{
_logger.OrderBookSkippedMessage(Api, Symbol, sequence, LastSequenceNumber);
return false;
}
if (_sequencesAreConsecutive && sequence > LastSequenceNumber + 1)
{
// Out of sync
_logger.OrderBookOutOfSync(Api, Symbol, LastSequenceNumber + 1, sequence);
_stopProcessing = true;
Resubscribe();
return false;
}
UpdateTime = DateTime.UtcNow; UpdateTime = DateTime.UtcNow;
var listToChange = type == OrderBookEntryType.Ask ? _asks : _bids; var listToChange = type == OrderBookEntryType.Ask ? _asks : _bids;
if (entry.Quantity == 0) if (entry.Quantity == 0)
@ -796,7 +780,7 @@ namespace CryptoExchange.Net.OrderBook
foreach (var bid in item.Bids) foreach (var bid in item.Bids)
_bids.Add(bid.Price, bid); _bids.Add(bid.Price, bid);
LastSequenceNumber = item.EndUpdateId; LastSequenceNumber = item.SequenceNumber;
AskCount = _asks.Count; AskCount = _asks.Count;
BidCount = _bids.Count; BidCount = _bids.Count;
@ -805,7 +789,7 @@ namespace CryptoExchange.Net.OrderBook
UpdateServerTime = item.ServerDataTime; UpdateServerTime = item.ServerDataTime;
UpdateLocalTime = item.LocalDataTime; UpdateLocalTime = item.LocalDataTime;
_logger.OrderBookDataSet(Api, Symbol, BidCount, AskCount, item.EndUpdateId); _logger.OrderBookDataSet(Api, Symbol, BidCount, AskCount, item.SequenceNumber);
CheckProcessBuffer(); CheckProcessBuffer();
OnOrderBookUpdate?.Invoke((item.Bids.ToArray(), item.Asks.ToArray())); OnOrderBookUpdate?.Invoke((item.Bids.ToArray(), item.Asks.ToArray()));
OnBestOffersChanged?.Invoke((BestBid, BestAsk)); OnBestOffersChanged?.Invoke((BestBid, BestAsk));
@ -822,19 +806,19 @@ namespace CryptoExchange.Net.OrderBook
{ {
Asks = item.Asks, Asks = item.Asks,
Bids = item.Bids, Bids = item.Bids,
FirstUpdateId = item.StartUpdateId, FirstUpdateId = item.StartSequenceNumber,
LastUpdateId = item.EndUpdateId, LastUpdateId = item.EndSequenceNumber,
}); });
if (_logger.IsEnabled(LogLevel.Trace)) if (_logger.IsEnabled(LogLevel.Trace))
_logger.OrderBookUpdateBuffered(Api, Symbol, item.StartUpdateId, item.EndUpdateId, item.Asks.Length, item.Bids.Length); _logger.OrderBookUpdateBuffered(Api, Symbol, item.StartSequenceNumber, item.EndSequenceNumber, item.Asks.Length, item.Bids.Length);
} }
else else
{ {
CheckProcessBuffer(); CheckProcessBuffer();
var (prevBestBid, prevBestAsk) = BestOffers; var (prevBestBid, prevBestAsk) = BestOffers;
ProcessRangeUpdates(item.StartUpdateId, item.EndUpdateId, item.Bids, item.Asks); ProcessRangeUpdates(item.StartSequenceNumber, item.EndSequenceNumber, item.Bids, item.Asks);
if (_asks.Count == 0 || _bids.Count == 0) if (_asks.Count == 0 || _bids.Count == 0)
return; return;
@ -876,6 +860,9 @@ namespace CryptoExchange.Net.OrderBook
throw; throw;
} }
if (ci.SequenceNumber != null)
LastSequenceNumber = ci.SequenceNumber.Value;
if (!checksumResult) if (!checksumResult)
{ {
_logger.OrderBookOutOfSyncChecksum(Api, Symbol); _logger.OrderBookOutOfSyncChecksum(Api, Symbol);
@ -914,22 +901,32 @@ namespace CryptoExchange.Net.OrderBook
} }
private void ProcessRangeUpdates( private void ProcessRangeUpdates(
long firstUpdateId, long updateSequenceNumberStart,
long lastUpdateId, long updateSequenceNumberEnd,
IEnumerable<ISymbolOrderBookEntry> bids, IEnumerable<ISymbolOrderBookEntry> bids,
IEnumerable<ISymbolOrderBookEntry> asks) IEnumerable<ISymbolOrderBookEntry> asks)
{ {
if (lastUpdateId <= LastSequenceNumber) if (updateSequenceNumberEnd <= LastSequenceNumber)
{ {
_logger.OrderBookUpdateSkipped(Api, Symbol, lastUpdateId, LastSequenceNumber); // We're already past this update
_logger.OrderBookUpdateSkipped(Api, Symbol, updateSequenceNumberEnd, LastSequenceNumber);
return;
}
if (_sequencesAreConsecutive && updateSequenceNumberStart != LastSequenceNumber + 1)
{
// Expected the start sequenceNumber to be LastSequenceNumber + 1, but wasn't
_logger.OrderBookOutOfSync(Api, Symbol, LastSequenceNumber + 1, updateSequenceNumberEnd);
_stopProcessing = true;
Resubscribe();
return; return;
} }
foreach (var entry in bids) foreach (var entry in bids)
ProcessUpdate(LastSequenceNumber + 1, OrderBookEntryType.Bid, entry); ProcessUpdate(OrderBookEntryType.Bid, entry);
foreach (var entry in asks) foreach (var entry in asks)
ProcessUpdate(LastSequenceNumber + 1, OrderBookEntryType.Ask, entry); ProcessUpdate(OrderBookEntryType.Ask, entry);
if (Levels.HasValue && _strictLevels) if (Levels.HasValue && _strictLevels)
{ {
@ -946,14 +943,14 @@ namespace CryptoExchange.Net.OrderBook
} }
} }
LastSequenceNumber = lastUpdateId; LastSequenceNumber = updateSequenceNumberEnd;
if (_logger.IsEnabled(LogLevel.Trace)) if (_logger.IsEnabled(LogLevel.Trace))
{ {
if (firstUpdateId != lastUpdateId) if (updateSequenceNumberStart != updateSequenceNumberEnd)
_logger.OrderBookProcessedMessage(Api, Symbol, firstUpdateId, lastUpdateId); _logger.OrderBookProcessedMessage(Api, Symbol, updateSequenceNumberStart, updateSequenceNumberEnd);
else else
_logger.OrderBookProcessedMessage(Api, Symbol, firstUpdateId); _logger.OrderBookProcessedMessage(Api, Symbol, updateSequenceNumberStart);
} }
} }
} }

View File

@ -257,6 +257,7 @@ namespace CryptoExchange.Net.Sockets.Default
} }
} }
private bool _pausedActivity; private bool _pausedActivity;
#if NET9_0_OR_GREATER #if NET9_0_OR_GREATER
private readonly Lock _listenersLock = new Lock(); private readonly Lock _listenersLock = new Lock();
@ -274,6 +275,8 @@ namespace CryptoExchange.Net.Sockets.Default
private ISocketMessageHandler? _byteMessageConverter; private ISocketMessageHandler? _byteMessageConverter;
private ISocketMessageHandler? _textMessageConverter; private ISocketMessageHandler? _textMessageConverter;
private long _lastSequenceNumber;
/// <summary> /// <summary>
/// The task that is sending periodic data on the websocket. Can be used for sending Ping messages every x seconds or similar. Not necessary. /// The task that is sending periodic data on the websocket. Can be used for sending Ping messages every x seconds or similar. Not necessary.
/// </summary> /// </summary>
@ -340,6 +343,7 @@ namespace CryptoExchange.Net.Sockets.Default
{ {
Status = SocketStatus.Closed; Status = SocketStatus.Closed;
Authenticated = false; Authenticated = false;
_lastSequenceNumber = 0;
if (ApiClient._socketConnections.ContainsKey(SocketId)) if (ApiClient._socketConnections.ContainsKey(SocketId))
ApiClient._socketConnections.TryRemove(SocketId, out _); ApiClient._socketConnections.TryRemove(SocketId, out _);
@ -371,6 +375,7 @@ namespace CryptoExchange.Net.Sockets.Default
Status = SocketStatus.Reconnecting; Status = SocketStatus.Reconnecting;
DisconnectTime = DateTime.UtcNow; DisconnectTime = DateTime.UtcNow;
Authenticated = false; Authenticated = false;
_lastSequenceNumber = 0;
lock (_listenersLock) lock (_listenersLock)
{ {
@ -1280,6 +1285,23 @@ namespace CryptoExchange.Net.Sockets.Default
return result; return result;
} }
/// <summary>
/// Update the sequence number for this connection
/// </summary>
public void UpdateSequenceNumber(long sequenceNumber)
{
if (ApiClient.EnforceSequenceNumbers
&& _lastSequenceNumber != 0
&& _lastSequenceNumber + 1 != sequenceNumber)
{
// Not sequential
_logger.LogWarning("[Sckt {SocketId}] update not in sequence. Last recorded sequence number: {LastSequence}, update sequence number: {UpdateSequence}. Reconnecting", SocketId, _lastSequenceNumber, sequenceNumber);
_ = TriggerReconnectAsync();
}
_lastSequenceNumber = sequenceNumber;
}
/// <summary> /// <summary>
/// Periodically sends data over a socket connection /// Periodically sends data over a socket connection
/// </summary> /// </summary>