diff --git a/CryptoExchange.Net/ExtensionMethods.cs b/CryptoExchange.Net/ExtensionMethods.cs index f1ae8a8..f47a6e7 100644 --- a/CryptoExchange.Net/ExtensionMethods.cs +++ b/CryptoExchange.Net/ExtensionMethods.cs @@ -114,7 +114,7 @@ namespace CryptoExchange.Net tokenRegistration = cancellationToken.Register( state => ((TaskCompletionSource)state).TrySetCanceled(), tcs); - return await tcs.Task; + return await tcs.Task.ConfigureAwait(false); } finally { diff --git a/CryptoExchange.Net/Objects/Enums.cs b/CryptoExchange.Net/Objects/Enums.cs index 825ca1d..53cff06 100644 --- a/CryptoExchange.Net/Objects/Enums.cs +++ b/CryptoExchange.Net/Objects/Enums.cs @@ -18,10 +18,17 @@ Json } - public enum SocketType + public enum OrderBookStatus { - Normal, - Background, - BackgroundAuthenticated + Disconnected, + Connecting, + Syncing, + Synced, + } + + public enum OrderBookEntryType + { + Ask, + Bid } } diff --git a/CryptoExchange.Net/OrderBook/ISymbolOrderBookEntry.cs b/CryptoExchange.Net/OrderBook/ISymbolOrderBookEntry.cs new file mode 100644 index 0000000..9354ad1 --- /dev/null +++ b/CryptoExchange.Net/OrderBook/ISymbolOrderBookEntry.cs @@ -0,0 +1,14 @@ +namespace CryptoExchange.Net.OrderBook +{ + public interface ISymbolOrderBookEntry + { + /// + /// The quantity of the entry + /// + decimal Quantity { get; set; } + /// + /// The price of the entry + /// + decimal Price { get; set; } + } +} diff --git a/CryptoExchange.Net/OrderBook/OrderBookEntry.cs b/CryptoExchange.Net/OrderBook/OrderBookEntry.cs new file mode 100644 index 0000000..606dccd --- /dev/null +++ b/CryptoExchange.Net/OrderBook/OrderBookEntry.cs @@ -0,0 +1,14 @@ +namespace CryptoExchange.Net.OrderBook +{ + public class OrderBookEntry : ISymbolOrderBookEntry + { + public decimal Quantity { get; set; } + public decimal Price { get; set; } + + public OrderBookEntry(decimal price, decimal quantity) + { + Quantity = quantity; + Price = price; + } + } +} diff --git a/CryptoExchange.Net/OrderBook/ProcessBufferEntry.cs b/CryptoExchange.Net/OrderBook/ProcessBufferEntry.cs new file mode 100644 index 0000000..e6bf3aa --- /dev/null +++ b/CryptoExchange.Net/OrderBook/ProcessBufferEntry.cs @@ -0,0 +1,16 @@ +using System.Collections.Generic; + +namespace CryptoExchange.Net.OrderBook +{ + public class ProcessBufferEntry + { + public long FirstSequence { get; set; } + public long LastSequence { get; set; } + public List Entries { get; set; } + + public ProcessBufferEntry() + { + Entries = new List(); + } + } +} diff --git a/CryptoExchange.Net/OrderBook/ProcessEntry.cs b/CryptoExchange.Net/OrderBook/ProcessEntry.cs new file mode 100644 index 0000000..4ddb93f --- /dev/null +++ b/CryptoExchange.Net/OrderBook/ProcessEntry.cs @@ -0,0 +1,16 @@ +using CryptoExchange.Net.Objects; + +namespace CryptoExchange.Net.OrderBook +{ + public class ProcessEntry + { + public ISymbolOrderBookEntry Entry { get; set; } + public OrderBookEntryType Type { get; set; } + + public ProcessEntry(OrderBookEntryType type, ISymbolOrderBookEntry entry) + { + Type = type; + Entry = entry; + } + } +} diff --git a/CryptoExchange.Net/OrderBook/SymbolOrderBook.cs b/CryptoExchange.Net/OrderBook/SymbolOrderBook.cs new file mode 100644 index 0000000..5bd3e8d --- /dev/null +++ b/CryptoExchange.Net/OrderBook/SymbolOrderBook.cs @@ -0,0 +1,272 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Threading.Tasks; +using CryptoExchange.Net.Logging; +using CryptoExchange.Net.Objects; +using CryptoExchange.Net.Sockets; + +namespace CryptoExchange.Net.OrderBook +{ + public abstract class SymbolOrderBook: IDisposable + { + protected readonly List processBuffer; + private readonly object bookLock = new object(); + protected List asks; + protected List bids; + private OrderBookStatus status; + private UpdateSubscription subscription; + private readonly bool sequencesAreConsecutive; + private readonly string id; + protected Log log; + + private bool bookSet; + + /// + /// The status of the order book. Order book is up to date when the status is `Synced` + /// + public OrderBookStatus Status + { + get => status; + set + { + if (value == status) + return; + + var old = status; + status = value; + log.Write(LogVerbosity.Info, $"{id} order book {Symbol} status changed: {old} => {value}"); + OnStatusChange?.Invoke(old, status); + } + } + + /// + /// Last update identifier + /// + public long LastSequenceNumber { get; private set; } + /// + /// The symbol of the order book + /// + public string Symbol { get; } + + /// + /// Event when the state changes + /// + public event Action OnStatusChange; + + /// + /// The number of asks in the book + /// + public int AskCount { get; private set; } + /// + /// The number of bids in the book + /// + public int BidCount { get; private set; } + + /// + /// The list of asks + /// + public IEnumerable Asks + { + get + { + lock (bookLock) + return asks.OrderBy(a => a.Price).ToList(); + } + } + + /// + /// The list of bids + /// + public IEnumerable Bids + { + get + { + lock (bookLock) + return bids.OrderByDescending(a => a.Price).ToList(); + } + } + + protected SymbolOrderBook(string id, string symbol, bool sequencesAreConsecutive, LogVerbosity logVerbosity, IEnumerable logWriters) + { + this.id = id; + processBuffer = new List(); + this.sequencesAreConsecutive = sequencesAreConsecutive; + Symbol = symbol; + Status = OrderBookStatus.Disconnected; + + asks = new List(); + bids = new List(); + + log = new Log { Level = logVerbosity }; + if (logWriters == null) + logWriters = new List { new DebugTextWriter() }; + log.UpdateWriters(logWriters.ToList()); + } + + /// + /// Start connecting and synchronizing the order book + /// + /// + public async Task> Start() + { + Status = OrderBookStatus.Connecting; + var startResult = await DoStart().ConfigureAwait(false); + if(!startResult.Success) + return new CallResult(false, startResult.Error); + + subscription = startResult.Data; + subscription.ConnectionLost += Reset; + subscription.ConnectionRestored += (time) => Resync(); + Status = OrderBookStatus.Synced; + return new CallResult(true, null); + } + + private void Reset() + { + log.Write(LogVerbosity.Warning, $"{id} order book {Symbol} connection lost"); + Status = OrderBookStatus.Connecting; + processBuffer.Clear(); + bookSet = false; + DoReset(); + } + + private void Resync() + { + Status = OrderBookStatus.Syncing; + bool success = false; + while (!success) + { + if (Status != OrderBookStatus.Syncing) + return; + + var resyncResult = DoResync().Result; + success = resyncResult.Success; + } + + log.Write(LogVerbosity.Info, $"{id} order book {Symbol} successfully resynchronized"); + Status = OrderBookStatus.Synced; + } + + /// + /// Stop syncing the order book + /// + /// + public Task Stop() + { + Status = OrderBookStatus.Disconnected; + return subscription.Close(); + } + + protected abstract Task> DoStart(); + + protected virtual void DoReset() { } + + protected abstract Task> DoResync(); + + protected void SetInitialOrderBook(long orderBookSequenceNumber, IEnumerable askList, IEnumerable bidList) + { + lock (bookLock) + { + if (Status == OrderBookStatus.Connecting) + return; + + asks = askList.Select(a => new OrderBookEntry(a.Price, a.Quantity)).ToList(); + bids = bidList.Select(b => new OrderBookEntry(b.Price, b.Quantity)).ToList(); + LastSequenceNumber = orderBookSequenceNumber; + + AskCount = asks.Count; + BidCount = asks.Count; + + CheckProcessBuffer(); + bookSet = true; + log.Write(LogVerbosity.Debug, $"{id} order book {Symbol} initial order book set"); + } + } + + protected void UpdateOrderBook(long firstSequenceNumber, long lastSequenceNumber, List entries) + { + lock (bookLock) + { + if (lastSequenceNumber < LastSequenceNumber) + return; + + if (!bookSet) + { + var entry = new ProcessBufferEntry() + { + FirstSequence = firstSequenceNumber, + LastSequence = lastSequenceNumber, + Entries = entries + }; + processBuffer.Add(entry); + log.Write(LogVerbosity.Debug, $"{id} order book {Symbol} update before synced; buffering"); + } + else if (sequencesAreConsecutive && firstSequenceNumber > LastSequenceNumber + 1) + { + // Out of sync + log.Write(LogVerbosity.Warning, $"{id} order book {Symbol} out of sync, reconnecting"); + subscription.Reconnect().Wait(); + } + else + { + foreach(var entry in entries) + ProcessUpdate(entry.Type, entry.Entry); + LastSequenceNumber = lastSequenceNumber; + CheckProcessBuffer(); + log.Write(LogVerbosity.Debug, $"{id} order book {Symbol} update: {entries.Count} entries processed"); + } + } + } + + protected void CheckProcessBuffer() + { + foreach (var bufferEntry in processBuffer.OrderBy(b => b.FirstSequence).ToList()) + { + if(bufferEntry.LastSequence < LastSequenceNumber) + { + processBuffer.Remove(bufferEntry); + continue; + } + + if (bufferEntry.FirstSequence > LastSequenceNumber + 1) + break; + + foreach(var entry in bufferEntry.Entries) + ProcessUpdate(entry.Type, entry.Entry); + processBuffer.Remove(bufferEntry); + LastSequenceNumber = bufferEntry.LastSequence; + } + } + + protected virtual void ProcessUpdate(OrderBookEntryType type, ISymbolOrderBookEntry entry) + { + var listToChange = type == OrderBookEntryType.Ask ? asks : bids; + if (entry.Quantity == 0) + { + var bookEntry = listToChange.SingleOrDefault(i => i.Price == entry.Price); + if (bookEntry != null) + { + listToChange.Remove(bookEntry); + if (type == OrderBookEntryType.Ask) AskCount--; + else BidCount--; + } + } + else + { + var bookEntry = listToChange.SingleOrDefault(i => i.Price == entry.Price); + if (bookEntry == null) + { + listToChange.Add(new OrderBookEntry(entry.Price, entry.Quantity)); + if (type == OrderBookEntryType.Ask) AskCount++; + else BidCount++; + } + else + bookEntry.Quantity = entry.Quantity; + } + } + + public abstract void Dispose(); + } +} diff --git a/CryptoExchange.Net/Sockets/SocketConnection.cs b/CryptoExchange.Net/Sockets/SocketConnection.cs index 233d219..450b576 100644 --- a/CryptoExchange.Net/Sockets/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/SocketConnection.cs @@ -69,11 +69,6 @@ namespace CryptoExchange.Net.Sockets { PausedActivity = false; Connected = true; - if (lostTriggered) - { - lostTriggered = false; - ConnectionRestored?.Invoke(DisconnectTime.HasValue ? DateTime.UtcNow - DisconnectTime.Value: TimeSpan.FromSeconds(0)); - } }; } @@ -230,7 +225,15 @@ namespace CryptoExchange.Net.Sockets if (!reconnectResult) await Socket.Close().ConfigureAwait(false); else + { + if (lostTriggered) + { + lostTriggered = false; + Task.Run(() => ConnectionRestored?.Invoke(DisconnectTime.HasValue ? DateTime.UtcNow - DisconnectTime.Value : TimeSpan.FromSeconds(0))); + } + break; + } } Socket.Reconnecting = false; @@ -274,7 +277,7 @@ namespace CryptoExchange.Net.Sockets log.Write(LogVerbosity.Debug, "All subscription successfully resubscribed on reconnected socket."); return true; } - + public async Task Close() { Connected = false; @@ -282,7 +285,6 @@ namespace CryptoExchange.Net.Sockets if (socketClient.sockets.ContainsKey(Socket.Id)) socketClient.sockets.TryRemove(Socket.Id, out _); - await Socket.Close().ConfigureAwait(false); Socket.Dispose(); } diff --git a/CryptoExchange.Net/Sockets/UpdateSubscription.cs b/CryptoExchange.Net/Sockets/UpdateSubscription.cs index dc43c4f..176d406 100644 --- a/CryptoExchange.Net/Sockets/UpdateSubscription.cs +++ b/CryptoExchange.Net/Sockets/UpdateSubscription.cs @@ -45,7 +45,7 @@ namespace CryptoExchange.Net.Sockets this.connection = connection; this.subscription = subscription; } - + /// /// Close the subscription /// @@ -54,5 +54,14 @@ namespace CryptoExchange.Net.Sockets { await connection.Close(subscription).ConfigureAwait(false); } + + /// + /// Close the socket to cause a reconnect + /// + /// + internal Task Reconnect() + { + return connection.Socket.Close(); + } } }