diff --git a/CryptoExchange.Net.UnitTests/TestImplementations/TestSocket.cs b/CryptoExchange.Net.UnitTests/TestImplementations/TestSocket.cs index a59b88a..431de97 100644 --- a/CryptoExchange.Net.UnitTests/TestImplementations/TestSocket.cs +++ b/CryptoExchange.Net.UnitTests/TestImplementations/TestSocket.cs @@ -13,11 +13,15 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations public bool Connected { get; set; } public event Action OnClose; + +#pragma warning disable 0067 public event Action OnReconnected; public event Action OnReconnecting; +#pragma warning restore 0067 public event Action OnMessage; public event Action OnError; public event Action OnOpen; + public Func> GetReconnectionUrl { get; set; } public int Id { get; } public bool ShouldReconnect { get; set; } diff --git a/CryptoExchange.Net/Clients/BaseSocketClient.cs b/CryptoExchange.Net/Clients/BaseSocketClient.cs index e0c3fba..8d6c26f 100644 --- a/CryptoExchange.Net/Clients/BaseSocketClient.cs +++ b/CryptoExchange.Net/Clients/BaseSocketClient.cs @@ -543,6 +543,17 @@ namespace CryptoExchange.Net return Task.FromResult(new CallResult(address)); } + /// + /// Get the url to reconnect to after losing a connection + /// + /// + /// + /// + public virtual Task GetReconnectUriAsync(SocketApiClient apiClient, SocketConnection connection) + { + return Task.FromResult(connection.ConnectionUri); + } + /// /// Gets a connection for a new subscription or query. Can be an existing if there are open position or a new one. /// diff --git a/CryptoExchange.Net/Interfaces/IWebsocket.cs b/CryptoExchange.Net/Interfaces/IWebsocket.cs index a28df4c..374a8d1 100644 --- a/CryptoExchange.Net/Interfaces/IWebsocket.cs +++ b/CryptoExchange.Net/Interfaces/IWebsocket.cs @@ -1,4 +1,5 @@ using CryptoExchange.Net.Objects; +using CryptoExchange.Net.Sockets; using System; using System.Security.Authentication; using System.Text; @@ -35,6 +36,10 @@ namespace CryptoExchange.Net.Interfaces /// Websocket has reconnected to the server /// event Action OnReconnected; + /// + /// Get reconntion url + /// + Func> GetReconnectionUrl { get; set; } /// /// Unique id for this socket diff --git a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs index a80854c..5248c5f 100644 --- a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs +++ b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs @@ -33,7 +33,6 @@ namespace CryptoExchange.Net.Sockets private readonly AsyncResetEvent _sendEvent; private readonly ConcurrentQueue _sendBuffer; private readonly SemaphoreSlim _closeSem; - private readonly WebSocketParameters _parameters; private readonly List _outgoingMessages; private ClientWebSocket _socket; @@ -64,13 +63,16 @@ namespace CryptoExchange.Net.Sockets /// public int Id { get; } + /// + public WebSocketParameters Parameters { get; } + /// /// The timestamp this socket has been active for the last time /// public DateTime LastActionTime { get; private set; } /// - public Uri Uri => _parameters.Uri; + public Uri Uri => Parameters.Uri; /// public bool IsClosed => _socket.State == WebSocketState.Closed; @@ -107,6 +109,8 @@ namespace CryptoExchange.Net.Sockets public event Action? OnReconnecting; /// public event Action? OnReconnected; + /// + public Func>? GetReconnectionUrl { get; set; } /// /// ctor @@ -118,7 +122,7 @@ namespace CryptoExchange.Net.Sockets Id = NextStreamId(); _log = log; - _parameters = websocketParameters; + Parameters = websocketParameters; _outgoingMessages = new List(); _receivedMessages = new List(); _sendEvent = new AsyncResetEvent(); @@ -147,17 +151,17 @@ namespace CryptoExchange.Net.Sockets private ClientWebSocket CreateSocket() { var cookieContainer = new CookieContainer(); - foreach (var cookie in _parameters.Cookies) + foreach (var cookie in Parameters.Cookies) cookieContainer.Add(new Cookie(cookie.Key, cookie.Value)); var socket = new ClientWebSocket(); socket.Options.Cookies = cookieContainer; - foreach (var header in _parameters.Headers) + foreach (var header in Parameters.Headers) socket.Options.SetRequestHeader(header.Key, header.Value); - socket.Options.KeepAliveInterval = _parameters.KeepAliveInterval ?? TimeSpan.Zero; + socket.Options.KeepAliveInterval = Parameters.KeepAliveInterval ?? TimeSpan.Zero; socket.Options.SetBuffer(65536, 65536); // Setting it to anything bigger than 65536 throws an exception in .net framework - if (_parameters.Proxy != null) - SetProxy(_parameters.Proxy); + if (Parameters.Proxy != null) + SetProxy(Parameters.Proxy); return socket; } @@ -188,7 +192,7 @@ namespace CryptoExchange.Net.Sockets _processState = ProcessState.Processing; var sendTask = SendLoopAsync(); var receiveTask = ReceiveLoopAsync(); - var timeoutTask = _parameters.Timeout != null && _parameters.Timeout > TimeSpan.FromSeconds(0) ? CheckTimeoutAsync() : Task.CompletedTask; + var timeoutTask = Parameters.Timeout != null && Parameters.Timeout > TimeSpan.FromSeconds(0) ? CheckTimeoutAsync() : Task.CompletedTask; await Task.WhenAll(sendTask, receiveTask, timeoutTask).ConfigureAwait(false); _log.Write(LogLevel.Debug, $"Socket {Id} processing tasks finished"); @@ -199,7 +203,7 @@ namespace CryptoExchange.Net.Sockets await _closeTask.ConfigureAwait(false); _closeTask = null; - if (!_parameters.AutoReconnect) + if (!Parameters.AutoReconnect) { _processState = ProcessState.Idle; OnClose?.Invoke(); @@ -209,12 +213,23 @@ namespace CryptoExchange.Net.Sockets if (!_stopRequested) { _processState = ProcessState.Reconnecting; - OnReconnecting?.Invoke(); + OnReconnecting?.Invoke(); } while (!_stopRequested) { _log.Write(LogLevel.Debug, $"Socket {Id} attempting to reconnect"); + var task = GetReconnectionUrl?.Invoke(); + if (task != null) + { + var reconnectUri = await task.ConfigureAwait(false); + if (reconnectUri != null && Parameters.Uri != reconnectUri) + { + _log.Write(LogLevel.Debug, $"Socket {Id} reconnect URI set to {reconnectUri}"); + Parameters.Uri = reconnectUri; + } + } + _socket = CreateSocket(); _ctsSource.Dispose(); _ctsSource = new CancellationTokenSource(); @@ -223,7 +238,7 @@ namespace CryptoExchange.Net.Sockets var connected = await ConnectInternalAsync().ConfigureAwait(false); if (!connected) { - await Task.Delay(_parameters.ReconnectInterval).ConfigureAwait(false); + await Task.Delay(Parameters.ReconnectInterval).ConfigureAwait(false); continue; } @@ -241,7 +256,7 @@ namespace CryptoExchange.Net.Sockets if (_ctsSource.IsCancellationRequested) return; - var bytes = _parameters.Encoding.GetBytes(data); + var bytes = Parameters.Encoding.GetBytes(data); _log.Write(LogLevel.Trace, $"Socket {Id} Adding {bytes.Length} to sent buffer"); _sendBuffer.Enqueue(bytes); _sendEvent.Set(); @@ -370,11 +385,11 @@ namespace CryptoExchange.Net.Sockets while (_sendBuffer.TryDequeue(out var data)) { - if (_parameters.RatelimitPerSecond != null) + if (Parameters.RatelimitPerSecond != null) { // Wait for rate limit DateTime? start = null; - while (MessagesSentLastSecond() >= _parameters.RatelimitPerSecond) + while (MessagesSentLastSecond() >= Parameters.RatelimitPerSecond) { start ??= DateTime.UtcNow; await Task.Delay(50).ConfigureAwait(false); @@ -549,14 +564,14 @@ namespace CryptoExchange.Net.Sockets string strData; if (messageType == WebSocketMessageType.Binary) { - if (_parameters.DataInterpreterBytes == null) + if (Parameters.DataInterpreterBytes == null) throw new Exception("Byte interpreter not set while receiving byte data"); try { var relevantData = new byte[count]; Array.Copy(data, offset, relevantData, 0, count); - strData = _parameters.DataInterpreterBytes(relevantData); + strData = Parameters.DataInterpreterBytes(relevantData); } catch(Exception e) { @@ -565,13 +580,13 @@ namespace CryptoExchange.Net.Sockets } } else - strData = _parameters.Encoding.GetString(data, offset, count); + strData = Parameters.Encoding.GetString(data, offset, count); - if (_parameters.DataInterpreterString != null) + if (Parameters.DataInterpreterString != null) { try { - strData = _parameters.DataInterpreterString(strData); + strData = Parameters.DataInterpreterString(strData); } catch(Exception e) { @@ -633,7 +648,7 @@ namespace CryptoExchange.Net.Sockets /// protected async Task CheckTimeoutAsync() { - _log.Write(LogLevel.Debug, $"Socket {Id} Starting task checking for no data received for {_parameters.Timeout}"); + _log.Write(LogLevel.Debug, $"Socket {Id} Starting task checking for no data received for {Parameters.Timeout}"); LastActionTime = DateTime.UtcNow; try { @@ -642,9 +657,9 @@ namespace CryptoExchange.Net.Sockets if (_ctsSource.IsCancellationRequested) return; - if (DateTime.UtcNow - LastActionTime > _parameters.Timeout) + if (DateTime.UtcNow - LastActionTime > Parameters.Timeout) { - _log.Write(LogLevel.Warning, $"Socket {Id} No data received for {_parameters.Timeout}, reconnecting socket"); + _log.Write(LogLevel.Warning, $"Socket {Id} No data received for {Parameters.Timeout}, reconnecting socket"); _ = CloseAsync().ConfigureAwait(false); return; } diff --git a/CryptoExchange.Net/Sockets/SocketConnection.cs b/CryptoExchange.Net/Sockets/SocketConnection.cs index 46d597c..703d358 100644 --- a/CryptoExchange.Net/Sockets/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/SocketConnection.cs @@ -184,6 +184,7 @@ namespace CryptoExchange.Net.Sockets _socket.OnReconnecting += HandleReconnecting; _socket.OnReconnected += HandleReconnected; _socket.OnError += HandleError; + _socket.GetReconnectionUrl = GetReconnectionUrlAsync; } /// @@ -223,7 +224,17 @@ namespace CryptoExchange.Net.Sockets foreach (var sub in subscriptions) sub.Confirmed = false; } - Task.Run(() => ConnectionLost?.Invoke()); + + _ = Task.Run(() => ConnectionLost?.Invoke()); + } + + /// + /// Get the url to connect to when reconnecting + /// + /// + protected virtual async Task GetReconnectionUrlAsync() + { + return await socketClient.GetReconnectUriAsync(ApiClient, this).ConfigureAwait(false); } /// diff --git a/CryptoExchange.Net/Sockets/WebSocketParameters.cs b/CryptoExchange.Net/Sockets/WebSocketParameters.cs index 7fe0330..fd41fdb 100644 --- a/CryptoExchange.Net/Sockets/WebSocketParameters.cs +++ b/CryptoExchange.Net/Sockets/WebSocketParameters.cs @@ -2,6 +2,7 @@ using System; using System.Collections.Generic; using System.Text; +using System.Threading.Tasks; namespace CryptoExchange.Net.Sockets {