diff --git a/CryptoExchange.Net/Converters/TimestampSecondsConverter.cs b/CryptoExchange.Net/Converters/TimestampSecondsConverter.cs index 63a83eb..aabd462 100644 --- a/CryptoExchange.Net/Converters/TimestampSecondsConverter.cs +++ b/CryptoExchange.Net/Converters/TimestampSecondsConverter.cs @@ -25,7 +25,8 @@ namespace CryptoExchange.Net.Converters return new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc).AddSeconds(d); var t = double.Parse(reader.Value.ToString(), CultureInfo.InvariantCulture); - return new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc).AddSeconds(t); + // Set ticks instead of seconds or milliseconds, because AddSeconds/AddMilliseconds rounds to nearest millisecond + return new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc).AddTicks((long)(t * TimeSpan.TicksPerSecond)); } /// diff --git a/CryptoExchange.Net/CryptoExchange.Net.xml b/CryptoExchange.Net/CryptoExchange.Net.xml index f9a1a0f..5a2eaf2 100644 --- a/CryptoExchange.Net/CryptoExchange.Net.xml +++ b/CryptoExchange.Net/CryptoExchange.Net.xml @@ -1320,6 +1320,12 @@ + + + + + + Unsubscribe from a stream @@ -2219,6 +2225,11 @@ The name of the order book implementation + + + Whether or not checksum validation is enabled. Default is true, disabling will ignore checksum messages. + + Whether each update should have a consecutive id number. Used to identify and reconnect when numbers are skipped. @@ -2339,6 +2350,21 @@ Time to wait between reconnect attempts + + + The maximum number of times to try to reconnect + + + + + The maximum number of times to try to resubscribe after reconnecting + + + + + Max number of concurrent resubscription tasks per socket after reconnecting a socket + + The time to wait for a socket response before giving a timeout @@ -3000,6 +3026,15 @@ + + + + + + + + + Delegate used for processing byte data received from socket connections before it is processed by handlers @@ -3621,6 +3656,16 @@ If the socket should be reconnected upon closing + + + Current reconnect try + + + + + Current resubscribe try + + Time of disconnecting @@ -3807,6 +3852,18 @@ + + + Unsubscribe a subscription + + + + + + Resubscribe this subscription + + + Default weboscket factory implementation diff --git a/CryptoExchange.Net/Interfaces/ISocketClient.cs b/CryptoExchange.Net/Interfaces/ISocketClient.cs index b1828d2..fb1658a 100644 --- a/CryptoExchange.Net/Interfaces/ISocketClient.cs +++ b/CryptoExchange.Net/Interfaces/ISocketClient.cs @@ -43,6 +43,12 @@ namespace CryptoExchange.Net.Interfaces /// int SocketCombineTarget { get; } + /// + int? MaxReconnectTries { get; } + /// + int? MaxResubscribeTries { get; } + /// + int MaxConcurrentResubscriptionsPerSocket { get; } /// /// Unsubscribe from a stream diff --git a/CryptoExchange.Net/Objects/Options.cs b/CryptoExchange.Net/Objects/Options.cs index 3c93b7c..e452a05 100644 --- a/CryptoExchange.Net/Objects/Options.cs +++ b/CryptoExchange.Net/Objects/Options.cs @@ -45,6 +45,11 @@ namespace CryptoExchange.Net.Objects /// public string OrderBookName { get; } + /// + /// Whether or not checksum validation is enabled. Default is true, disabling will ignore checksum messages. + /// + public bool ChecksumValidationEnabled { get; set; } = true; + /// /// Whether each update should have a consecutive id number. Used to identify and reconnect when numbers are skipped. /// @@ -220,6 +225,21 @@ namespace CryptoExchange.Net.Objects /// public TimeSpan ReconnectInterval { get; set; } = TimeSpan.FromSeconds(5); + /// + /// The maximum number of times to try to reconnect + /// + public int? MaxReconnectTries { get; set; } + + /// + /// The maximum number of times to try to resubscribe after reconnecting + /// + public int? MaxResubscribeTries { get; set; } = 5; + + /// + /// Max number of concurrent resubscription tasks per socket after reconnecting a socket + /// + public int MaxConcurrentResubscriptionsPerSocket { get; set; } = 5; + /// /// The time to wait for a socket response before giving a timeout /// diff --git a/CryptoExchange.Net/OrderBook/SymbolOrderBook.cs b/CryptoExchange.Net/OrderBook/SymbolOrderBook.cs index 0d530bc..f3661b2 100644 --- a/CryptoExchange.Net/OrderBook/SymbolOrderBook.cs +++ b/CryptoExchange.Net/OrderBook/SymbolOrderBook.cs @@ -37,6 +37,7 @@ namespace CryptoExchange.Net.OrderBook private UpdateSubscription? subscription; private readonly bool sequencesAreConsecutive; private readonly bool strictLevels; + private readonly bool validateChecksum; private Task? _processTask; private readonly AutoResetEvent _queueEvent; @@ -214,6 +215,7 @@ namespace CryptoExchange.Net.OrderBook sequencesAreConsecutive = options.SequenceNumbersAreConsecutive; strictLevels = options.StrictLevels; + validateChecksum = options.ChecksumValidationEnabled; Symbol = symbol; Status = OrderBookStatus.Disconnected; @@ -233,7 +235,7 @@ namespace CryptoExchange.Net.OrderBook { log.Write(LogLevel.Debug, $"{Id} order book {Symbol} starting"); Status = OrderBookStatus.Connecting; - _processTask = Task.Run(ProcessQueue); + _processTask = Task.Factory.StartNew(ProcessQueue, TaskCreationOptions.LongRunning); var startResult = await DoStartAsync().ConfigureAwait(false); if (!startResult) @@ -243,7 +245,14 @@ namespace CryptoExchange.Net.OrderBook } subscription = startResult.Data; - subscription.ConnectionLost += Reset; + subscription.ConnectionLost += () => + { + + log.Write(LogLevel.Warning, $"{Id} order book {Symbol} connection lost"); + Status = OrderBookStatus.Reconnecting; + Reset(); + }; + subscription.ConnectionRestored += async time => await ResyncAsync().ConfigureAwait(false); Status = OrderBookStatus.Synced; return new CallResult(true, null); @@ -288,8 +297,6 @@ namespace CryptoExchange.Net.OrderBook private void Reset() { - log.Write(LogLevel.Warning, $"{Id} order book {Symbol} connection lost"); - Status = OrderBookStatus.Reconnecting; _queueEvent.Set(); // Clear queue while (_processQueue.TryDequeue(out _)) { } @@ -380,9 +387,7 @@ namespace CryptoExchange.Net.OrderBook { lock (bookLock) { - if (Status == OrderBookStatus.Connecting || Status == OrderBookStatus.Disconnected) - return; - + log.Write(LogLevel.Warning, $"{Symbol} bookSet"); bookSet = true; asks.Clear(); foreach (var ask in item.Asks) @@ -408,9 +413,6 @@ namespace CryptoExchange.Net.OrderBook { lock (bookLock) { - if (Status == OrderBookStatus.Connecting || Status == OrderBookStatus.Disconnected) - return; - if (!bookSet) { processBuffer.Add(new ProcessBufferRangeSequenceEntry() @@ -434,7 +436,7 @@ namespace CryptoExchange.Net.OrderBook if (asks.First().Key < bids.First().Key) { log.Write(LogLevel.Warning, $"{Id} order book {Symbol} detected out of sync order book. Resyncing"); - _ = subscription?.ReconnectAsync(); + Resubscribe(); return; } @@ -448,6 +450,9 @@ namespace CryptoExchange.Net.OrderBook { lock (bookLock) { + if (!validateChecksum) + return; + bool checksumResult = false; try { @@ -467,12 +472,31 @@ namespace CryptoExchange.Net.OrderBook // Should maybe only reconnect the specific subscription? log.Write(LogLevel.Warning, $"{Id} order book {Symbol} out of sync. Resyncing"); - _ = subscription?.ReconnectAsync(); + Resubscribe(); return; } } } + private void Resubscribe() + { + Status = OrderBookStatus.Syncing; + _ = Task.Run(async () => + { + await subscription!.UnsubscribeAsync().ConfigureAwait(false); + Reset(); + if (!await subscription!.ResubscribeAsync().ConfigureAwait(false)) + { + // Resubscribing failed, reconnect the socket + log.Write(LogLevel.Warning, $"{Id} order book {Symbol} resync failed, reconnecting socket"); + Status = OrderBookStatus.Reconnecting; + _ = subscription!.ReconnectAsync(); + } + else + await ResyncAsync().ConfigureAwait(false); + }); + } + /// /// Set the initial data for the order book /// @@ -591,9 +615,6 @@ namespace CryptoExchange.Net.OrderBook /// The entry protected virtual bool ProcessUpdate(long sequence, OrderBookEntryType type, ISymbolOrderBookEntry entry) { - if (Status != OrderBookStatus.Syncing && Status != OrderBookStatus.Synced) - return false; - if (sequence <= LastSequenceNumber) { log.Write(LogLevel.Debug, $"{Id} order book {Symbol} update skipped #{sequence}"); @@ -604,7 +625,7 @@ namespace CryptoExchange.Net.OrderBook { // Out of sync log.Write(LogLevel.Warning, $"{Id} order book {Symbol} out of sync (expected { LastSequenceNumber + 1}, was {sequence}), reconnecting"); - subscription?.ReconnectAsync(); + Resubscribe(); return false; } diff --git a/CryptoExchange.Net/SocketClient.cs b/CryptoExchange.Net/SocketClient.cs index ec0aaf7..dc7f270 100644 --- a/CryptoExchange.Net/SocketClient.cs +++ b/CryptoExchange.Net/SocketClient.cs @@ -49,6 +49,12 @@ namespace CryptoExchange.Net public int MaxSocketConnections { get; protected set; } = 9999; /// public int SocketCombineTarget { get; protected set; } + /// + public int? MaxReconnectTries { get; protected set; } + /// + public int? MaxResubscribeTries { get; protected set; } + /// + public int MaxConcurrentResubscriptionsPerSocket { get; protected set; } /// /// Delegate used for processing byte data received from socket connections before it is processed by handlers /// @@ -102,6 +108,9 @@ namespace CryptoExchange.Net ResponseTimeout = exchangeOptions.SocketResponseTimeout; SocketNoDataTimeout = exchangeOptions.SocketNoDataTimeout; SocketCombineTarget = exchangeOptions.SocketSubscriptionsCombineTarget ?? 1; + MaxReconnectTries = exchangeOptions.MaxReconnectTries; + MaxResubscribeTries = exchangeOptions.MaxResubscribeTries; + MaxConcurrentResubscriptionsPerSocket = exchangeOptions.MaxConcurrentResubscriptionsPerSocket; } /// diff --git a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs index c16fa54..08e1e8a 100644 --- a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs +++ b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs @@ -5,6 +5,7 @@ using Microsoft.Extensions.Logging; using System; using System.Collections.Concurrent; using System.Collections.Generic; +using System.Diagnostics; using System.IO; using System.Net; using System.Net.WebSockets; @@ -215,14 +216,19 @@ namespace CryptoExchange.Net.Sockets return false; } - _sendTask = Task.Run(SendLoopAsync); - _receiveTask = Task.Run(ReceiveLoopAsync); + log.Write(LogLevel.Trace, $"Socket {Id} connection succeeded, starting communication"); + _sendTask = Task.Factory.StartNew(SendLoopAsync, TaskCreationOptions.LongRunning); + _receiveTask = Task.Factory.StartNew(ReceiveLoopAsync, TaskCreationOptions.LongRunning); if (Timeout != default) _timeoutTask = Task.Run(CheckTimeoutAsync); + var sw = Stopwatch.StartNew(); while (!_startedSent || !_startedReceive) // Wait for the tasks to have actually started await Task.Delay(10).ConfigureAwait(false); + + log.Write(LogLevel.Warning, $"Socket {Id} waited for {sw.ElapsedMilliseconds}ms for tasks to start"); + log.Write(LogLevel.Debug, $"Socket {Id} connected"); return true; } @@ -237,6 +243,7 @@ namespace CryptoExchange.Net.Sockets throw new InvalidOperationException("Can't send data when socket is not connected"); var bytes = _encoding.GetBytes(data); + log.Write(LogLevel.Trace, $"Socket {Id} Adding {bytes.Length} to sent buffer"); _sendBuffer.Enqueue(bytes); _sendEvent.Set(); } @@ -278,6 +285,7 @@ namespace CryptoExchange.Net.Sockets if (_timeoutTask != null) tasksToAwait.Add(_timeoutTask); + log.Write(LogLevel.Trace, $"Socket {Id} waiting for communication loops to finish"); await Task.WhenAll(tasksToAwait).ConfigureAwait(false); log.Write(LogLevel.Debug, $"Socket {Id} closed"); Handle(closeHandlers); @@ -296,6 +304,7 @@ namespace CryptoExchange.Net.Sockets openHandlers.Clear(); closeHandlers.Clear(); messageHandlers.Clear(); + log.Write(LogLevel.Trace, $"Socket {Id} disposed"); } /// diff --git a/CryptoExchange.Net/Sockets/SocketConnection.cs b/CryptoExchange.Net/Sockets/SocketConnection.cs index d50303e..ea44585 100644 --- a/CryptoExchange.Net/Sockets/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/SocketConnection.cs @@ -9,6 +9,7 @@ using CryptoExchange.Net.Logging; using Newtonsoft.Json; using Newtonsoft.Json.Linq; using Microsoft.Extensions.Logging; +using CryptoExchange.Net.Objects; namespace CryptoExchange.Net.Sockets { @@ -68,7 +69,14 @@ namespace CryptoExchange.Net.Sockets /// If the socket should be reconnected upon closing /// public bool ShouldReconnect { get; set; } - + /// + /// Current reconnect try + /// + public int ReconnectTry { get; set; } + /// + /// Current resubscribe try + /// + public int ResubscribeTry { get; set; } /// /// Time of disconnecting /// @@ -133,6 +141,7 @@ namespace CryptoExchange.Net.Sockets Socket.OnClose += SocketOnClose; Socket.OnOpen += () => { + ReconnectTry = 0; PausedActivity = false; Connected = true; }; @@ -325,7 +334,21 @@ namespace CryptoExchange.Net.Sockets Socket.Reset(); if (!await Socket.ConnectAsync().ConfigureAwait(false)) { - log.Write(LogLevel.Debug, $"Socket {Socket.Id} failed to reconnect"); + ReconnectTry++; + if(socketClient.MaxReconnectTries != null + && ReconnectTry >= socketClient.MaxReconnectTries) + { + log.Write(LogLevel.Debug, $"Socket {Socket.Id} failed to reconnect after {ReconnectTry} tries, closing"); + ShouldReconnect = false; + + if (socketClient.sockets.ContainsKey(Socket.Id)) + socketClient.sockets.TryRemove(Socket.Id, out _); + + Closed?.Invoke(); + break; + } + + log.Write(LogLevel.Debug, $"Socket {Socket.Id} failed to reconnect{(socketClient.MaxReconnectTries != null ? $", try {ReconnectTry}/{socketClient.MaxReconnectTries}": "")}"); continue; } @@ -337,9 +360,28 @@ namespace CryptoExchange.Net.Sockets var reconnectResult = await ProcessReconnectAsync().ConfigureAwait(false); if (!reconnectResult) + { + ResubscribeTry++; + + if (socketClient.MaxResubscribeTries != null && + ResubscribeTry >= socketClient.MaxResubscribeTries) + { + log.Write(LogLevel.Debug, $"Socket {Socket.Id} failed to resubscribe after {ResubscribeTry} tries, closing"); + ShouldReconnect = false; + + if (socketClient.sockets.ContainsKey(Socket.Id)) + socketClient.sockets.TryRemove(Socket.Id, out _); + + Closed?.Invoke(); + } + else + log.Write(LogLevel.Debug, $"Socket {Socket.Id} resubscribing all subscriptions failed on reconnected socket{(socketClient.MaxResubscribeTries != null ? $", try {ResubscribeTry}/{socketClient.MaxResubscribeTries}" : "")}. Disconnecting and reconnecting."); + await Socket.CloseAsync().ConfigureAwait(false); + } else { + ResubscribeTry = 0; if (lostTriggered) { lostTriggered = false; @@ -389,29 +431,39 @@ namespace CryptoExchange.Net.Sockets lock (subscriptionLock) subscriptionList = subscriptions.Where(h => h.Request != null).ToList(); - var success = true; - var taskList = new List(); // Foreach subscription which is subscribed by a subscription request we will need to resend that request to resubscribe - foreach (var subscription in subscriptionList) + for (var i = 0; i < subscriptionList.Count; i += socketClient.MaxConcurrentResubscriptionsPerSocket) { - var task = socketClient.SubscribeAndWaitAsync(this, subscription.Request!, subscription).ContinueWith(t => + var success = true; + var taskList = new List(); + foreach (var subscription in subscriptionList.Skip(i).Take(socketClient.MaxConcurrentResubscriptionsPerSocket)) { - if (!t.Result) - success = false; - }); - taskList.Add(task); - } + var task = socketClient.SubscribeAndWaitAsync(this, subscription.Request!, subscription).ContinueWith(t => + { + if (!t.Result) + success = false; + }); + taskList.Add(task); + } - await Task.WhenAll(taskList).ConfigureAwait(false); - if (!success) - { - log.Write(LogLevel.Debug, $"Socket {Socket.Id} resubscribing all subscriptions failed on reconnected socket. Disconnecting and reconnecting."); - return false; - } + await Task.WhenAll(taskList).ConfigureAwait(false); + if (!success) + return false; + } log.Write(LogLevel.Debug, $"Socket {Socket.Id} all subscription successfully resubscribed on reconnected socket."); return true; } + + internal async Task UnsubscribeAsync(SocketSubscription socketSubscription) + { + await socketClient.UnsubscribeAsync(this, socketSubscription).ConfigureAwait(false); + } + + internal async Task> ResubscribeAsync(SocketSubscription socketSubscription) + { + return await socketClient.SubscribeAndWaitAsync(this, socketSubscription.Request!, socketSubscription).ConfigureAwait(false); + } /// /// Close the connection diff --git a/CryptoExchange.Net/Sockets/UpdateSubscription.cs b/CryptoExchange.Net/Sockets/UpdateSubscription.cs index c6e7b7a..0d352ab 100644 --- a/CryptoExchange.Net/Sockets/UpdateSubscription.cs +++ b/CryptoExchange.Net/Sockets/UpdateSubscription.cs @@ -1,4 +1,5 @@ -using System; +using CryptoExchange.Net.Objects; +using System; using System.Threading.Tasks; namespace CryptoExchange.Net.Sockets @@ -91,5 +92,23 @@ namespace CryptoExchange.Net.Sockets { return connection.Socket.CloseAsync(); } + + /// + /// Unsubscribe a subscription + /// + /// + internal async Task UnsubscribeAsync() + { + await connection.UnsubscribeAsync(subscription).ConfigureAwait(false); + } + + /// + /// Resubscribe this subscription + /// + /// + internal async Task> ResubscribeAsync() + { + return await connection.ResubscribeAsync(subscription).ConfigureAwait(false); + } } }