1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-06-10 09:26:22 +00:00

wip orderbook rework

This commit is contained in:
Jan Korf 2020-06-04 22:01:42 +02:00
parent b95215866b
commit c60131d464
3 changed files with 86 additions and 112 deletions

View File

@ -2122,6 +2122,11 @@
How to serialize array parameters
</summary>
</member>
<member name="F:CryptoExchange.Net.RestClient.requestBodyEmptyContent">
<summary>
What request body should be when no data is send
</summary>
</member>
<member name="P:CryptoExchange.Net.RestClient.RequestTimeout">
<summary>
Timeout for requests
@ -2939,7 +2944,9 @@
<member name="M:CryptoExchange.Net.Sockets.WebsocketFactory.CreateWebsocket(CryptoExchange.Net.Logging.Log,System.String,System.Collections.Generic.IDictionary{System.String,System.String},System.Collections.Generic.IDictionary{System.String,System.String})">
<inheritdoc />
</member>
<member name="T:System.Diagnostics.CodeAnalysis.AllowNullAttribute">
</members>
</doc>
System.Diagnostics.CodeAnalysis.AllowNullAttribute">
<summary>
Specifies that <see langword="null"/> is allowed as an input even if the
corresponding type disallows it.

View File

@ -0,0 +1,13 @@
using CryptoExchange.Net.Interfaces;
using System.Collections.Generic;
namespace CryptoExchange.Net.OrderBook
{
internal class ProcessQueueItem
{
public long StartUpdateId { get; set; }
public long EndUpdateId { get; set; }
public IEnumerable<ISymbolOrderBookEntry> Bids { get; set; } = new List<ISymbolOrderBookEntry>();
public IEnumerable<ISymbolOrderBookEntry> Asks { get; set; } = new List<ISymbolOrderBookEntry>();
}
}

View File

@ -1,8 +1,10 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Logging;
@ -19,7 +21,7 @@ namespace CryptoExchange.Net.OrderBook
/// <summary>
/// The process buffer, used while syncing
/// </summary>
protected readonly List<object> processBuffer;
protected readonly List<ProcessBufferRangeSequenceEntry> processBuffer;
private readonly object bookLock = new object();
/// <summary>
/// The ask list
@ -34,6 +36,10 @@ namespace CryptoExchange.Net.OrderBook
private UpdateSubscription? subscription;
private readonly bool sequencesAreConsecutive;
private Task _processTask;
private AutoResetEvent _queueEvent;
private ConcurrentQueue<ProcessQueueItem> _processQueue;
/// <summary>
/// Order book implementation id
/// </summary>
@ -183,7 +189,10 @@ namespace CryptoExchange.Net.OrderBook
throw new ArgumentNullException(nameof(options));
Id = options.OrderBookName;
processBuffer = new List<object>();
processBuffer = new List<ProcessBufferRangeSequenceEntry>();
_processQueue = new ConcurrentQueue<ProcessQueueItem>();
_queueEvent = new AutoResetEvent(false);
sequencesAreConsecutive = options.SequenceNumbersAreConsecutive;
Symbol = symbol;
Status = OrderBookStatus.Disconnected;
@ -209,6 +218,8 @@ namespace CryptoExchange.Net.OrderBook
public async Task<CallResult<bool>> StartAsync()
{
Status = OrderBookStatus.Connecting;
_processTask = Task.Run(ProcessQueue);
var startResult = await DoStart().ConfigureAwait(false);
if (!startResult)
return new CallResult<bool>(false, startResult.Error);
@ -224,6 +235,7 @@ namespace CryptoExchange.Net.OrderBook
{
log.Write(LogVerbosity.Warning, $"{Id} order book {Symbol} connection lost");
Status = OrderBookStatus.Connecting;
_queueEvent.Set();
processBuffer.Clear();
bookSet = false;
DoReset();
@ -259,6 +271,8 @@ namespace CryptoExchange.Net.OrderBook
public async Task StopAsync()
{
Status = OrderBookStatus.Disconnected;
_queueEvent.Set();
_processTask.Wait();
if(subscription != null)
await subscription.Close().ConfigureAwait(false);
}
@ -280,6 +294,46 @@ namespace CryptoExchange.Net.OrderBook
/// <returns></returns>
protected abstract Task<CallResult<bool>> DoResync();
private void ProcessQueue()
{
while(Status != OrderBookStatus.Disconnected)
{
_queueEvent.WaitOne();
while (_processQueue.TryDequeue(out var item))
ProcessQueueItem(item);
}
}
private void ProcessQueueItem(ProcessQueueItem item)
{
lock (bookLock)
{
if (Status == OrderBookStatus.Connecting || Status == OrderBookStatus.Disconnected)
return;
if (!bookSet)
{
processBuffer.Add(new ProcessBufferRangeSequenceEntry()
{
Asks = item.Asks,
Bids = item.Bids,
FirstUpdateId = item.StartUpdateId,
LastUpdateId = item.EndUpdateId,
});
log.Write(LogVerbosity.Debug, $"{Id} order book {Symbol} update buffered #{item.StartUpdateId}-#{item.EndUpdateId} [{Asks.Count()} asks, {Bids.Count()} bids]");
}
else
{
CheckProcessBuffer();
var (prevBestBid, prevBestAsk) = BestOffers;
ProcessRangeUpdates(item.StartUpdateId, item.EndUpdateId, item.Bids, item.Asks);
OnOrderBookUpdate?.Invoke(item.Bids, item.Asks);
CheckBestOffersChanged(prevBestBid, prevBestAsk);
}
}
}
/// <summary>
/// Set the initial data for the order book
/// </summary>
@ -330,30 +384,8 @@ namespace CryptoExchange.Net.OrderBook
/// <param name="asks"></param>
protected void UpdateOrderBook(long rangeUpdateId, IEnumerable<ISymbolOrderBookEntry> bids, IEnumerable<ISymbolOrderBookEntry> asks)
{
lock (bookLock)
{
if (Status == OrderBookStatus.Connecting || Status == OrderBookStatus.Disconnected)
return;
if (!bookSet)
{
processBuffer.Add(new ProcessBufferSingleSequenceEntry()
{
UpdateId = rangeUpdateId,
Asks = asks,
Bids = bids
});
log.Write(LogVerbosity.Debug, $"{Id} order book {Symbol} update buffered #{rangeUpdateId}");
}
else
{
CheckProcessBuffer();
var (prevBestBid, prevBestAsk) = BestOffers;
ProcessSingleSequenceUpdates(rangeUpdateId, bids, asks);
OnOrderBookUpdate?.Invoke(bids, asks);
CheckBestOffersChanged(prevBestBid, prevBestAsk);
}
}
_processQueue.Enqueue(new ProcessQueueItem { StartUpdateId = rangeUpdateId, EndUpdateId = rangeUpdateId, Asks = asks, Bids = bids });
_queueEvent.Set();
}
/// <summary>
@ -365,31 +397,8 @@ namespace CryptoExchange.Net.OrderBook
/// <param name="asks"></param>
protected void UpdateOrderBook(long firstUpdateId, long lastUpdateId, IEnumerable<ISymbolOrderBookEntry> bids, IEnumerable<ISymbolOrderBookEntry> asks)
{
lock (bookLock)
{
if (Status == OrderBookStatus.Connecting || Status == OrderBookStatus.Disconnected)
return;
if (!bookSet)
{
processBuffer.Add(new ProcessBufferRangeSequenceEntry()
{
Asks = asks,
Bids = bids,
FirstUpdateId = firstUpdateId,
LastUpdateId = lastUpdateId
});
log.Write(LogVerbosity.Debug, $"{Id} order book {Symbol} update buffered #{firstUpdateId}-{lastUpdateId}");
}
else
{
CheckProcessBuffer();
var (prevBestBid, prevBestAsk) = BestOffers;
ProcessRangeUpdates(firstUpdateId, lastUpdateId, bids, asks);
OnOrderBookUpdate?.Invoke(bids, asks);
CheckBestOffersChanged(prevBestBid, prevBestAsk);
}
}
_processQueue.Enqueue(new ProcessQueueItem { StartUpdateId = firstUpdateId, EndUpdateId = lastUpdateId, Asks = asks, Bids = bids });
_queueEvent.Set();
}
/// <summary>
@ -399,42 +408,11 @@ namespace CryptoExchange.Net.OrderBook
/// <param name="asks">List of asks</param>
protected void UpdateOrderBook(IEnumerable<ISymbolOrderSequencedBookEntry> bids, IEnumerable<ISymbolOrderSequencedBookEntry> asks)
{
lock (bookLock)
{
if (!bookSet)
{
processBuffer.Add(new ProcessBufferEntry
{
Asks = asks,
Bids = bids
});
log.Write(LogVerbosity.Debug, $"{Id} order book {Symbol} update buffered #{Math.Min(bids.Min(b => b.Sequence), asks.Min(a => a.Sequence))}-{Math.Max(bids.Max(b => b.Sequence), asks.Max(a => a.Sequence))}");
}
else
{
CheckProcessBuffer();
var (prevBestBid, prevBestAsk) = BestOffers;
ProcessUpdates(bids, asks);
OnOrderBookUpdate?.Invoke(bids, asks);
CheckBestOffersChanged(prevBestBid, prevBestAsk);
}
}
}
var highest = Math.Max(bids.Any() ? bids.Max(b => b.Sequence) : 0, asks.Any() ? asks.Max(a => a.Sequence) : 0);
var lowest = Math.Min(bids.Any() ? bids.Min(b => b.Sequence) : long.MaxValue, asks.Any() ? asks.Min(a => a.Sequence) : long.MaxValue);
private void ProcessUpdates(IEnumerable<ISymbolOrderSequencedBookEntry> bids, IEnumerable<ISymbolOrderSequencedBookEntry> asks)
{
var entries = new Dictionary<ISymbolOrderSequencedBookEntry, OrderBookEntryType>();
foreach (var entry in asks.OrderBy(a => a.Sequence))
entries.Add(entry, OrderBookEntryType.Ask);
foreach (var entry in bids.OrderBy(a => a.Sequence))
entries.Add(entry, OrderBookEntryType.Bid);
foreach (var entry in entries.OrderBy(e => e.Key.Sequence))
{
if(ProcessUpdate(entry.Key.Sequence, entry.Value, entry.Key))
LastSequenceNumber = entry.Key.Sequence;
log.Write(LogVerbosity.Debug, $"{Id} order book {Symbol} update #{LastSequenceNumber}");
}
_processQueue.Enqueue(new ProcessQueueItem { StartUpdateId = lowest, EndUpdateId = highest , Asks = asks, Bids = bids });
_queueEvent.Set();
}
private void ProcessRangeUpdates(long firstUpdateId, long lastUpdateId, IEnumerable<ISymbolOrderBookEntry> bids, IEnumerable<ISymbolOrderBookEntry> asks)
@ -455,24 +433,6 @@ namespace CryptoExchange.Net.OrderBook
log.Write(LogVerbosity.Debug, $"{Id} order book {Symbol} update processed #{firstUpdateId}-{lastUpdateId}");
}
private void ProcessSingleSequenceUpdates(long updateId, IEnumerable<ISymbolOrderBookEntry> bids, IEnumerable<ISymbolOrderBookEntry> asks)
{
foreach (var entry in bids)
{
if (!ProcessUpdate(updateId, OrderBookEntryType.Bid, entry))
return;
}
foreach (var entry in asks)
{
if (!ProcessUpdate(updateId, OrderBookEntryType.Ask, entry))
return;
}
LastSequenceNumber = updateId;
log.Write(LogVerbosity.Debug, $"{Id} order book {Symbol} update processed #{LastSequenceNumber}");
}
/// <summary>
/// Check and empty the process buffer; see what entries to update the book with
/// </summary>
@ -484,13 +444,7 @@ namespace CryptoExchange.Net.OrderBook
foreach (var bufferEntry in pbList)
{
if (bufferEntry is ProcessBufferEntry pbe)
ProcessUpdates(pbe.Bids, pbe.Asks);
else if(bufferEntry is ProcessBufferRangeSequenceEntry pbrse)
ProcessRangeUpdates(pbrse.FirstUpdateId, pbrse.LastUpdateId, pbrse.Bids, pbrse.Asks);
else if (bufferEntry is ProcessBufferSingleSequenceEntry pbsse)
ProcessSingleSequenceUpdates(pbsse.UpdateId, pbsse.Bids, pbsse.Asks);
ProcessRangeUpdates(bufferEntry.FirstUpdateId, bufferEntry.LastUpdateId, bufferEntry.Bids, bufferEntry.Asks);
processBuffer.Remove(bufferEntry);
}
}