mirror of
https://github.com/JKorf/CryptoExchange.Net
synced 2026-02-16 14:13:46 +00:00
wip
This commit is contained in:
parent
41b3decd9e
commit
d7ca1b69de
@ -710,6 +710,10 @@ namespace CryptoExchange.Net.Clients
|
||||
if (connection != null && !connection.DedicatedRequestConnection.Authenticated)
|
||||
// Mark dedicated request connection as authenticated if the request is authenticated
|
||||
connection.DedicatedRequestConnection.Authenticated = authenticated;
|
||||
|
||||
if (connection == null)
|
||||
// Fall back to an existing connection if there is no dedicated request connection available
|
||||
connection = socketQuery.OrderBy(s => s.UserSubscriptionCount).FirstOrDefault();
|
||||
}
|
||||
|
||||
bool maxConnectionsReached = _socketConnections.Count >= (ApiOptions.MaxSocketConnections ?? ClientOptions.MaxSocketConnections);
|
||||
|
||||
@ -22,7 +22,7 @@ namespace CryptoExchange.Net.Logging.Extensions
|
||||
private static readonly Action<ILogger, string, string, Exception?> _orderBookResyncing;
|
||||
private static readonly Action<ILogger, string, string, Exception?> _orderBookResynced;
|
||||
private static readonly Action<ILogger, string, string, Exception?> _orderBookMessageSkippedBecauseOfResubscribing;
|
||||
private static readonly Action<ILogger, string, string, long, long, long, Exception?> _orderBookDataSet;
|
||||
private static readonly Action<ILogger, string, string, long, long, long?, Exception?> _orderBookDataSet;
|
||||
private static readonly Action<ILogger, string, string, long, long, long, long, Exception?> _orderBookUpdateBuffered;
|
||||
private static readonly Action<ILogger, string, string, decimal, decimal, Exception?> _orderBookOutOfSyncDetected;
|
||||
private static readonly Action<ILogger, string, string, Exception?> _orderBookReconnectingSocket;
|
||||
@ -30,6 +30,7 @@ namespace CryptoExchange.Net.Logging.Extensions
|
||||
private static readonly Action<ILogger, string, string, long, long, Exception?> _orderBookProcessedMessage;
|
||||
private static readonly Action<ILogger, string, string, long, Exception?> _orderBookProcessedMessageSingle;
|
||||
private static readonly Action<ILogger, string, string, long, long, Exception?> _orderBookOutOfSync;
|
||||
private static readonly Action<ILogger, string, string, long, long, long, Exception?> _orderBookUpdateSkippedStartEnd;
|
||||
|
||||
static SymbolOrderBookLoggingExtensions()
|
||||
{
|
||||
@ -93,7 +94,7 @@ namespace CryptoExchange.Net.Logging.Extensions
|
||||
new EventId(5011, "OrderBookMessageSkippedResubscribing"),
|
||||
"{Api} order book {Symbol} Skipping message because of resubscribing");
|
||||
|
||||
_orderBookDataSet = LoggerMessage.Define<string, string, long, long, long>(
|
||||
_orderBookDataSet = LoggerMessage.Define<string, string, long, long, long?>(
|
||||
LogLevel.Debug,
|
||||
new EventId(5012, "OrderBookDataSet"),
|
||||
"{Api} order book {Symbol} snapshot set: {BidCount} bids, {AskCount} asks. #{EndUpdateId}");
|
||||
@ -142,6 +143,12 @@ namespace CryptoExchange.Net.Logging.Extensions
|
||||
LogLevel.Trace,
|
||||
new EventId(5021, "OrderBookProcessedMessage"),
|
||||
"{Api} order book {Symbol} update processed #{UpdateId}");
|
||||
|
||||
_orderBookUpdateSkippedStartEnd = LoggerMessage.Define<string, string, long, long, long>(
|
||||
LogLevel.Debug,
|
||||
new EventId(5022, "OrderBookUpdateSkippedStartEnd"),
|
||||
"{Api} order book {Symbol} update skipped #{SequenceStart}-#{SequenceEnd}, currently at #{LastSequenceNumber}");
|
||||
|
||||
}
|
||||
|
||||
public static void OrderBookStatusChanged(this ILogger logger, string api, string symbol, OrderBookStatus previousStatus, OrderBookStatus newStatus)
|
||||
@ -200,7 +207,7 @@ namespace CryptoExchange.Net.Logging.Extensions
|
||||
{
|
||||
_orderBookMessageSkippedBecauseOfResubscribing(logger, api, symbol, null);
|
||||
}
|
||||
public static void OrderBookDataSet(this ILogger logger, string api, string symbol, long bidCount, long askCount, long endUpdateId)
|
||||
public static void OrderBookDataSet(this ILogger logger, string api, string symbol, long bidCount, long askCount, long? endUpdateId)
|
||||
{
|
||||
_orderBookDataSet(logger, api, symbol, bidCount, askCount, endUpdateId, null);
|
||||
}
|
||||
@ -243,5 +250,10 @@ namespace CryptoExchange.Net.Logging.Extensions
|
||||
{
|
||||
_orderBookOutOfSyncChecksum(logger, api, symbol, null);
|
||||
}
|
||||
|
||||
public static void OrderBookUpdateSkipped(this ILogger logger, string api, string symbol, long sequenceStart, long sequenceEnd, long lastSequenceNumber)
|
||||
{
|
||||
_orderBookUpdateSkippedStartEnd(logger, api, symbol, sequenceStart, sequenceEnd, lastSequenceNumber, null);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -17,7 +17,7 @@ namespace CryptoExchange.Net.OrderBook
|
||||
{
|
||||
public DateTime? LocalDataTime { get; set; }
|
||||
public DateTime? ServerDataTime { get; set; }
|
||||
public long SequenceNumber { get; set; }
|
||||
public long? SequenceNumber { get; set; }
|
||||
public ISymbolOrderBookEntry[] Bids { get; set; } = Array.Empty<ISymbolOrderBookEntry>();
|
||||
public ISymbolOrderBookEntry[] Asks { get; set; } = Array.Empty<ISymbolOrderBookEntry>();
|
||||
}
|
||||
|
||||
@ -50,6 +50,13 @@ namespace CryptoExchange.Net.OrderBook
|
||||
|
||||
private static readonly ISymbolOrderBookEntry _emptySymbolOrderBookEntry = new EmptySymbolOrderBookEntry();
|
||||
|
||||
private enum SequenceNumberResult
|
||||
{
|
||||
Skip,
|
||||
Ok,
|
||||
OutOfSync
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// A buffer to store messages received before the initial book snapshot is processed. These messages
|
||||
/// will be processed after the book snapshot is set. Any messages in this buffer with sequence numbers lower
|
||||
@ -423,16 +430,16 @@ namespace CryptoExchange.Net.OrderBook
|
||||
protected virtual bool DoChecksum(int checksum) => true;
|
||||
|
||||
/// <summary>
|
||||
/// Set the initial data for the order book. Typically the snapshot which was requested from the Rest API, or the first snapshot
|
||||
/// received from a socket subscription
|
||||
/// Set snapshot data for the order book. Typically the snapshot which was requested from the Rest API, or the first snapshot
|
||||
/// received from a socket subscription. Will clear any previous data.
|
||||
/// </summary>
|
||||
/// <param name="orderBookSequenceNumber">The last update sequence number until which the snapshot is in sync</param>
|
||||
/// <param name="askList">List of asks</param>
|
||||
/// <param name="bidList">List of bids</param>
|
||||
/// <param name="serverDataTime">Server data timestamp</param>
|
||||
/// <param name="localDataTime">local data timestamp</param>
|
||||
protected void SetInitialOrderBook(
|
||||
long orderBookSequenceNumber,
|
||||
protected void SetSnapshot(
|
||||
long? orderBookSequenceNumber,
|
||||
ISymbolOrderBookEntry[] bidList,
|
||||
ISymbolOrderBookEntry[] askList,
|
||||
DateTime? serverDataTime = null,
|
||||
@ -557,7 +564,12 @@ namespace CryptoExchange.Net.OrderBook
|
||||
_logger.OrderBookProcessingBufferedUpdates(Api, Symbol, _processBuffer.Count);
|
||||
|
||||
foreach (var bufferEntry in _processBuffer)
|
||||
{
|
||||
if (_stopProcessing)
|
||||
break;
|
||||
|
||||
ProcessUpdate(bufferEntry.FirstUpdateId, bufferEntry.LastUpdateId, bufferEntry.Bids, bufferEntry.Asks, true);
|
||||
}
|
||||
|
||||
_processBuffer.Clear();
|
||||
}
|
||||
@ -707,12 +719,6 @@ namespace CryptoExchange.Net.OrderBook
|
||||
});
|
||||
}
|
||||
|
||||
protected void TriggerResubscribe()
|
||||
{
|
||||
_stopProcessing = true;
|
||||
Resubscribe();
|
||||
}
|
||||
|
||||
private void CheckBestOffersChanged(ISymbolOrderBookEntry prevBestBid, ISymbolOrderBookEntry prevBestAsk)
|
||||
{
|
||||
var (bestBid, bestAsk) = BestOffers;
|
||||
@ -793,7 +799,8 @@ namespace CryptoExchange.Net.OrderBook
|
||||
foreach (var bid in item.Bids)
|
||||
_bids.Add(bid.Price, bid);
|
||||
|
||||
LastSequenceNumber = item.SequenceNumber;
|
||||
if (item.SequenceNumber != null)
|
||||
LastSequenceNumber = item.SequenceNumber.Value;
|
||||
|
||||
AskCount = _asks.Count;
|
||||
BidCount = _bids.Count;
|
||||
@ -921,33 +928,23 @@ namespace CryptoExchange.Net.OrderBook
|
||||
IEnumerable<ISymbolOrderBookEntry> asks,
|
||||
bool fromBuffer)
|
||||
{
|
||||
if (updateSequenceNumberEnd <= LastSequenceNumber)
|
||||
var sequenceResult = fromBuffer ? ValidateBufferSequenceNumber(updateSequenceNumberStart, updateSequenceNumberEnd) : ValidateLiveSequenceNumber(updateSequenceNumberStart);
|
||||
if (sequenceResult == SequenceNumberResult.Skip)
|
||||
{
|
||||
if (fromBuffer)
|
||||
{
|
||||
// We're already past this update, discard buffered update
|
||||
_logger.OrderBookUpdateSkipped(Api, Symbol, updateSequenceNumberEnd, LastSequenceNumber);
|
||||
return;
|
||||
}
|
||||
if (updateSequenceNumberStart != updateSequenceNumberEnd)
|
||||
_logger.OrderBookUpdateSkipped(Api, Symbol, updateSequenceNumberStart, updateSequenceNumberEnd, LastSequenceNumber);
|
||||
else
|
||||
{
|
||||
// Somehow this sequence number is before the last sequence number
|
||||
_stopProcessing = true;
|
||||
Resubscribe();
|
||||
return;
|
||||
}
|
||||
_logger.OrderBookUpdateSkipped(Api, Symbol, updateSequenceNumberStart, LastSequenceNumber);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
if (_sequencesAreConsecutive && updateSequenceNumberStart != LastSequenceNumber + 1)
|
||||
if (sequenceResult == SequenceNumberResult.OutOfSync)
|
||||
{
|
||||
if (_firstUpdateAfterSnapshotDone || !_skipSequenceCheckFirstUpdateAfterSnapshotSet)
|
||||
{
|
||||
// Expected the start sequenceNumber to be LastSequenceNumber + 1, but wasn't
|
||||
_logger.OrderBookOutOfSync(Api, Symbol, LastSequenceNumber + 1, updateSequenceNumberEnd);
|
||||
_stopProcessing = true;
|
||||
Resubscribe();
|
||||
return;
|
||||
}
|
||||
_logger.OrderBookOutOfSync(Api, Symbol, LastSequenceNumber + 1, updateSequenceNumberStart);
|
||||
_stopProcessing = true;
|
||||
Resubscribe();
|
||||
return;
|
||||
}
|
||||
|
||||
foreach (var entry in bids)
|
||||
@ -981,7 +978,40 @@ namespace CryptoExchange.Net.OrderBook
|
||||
else
|
||||
_logger.OrderBookProcessedMessage(Api, Symbol, updateSequenceNumberStart);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private SequenceNumberResult ValidateBufferSequenceNumber(long startSequenceNumber, long endSequenceNumber)
|
||||
{
|
||||
if (endSequenceNumber <= LastSequenceNumber)
|
||||
// Buffered update is from before the snapshot, ignore
|
||||
return SequenceNumberResult.Skip;
|
||||
|
||||
if (_sequencesAreConsecutive && startSequenceNumber != LastSequenceNumber + 1)
|
||||
{
|
||||
if (_firstUpdateAfterSnapshotDone || !_skipSequenceCheckFirstUpdateAfterSnapshotSet)
|
||||
// Buffered update is not the next sequence number when it was expected
|
||||
return SequenceNumberResult.OutOfSync;
|
||||
}
|
||||
|
||||
// Buffered sequence number is larger than the last sequence number
|
||||
return SequenceNumberResult.Ok;
|
||||
}
|
||||
|
||||
private SequenceNumberResult ValidateLiveSequenceNumber(long sequenceNumber)
|
||||
{
|
||||
if (sequenceNumber < LastSequenceNumber)
|
||||
return SequenceNumberResult.OutOfSync;
|
||||
|
||||
if (_sequencesAreConsecutive
|
||||
&& LastSequenceNumber != 0
|
||||
&& sequenceNumber != LastSequenceNumber + 1)
|
||||
{
|
||||
if (_firstUpdateAfterSnapshotDone || !_skipSequenceCheckFirstUpdateAfterSnapshotSet)
|
||||
return SequenceNumberResult.OutOfSync;
|
||||
}
|
||||
|
||||
return SequenceNumberResult.Ok;
|
||||
}
|
||||
}
|
||||
|
||||
internal class DescComparer<T> : IComparer<T>
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user