mirror of
https://github.com/JKorf/CryptoExchange.Net
synced 2026-02-16 22:23:54 +00:00
Compare commits
3 Commits
c512bee825
...
fc2d3fc2d2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fc2d3fc2d2 | ||
|
|
187ca6a4ef | ||
|
|
3b2a85d210 |
@ -142,6 +142,10 @@ namespace CryptoExchange.Net.Clients
|
||||
/// </summary>
|
||||
public int? MaxIndividualSubscriptionsPerConnection { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Whether or not to enforce that sequence number updates are always (lastSequenceNumber + 1)
|
||||
/// </summary>
|
||||
public bool EnforceSequenceNumbers { get; set; }
|
||||
#endregion
|
||||
|
||||
/// <summary>
|
||||
@ -706,6 +710,10 @@ namespace CryptoExchange.Net.Clients
|
||||
if (connection != null && !connection.DedicatedRequestConnection.Authenticated)
|
||||
// Mark dedicated request connection as authenticated if the request is authenticated
|
||||
connection.DedicatedRequestConnection.Authenticated = authenticated;
|
||||
|
||||
if (connection == null)
|
||||
// Fall back to an existing connection if there is no dedicated request connection available
|
||||
connection = socketQuery.OrderBy(s => s.UserSubscriptionCount).FirstOrDefault();
|
||||
}
|
||||
|
||||
bool maxConnectionsReached = _socketConnections.Count >= (ApiOptions.MaxSocketConnections ?? ClientOptions.MaxSocketConnections);
|
||||
|
||||
@ -0,0 +1,38 @@
|
||||
using System;
|
||||
using System.Text.Json;
|
||||
using System.Text.Json.Serialization;
|
||||
|
||||
namespace CryptoExchange.Net.Converters.SystemTextJson
|
||||
{
|
||||
/// <summary>
|
||||
/// Bool converter
|
||||
/// </summary>
|
||||
public class IntBoolConverter : JsonConverter<bool>
|
||||
{
|
||||
private readonly int _trueValue;
|
||||
|
||||
/// <summary>
|
||||
/// ctor
|
||||
/// </summary>
|
||||
/// <param name="trueValue">The int value representing the true value</param>
|
||||
public IntBoolConverter(int trueValue)
|
||||
{
|
||||
_trueValue = trueValue;
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public override bool Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
|
||||
{
|
||||
if (reader.TokenType != JsonTokenType.Number)
|
||||
return false;
|
||||
|
||||
return reader.GetDecimal() == _trueValue;
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public override void Write(Utf8JsonWriter writer, bool value, JsonSerializerOptions options)
|
||||
{
|
||||
writer.WriteNumberValue(_trueValue);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -6,9 +6,9 @@
|
||||
<PackageId>CryptoExchange.Net</PackageId>
|
||||
<Authors>JKorf</Authors>
|
||||
<Description>CryptoExchange.Net is a base library which is used to implement different cryptocurrency (exchange) API's. It provides a standardized way of implementing different API's, which results in a very similar experience for users of the API implementations.</Description>
|
||||
<PackageVersion>10.1.0</PackageVersion>
|
||||
<AssemblyVersion>10.1.0</AssemblyVersion>
|
||||
<FileVersion>10.1.0</FileVersion>
|
||||
<PackageVersion>10.2.0</PackageVersion>
|
||||
<AssemblyVersion>10.2.0</AssemblyVersion>
|
||||
<FileVersion>10.2.0</FileVersion>
|
||||
<PackageRequireLicenseAcceptance>false</PackageRequireLicenseAcceptance>
|
||||
<PackageTags>OKX;OKX.Net;Mexc;Mexc.Net;Kucoin;Kucoin.Net;Kraken;Kraken.Net;Huobi;Huobi.Net;CoinEx;CoinEx.Net;Bybit;Bybit.Net;Bitget;Bitget.Net;Bitfinex;Bitfinex.Net;Binance;Binance.Net;CryptoCurrency;CryptoCurrency Exchange;CryptoExchange.Net</PackageTags>
|
||||
<RepositoryType>git</RepositoryType>
|
||||
|
||||
@ -22,7 +22,7 @@ namespace CryptoExchange.Net.Logging.Extensions
|
||||
private static readonly Action<ILogger, string, string, Exception?> _orderBookResyncing;
|
||||
private static readonly Action<ILogger, string, string, Exception?> _orderBookResynced;
|
||||
private static readonly Action<ILogger, string, string, Exception?> _orderBookMessageSkippedBecauseOfResubscribing;
|
||||
private static readonly Action<ILogger, string, string, long, long, long, Exception?> _orderBookDataSet;
|
||||
private static readonly Action<ILogger, string, string, long, long, long?, Exception?> _orderBookDataSet;
|
||||
private static readonly Action<ILogger, string, string, long, long, long, long, Exception?> _orderBookUpdateBuffered;
|
||||
private static readonly Action<ILogger, string, string, decimal, decimal, Exception?> _orderBookOutOfSyncDetected;
|
||||
private static readonly Action<ILogger, string, string, Exception?> _orderBookReconnectingSocket;
|
||||
@ -30,6 +30,7 @@ namespace CryptoExchange.Net.Logging.Extensions
|
||||
private static readonly Action<ILogger, string, string, long, long, Exception?> _orderBookProcessedMessage;
|
||||
private static readonly Action<ILogger, string, string, long, Exception?> _orderBookProcessedMessageSingle;
|
||||
private static readonly Action<ILogger, string, string, long, long, Exception?> _orderBookOutOfSync;
|
||||
private static readonly Action<ILogger, string, string, long, long, long, Exception?> _orderBookUpdateSkippedStartEnd;
|
||||
|
||||
static SymbolOrderBookLoggingExtensions()
|
||||
{
|
||||
@ -74,7 +75,7 @@ namespace CryptoExchange.Net.Logging.Extensions
|
||||
"{Api} order book {Symbol} Processing {NumberBufferedUpdated} buffered updates");
|
||||
|
||||
_orderBookUpdateSkipped = LoggerMessage.Define<string, string, long, long>(
|
||||
LogLevel.Debug,
|
||||
LogLevel.Trace,
|
||||
new EventId(5008, "OrderBookUpdateSkipped"),
|
||||
"{Api} order book {Symbol} update skipped #{SequenceNumber}, currently at #{LastSequenceNumber}");
|
||||
|
||||
@ -93,10 +94,10 @@ namespace CryptoExchange.Net.Logging.Extensions
|
||||
new EventId(5011, "OrderBookMessageSkippedResubscribing"),
|
||||
"{Api} order book {Symbol} Skipping message because of resubscribing");
|
||||
|
||||
_orderBookDataSet = LoggerMessage.Define<string, string, long, long, long>(
|
||||
LogLevel.Debug,
|
||||
_orderBookDataSet = LoggerMessage.Define<string, string, long, long, long?>(
|
||||
LogLevel.Trace,
|
||||
new EventId(5012, "OrderBookDataSet"),
|
||||
"{Api} order book {Symbol} data set: {BidCount} bids, {AskCount} asks. #{EndUpdateId}");
|
||||
"{Api} order book {Symbol} snapshot set: {BidCount} bids, {AskCount} asks. #{EndUpdateId}");
|
||||
|
||||
_orderBookUpdateBuffered = LoggerMessage.Define<string, string, long, long, long, long>(
|
||||
LogLevel.Trace,
|
||||
@ -142,6 +143,12 @@ namespace CryptoExchange.Net.Logging.Extensions
|
||||
LogLevel.Trace,
|
||||
new EventId(5021, "OrderBookProcessedMessage"),
|
||||
"{Api} order book {Symbol} update processed #{UpdateId}");
|
||||
|
||||
_orderBookUpdateSkippedStartEnd = LoggerMessage.Define<string, string, long, long, long>(
|
||||
LogLevel.Trace,
|
||||
new EventId(5022, "OrderBookUpdateSkippedStartEnd"),
|
||||
"{Api} order book {Symbol} update skipped #{SequenceStart}-#{SequenceEnd}, currently at #{LastSequenceNumber}");
|
||||
|
||||
}
|
||||
|
||||
public static void OrderBookStatusChanged(this ILogger logger, string api, string symbol, OrderBookStatus previousStatus, OrderBookStatus newStatus)
|
||||
@ -200,7 +207,7 @@ namespace CryptoExchange.Net.Logging.Extensions
|
||||
{
|
||||
_orderBookMessageSkippedBecauseOfResubscribing(logger, api, symbol, null);
|
||||
}
|
||||
public static void OrderBookDataSet(this ILogger logger, string api, string symbol, long bidCount, long askCount, long endUpdateId)
|
||||
public static void OrderBookDataSet(this ILogger logger, string api, string symbol, long bidCount, long askCount, long? endUpdateId)
|
||||
{
|
||||
_orderBookDataSet(logger, api, symbol, bidCount, askCount, endUpdateId, null);
|
||||
}
|
||||
@ -243,5 +250,10 @@ namespace CryptoExchange.Net.Logging.Extensions
|
||||
{
|
||||
_orderBookOutOfSyncChecksum(logger, api, symbol, null);
|
||||
}
|
||||
|
||||
public static void OrderBookUpdateSkipped(this ILogger logger, string api, string symbol, long sequenceStart, long sequenceEnd, long lastSequenceNumber)
|
||||
{
|
||||
_orderBookUpdateSkippedStartEnd(logger, api, symbol, sequenceStart, sequenceEnd, lastSequenceNumber, null);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -53,6 +53,11 @@ namespace CryptoExchange.Net.Objects.Sockets
|
||||
/// </summary>
|
||||
public SocketUpdateType? UpdateType { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Sequence number of the update
|
||||
/// </summary>
|
||||
public long? SequenceNumber { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// ctor
|
||||
/// </summary>
|
||||
@ -126,6 +131,15 @@ namespace CryptoExchange.Net.Objects.Sockets
|
||||
return this;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Specify the sequence number of the update
|
||||
/// </summary>
|
||||
public DataEvent<T> WithSequenceNumber(long? sequenceNumber)
|
||||
{
|
||||
SequenceNumber = sequenceNumber;
|
||||
return this;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Specify the data timestamp
|
||||
/// </summary>
|
||||
|
||||
@ -3,28 +3,28 @@ using System;
|
||||
|
||||
namespace CryptoExchange.Net.OrderBook
|
||||
{
|
||||
internal class ProcessQueueItem
|
||||
internal class OrderBookUpdate
|
||||
{
|
||||
public DateTime? LocalDataTime { get; set; }
|
||||
public DateTime? ServerDataTime { get; set; }
|
||||
public long StartUpdateId { get; set; }
|
||||
public long EndUpdateId { get; set; }
|
||||
public long StartSequenceNumber { get; set; }
|
||||
public long EndSequenceNumber { get; set; }
|
||||
public ISymbolOrderBookEntry[] Bids { get; set; } = Array.Empty<ISymbolOrderBookEntry>();
|
||||
public ISymbolOrderBookEntry[] Asks { get; set; } = Array.Empty<ISymbolOrderBookEntry>();
|
||||
}
|
||||
|
||||
internal class InitialOrderBookItem
|
||||
internal class OrderBookSnapshot
|
||||
{
|
||||
public DateTime? LocalDataTime { get; set; }
|
||||
public DateTime? ServerDataTime { get; set; }
|
||||
public long StartUpdateId { get; set; }
|
||||
public long EndUpdateId { get; set; }
|
||||
public long? SequenceNumber { get; set; }
|
||||
public ISymbolOrderBookEntry[] Bids { get; set; } = Array.Empty<ISymbolOrderBookEntry>();
|
||||
public ISymbolOrderBookEntry[] Asks { get; set; } = Array.Empty<ISymbolOrderBookEntry>();
|
||||
}
|
||||
|
||||
internal class ChecksumItem
|
||||
internal class OrderBookChecksum
|
||||
{
|
||||
public long? SequenceNumber { get; set; }
|
||||
public int Checksum { get; set; }
|
||||
}
|
||||
}
|
||||
@ -39,6 +39,7 @@ namespace CryptoExchange.Net.OrderBook
|
||||
private readonly AsyncResetEvent _queueEvent;
|
||||
private readonly ConcurrentQueue<object> _processQueue;
|
||||
private bool _validateChecksum;
|
||||
private bool _firstUpdateAfterSnapshotDone;
|
||||
|
||||
private class EmptySymbolOrderBookEntry : ISymbolOrderBookEntry
|
||||
{
|
||||
@ -50,6 +51,13 @@ namespace CryptoExchange.Net.OrderBook
|
||||
|
||||
private static readonly ISymbolOrderBookEntry _emptySymbolOrderBookEntry = new EmptySymbolOrderBookEntry();
|
||||
|
||||
private enum SequenceNumberResult
|
||||
{
|
||||
Skip,
|
||||
Ok,
|
||||
OutOfSync
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// A buffer to store messages received before the initial book snapshot is processed. These messages
|
||||
/// will be processed after the book snapshot is set. Any messages in this buffer with sequence numbers lower
|
||||
@ -77,7 +85,12 @@ namespace CryptoExchange.Net.OrderBook
|
||||
/// the book will resynchronize as it is deemed out of sync
|
||||
/// </summary>
|
||||
protected bool _sequencesAreConsecutive;
|
||||
|
||||
|
||||
/// <summary>
|
||||
/// Whether the first update message after a snapshot may have overlapping sequence numbers instead of the snapshot sequence number + 1
|
||||
/// </summary>
|
||||
protected bool _skipSequenceCheckFirstUpdateAfterSnapshotSet;
|
||||
|
||||
/// <summary>
|
||||
/// Whether levels should be strictly enforced. For example, when an order book has 25 levels and a new update comes in which pushes
|
||||
/// the current level 25 ask out of the top 25, should the level 26 entry be removed from the book or does the server handle this
|
||||
@ -267,6 +280,7 @@ namespace CryptoExchange.Net.OrderBook
|
||||
|
||||
_processBuffer.Clear();
|
||||
_bookSet = false;
|
||||
_firstUpdateAfterSnapshotDone = false;
|
||||
|
||||
Status = OrderBookStatus.Connecting;
|
||||
_processTask = Task.Factory.StartNew(ProcessQueue, TaskCreationOptions.LongRunning);
|
||||
@ -419,28 +433,27 @@ namespace CryptoExchange.Net.OrderBook
|
||||
protected virtual bool DoChecksum(int checksum) => true;
|
||||
|
||||
/// <summary>
|
||||
/// Set the initial data for the order book. Typically the snapshot which was requested from the Rest API, or the first snapshot
|
||||
/// received from a socket subscription
|
||||
/// Set snapshot data for the order book. Typically the snapshot which was requested from the Rest API, or the first snapshot
|
||||
/// received from a socket subscription. Will clear any previous data.
|
||||
/// </summary>
|
||||
/// <param name="orderBookSequenceNumber">The last update sequence number until which the snapshot is in sync</param>
|
||||
/// <param name="askList">List of asks</param>
|
||||
/// <param name="bidList">List of bids</param>
|
||||
/// <param name="serverDataTime">Server data timestamp</param>
|
||||
/// <param name="localDataTime">local data timestamp</param>
|
||||
protected void SetInitialOrderBook(
|
||||
long orderBookSequenceNumber,
|
||||
protected void SetSnapshot(
|
||||
long? orderBookSequenceNumber,
|
||||
ISymbolOrderBookEntry[] bidList,
|
||||
ISymbolOrderBookEntry[] askList,
|
||||
DateTime? serverDataTime = null,
|
||||
DateTime? localDataTime = null)
|
||||
{
|
||||
_processQueue.Enqueue(
|
||||
new InitialOrderBookItem
|
||||
new OrderBookSnapshot
|
||||
{
|
||||
LocalDataTime = localDataTime,
|
||||
ServerDataTime = serverDataTime,
|
||||
StartUpdateId = orderBookSequenceNumber,
|
||||
EndUpdateId = orderBookSequenceNumber,
|
||||
SequenceNumber = orderBookSequenceNumber,
|
||||
Asks = askList,
|
||||
Bids = bidList
|
||||
});
|
||||
@ -450,25 +463,25 @@ namespace CryptoExchange.Net.OrderBook
|
||||
/// <summary>
|
||||
/// Add an update to the process queue. Updates the book by providing changed bids and asks, along with an update number which should be higher than the previous update numbers
|
||||
/// </summary>
|
||||
/// <param name="updateId">The sequence number</param>
|
||||
/// <param name="sequenceNumber">The sequence number</param>
|
||||
/// <param name="bids">List of updated/new bids</param>
|
||||
/// <param name="asks">List of updated/new asks</param>
|
||||
/// <param name="serverDataTime">Server data timestamp</param>
|
||||
/// <param name="localDataTime">local data timestamp</param>
|
||||
protected void UpdateOrderBook(
|
||||
long updateId,
|
||||
long sequenceNumber,
|
||||
ISymbolOrderBookEntry[] bids,
|
||||
ISymbolOrderBookEntry[] asks,
|
||||
DateTime? serverDataTime = null,
|
||||
DateTime? localDataTime = null)
|
||||
{
|
||||
_processQueue.Enqueue(
|
||||
new ProcessQueueItem
|
||||
new OrderBookUpdate
|
||||
{
|
||||
LocalDataTime = localDataTime,
|
||||
ServerDataTime = serverDataTime,
|
||||
StartUpdateId = updateId,
|
||||
EndUpdateId = updateId,
|
||||
StartSequenceNumber = sequenceNumber,
|
||||
EndSequenceNumber = sequenceNumber,
|
||||
Asks = asks,
|
||||
Bids = bids
|
||||
});
|
||||
@ -478,27 +491,27 @@ namespace CryptoExchange.Net.OrderBook
|
||||
/// <summary>
|
||||
/// Add an update to the process queue. Updates the book by providing changed bids and asks, along with the first and last sequence number in the update
|
||||
/// </summary>
|
||||
/// <param name="firstUpdateId">The sequence number of the first update</param>
|
||||
/// <param name="lastUpdateId">The sequence number of the last update</param>
|
||||
/// <param name="firstSequenceNumber">The sequence number of the first update</param>
|
||||
/// <param name="lastSequenceNumber">The sequence number of the last update</param>
|
||||
/// <param name="bids">List of updated/new bids</param>
|
||||
/// <param name="asks">List of updated/new asks</param>
|
||||
/// <param name="serverDataTime">Server data timestamp</param>
|
||||
/// <param name="localDataTime">local data timestamp</param>
|
||||
protected void UpdateOrderBook(
|
||||
long firstUpdateId,
|
||||
long lastUpdateId,
|
||||
long firstSequenceNumber,
|
||||
long lastSequenceNumber,
|
||||
ISymbolOrderBookEntry[] bids,
|
||||
ISymbolOrderBookEntry[] asks,
|
||||
DateTime? serverDataTime = null,
|
||||
DateTime? localDataTime = null)
|
||||
{
|
||||
_processQueue.Enqueue(
|
||||
new ProcessQueueItem
|
||||
new OrderBookUpdate
|
||||
{
|
||||
LocalDataTime = localDataTime,
|
||||
ServerDataTime = serverDataTime,
|
||||
StartUpdateId = firstUpdateId,
|
||||
EndUpdateId = lastUpdateId,
|
||||
StartSequenceNumber = firstSequenceNumber,
|
||||
EndSequenceNumber = lastSequenceNumber,
|
||||
Asks = asks,
|
||||
Bids = bids
|
||||
});
|
||||
@ -522,12 +535,12 @@ namespace CryptoExchange.Net.OrderBook
|
||||
var lowest = Math.Min(bids.Any() ? bids.Min(b => b.Sequence) : long.MaxValue, asks.Any() ? asks.Min(a => a.Sequence) : long.MaxValue);
|
||||
|
||||
_processQueue.Enqueue(
|
||||
new ProcessQueueItem
|
||||
new OrderBookUpdate
|
||||
{
|
||||
LocalDataTime = localDataTime,
|
||||
ServerDataTime = serverDataTime,
|
||||
StartUpdateId = lowest,
|
||||
EndUpdateId = highest,
|
||||
StartSequenceNumber = lowest,
|
||||
EndSequenceNumber = highest,
|
||||
Asks = asks,
|
||||
Bids = bids
|
||||
});
|
||||
@ -538,9 +551,10 @@ namespace CryptoExchange.Net.OrderBook
|
||||
/// Add a checksum value to the process queue
|
||||
/// </summary>
|
||||
/// <param name="checksum">The checksum value</param>
|
||||
protected void AddChecksum(int checksum)
|
||||
/// <param name="sequenceNumber">The sequence number of the message if it's a separate message with separate number</param>
|
||||
protected void AddChecksum(int checksum, long? sequenceNumber = null)
|
||||
{
|
||||
_processQueue.Enqueue(new ChecksumItem() { Checksum = checksum });
|
||||
_processQueue.Enqueue(new OrderBookChecksum() { Checksum = checksum, SequenceNumber = sequenceNumber });
|
||||
_queueEvent.Set();
|
||||
}
|
||||
|
||||
@ -553,7 +567,12 @@ namespace CryptoExchange.Net.OrderBook
|
||||
_logger.OrderBookProcessingBufferedUpdates(Api, Symbol, _processBuffer.Count);
|
||||
|
||||
foreach (var bufferEntry in _processBuffer)
|
||||
ProcessRangeUpdates(bufferEntry.FirstUpdateId, bufferEntry.LastUpdateId, bufferEntry.Bids, bufferEntry.Asks);
|
||||
{
|
||||
if (_stopProcessing)
|
||||
break;
|
||||
|
||||
ProcessUpdate(bufferEntry.FirstUpdateId, bufferEntry.LastUpdateId, bufferEntry.Bids, bufferEntry.Asks, true);
|
||||
}
|
||||
|
||||
_processBuffer.Clear();
|
||||
}
|
||||
@ -561,26 +580,10 @@ namespace CryptoExchange.Net.OrderBook
|
||||
/// <summary>
|
||||
/// Update order book with an entry
|
||||
/// </summary>
|
||||
/// <param name="sequence">Sequence number of the update</param>
|
||||
/// <param name="type">Type of entry</param>
|
||||
/// <param name="entry">The entry</param>
|
||||
protected virtual bool ProcessUpdate(long sequence, OrderBookEntryType type, ISymbolOrderBookEntry entry)
|
||||
protected virtual bool UpdateValue(OrderBookEntryType type, ISymbolOrderBookEntry entry)
|
||||
{
|
||||
if (sequence <= LastSequenceNumber)
|
||||
{
|
||||
_logger.OrderBookSkippedMessage(Api, Symbol, sequence, LastSequenceNumber);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (_sequencesAreConsecutive && sequence > LastSequenceNumber + 1)
|
||||
{
|
||||
// Out of sync
|
||||
_logger.OrderBookOutOfSync(Api, Symbol, LastSequenceNumber + 1, sequence);
|
||||
_stopProcessing = true;
|
||||
Resubscribe();
|
||||
return false;
|
||||
}
|
||||
|
||||
UpdateTime = DateTime.UtcNow;
|
||||
var listToChange = type == OrderBookEntryType.Ask ? _asks : _bids;
|
||||
if (entry.Quantity == 0)
|
||||
@ -735,8 +738,11 @@ namespace CryptoExchange.Net.OrderBook
|
||||
// Clear queue
|
||||
while (_processQueue.TryDequeue(out _)) { }
|
||||
|
||||
LastSequenceNumber = 0;
|
||||
_processBuffer.Clear();
|
||||
_bookSet = false;
|
||||
_firstUpdateAfterSnapshotDone = false;
|
||||
|
||||
DoReset();
|
||||
}
|
||||
|
||||
@ -774,17 +780,17 @@ namespace CryptoExchange.Net.OrderBook
|
||||
continue;
|
||||
}
|
||||
|
||||
if (item is InitialOrderBookItem iobi)
|
||||
ProcessInitialOrderBookItem(iobi);
|
||||
if (item is ProcessQueueItem pqi)
|
||||
ProcessQueueItem(pqi);
|
||||
else if (item is ChecksumItem ci)
|
||||
ProcessChecksum(ci);
|
||||
if (item is OrderBookSnapshot snapshot)
|
||||
ProcessOrderBookSnapshot(snapshot);
|
||||
if (item is OrderBookUpdate update)
|
||||
ProcessQueueItem(update);
|
||||
else if (item is OrderBookChecksum checksum)
|
||||
ProcessChecksum(checksum);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void ProcessInitialOrderBookItem(InitialOrderBookItem item)
|
||||
private void ProcessOrderBookSnapshot(OrderBookSnapshot item)
|
||||
{
|
||||
lock (_bookLock)
|
||||
{
|
||||
@ -796,7 +802,8 @@ namespace CryptoExchange.Net.OrderBook
|
||||
foreach (var bid in item.Bids)
|
||||
_bids.Add(bid.Price, bid);
|
||||
|
||||
LastSequenceNumber = item.EndUpdateId;
|
||||
if (item.SequenceNumber != null)
|
||||
LastSequenceNumber = item.SequenceNumber.Value;
|
||||
|
||||
AskCount = _asks.Count;
|
||||
BidCount = _bids.Count;
|
||||
@ -805,14 +812,15 @@ namespace CryptoExchange.Net.OrderBook
|
||||
UpdateServerTime = item.ServerDataTime;
|
||||
UpdateLocalTime = item.LocalDataTime;
|
||||
|
||||
_logger.OrderBookDataSet(Api, Symbol, BidCount, AskCount, item.EndUpdateId);
|
||||
_logger.OrderBookDataSet(Api, Symbol, BidCount, AskCount, item.SequenceNumber);
|
||||
CheckProcessBuffer();
|
||||
|
||||
OnOrderBookUpdate?.Invoke((item.Bids.ToArray(), item.Asks.ToArray()));
|
||||
OnBestOffersChanged?.Invoke((BestBid, BestAsk));
|
||||
}
|
||||
}
|
||||
|
||||
private void ProcessQueueItem(ProcessQueueItem item)
|
||||
private void ProcessQueueItem(OrderBookUpdate item)
|
||||
{
|
||||
lock (_bookLock)
|
||||
{
|
||||
@ -822,19 +830,19 @@ namespace CryptoExchange.Net.OrderBook
|
||||
{
|
||||
Asks = item.Asks,
|
||||
Bids = item.Bids,
|
||||
FirstUpdateId = item.StartUpdateId,
|
||||
LastUpdateId = item.EndUpdateId,
|
||||
FirstUpdateId = item.StartSequenceNumber,
|
||||
LastUpdateId = item.EndSequenceNumber,
|
||||
});
|
||||
|
||||
|
||||
if (_logger.IsEnabled(LogLevel.Trace))
|
||||
_logger.OrderBookUpdateBuffered(Api, Symbol, item.StartUpdateId, item.EndUpdateId, item.Asks.Length, item.Bids.Length);
|
||||
_logger.OrderBookUpdateBuffered(Api, Symbol, item.StartSequenceNumber, item.EndSequenceNumber, item.Asks.Length, item.Bids.Length);
|
||||
}
|
||||
else
|
||||
{
|
||||
CheckProcessBuffer();
|
||||
var (prevBestBid, prevBestAsk) = BestOffers;
|
||||
ProcessRangeUpdates(item.StartUpdateId, item.EndUpdateId, item.Bids, item.Asks);
|
||||
ProcessUpdate(item.StartSequenceNumber, item.EndSequenceNumber, item.Bids, item.Asks, false);
|
||||
|
||||
if (_asks.Count == 0 || _bids.Count == 0)
|
||||
return;
|
||||
@ -856,7 +864,7 @@ namespace CryptoExchange.Net.OrderBook
|
||||
}
|
||||
}
|
||||
|
||||
private void ProcessChecksum(ChecksumItem ci)
|
||||
private void ProcessChecksum(OrderBookChecksum ci)
|
||||
{
|
||||
lock (_bookLock)
|
||||
{
|
||||
@ -876,6 +884,9 @@ namespace CryptoExchange.Net.OrderBook
|
||||
throw;
|
||||
}
|
||||
|
||||
if (ci.SequenceNumber != null)
|
||||
LastSequenceNumber = ci.SequenceNumber.Value;
|
||||
|
||||
if (!checksumResult)
|
||||
{
|
||||
_logger.OrderBookOutOfSyncChecksum(Api, Symbol);
|
||||
@ -913,23 +924,37 @@ namespace CryptoExchange.Net.OrderBook
|
||||
});
|
||||
}
|
||||
|
||||
private void ProcessRangeUpdates(
|
||||
long firstUpdateId,
|
||||
long lastUpdateId,
|
||||
private void ProcessUpdate(
|
||||
long updateSequenceNumberStart,
|
||||
long updateSequenceNumberEnd,
|
||||
IEnumerable<ISymbolOrderBookEntry> bids,
|
||||
IEnumerable<ISymbolOrderBookEntry> asks)
|
||||
IEnumerable<ISymbolOrderBookEntry> asks,
|
||||
bool fromBuffer)
|
||||
{
|
||||
if (lastUpdateId <= LastSequenceNumber)
|
||||
var sequenceResult = fromBuffer ? ValidateBufferSequenceNumber(updateSequenceNumberStart, updateSequenceNumberEnd) : ValidateLiveSequenceNumber(updateSequenceNumberStart);
|
||||
if (sequenceResult == SequenceNumberResult.Skip)
|
||||
{
|
||||
_logger.OrderBookUpdateSkipped(Api, Symbol, lastUpdateId, LastSequenceNumber);
|
||||
if (updateSequenceNumberStart != updateSequenceNumberEnd)
|
||||
_logger.OrderBookUpdateSkipped(Api, Symbol, updateSequenceNumberStart, updateSequenceNumberEnd, LastSequenceNumber);
|
||||
else
|
||||
_logger.OrderBookUpdateSkipped(Api, Symbol, updateSequenceNumberStart, LastSequenceNumber);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
if (sequenceResult == SequenceNumberResult.OutOfSync)
|
||||
{
|
||||
_logger.OrderBookOutOfSync(Api, Symbol, LastSequenceNumber + 1, updateSequenceNumberStart);
|
||||
_stopProcessing = true;
|
||||
Resubscribe();
|
||||
return;
|
||||
}
|
||||
|
||||
foreach (var entry in bids)
|
||||
ProcessUpdate(LastSequenceNumber + 1, OrderBookEntryType.Bid, entry);
|
||||
UpdateValue(OrderBookEntryType.Bid, entry);
|
||||
|
||||
foreach (var entry in asks)
|
||||
ProcessUpdate(LastSequenceNumber + 1, OrderBookEntryType.Ask, entry);
|
||||
UpdateValue(OrderBookEntryType.Ask, entry);
|
||||
|
||||
if (Levels.HasValue && _strictLevels)
|
||||
{
|
||||
@ -946,16 +971,51 @@ namespace CryptoExchange.Net.OrderBook
|
||||
}
|
||||
}
|
||||
|
||||
LastSequenceNumber = lastUpdateId;
|
||||
_firstUpdateAfterSnapshotDone = true;
|
||||
LastSequenceNumber = updateSequenceNumberEnd;
|
||||
|
||||
if (_logger.IsEnabled(LogLevel.Trace))
|
||||
{
|
||||
if (firstUpdateId != lastUpdateId)
|
||||
_logger.OrderBookProcessedMessage(Api, Symbol, firstUpdateId, lastUpdateId);
|
||||
if (updateSequenceNumberStart != updateSequenceNumberEnd)
|
||||
_logger.OrderBookProcessedMessage(Api, Symbol, updateSequenceNumberStart, updateSequenceNumberEnd);
|
||||
else
|
||||
_logger.OrderBookProcessedMessage(Api, Symbol, firstUpdateId);
|
||||
_logger.OrderBookProcessedMessage(Api, Symbol, updateSequenceNumberStart);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private SequenceNumberResult ValidateBufferSequenceNumber(long startSequenceNumber, long endSequenceNumber)
|
||||
{
|
||||
if (endSequenceNumber <= LastSequenceNumber)
|
||||
// Buffered update is from before the snapshot, ignore
|
||||
return SequenceNumberResult.Skip;
|
||||
|
||||
if (_sequencesAreConsecutive && startSequenceNumber != LastSequenceNumber + 1)
|
||||
{
|
||||
if (_firstUpdateAfterSnapshotDone || !_skipSequenceCheckFirstUpdateAfterSnapshotSet)
|
||||
// Buffered update is not the next sequence number when it was expected to be
|
||||
return SequenceNumberResult.OutOfSync;
|
||||
}
|
||||
|
||||
// Buffered sequence number is larger than the last sequence number
|
||||
return SequenceNumberResult.Ok;
|
||||
}
|
||||
|
||||
private SequenceNumberResult ValidateLiveSequenceNumber(long sequenceNumber)
|
||||
{
|
||||
if (sequenceNumber < LastSequenceNumber)
|
||||
// Update is somehow from before the current state
|
||||
return SequenceNumberResult.OutOfSync;
|
||||
|
||||
if (_sequencesAreConsecutive
|
||||
&& LastSequenceNumber != 0
|
||||
&& sequenceNumber != LastSequenceNumber + 1)
|
||||
{
|
||||
if (_firstUpdateAfterSnapshotDone || !_skipSequenceCheckFirstUpdateAfterSnapshotSet)
|
||||
return SequenceNumberResult.OutOfSync;
|
||||
}
|
||||
|
||||
return SequenceNumberResult.Ok;
|
||||
}
|
||||
}
|
||||
|
||||
internal class DescComparer<T> : IComparer<T>
|
||||
|
||||
@ -257,6 +257,7 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private bool _pausedActivity;
|
||||
#if NET9_0_OR_GREATER
|
||||
private readonly Lock _listenersLock = new Lock();
|
||||
@ -274,6 +275,8 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
private ISocketMessageHandler? _byteMessageConverter;
|
||||
private ISocketMessageHandler? _textMessageConverter;
|
||||
|
||||
private long _lastSequenceNumber;
|
||||
|
||||
/// <summary>
|
||||
/// The task that is sending periodic data on the websocket. Can be used for sending Ping messages every x seconds or similar. Not necessary.
|
||||
/// </summary>
|
||||
@ -293,7 +296,7 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
/// Cache for deserialization, only caches for a single message
|
||||
/// </summary>
|
||||
private readonly Dictionary<Type, object> _deserializationCache = new Dictionary<Type, object>();
|
||||
|
||||
|
||||
/// <summary>
|
||||
/// New socket connection
|
||||
/// </summary>
|
||||
@ -340,6 +343,7 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
{
|
||||
Status = SocketStatus.Closed;
|
||||
Authenticated = false;
|
||||
_lastSequenceNumber = 0;
|
||||
|
||||
if (ApiClient._socketConnections.ContainsKey(SocketId))
|
||||
ApiClient._socketConnections.TryRemove(SocketId, out _);
|
||||
@ -371,6 +375,7 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
Status = SocketStatus.Reconnecting;
|
||||
DisconnectTime = DateTime.UtcNow;
|
||||
Authenticated = false;
|
||||
_lastSequenceNumber = 0;
|
||||
|
||||
lock (_listenersLock)
|
||||
{
|
||||
@ -1280,6 +1285,23 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
return result;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Update the sequence number for this connection
|
||||
/// </summary>
|
||||
public void UpdateSequenceNumber(long sequenceNumber)
|
||||
{
|
||||
if (ApiClient.EnforceSequenceNumbers
|
||||
&& _lastSequenceNumber != 0
|
||||
&& _lastSequenceNumber + 1 != sequenceNumber)
|
||||
{
|
||||
// Not sequential
|
||||
_logger.LogWarning("[Sckt {SocketId}] update not in sequence. Last recorded sequence number: {LastSequence}, update sequence number: {UpdateSequence}. Reconnecting", SocketId, _lastSequenceNumber, sequenceNumber);
|
||||
_ = TriggerReconnectAsync();
|
||||
}
|
||||
|
||||
_lastSequenceNumber = sequenceNumber;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Periodically sends data over a socket connection
|
||||
/// </summary>
|
||||
|
||||
11
README.md
11
README.md
@ -66,6 +66,17 @@ Make a one time donation in a crypto currency of your choice. If you prefer to d
|
||||
Alternatively, sponsor me on Github using [Github Sponsors](https://github.com/sponsors/JKorf).
|
||||
|
||||
## Release notes
|
||||
* Version 10.2.0 - 12 Jan 2026
|
||||
* Added EnforceSequenceNumbers property on SocketApiClient to configure whether websocket message contain sequence numbers and if these should be checked to be sequential
|
||||
* Added fallback to existing websocket connection if no dedicated request connection was found
|
||||
* Added IntBoolConverter base class for arbitrary int value to bool mapping
|
||||
* Added SequenceNumber property to DataEvent object
|
||||
* Added _skipSequenceCheckFirstUpdateAfterSnapshotSet property for SymbolOrderBook implementations
|
||||
* Updated SymbolOrderBook sequenceNumber validation
|
||||
* Updated SymbolOrderBook log verbosities
|
||||
* Renamed SetInitialOrderBook to SetSnapshot in SymbolOrderBook
|
||||
* Renamed updateId references to sequenceNumber in SymbolOrderBook
|
||||
|
||||
* Version 10.1.0 - 07 Jan 2026
|
||||
* Updated time sync / time offset management for REST API's
|
||||
* Added time offset tracking for WebSocket API's
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user