diff --git a/CryptoExchange.Net/CryptoExchange.Net.xml b/CryptoExchange.Net/CryptoExchange.Net.xml index aa6e171..b225bc8 100644 --- a/CryptoExchange.Net/CryptoExchange.Net.xml +++ b/CryptoExchange.Net/CryptoExchange.Net.xml @@ -2122,6 +2122,11 @@ How to serialize array parameters + + + What request body should be when no data is send + + Timeout for requests @@ -2939,7 +2944,9 @@ - + + +System.Diagnostics.CodeAnalysis.AllowNullAttribute"> Specifies that is allowed as an input even if the corresponding type disallows it. diff --git a/CryptoExchange.Net/OrderBook/ProcessQueueItem.cs b/CryptoExchange.Net/OrderBook/ProcessQueueItem.cs new file mode 100644 index 0000000..996e6d4 --- /dev/null +++ b/CryptoExchange.Net/OrderBook/ProcessQueueItem.cs @@ -0,0 +1,13 @@ +using CryptoExchange.Net.Interfaces; +using System.Collections.Generic; + +namespace CryptoExchange.Net.OrderBook +{ + internal class ProcessQueueItem + { + public long StartUpdateId { get; set; } + public long EndUpdateId { get; set; } + public IEnumerable Bids { get; set; } = new List(); + public IEnumerable Asks { get; set; } = new List(); + } +} diff --git a/CryptoExchange.Net/OrderBook/SymbolOrderBook.cs b/CryptoExchange.Net/OrderBook/SymbolOrderBook.cs index ea35512..24adc47 100644 --- a/CryptoExchange.Net/OrderBook/SymbolOrderBook.cs +++ b/CryptoExchange.Net/OrderBook/SymbolOrderBook.cs @@ -1,8 +1,10 @@ using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Globalization; using System.IO; using System.Linq; +using System.Threading; using System.Threading.Tasks; using CryptoExchange.Net.Interfaces; using CryptoExchange.Net.Logging; @@ -19,7 +21,7 @@ namespace CryptoExchange.Net.OrderBook /// /// The process buffer, used while syncing /// - protected readonly List processBuffer; + protected readonly List processBuffer; private readonly object bookLock = new object(); /// /// The ask list @@ -34,6 +36,10 @@ namespace CryptoExchange.Net.OrderBook private UpdateSubscription? subscription; private readonly bool sequencesAreConsecutive; + private Task _processTask; + private AutoResetEvent _queueEvent; + private ConcurrentQueue _processQueue; + /// /// Order book implementation id /// @@ -183,7 +189,10 @@ namespace CryptoExchange.Net.OrderBook throw new ArgumentNullException(nameof(options)); Id = options.OrderBookName; - processBuffer = new List(); + processBuffer = new List(); + _processQueue = new ConcurrentQueue(); + _queueEvent = new AutoResetEvent(false); + sequencesAreConsecutive = options.SequenceNumbersAreConsecutive; Symbol = symbol; Status = OrderBookStatus.Disconnected; @@ -209,6 +218,8 @@ namespace CryptoExchange.Net.OrderBook public async Task> StartAsync() { Status = OrderBookStatus.Connecting; + _processTask = Task.Run(ProcessQueue); + var startResult = await DoStart().ConfigureAwait(false); if (!startResult) return new CallResult(false, startResult.Error); @@ -224,6 +235,7 @@ namespace CryptoExchange.Net.OrderBook { log.Write(LogVerbosity.Warning, $"{Id} order book {Symbol} connection lost"); Status = OrderBookStatus.Connecting; + _queueEvent.Set(); processBuffer.Clear(); bookSet = false; DoReset(); @@ -259,6 +271,8 @@ namespace CryptoExchange.Net.OrderBook public async Task StopAsync() { Status = OrderBookStatus.Disconnected; + _queueEvent.Set(); + _processTask.Wait(); if(subscription != null) await subscription.Close().ConfigureAwait(false); } @@ -280,6 +294,46 @@ namespace CryptoExchange.Net.OrderBook /// protected abstract Task> DoResync(); + private void ProcessQueue() + { + while(Status != OrderBookStatus.Disconnected) + { + _queueEvent.WaitOne(); + + while (_processQueue.TryDequeue(out var item)) + ProcessQueueItem(item); + } + } + + private void ProcessQueueItem(ProcessQueueItem item) + { + lock (bookLock) + { + if (Status == OrderBookStatus.Connecting || Status == OrderBookStatus.Disconnected) + return; + + if (!bookSet) + { + processBuffer.Add(new ProcessBufferRangeSequenceEntry() + { + Asks = item.Asks, + Bids = item.Bids, + FirstUpdateId = item.StartUpdateId, + LastUpdateId = item.EndUpdateId, + }); + log.Write(LogVerbosity.Debug, $"{Id} order book {Symbol} update buffered #{item.StartUpdateId}-#{item.EndUpdateId} [{Asks.Count()} asks, {Bids.Count()} bids]"); + } + else + { + CheckProcessBuffer(); + var (prevBestBid, prevBestAsk) = BestOffers; + ProcessRangeUpdates(item.StartUpdateId, item.EndUpdateId, item.Bids, item.Asks); + OnOrderBookUpdate?.Invoke(item.Bids, item.Asks); + CheckBestOffersChanged(prevBestBid, prevBestAsk); + } + } + } + /// /// Set the initial data for the order book /// @@ -330,30 +384,8 @@ namespace CryptoExchange.Net.OrderBook /// protected void UpdateOrderBook(long rangeUpdateId, IEnumerable bids, IEnumerable asks) { - lock (bookLock) - { - if (Status == OrderBookStatus.Connecting || Status == OrderBookStatus.Disconnected) - return; - - if (!bookSet) - { - processBuffer.Add(new ProcessBufferSingleSequenceEntry() - { - UpdateId = rangeUpdateId, - Asks = asks, - Bids = bids - }); - log.Write(LogVerbosity.Debug, $"{Id} order book {Symbol} update buffered #{rangeUpdateId}"); - } - else - { - CheckProcessBuffer(); - var (prevBestBid, prevBestAsk) = BestOffers; - ProcessSingleSequenceUpdates(rangeUpdateId, bids, asks); - OnOrderBookUpdate?.Invoke(bids, asks); - CheckBestOffersChanged(prevBestBid, prevBestAsk); - } - } + _processQueue.Enqueue(new ProcessQueueItem { StartUpdateId = rangeUpdateId, EndUpdateId = rangeUpdateId, Asks = asks, Bids = bids }); + _queueEvent.Set(); } /// @@ -365,31 +397,8 @@ namespace CryptoExchange.Net.OrderBook /// protected void UpdateOrderBook(long firstUpdateId, long lastUpdateId, IEnumerable bids, IEnumerable asks) { - lock (bookLock) - { - if (Status == OrderBookStatus.Connecting || Status == OrderBookStatus.Disconnected) - return; - - if (!bookSet) - { - processBuffer.Add(new ProcessBufferRangeSequenceEntry() - { - Asks = asks, - Bids = bids, - FirstUpdateId = firstUpdateId, - LastUpdateId = lastUpdateId - }); - log.Write(LogVerbosity.Debug, $"{Id} order book {Symbol} update buffered #{firstUpdateId}-{lastUpdateId}"); - } - else - { - CheckProcessBuffer(); - var (prevBestBid, prevBestAsk) = BestOffers; - ProcessRangeUpdates(firstUpdateId, lastUpdateId, bids, asks); - OnOrderBookUpdate?.Invoke(bids, asks); - CheckBestOffersChanged(prevBestBid, prevBestAsk); - } - } + _processQueue.Enqueue(new ProcessQueueItem { StartUpdateId = firstUpdateId, EndUpdateId = lastUpdateId, Asks = asks, Bids = bids }); + _queueEvent.Set(); } /// @@ -399,42 +408,11 @@ namespace CryptoExchange.Net.OrderBook /// List of asks protected void UpdateOrderBook(IEnumerable bids, IEnumerable asks) { - lock (bookLock) - { - if (!bookSet) - { - processBuffer.Add(new ProcessBufferEntry - { - Asks = asks, - Bids = bids - }); - log.Write(LogVerbosity.Debug, $"{Id} order book {Symbol} update buffered #{Math.Min(bids.Min(b => b.Sequence), asks.Min(a => a.Sequence))}-{Math.Max(bids.Max(b => b.Sequence), asks.Max(a => a.Sequence))}"); - } - else - { - CheckProcessBuffer(); - var (prevBestBid, prevBestAsk) = BestOffers; - ProcessUpdates(bids, asks); - OnOrderBookUpdate?.Invoke(bids, asks); - CheckBestOffersChanged(prevBestBid, prevBestAsk); - } - } - } + var highest = Math.Max(bids.Any() ? bids.Max(b => b.Sequence) : 0, asks.Any() ? asks.Max(a => a.Sequence) : 0); + var lowest = Math.Min(bids.Any() ? bids.Min(b => b.Sequence) : long.MaxValue, asks.Any() ? asks.Min(a => a.Sequence) : long.MaxValue); - private void ProcessUpdates(IEnumerable bids, IEnumerable asks) - { - var entries = new Dictionary(); - foreach (var entry in asks.OrderBy(a => a.Sequence)) - entries.Add(entry, OrderBookEntryType.Ask); - foreach (var entry in bids.OrderBy(a => a.Sequence)) - entries.Add(entry, OrderBookEntryType.Bid); - - foreach (var entry in entries.OrderBy(e => e.Key.Sequence)) - { - if(ProcessUpdate(entry.Key.Sequence, entry.Value, entry.Key)) - LastSequenceNumber = entry.Key.Sequence; - log.Write(LogVerbosity.Debug, $"{Id} order book {Symbol} update #{LastSequenceNumber}"); - } + _processQueue.Enqueue(new ProcessQueueItem { StartUpdateId = lowest, EndUpdateId = highest , Asks = asks, Bids = bids }); + _queueEvent.Set(); } private void ProcessRangeUpdates(long firstUpdateId, long lastUpdateId, IEnumerable bids, IEnumerable asks) @@ -455,24 +433,6 @@ namespace CryptoExchange.Net.OrderBook log.Write(LogVerbosity.Debug, $"{Id} order book {Symbol} update processed #{firstUpdateId}-{lastUpdateId}"); } - private void ProcessSingleSequenceUpdates(long updateId, IEnumerable bids, IEnumerable asks) - { - foreach (var entry in bids) - { - if (!ProcessUpdate(updateId, OrderBookEntryType.Bid, entry)) - return; - } - - foreach (var entry in asks) - { - if (!ProcessUpdate(updateId, OrderBookEntryType.Ask, entry)) - return; - } - - LastSequenceNumber = updateId; - log.Write(LogVerbosity.Debug, $"{Id} order book {Symbol} update processed #{LastSequenceNumber}"); - } - /// /// Check and empty the process buffer; see what entries to update the book with /// @@ -484,13 +444,7 @@ namespace CryptoExchange.Net.OrderBook foreach (var bufferEntry in pbList) { - if (bufferEntry is ProcessBufferEntry pbe) - ProcessUpdates(pbe.Bids, pbe.Asks); - else if(bufferEntry is ProcessBufferRangeSequenceEntry pbrse) - ProcessRangeUpdates(pbrse.FirstUpdateId, pbrse.LastUpdateId, pbrse.Bids, pbrse.Asks); - else if (bufferEntry is ProcessBufferSingleSequenceEntry pbsse) - ProcessSingleSequenceUpdates(pbsse.UpdateId, pbsse.Bids, pbsse.Asks); - + ProcessRangeUpdates(bufferEntry.FirstUpdateId, bufferEntry.LastUpdateId, bufferEntry.Bids, bufferEntry.Asks); processBuffer.Remove(bufferEntry); } }