diff --git a/CryptoExchange.Net.UnitTests/SocketClientTests.cs b/CryptoExchange.Net.UnitTests/SocketClientTests.cs index 374a943..44b9a5d 100644 --- a/CryptoExchange.Net.UnitTests/SocketClientTests.cs +++ b/CryptoExchange.Net.UnitTests/SocketClientTests.cs @@ -42,7 +42,7 @@ namespace CryptoExchange.Net.UnitTests socket.CanConnect = canConnect; //act - var connectResult = client.ConnectSocketSub(new SocketConnection(client, null, socket)); + var connectResult = client.ConnectSocketSub(new SocketConnection(client, null, socket, null)); //assert Assert.IsTrue(connectResult.Success == canConnect); @@ -57,10 +57,10 @@ namespace CryptoExchange.Net.UnitTests socket.ShouldReconnect = true; socket.CanConnect = true; socket.DisconnectTime = DateTime.UtcNow; - var sub = new SocketConnection(client, null, socket); + var sub = new SocketConnection(client, null, socket, null); var rstEvent = new ManualResetEvent(false); JToken result = null; - sub.AddSubscription(SocketSubscription.CreateForIdentifier(10, "TestHandler", true, (messageEvent) => + sub.AddSubscription(SocketSubscription.CreateForIdentifier(10, "TestHandler", true, false, (messageEvent) => { result = messageEvent.JsonData; rstEvent.Set(); @@ -85,10 +85,10 @@ namespace CryptoExchange.Net.UnitTests socket.ShouldReconnect = true; socket.CanConnect = true; socket.DisconnectTime = DateTime.UtcNow; - var sub = new SocketConnection(client, null, socket); + var sub = new SocketConnection(client, null, socket, null); var rstEvent = new ManualResetEvent(false); string original = null; - sub.AddSubscription(SocketSubscription.CreateForIdentifier(10, "TestHandler", true, (messageEvent) => + sub.AddSubscription(SocketSubscription.CreateForIdentifier(10, "TestHandler", true, false, (messageEvent) => { original = messageEvent.OriginalData; rstEvent.Set(); @@ -103,34 +103,6 @@ namespace CryptoExchange.Net.UnitTests Assert.IsTrue(original == (enabled ? "{\"property\": 123}" : null)); } - [TestCase] - public void DisconnectedSocket_Should_Reconnect() - { - // arrange - bool reconnected = false; - var client = new TestSocketClient(new TestOptions() { ReconnectInterval = TimeSpan.Zero, LogLevel = LogLevel.Debug }); - var socket = client.CreateSocket(); - socket.ShouldReconnect = true; - socket.CanConnect = true; - socket.DisconnectTime = DateTime.UtcNow; - var sub = new SocketConnection(client, null, socket); - sub.ShouldReconnect = true; - client.ConnectSocketSub(sub); - var rstEvent = new ManualResetEvent(false); - sub.ConnectionRestored += (a) => - { - reconnected = true; - rstEvent.Set(); - }; - - // act - socket.InvokeClose(); - rstEvent.WaitOne(1000); - - // assert - Assert.IsTrue(reconnected); - } - [TestCase()] public void UnsubscribingStream_Should_CloseTheSocket() { @@ -138,9 +110,11 @@ namespace CryptoExchange.Net.UnitTests var client = new TestSocketClient(new TestOptions() { ReconnectInterval = TimeSpan.Zero, LogLevel = LogLevel.Debug }); var socket = client.CreateSocket(); socket.CanConnect = true; - var sub = new SocketConnection(client, null, socket); + var sub = new SocketConnection(client, null, socket, null); client.ConnectSocketSub(sub); - var ups = new UpdateSubscription(sub, SocketSubscription.CreateForIdentifier(10, "Test", true, (e) => {})); + var us = SocketSubscription.CreateForIdentifier(10, "Test", true, false, (e) => { }); + var ups = new UpdateSubscription(sub, us); + sub.AddSubscription(us); // act client.UnsubscribeAsync(ups).Wait(); @@ -158,8 +132,8 @@ namespace CryptoExchange.Net.UnitTests var socket2 = client.CreateSocket(); socket1.CanConnect = true; socket2.CanConnect = true; - var sub1 = new SocketConnection(client, null, socket1); - var sub2 = new SocketConnection(client, null, socket2); + var sub1 = new SocketConnection(client, null, socket1, null); + var sub2 = new SocketConnection(client, null, socket2, null); client.ConnectSocketSub(sub1); client.ConnectSocketSub(sub2); @@ -178,7 +152,7 @@ namespace CryptoExchange.Net.UnitTests var client = new TestSocketClient(new TestOptions() { ReconnectInterval = TimeSpan.Zero, LogLevel = LogLevel.Debug }); var socket = client.CreateSocket(); socket.CanConnect = false; - var sub = new SocketConnection(client, null, socket); + var sub = new SocketConnection(client, null, socket, null); // act var connectResult = client.ConnectSocketSub(sub); diff --git a/CryptoExchange.Net.UnitTests/TestImplementations/TestSocket.cs b/CryptoExchange.Net.UnitTests/TestImplementations/TestSocket.cs index 6eabc9a..a59b88a 100644 --- a/CryptoExchange.Net.UnitTests/TestImplementations/TestSocket.cs +++ b/CryptoExchange.Net.UnitTests/TestImplementations/TestSocket.cs @@ -13,6 +13,8 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations public bool Connected { get; set; } public event Action OnClose; + public event Action OnReconnected; + public event Action OnReconnecting; public event Action OnMessage; public event Action OnError; public event Action OnOpen; @@ -93,6 +95,7 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations { Connected = false; DisconnectTime = DateTime.UtcNow; + Reconnecting = true; OnClose?.Invoke(); } @@ -115,11 +118,6 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations { OnError?.Invoke(error); } - - public async Task ProcessAsync() - { - while (Connected) - await Task.Delay(50); - } + public Task ReconnectAsync() => Task.CompletedTask; } } diff --git a/CryptoExchange.Net.UnitTests/TestImplementations/TestSocketClient.cs b/CryptoExchange.Net.UnitTests/TestImplementations/TestSocketClient.cs index 218264d..eb99274 100644 --- a/CryptoExchange.Net.UnitTests/TestImplementations/TestSocketClient.cs +++ b/CryptoExchange.Net.UnitTests/TestImplementations/TestSocketClient.cs @@ -22,13 +22,13 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations { SubClient = new TestSubSocketClient(exchangeOptions, exchangeOptions.SubOptions); SocketFactory = new Mock().Object; - Mock.Get(SocketFactory).Setup(f => f.CreateWebsocket(It.IsAny(), It.IsAny())).Returns(new TestSocket()); + Mock.Get(SocketFactory).Setup(f => f.CreateWebsocket(It.IsAny(), It.IsAny())).Returns(new TestSocket()); } public TestSocket CreateSocket() { - Mock.Get(SocketFactory).Setup(f => f.CreateWebsocket(It.IsAny(), It.IsAny())).Returns(new TestSocket()); - return (TestSocket)CreateSocket("123"); + Mock.Get(SocketFactory).Setup(f => f.CreateWebsocket(It.IsAny(), It.IsAny())).Returns(new TestSocket()); + return (TestSocket)CreateSocket("https://localhost:123/"); } public CallResult ConnectSocketSub(SocketConnection sub) diff --git a/CryptoExchange.Net/Clients/BaseSocketClient.cs b/CryptoExchange.Net/Clients/BaseSocketClient.cs index a8cd86a..d67d435 100644 --- a/CryptoExchange.Net/Clients/BaseSocketClient.cs +++ b/CryptoExchange.Net/Clients/BaseSocketClient.cs @@ -3,7 +3,7 @@ using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; using System.Linq; -using System.Net.WebSockets; +using System.Text; using System.Threading; using System.Threading.Tasks; using CryptoExchange.Net.Authentication; @@ -11,7 +11,6 @@ using CryptoExchange.Net.Interfaces; using CryptoExchange.Net.Objects; using CryptoExchange.Net.Sockets; using Microsoft.Extensions.Logging; -using Newtonsoft.Json; using Newtonsoft.Json.Linq; namespace CryptoExchange.Net @@ -120,10 +119,7 @@ namespace CryptoExchange.Net /// The options for this client protected BaseSocketClient(string name, BaseSocketClientOptions options) : base(name, options) { - if (options == null) - throw new ArgumentNullException(nameof(options)); - - ClientOptions = options; + ClientOptions = options ?? throw new ArgumentNullException(nameof(options)); } /// @@ -196,10 +192,14 @@ namespace CryptoExchange.Net while (true) { // Get a new or existing socket connection - socketConnection = GetSocketConnection(apiClient, url, authenticated); + var socketResult = await GetSocketConnection(apiClient, url, authenticated).ConfigureAwait(false); + if(!socketResult) + return socketResult.As(null); + + socketConnection = socketResult.Data; // Add a subscription on the socket connection - subscription = AddSubscription(request, identifier, true, socketConnection, dataHandler); + subscription = AddSubscription(request, identifier, true, socketConnection, dataHandler, authenticated); if (subscription == null) { log.Write(LogLevel.Trace, $"Socket {socketConnection.SocketId} failed to add subscription, retrying on different connection"); @@ -240,6 +240,7 @@ namespace CryptoExchange.Net var subResult = await SubscribeAndWaitAsync(socketConnection, request, subscription).ConfigureAwait(false); if (!subResult) { + log.Write(LogLevel.Warning, $"Socket {socketConnection.SocketId} failed to subscribe: {subResult.Error}"); await socketConnection.CloseAsync(subscription).ConfigureAwait(false); return new CallResult(subResult.Error!); } @@ -250,7 +251,6 @@ namespace CryptoExchange.Net subscription.Confirmed = true; } - socketConnection.ShouldReconnect = true; if (ct != default) { subscription.CancellationTokenRegistration = ct.Register(async () => @@ -260,7 +260,7 @@ namespace CryptoExchange.Net }, false); } - log.Write(LogLevel.Information, $"Socket {socketConnection.SocketId} subscription completed"); + log.Write(LogLevel.Information, $"Socket {socketConnection.SocketId} subscription {subscription.Id} completed successfully"); return new CallResult(new UpdateSubscription(socketConnection, subscription)); } @@ -320,7 +320,12 @@ namespace CryptoExchange.Net await semaphoreSlim.WaitAsync().ConfigureAwait(false); try { - socketConnection = GetSocketConnection(apiClient, url, authenticated); + var socketResult = await GetSocketConnection(apiClient, url, authenticated).ConfigureAwait(false); + if (!socketResult) + return socketResult.As(default); + + socketConnection = socketResult.Data; + if (ClientOptions.SocketSubscriptionsCombineTarget == 1) { // Can release early when only a single sub per connection @@ -334,8 +339,6 @@ namespace CryptoExchange.Net } finally { - //When the task is ready, release the semaphore. It is vital to ALWAYS release the semaphore when we are ready, or else we will end up with a Semaphore that is forever locked. - //This is why it is important to do the Release within a try...finally clause; program execution may crash or take a different path, this way you are guaranteed execution if (!released) semaphoreSlim.Release(); } @@ -481,8 +484,9 @@ namespace CryptoExchange.Net /// Whether or not this is a user subscription (counts towards the max amount of handlers on a socket) /// The socket connection the handler is on /// The handler of the data received + /// Whether the subscription needs authentication /// - protected virtual SocketSubscription? AddSubscription(object? request, string? identifier, bool userSubscription, SocketConnection connection, Action> dataHandler) + protected virtual SocketSubscription? AddSubscription(object? request, string? identifier, bool userSubscription, SocketConnection connection, Action> dataHandler, bool authenticated) { void InternalHandler(MessageEvent messageEvent) { @@ -504,8 +508,8 @@ namespace CryptoExchange.Net } var subscription = request == null - ? SocketSubscription.CreateForIdentifier(NextId(), identifier!, userSubscription, InternalHandler) - : SocketSubscription.CreateForRequest(NextId(), request, userSubscription, InternalHandler); + ? SocketSubscription.CreateForIdentifier(NextId(), identifier!, userSubscription, authenticated, InternalHandler) + : SocketSubscription.CreateForRequest(NextId(), request, userSubscription, authenticated, InternalHandler); if (!connection.AddSubscription(subscription)) return null; return subscription; @@ -519,11 +523,23 @@ namespace CryptoExchange.Net protected void AddGenericHandler(string identifier, Action action) { genericHandlers.Add(identifier, action); - var subscription = SocketSubscription.CreateForIdentifier(NextId(), identifier, false, action); + var subscription = SocketSubscription.CreateForIdentifier(NextId(), identifier, false, false, action); foreach (var connection in socketConnections.Values) connection.AddSubscription(subscription); } + /// + /// Get the url to connect to (defaults to BaseAddress form the client options) + /// + /// + /// + /// + /// + protected virtual Task> GetConnectionUrlAsync(SocketApiClient apiClient, string address, bool authentication) + { + return Task.FromResult(new CallResult(address)); + } + /// /// Gets a connection for a new subscription or query. Can be an existing if there are open position or a new one. /// @@ -531,10 +547,10 @@ namespace CryptoExchange.Net /// The address the socket is for /// Whether the socket should be authenticated /// - protected virtual SocketConnection GetSocketConnection(SocketApiClient apiClient, string address, bool authenticated) + protected virtual async Task> GetSocketConnection(SocketApiClient apiClient, string address, bool authenticated) { var socketResult = socketConnections.Where(s => (s.Value.Status == SocketConnection.SocketStatus.None || s.Value.Status == SocketConnection.SocketStatus.Connected) - && s.Value.Uri.ToString().TrimEnd('/') == address.TrimEnd('/') + && s.Value.Tag.TrimEnd('/') == address.TrimEnd('/') && (s.Value.ApiClient.GetType() == apiClient.GetType()) && (s.Value.Authenticated == authenticated || !authenticated) && s.Value.Connected).OrderBy(s => s.Value.SubscriptionCount).FirstOrDefault(); var result = socketResult.Equals(default(KeyValuePair)) ? null : socketResult.Value; @@ -543,21 +559,31 @@ namespace CryptoExchange.Net if (result.SubscriptionCount < ClientOptions.SocketSubscriptionsCombineTarget || (socketConnections.Count >= ClientOptions.MaxSocketConnections && socketConnections.All(s => s.Value.SubscriptionCount >= ClientOptions.SocketSubscriptionsCombineTarget))) { // Use existing socket if it has less than target connections OR it has the least connections and we can't make new - return result; + return new CallResult(result); } } + var connectionAddress = await GetConnectionUrlAsync(apiClient, address, authenticated).ConfigureAwait(false); + if (!connectionAddress) + { + log.Write(LogLevel.Warning, $"Failed to determine connection url: " + connectionAddress.Error); + return connectionAddress.As(null); + } + + if (connectionAddress.Data != address) + log.Write(LogLevel.Debug, $"Connection address set to " + connectionAddress.Data); + // Create new socket - var socket = CreateSocket(address); - var socketConnection = new SocketConnection(this, apiClient, socket); + var socket = CreateSocket(connectionAddress.Data!); + var socketConnection = new SocketConnection(this, apiClient, socket, address); socketConnection.UnhandledMessage += HandleUnhandledMessage; foreach (var kvp in genericHandlers) { - var handler = SocketSubscription.CreateForIdentifier(NextId(), kvp.Key, false, kvp.Value); + var handler = SocketSubscription.CreateForIdentifier(NextId(), kvp.Key, false, false, kvp.Value); socketConnection.AddSubscription(handler); } - return socketConnection; + return new CallResult(socketConnection); } /// @@ -585,6 +611,23 @@ namespace CryptoExchange.Net return new CallResult(new CantConnectError()); } + /// + /// Get parameters for the websocket connection + /// + /// The address to connect to + /// + protected virtual WebSocketParameters GetWebSocketParameters(string address) + => new (new Uri(address), ClientOptions.AutoReconnect) + { + DataInterpreterBytes = dataInterpreterBytes, + DataInterpreterString = dataInterpreterString, + KeepAliveInterval = KeepAliveInterval, + ReconnectInterval = ClientOptions.ReconnectInterval, + RatelimitPerSecond = RateLimitPerSocketPerSecond, + Proxy = ClientOptions.Proxy, + Timeout = ClientOptions.SocketNoDataTimeout + }; + /// /// Create a socket for an address /// @@ -592,24 +635,8 @@ namespace CryptoExchange.Net /// protected virtual IWebsocket CreateSocket(string address) { - var socket = SocketFactory.CreateWebsocket(log, address); + var socket = SocketFactory.CreateWebsocket(log, GetWebSocketParameters(address)); log.Write(LogLevel.Debug, $"Socket {socket.Id} new socket created for " + address); - - if (ClientOptions.Proxy != null) - socket.SetProxy(ClientOptions.Proxy); - - socket.KeepAliveInterval = KeepAliveInterval; - socket.Timeout = ClientOptions.SocketNoDataTimeout; - socket.DataInterpreterBytes = dataInterpreterBytes; - socket.DataInterpreterString = dataInterpreterString; - socket.RatelimitPerSecond = RateLimitPerSocketPerSecond; - socket.OnError += e => - { - if(e is WebSocketException wse) - log.Write(LogLevel.Warning, $"Socket {socket.Id} error: Websocket error code {wse.WebSocketErrorCode}, details: " + e.ToLogString()); - else - log.Write(LogLevel.Warning, $"Socket {socket.Id} error: " + e.ToLogString()); - }; return socket; } @@ -667,7 +694,6 @@ namespace CryptoExchange.Net /// public virtual async Task UnsubscribeAsync(int subscriptionId) { - SocketSubscription? subscription = null; SocketConnection? connection = null; foreach(var socket in socketConnections.Values.ToList()) @@ -683,7 +709,7 @@ namespace CryptoExchange.Net if (subscription == null || connection == null) return; - log.Write(LogLevel.Information, "Closing subscription " + subscriptionId); + log.Write(LogLevel.Information, $"Socket {connection.SocketId} Unsubscribing subscription " + subscriptionId); await connection.CloseAsync(subscription).ConfigureAwait(false); } @@ -697,7 +723,7 @@ namespace CryptoExchange.Net if (subscription == null) throw new ArgumentNullException(nameof(subscription)); - log.Write(LogLevel.Information, "Closing subscription " + subscription.Id); + log.Write(LogLevel.Information, $"Socket {subscription.SocketId} Unsubscribing subscription " + subscription.Id); await subscription.CloseAsync().ConfigureAwait(false); } @@ -707,7 +733,7 @@ namespace CryptoExchange.Net /// public virtual async Task UnsubscribeAllAsync() { - log.Write(LogLevel.Information, $"Closing all {socketConnections.Sum(s => s.Value.SubscriptionCount)} subscriptions"); + log.Write(LogLevel.Information, $"Unsubscribing all {socketConnections.Sum(s => s.Value.SubscriptionCount)} subscriptions"); var tasks = new List(); { var socketList = socketConnections.Values; @@ -718,6 +744,39 @@ namespace CryptoExchange.Net await Task.WhenAll(tasks.ToArray()).ConfigureAwait(false); } + /// + /// Reconnect all connections + /// + /// + public virtual async Task ReconnectAsync() + { + log.Write(LogLevel.Information, $"Reconnecting all {socketConnections.Count} connections"); + var tasks = new List(); + { + var socketList = socketConnections.Values; + foreach (var sub in socketList) + tasks.Add(sub.TriggerReconnectAsync()); + } + + await Task.WhenAll(tasks.ToArray()).ConfigureAwait(false); + } + + /// + /// Log the current state of connections and subscriptions + /// + public string GetSubscriptionsState() + { + var sb = new StringBuilder(); + sb.AppendLine($"{socketConnections.Count} connections, {CurrentSubscriptions} subscriptions, kbps: {IncomingKbps}"); + foreach(var connection in socketConnections) + { + sb.AppendLine($" Connection {connection.Key}: {connection.Value.SubscriptionCount} subscriptions, status: {connection.Value.Status}, authenticated: {connection.Value.Authenticated}, kbps: {connection.Value.IncomingKbps}"); + foreach (var subscription in connection.Value.Subscriptions) + sb.AppendLine($" Subscription {subscription.Id}, authenticated: {subscription.Authenticated}, confirmed: {subscription.Confirmed}"); + } + return sb.ToString(); + } + /// /// Dispose the client /// @@ -726,8 +785,11 @@ namespace CryptoExchange.Net disposing = true; periodicEvent?.Set(); periodicEvent?.Dispose(); - log.Write(LogLevel.Debug, "Disposing socket client, closing all subscriptions"); - _ = UnsubscribeAllAsync(); + if (socketConnections.Sum(s => s.Value.SubscriptionCount) > 0) + { + log.Write(LogLevel.Debug, "Disposing socket client, closing all subscriptions"); + _ = UnsubscribeAllAsync(); + } semaphoreSlim?.Dispose(); base.Dispose(); } diff --git a/CryptoExchange.Net/CryptoExchange.Net.csproj b/CryptoExchange.Net/CryptoExchange.Net.csproj index 8f41879..ee36363 100644 --- a/CryptoExchange.Net/CryptoExchange.Net.csproj +++ b/CryptoExchange.Net/CryptoExchange.Net.csproj @@ -6,16 +6,16 @@ CryptoExchange.Net JKorf A base package for implementing cryptocurrency API's - 5.1.12 - 5.1.12 - 5.1.12 + 5.2.0 + 5.2.0 + 5.2.0 false git https://github.com/JKorf/CryptoExchange.Net.git https://github.com/JKorf/CryptoExchange.Net en true - 5.1.12 - Changed time sync so requests no longer wait for it to complete unless it's the first time, Made log client options changable after client creation, Fixed proxy setting not used when reconnecting socket, Changed MaxSocketConnections to a client options, Updated socket reconnection logic + 5.2.0 - Refactored websocket code, removed some clutter and simplified, Added ReconnectAsync and GetSubscriptionsState methods on socket clients enable 9.0 MIT diff --git a/CryptoExchange.Net/Interfaces/IWebsocket.cs b/CryptoExchange.Net/Interfaces/IWebsocket.cs index 9d40d86..a28df4c 100644 --- a/CryptoExchange.Net/Interfaces/IWebsocket.cs +++ b/CryptoExchange.Net/Interfaces/IWebsocket.cs @@ -27,36 +27,24 @@ namespace CryptoExchange.Net.Interfaces /// Websocket opened event /// event Action OnOpen; + /// + /// Websocket has lost connection to the server and is attempting to reconnect + /// + event Action OnReconnecting; + /// + /// Websocket has reconnected to the server + /// + event Action OnReconnected; /// /// Unique id for this socket /// int Id { get; } /// - /// Origin header - /// - string? Origin { get; set; } - /// - /// Encoding to use for sending/receiving string data - /// - Encoding? Encoding { get; set; } - /// - /// The max amount of outgoing messages per second - /// - int? RatelimitPerSecond { get; set; } - /// /// The current kilobytes per second of data being received, averaged over the last 3 seconds /// double IncomingKbps { get; } /// - /// Handler for byte data - /// - Func? DataInterpreterBytes { get; set; } - /// - /// Handler for string data - /// - Func? DataInterpreterString { get; set; } - /// /// The uri the socket connects to /// Uri Uri { get; } @@ -69,41 +57,20 @@ namespace CryptoExchange.Net.Interfaces /// bool IsOpen { get; } /// - /// Supported ssl protocols - /// - SslProtocols SSLProtocols { get; set; } - /// - /// The max time for no data being received before the connection is considered lost - /// - TimeSpan Timeout { get; set; } - /// - /// The interval at which to send a ping frame to the server - /// - TimeSpan KeepAliveInterval { get; set; } - /// - /// Set a proxy to use when connecting - /// - /// - void SetProxy(ApiProxy proxy); - /// /// Connect the socket /// /// - Task ConnectAsync(); - /// - /// Receive and send messages over the connection. Resulting task should complete when closing the socket. - /// - /// - Task ProcessAsync(); + Task ConnectAsync(); /// /// Send data /// /// void Send(string data); /// - /// Reset socket when a connection is lost to prepare for a new connection + /// Reconnect the socket /// - void Reset(); + /// + Task ReconnectAsync(); /// /// Close the connection /// diff --git a/CryptoExchange.Net/Interfaces/IWebsocketFactory.cs b/CryptoExchange.Net/Interfaces/IWebsocketFactory.cs index 809c624..7d638a3 100644 --- a/CryptoExchange.Net/Interfaces/IWebsocketFactory.cs +++ b/CryptoExchange.Net/Interfaces/IWebsocketFactory.cs @@ -1,5 +1,5 @@ -using System.Collections.Generic; -using CryptoExchange.Net.Logging; +using CryptoExchange.Net.Logging; +using CryptoExchange.Net.Sockets; namespace CryptoExchange.Net.Interfaces { @@ -12,17 +12,8 @@ namespace CryptoExchange.Net.Interfaces /// Create a websocket for an url /// /// The logger - /// The url the socket is fo + /// The parameters to use for the connection /// - IWebsocket CreateWebsocket(Log log, string url); - /// - /// Create a websocket for an url - /// - /// The logger - /// The url the socket is fo - /// Cookies to be send in the initial request - /// Headers to be send in the initial request - /// - IWebsocket CreateWebsocket(Log log, string url, IDictionary cookies, IDictionary headers); + IWebsocket CreateWebsocket(Log log, WebSocketParameters parameters); } } diff --git a/CryptoExchange.Net/Objects/Options.cs b/CryptoExchange.Net/Objects/Options.cs index 0494e66..5c727cc 100644 --- a/CryptoExchange.Net/Objects/Options.cs +++ b/CryptoExchange.Net/Objects/Options.cs @@ -176,16 +176,6 @@ namespace CryptoExchange.Net.Objects /// public TimeSpan ReconnectInterval { get; set; } = TimeSpan.FromSeconds(5); - /// - /// The maximum number of times to try to reconnect, default null will retry indefinitely - /// - public int? MaxReconnectTries { get; set; } - - /// - /// The maximum number of times to try to resubscribe after reconnecting - /// - public int? MaxResubscribeTries { get; set; } - /// /// Max number of concurrent resubscription tasks per socket after reconnecting a socket /// @@ -232,8 +222,6 @@ namespace CryptoExchange.Net.Objects AutoReconnect = baseOptions.AutoReconnect; ReconnectInterval = baseOptions.ReconnectInterval; - MaxReconnectTries = baseOptions.MaxReconnectTries; - MaxResubscribeTries = baseOptions.MaxResubscribeTries; MaxConcurrentResubscriptionsPerSocket = baseOptions.MaxConcurrentResubscriptionsPerSocket; SocketResponseTimeout = baseOptions.SocketResponseTimeout; SocketNoDataTimeout = baseOptions.SocketNoDataTimeout; @@ -244,7 +232,7 @@ namespace CryptoExchange.Net.Objects /// public override string ToString() { - return $"{base.ToString()}, AutoReconnect: {AutoReconnect}, ReconnectInterval: {ReconnectInterval}, MaxReconnectTries: {MaxReconnectTries}, MaxResubscribeTries: {MaxResubscribeTries}, MaxConcurrentResubscriptionsPerSocket: {MaxConcurrentResubscriptionsPerSocket}, SocketResponseTimeout: {SocketResponseTimeout:c}, SocketNoDataTimeout: {SocketNoDataTimeout}, SocketSubscriptionsCombineTarget: {SocketSubscriptionsCombineTarget}, MaxSocketConnections: {MaxSocketConnections}"; + return $"{base.ToString()}, AutoReconnect: {AutoReconnect}, ReconnectInterval: {ReconnectInterval}, MaxConcurrentResubscriptionsPerSocket: {MaxConcurrentResubscriptionsPerSocket}, SocketResponseTimeout: {SocketResponseTimeout:c}, SocketNoDataTimeout: {SocketNoDataTimeout}, SocketSubscriptionsCombineTarget: {SocketSubscriptionsCombineTarget}, MaxSocketConnections: {MaxSocketConnections}"; } } diff --git a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs index 92f5670..dddaa2f 100644 --- a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs +++ b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs @@ -5,13 +5,10 @@ using Microsoft.Extensions.Logging; using System; using System.Collections.Concurrent; using System.Collections.Generic; -using System.Diagnostics; using System.IO; using System.Linq; using System.Net; using System.Net.WebSockets; -using System.Security.Authentication; -using System.Text; using System.Threading; using System.Threading.Tasks; @@ -22,21 +19,32 @@ namespace CryptoExchange.Net.Sockets /// public class CryptoExchangeWebSocketClient : IWebsocket { + enum ProcessState + { + Idle, + Processing, + WaitingForClose, + Reconnecting + } + internal static int lastStreamId; private static readonly object streamIdLock = new(); - private ClientWebSocket _socket; private readonly AsyncResetEvent _sendEvent; private readonly ConcurrentQueue _sendBuffer; - private readonly IDictionary cookies; - private readonly IDictionary headers; - private CancellationTokenSource _ctsSource; - private ApiProxy? _proxy; - + private readonly SemaphoreSlim _closeSem; + private readonly WebSocketParameters _parameters; private readonly List _outgoingMessages; + + private ClientWebSocket _socket; + private CancellationTokenSource _ctsSource; private DateTime _lastReceivedMessagesUpdate; - private bool _closed; + private Task? _processTask; + private Task? _closeTask; + private bool _stopRequested; private bool _disposed; + private ProcessState _processState; + /// /// Received messages, the size and the timstamp @@ -51,48 +59,18 @@ namespace CryptoExchange.Net.Sockets /// /// Log /// - protected Log log; - - /// - /// Handlers for when an error happens on the socket - /// - protected readonly List> errorHandlers = new(); - /// - /// Handlers for when the socket connection is opened - /// - protected readonly List openHandlers = new(); - /// - /// Handlers for when the connection is closed - /// - protected readonly List closeHandlers = new(); - /// - /// Handlers for when a message is received - /// - protected readonly List> messageHandlers = new(); + protected Log _log; /// public int Id { get; } - /// - public string? Origin { get; set; } - /// /// The timestamp this socket has been active for the last time /// public DateTime LastActionTime { get; private set; } - - /// - /// Delegate used for processing byte data received from socket connections before it is processed by handlers - /// - public Func? DataInterpreterBytes { get; set; } - - /// - /// Delegate used for processing string data received from socket connections before it is processed by handlers - /// - public Func? DataInterpreterString { get; set; } - + /// - public Uri Uri { get; } + public Uri Uri => _parameters.Uri; /// public bool IsClosed => _socket.State == WebSocketState.Closed; @@ -100,34 +78,6 @@ namespace CryptoExchange.Net.Sockets /// public bool IsOpen => _socket.State == WebSocketState.Open && !_ctsSource.IsCancellationRequested; - /// - /// Ssl protocols supported. NOT USED BY THIS IMPLEMENTATION - /// - public SslProtocols SSLProtocols { get; set; } - - private Encoding _encoding = Encoding.UTF8; - /// - public Encoding? Encoding - { - get => _encoding; - set - { - if(value != null) - _encoding = value; - } - } - - /// - /// The max amount of outgoing messages per second - /// - public int? RatelimitPerSecond { get; set; } - - /// - public TimeSpan Timeout { get; set; } - - /// - public TimeSpan KeepAliveInterval { get; set; } - /// public double IncomingKbps { @@ -146,57 +96,29 @@ namespace CryptoExchange.Net.Sockets } /// - public event Action OnClose - { - add => closeHandlers.Add(value); - remove => closeHandlers.Remove(value); - } - + public event Action? OnClose; /// - public event Action OnMessage - { - add => messageHandlers.Add(value); - remove => messageHandlers.Remove(value); - } - + public event Action? OnMessage; /// - public event Action OnError - { - add => errorHandlers.Add(value); - remove => errorHandlers.Remove(value); - } - + public event Action? OnError; /// - public event Action OnOpen - { - add => openHandlers.Add(value); - remove => openHandlers.Remove(value); - } + public event Action? OnOpen; + /// + public event Action? OnReconnecting; + /// + public event Action? OnReconnected; /// /// ctor /// /// The log object to use - /// The uri the socket should connect to - public CryptoExchangeWebSocketClient(Log log, Uri uri) : this(log, uri, new Dictionary(), new Dictionary()) - { - } - - /// - /// ctor - /// - /// The log object to use - /// The uri the socket should connect to - /// Cookies to sent in the socket connection request - /// Headers to sent in the socket connection request - public CryptoExchangeWebSocketClient(Log log, Uri uri, IDictionary cookies, IDictionary headers) + /// The parameters for this socket + public CryptoExchangeWebSocketClient(Log log, WebSocketParameters websocketParameters) { Id = NextStreamId(); - this.log = log; - Uri = uri; - this.cookies = cookies; - this.headers = headers; + _log = log; + _parameters = websocketParameters; _outgoingMessages = new List(); _receivedMessages = new List(); _sendEvent = new AsyncResetEvent(); @@ -204,90 +126,184 @@ namespace CryptoExchange.Net.Sockets _ctsSource = new CancellationTokenSource(); _receivedMessagesLock = new object(); + _closeSem = new SemaphoreSlim(1, 1); _socket = CreateSocket(); } - /// - public virtual void SetProxy(ApiProxy proxy) - { - _proxy = proxy; - - if (!Uri.TryCreate($"{proxy.Host}:{proxy.Port}", UriKind.Absolute, out var uri)) - throw new ArgumentException("Proxy settings invalid, {proxy.Host}:{proxy.Port} not a valid URI", nameof(proxy)); - - _socket.Options.Proxy = uri?.Scheme == null - ? _socket.Options.Proxy = new WebProxy(proxy.Host, proxy.Port) - : _socket.Options.Proxy = new WebProxy - { - Address = uri - }; - - if (proxy.Login != null) - _socket.Options.Proxy.Credentials = new NetworkCredential(proxy.Login, proxy.Password); - } - /// public virtual async Task ConnectAsync() { - log.Write(LogLevel.Debug, $"Socket {Id} connecting"); + if (!await ConnectInternalAsync().ConfigureAwait(false)) + return false; + + OnOpen?.Invoke(); + _processTask = ProcessAsync(); + return true; + } + + /// + /// Create the socket object + /// + private ClientWebSocket CreateSocket() + { + var cookieContainer = new CookieContainer(); + foreach (var cookie in _parameters.Cookies) + cookieContainer.Add(new Cookie(cookie.Key, cookie.Value)); + + var socket = new ClientWebSocket(); + socket.Options.Cookies = cookieContainer; + foreach (var header in _parameters.Headers) + socket.Options.SetRequestHeader(header.Key, header.Value); + socket.Options.KeepAliveInterval = _parameters.KeepAliveInterval ?? TimeSpan.Zero; + socket.Options.SetBuffer(65536, 65536); // Setting it to anything bigger than 65536 throws an exception in .net framework + if (_parameters.Proxy != null) + SetProxy(_parameters.Proxy); + return socket; + } + + private async Task ConnectInternalAsync() + { + _log.Write(LogLevel.Debug, $"Socket {Id} connecting"); try { - using CancellationTokenSource tcs = new(TimeSpan.FromSeconds(10)); + using CancellationTokenSource tcs = new(TimeSpan.FromSeconds(10)); await _socket.ConnectAsync(Uri, tcs.Token).ConfigureAwait(false); - - Handle(openHandlers); } catch (Exception e) { - log.Write(LogLevel.Debug, $"Socket {Id} connection failed: " + e.ToLogString()); + _log.Write(LogLevel.Debug, $"Socket {Id} connection failed: " + e.ToLogString()); return false; } - - log.Write(LogLevel.Debug, $"Socket {Id} connected to {Uri}"); + + _log.Write(LogLevel.Debug, $"Socket {Id} connected to {Uri}"); return true; } /// - public virtual async Task ProcessAsync() + private async Task ProcessAsync() { - log.Write(LogLevel.Trace, $"Socket {Id} ProcessAsync started"); - var sendTask = SendLoopAsync(); - var receiveTask = ReceiveLoopAsync(); - var timeoutTask = Timeout != default ? CheckTimeoutAsync() : Task.CompletedTask; - log.Write(LogLevel.Trace, $"Socket {Id} processing startup completed"); - await Task.WhenAll(sendTask, receiveTask, timeoutTask).ConfigureAwait(false); - log.Write(LogLevel.Trace, $"Socket {Id} ProcessAsync finished"); + while (!_stopRequested) + { + _log.Write(LogLevel.Debug, $"Socket {Id} starting processing tasks"); + _processState = ProcessState.Processing; + var sendTask = SendLoopAsync(); + var receiveTask = ReceiveLoopAsync(); + var timeoutTask = _parameters.Timeout != null && _parameters.Timeout > TimeSpan.FromSeconds(0) ? CheckTimeoutAsync() : Task.CompletedTask; + await Task.WhenAll(sendTask, receiveTask, timeoutTask).ConfigureAwait(false); + _log.Write(LogLevel.Debug, $"Socket {Id} processing tasks finished"); + + _processState = ProcessState.WaitingForClose; + while (_closeTask == null) + await Task.Delay(50).ConfigureAwait(false); + + await _closeTask.ConfigureAwait(false); + _closeTask = null; + + if (!_parameters.AutoReconnect) + { + _processState = ProcessState.Idle; + OnClose?.Invoke(); + return; + } + + if (!_stopRequested) + { + _processState = ProcessState.Reconnecting; + OnReconnecting?.Invoke(); + } + + while (!_stopRequested) + { + _log.Write(LogLevel.Debug, $"Socket {Id} attempting to reconnect"); + _socket = CreateSocket(); + _ctsSource.Dispose(); + _ctsSource = new CancellationTokenSource(); + while (_sendBuffer.TryDequeue(out _)) { } // Clear send buffer + + var connected = await ConnectInternalAsync().ConfigureAwait(false); + if (!connected) + { + await Task.Delay(_parameters.ReconnectInterval).ConfigureAwait(false); + continue; + } + + OnReconnected?.Invoke(); + break; + } + } + + _processState = ProcessState.Idle; } /// public virtual void Send(string data) { if (_ctsSource.IsCancellationRequested) - throw new InvalidOperationException($"Socket {Id} Can't send data when socket is not connected"); + return; - var bytes = _encoding.GetBytes(data); - log.Write(LogLevel.Trace, $"Socket {Id} Adding {bytes.Length} to sent buffer"); + var bytes = _parameters.Encoding.GetBytes(data); + _log.Write(LogLevel.Trace, $"Socket {Id} Adding {bytes.Length} to sent buffer"); _sendBuffer.Enqueue(bytes); _sendEvent.Set(); } + /// + public virtual async Task ReconnectAsync() + { + if (_processState != ProcessState.Processing) + return; + + _log.Write(LogLevel.Debug, $"Socket {Id} reconnect requested"); + _closeTask = CloseInternalAsync(); + await _closeTask.ConfigureAwait(false); + } + /// public virtual async Task CloseAsync() { - log.Write(LogLevel.Debug, $"Socket {Id} closing"); - await CloseInternalAsync().ConfigureAwait(false); + await _closeSem.WaitAsync().ConfigureAwait(false); + try + { + if (_closeTask != null && !_closeTask.IsCompleted) + { + _log.Write(LogLevel.Debug, $"Socket {Id} CloseAsync() waiting for existing close task"); + await _closeTask.ConfigureAwait(false); + return; + } + + _stopRequested = true; + + if (!IsOpen) + { + _log.Write(LogLevel.Debug, $"Socket {Id} CloseAsync() socket not open"); + return; + } + + _log.Write(LogLevel.Debug, $"Socket {Id} closing"); + _closeTask = CloseInternalAsync(); + } + finally + { + _closeSem.Release(); + } + + await _closeTask.ConfigureAwait(false); + if(_processTask != null) + await _processTask.ConfigureAwait(false); + OnClose?.Invoke(); + _log.Write(LogLevel.Debug, $"Socket {Id} closed"); } - + /// /// Internal close method /// /// private async Task CloseInternalAsync() { - if (_closed || _disposed) + if (_disposed) return; - _closed = true; + //_closeState = CloseState.Closing; _ctsSource.Cancel(); _sendEvent.Set(); @@ -297,8 +313,12 @@ namespace CryptoExchange.Net.Sockets { await _socket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Closing", default).ConfigureAwait(false); } - catch(Exception) - { } // Can sometimes throw an exception when socket is in aborted state due to timing + catch (Exception) + { + // Can sometimes throw an exception when socket is in aborted state due to timing + // Websocket is set to Aborted state when the cancelation token is set during SendAsync/ReceiveAsync + // So socket might go to aborted state, might still be open + } } else if(_socket.State == WebSocketState.CloseReceived) { @@ -307,10 +327,12 @@ namespace CryptoExchange.Net.Sockets await _socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing", default).ConfigureAwait(false); } catch (Exception) - { } // Can sometimes throw an exception when socket is in aborted state due to timing + { + // Can sometimes throw an exception when socket is in aborted state due to timing + // Websocket is set to Aborted state when the cancelation token is set during SendAsync/ReceiveAsync + // So socket might go to aborted state, might still be open + } } - log.Write(LogLevel.Debug, $"Socket {Id} closed"); - Handle(closeHandlers); } /// @@ -321,48 +343,11 @@ namespace CryptoExchange.Net.Sockets if (_disposed) return; - log.Write(LogLevel.Debug, $"Socket {Id} disposing"); + _log.Write(LogLevel.Debug, $"Socket {Id} disposing"); _disposed = true; _socket.Dispose(); _ctsSource.Dispose(); - - errorHandlers.Clear(); - openHandlers.Clear(); - closeHandlers.Clear(); - messageHandlers.Clear(); - log.Write(LogLevel.Trace, $"Socket {Id} disposed"); - } - - /// - public void Reset() - { - log.Write(LogLevel.Debug, $"Socket {Id} resetting"); - _ctsSource = new CancellationTokenSource(); - - while (_sendBuffer.TryDequeue(out _)) { } // Clear send buffer - - _socket = CreateSocket(); - if (_proxy != null) - SetProxy(_proxy); - _closed = false; - } - - /// - /// Create the socket object - /// - private ClientWebSocket CreateSocket() - { - var cookieContainer = new CookieContainer(); - foreach (var cookie in cookies) - cookieContainer.Add(new Cookie(cookie.Key, cookie.Value)); - - var socket = new ClientWebSocket(); - socket.Options.Cookies = cookieContainer; - foreach (var header in headers) - socket.Options.SetRequestHeader(header.Key, header.Value); - socket.Options.KeepAliveInterval = KeepAliveInterval; - socket.Options.SetBuffer(65536, 65536); // Setting it to anything bigger than 65536 throws an exception in .net framework - return socket; + _log.Write(LogLevel.Trace, $"Socket {Id} disposed"); } /// @@ -385,25 +370,25 @@ namespace CryptoExchange.Net.Sockets while (_sendBuffer.TryDequeue(out var data)) { - if (RatelimitPerSecond != null) + if (_parameters.RatelimitPerSecond != null) { // Wait for rate limit DateTime? start = null; - while (MessagesSentLastSecond() >= RatelimitPerSecond) + while (MessagesSentLastSecond() >= _parameters.RatelimitPerSecond) { start ??= DateTime.UtcNow; await Task.Delay(50).ConfigureAwait(false); } if (start != null) - log.Write(LogLevel.Trace, $"Socket {Id} sent delayed {Math.Round((DateTime.UtcNow - start.Value).TotalMilliseconds)}ms because of rate limit"); + _log.Write(LogLevel.Debug, $"Socket {Id} sent delayed {Math.Round((DateTime.UtcNow - start.Value).TotalMilliseconds)}ms because of rate limit"); } try { await _socket.SendAsync(new ArraySegment(data, 0, data.Length), WebSocketMessageType.Text, true, _ctsSource.Token).ConfigureAwait(false); _outgoingMessages.Add(DateTime.UtcNow); - log.Write(LogLevel.Trace, $"Socket {Id} sent {data.Length} bytes"); + _log.Write(LogLevel.Trace, $"Socket {Id} sent {data.Length} bytes"); } catch (OperationCanceledException) { @@ -412,9 +397,9 @@ namespace CryptoExchange.Net.Sockets } catch (Exception ioe) { - // Connection closed unexpectedly, .NET framework - Handle(errorHandlers, ioe); - await CloseInternalAsync().ConfigureAwait(false); + // Connection closed unexpectedly, .NET framework + OnError?.Invoke(ioe); + _closeTask = CloseInternalAsync(); break; } } @@ -425,12 +410,12 @@ namespace CryptoExchange.Net.Sockets // Because this is running in a separate task and not awaited until the socket gets closed // any exception here will crash the send processing, but do so silently unless the socket get's stopped. // Make sure we at least let the owner know there was an error - Handle(errorHandlers, e); + OnError?.Invoke(e); throw; } finally { - log.Write(LogLevel.Trace, $"Socket {Id} Send loop finished"); + _log.Write(LogLevel.Debug, $"Socket {Id} Send loop finished"); } } @@ -468,17 +453,17 @@ namespace CryptoExchange.Net.Sockets } catch (Exception wse) { - // Connection closed unexpectedly - Handle(errorHandlers, wse); - await CloseInternalAsync().ConfigureAwait(false); + // Connection closed unexpectedly + OnError?.Invoke(wse); + _closeTask = CloseInternalAsync(); break; } if (receiveResult.MessageType == WebSocketMessageType.Close) { // Connection closed unexpectedly - log.Write(LogLevel.Debug, $"Socket {Id} received `Close` message"); - await CloseInternalAsync().ConfigureAwait(false); + _log.Write(LogLevel.Debug, $"Socket {Id} received `Close` message"); + _closeTask = CloseInternalAsync(); break; } @@ -487,7 +472,7 @@ namespace CryptoExchange.Net.Sockets // We received data, but it is not complete, write it to a memory stream for reassembling multiPartMessage = true; memoryStream ??= new MemoryStream(); - log.Write(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in partial message"); + _log.Write(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in partial message"); await memoryStream.WriteAsync(buffer.Array, buffer.Offset, receiveResult.Count).ConfigureAwait(false); } else @@ -495,13 +480,13 @@ namespace CryptoExchange.Net.Sockets if (!multiPartMessage) { // Received a complete message and it's not multi part - log.Write(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in single message"); + _log.Write(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in single message"); HandleMessage(buffer.Array!, buffer.Offset, receiveResult.Count, receiveResult.MessageType); } else { // Received the end of a multipart message, write to memory stream for reassembling - log.Write(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in partial message"); + _log.Write(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in partial message"); await memoryStream!.WriteAsync(buffer.Array, buffer.Offset, receiveResult.Count).ConfigureAwait(false); } break; @@ -529,12 +514,12 @@ namespace CryptoExchange.Net.Sockets if (receiveResult?.EndOfMessage == true) { // Reassemble complete message from memory stream - log.Write(LogLevel.Trace, $"Socket {Id} reassembled message of {memoryStream!.Length} bytes"); + _log.Write(LogLevel.Trace, $"Socket {Id} reassembled message of {memoryStream!.Length} bytes"); HandleMessage(memoryStream!.ToArray(), 0, (int)memoryStream.Length, receiveResult.MessageType); memoryStream.Dispose(); } else - log.Write(LogLevel.Trace, $"Socket {Id} discarding incomplete message of {memoryStream!.Length} bytes"); + _log.Write(LogLevel.Trace, $"Socket {Id} discarding incomplete message of {memoryStream!.Length} bytes"); } } } @@ -543,12 +528,12 @@ namespace CryptoExchange.Net.Sockets // Because this is running in a separate task and not awaited until the socket gets closed // any exception here will crash the receive processing, but do so silently unless the socket gets stopped. // Make sure we at least let the owner know there was an error - Handle(errorHandlers, e); + OnError?.Invoke(e); throw; } finally { - log.Write(LogLevel.Trace, $"Socket {Id} Receive loop finished"); + _log.Write(LogLevel.Debug, $"Socket {Id} Receive loop finished"); } } @@ -564,54 +549,92 @@ namespace CryptoExchange.Net.Sockets string strData; if (messageType == WebSocketMessageType.Binary) { - if (DataInterpreterBytes == null) + if (_parameters.DataInterpreterBytes == null) throw new Exception("Byte interpreter not set while receiving byte data"); try { var relevantData = new byte[count]; Array.Copy(data, offset, relevantData, 0, count); - strData = DataInterpreterBytes(relevantData); + strData = _parameters.DataInterpreterBytes(relevantData); } catch(Exception e) { - log.Write(LogLevel.Error, $"Socket {Id} unhandled exception during byte data interpretation: " + e.ToLogString()); + _log.Write(LogLevel.Error, $"Socket {Id} unhandled exception during byte data interpretation: " + e.ToLogString()); return; } } else - strData = _encoding.GetString(data, offset, count); + strData = _parameters.Encoding.GetString(data, offset, count); - if (DataInterpreterString != null) + if (_parameters.DataInterpreterString != null) { try { - strData = DataInterpreterString(strData); + strData = _parameters.DataInterpreterString(strData); } catch(Exception e) { - log.Write(LogLevel.Error, $"Socket {Id} unhandled exception during string data interpretation: " + e.ToLogString()); + _log.Write(LogLevel.Error, $"Socket {Id} unhandled exception during string data interpretation: " + e.ToLogString()); return; } } try { - Handle(messageHandlers, strData); + LastActionTime = DateTime.UtcNow; + OnMessage?.Invoke(strData); } catch(Exception e) { - log.Write(LogLevel.Error, $"Socket {Id} unhandled exception during message processing: " + e.ToLogString()); + _log.Write(LogLevel.Error, $"Socket {Id} unhandled exception during message processing: " + e.ToLogString()); } } + /// + /// Trigger the OnMessage event + /// + /// + protected void TriggerOnMessage(string data) + { + LastActionTime = DateTime.UtcNow; + OnMessage?.Invoke(data); + } + + /// + /// Trigger the OnError event + /// + /// + protected void TriggerOnError(Exception ex) => OnError?.Invoke(ex); + + /// + /// Trigger the OnError event + /// + protected void TriggerOnOpen() => OnOpen?.Invoke(); + + /// + /// Trigger the OnError event + /// + protected void TriggerOnClose() => OnClose?.Invoke(); + + /// + /// Trigger the OnReconnecting event + /// + protected void TriggerOnReconnecting() => OnReconnecting?.Invoke(); + + /// + /// Trigger the OnReconnected event + /// + protected void TriggerOnReconnected() => OnReconnected?.Invoke(); + /// /// Checks if there is no data received for a period longer than the specified timeout /// /// protected async Task CheckTimeoutAsync() { - log.Write(LogLevel.Debug, $"Socket {Id} Starting task checking for no data received for {Timeout}"); + _log.Write(LogLevel.Debug, $"Socket {Id} Starting task checking for no data received for {_parameters.Timeout}"); + LastActionTime = DateTime.UtcNow; try { while (true) @@ -619,9 +642,9 @@ namespace CryptoExchange.Net.Sockets if (_ctsSource.IsCancellationRequested) return; - if (DateTime.UtcNow - LastActionTime > Timeout) + if (DateTime.UtcNow - LastActionTime > _parameters.Timeout) { - log.Write(LogLevel.Warning, $"Socket {Id} No data received for {Timeout}, reconnecting socket"); + _log.Write(LogLevel.Warning, $"Socket {Id} No data received for {_parameters.Timeout}, reconnecting socket"); _ = CloseAsync().ConfigureAwait(false); return; } @@ -641,35 +664,11 @@ namespace CryptoExchange.Net.Sockets // Because this is running in a separate task and not awaited until the socket gets closed // any exception here will stop the timeout checking, but do so silently unless the socket get's stopped. // Make sure we at least let the owner know there was an error - Handle(errorHandlers, e); + OnError?.Invoke(e); throw; } } - /// - /// Helper to invoke handlers - /// - /// - protected void Handle(List handlers) - { - LastActionTime = DateTime.UtcNow; - foreach (var handle in new List(handlers)) - handle?.Invoke(); - } - - /// - /// Helper to invoke handlers - /// - /// - /// - /// - protected void Handle(List> handlers, T data) - { - LastActionTime = DateTime.UtcNow; - foreach (var handle in new List>(handlers)) - handle?.Invoke(data); - } - /// /// Get the next identifier /// @@ -705,6 +704,27 @@ namespace CryptoExchange.Net.Sockets _lastReceivedMessagesUpdate = checkTime; } } + + /// + /// Set proxy on socket + /// + /// + /// + protected virtual void SetProxy(ApiProxy proxy) + { + if (!Uri.TryCreate($"{proxy.Host}:{proxy.Port}", UriKind.Absolute, out var uri)) + throw new ArgumentException("Proxy settings invalid, {proxy.Host}:{proxy.Port} not a valid URI", nameof(proxy)); + + _socket.Options.Proxy = uri?.Scheme == null + ? _socket.Options.Proxy = new WebProxy(proxy.Host, proxy.Port) + : _socket.Options.Proxy = new WebProxy + { + Address = uri + }; + + if (proxy.Login != null) + _socket.Options.Proxy.Credentials = new NetworkCredential(proxy.Login, proxy.Password); + } } /// diff --git a/CryptoExchange.Net/Sockets/SocketConnection.cs b/CryptoExchange.Net/Sockets/SocketConnection.cs index 3e61a1d..512368c 100644 --- a/CryptoExchange.Net/Sockets/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/SocketConnection.cs @@ -3,13 +3,13 @@ using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; -using System.Threading; using System.Threading.Tasks; using CryptoExchange.Net.Logging; using Newtonsoft.Json; using Newtonsoft.Json.Linq; using Microsoft.Extensions.Logging; using CryptoExchange.Net.Objects; +using System.Net.WebSockets; namespace CryptoExchange.Net.Sockets { @@ -57,10 +57,22 @@ namespace CryptoExchange.Net.Sockets return subscriptions.Count(h => h.UserSubscription); } } + /// + /// Get a copy of the current subscriptions + /// + public SocketSubscription[] Subscriptions + { + get + { + lock (subscriptionLock) + return subscriptions.Where(h => h.UserSubscription).ToArray(); + } + } + /// /// If the connection has been authenticated /// - public bool Authenticated { get; set; } + public bool Authenticated { get; internal set; } /// /// If connection is made @@ -80,28 +92,13 @@ namespace CryptoExchange.Net.Sockets /// /// The connection uri /// - public Uri Uri => _socket.Uri; + public Uri ConnectionUri => _socket.Uri; /// /// The API client the connection is for /// public SocketApiClient ApiClient { get; set; } - /// - /// If the socket should be reconnected upon closing - /// - public bool ShouldReconnect { get; set; } - - /// - /// Current reconnect try, reset when a successful connection is made - /// - public int ReconnectTry { get; set; } - - /// - /// Current resubscribe try, reset when a successful connection is made - /// - public int ResubscribeTry { get; set; } - /// /// Time of disconnecting /// @@ -110,7 +107,7 @@ namespace CryptoExchange.Net.Sockets /// /// Tag for identificaion /// - public string? Tag { get; set; } + public string Tag { get; set; } /// /// If activity is paused @@ -124,27 +121,12 @@ namespace CryptoExchange.Net.Sockets { pausedActivity = value; log.Write(LogLevel.Information, $"Socket {SocketId} Paused activity: " + value); - if(pausedActivity) ActivityPaused?.Invoke(); - else ActivityUnpaused?.Invoke(); + if(pausedActivity) _ = Task.Run(() => ActivityPaused?.Invoke()); + else _ = Task.Run(() => ActivityUnpaused?.Invoke()); } } } - private bool pausedActivity; - private readonly List subscriptions; - private readonly object subscriptionLock = new(); - - private bool lostTriggered; - private readonly Log log; - private readonly BaseSocketClient socketClient; - - private readonly List pendingRequests; - private Task? _socketProcessTask; - private Task? _socketReconnectTask; - private readonly AsyncResetEvent _reconnectWaitEvent; - - private SocketStatus _status; - /// /// Status of the socket connection /// @@ -153,12 +135,26 @@ namespace CryptoExchange.Net.Sockets get => _status; private set { + if (_status == value) + return; + var oldStatus = _status; _status = value; - log.Write(LogLevel.Trace, $"Socket {SocketId} status changed from {oldStatus} to {_status}"); + log.Write(LogLevel.Debug, $"Socket {SocketId} status changed from {oldStatus} to {_status}"); } } + private bool pausedActivity; + private readonly List subscriptions; + private readonly object subscriptionLock = new(); + + private readonly Log log; + private readonly BaseSocketClient socketClient; + + private readonly List pendingRequests; + + private SocketStatus _status; + /// /// The underlying websocket /// @@ -170,137 +166,72 @@ namespace CryptoExchange.Net.Sockets /// The socket client /// The api client /// The socket - public SocketConnection(BaseSocketClient client, SocketApiClient apiClient, IWebsocket socket) + /// + public SocketConnection(BaseSocketClient client, SocketApiClient apiClient, IWebsocket socket, string tag) { log = client.log; socketClient = client; ApiClient = apiClient; + Tag = tag; pendingRequests = new List(); - subscriptions = new List(); + _socket = socket; - - _reconnectWaitEvent = new AsyncResetEvent(false, true); - - _socket.Timeout = client.ClientOptions.SocketNoDataTimeout; - _socket.OnMessage += ProcessMessage; - _socket.OnOpen += SocketOnOpen; - _socket.OnClose += () => _reconnectWaitEvent.Set(); - + _socket.OnMessage += HandleMessage; + _socket.OnOpen += HandleOpen; + _socket.OnClose += HandleClose; + _socket.OnReconnecting += HandleReconnecting; + _socket.OnReconnected += HandleReconnected; + _socket.OnError += HandleError; } - + /// - /// Connect the websocket and start processing + /// Handler for a socket opening /// - /// - public async Task ConnectAsync() + protected virtual void HandleOpen() { - var connected = await _socket.ConnectAsync().ConfigureAwait(false); - if (connected) + Status = SocketStatus.Connected; + PausedActivity = false; + } + + /// + /// Handler for a socket closing without reconnect + /// + protected virtual void HandleClose() + { + Status = SocketStatus.Closed; + Authenticated = false; + lock(subscriptionLock) { - Status = SocketStatus.Connected; - _socketReconnectTask = ReconnectWatcherAsync(); - _socketProcessTask = _socket.ProcessAsync(); - } - - return connected; + foreach (var sub in subscriptions) + sub.Confirmed = false; + } + Task.Run(() => ConnectionClosed?.Invoke()); } /// - /// Retrieve the underlying socket + /// Handler for a socket losing conenction and starting reconnect /// - /// - public IWebsocket GetSocket() + protected virtual void HandleReconnecting() { - return _socket; - } - - /// - /// Trigger a reconnect of the socket connection - /// - /// - public async Task TriggerReconnectAsync() - { - await _socket.CloseAsync().ConfigureAwait(false); - } - - /// - /// Close the connection - /// - /// - public async Task CloseAsync() - { - if (Status == SocketStatus.Closed || Status == SocketStatus.Disposed) - return; - - ShouldReconnect = false; - if (socketClient.socketConnections.ContainsKey(SocketId)) - socketClient.socketConnections.TryRemove(SocketId, out _); - + Status = SocketStatus.Reconnecting; + DisconnectTime = DateTime.UtcNow; + Authenticated = false; lock (subscriptionLock) { - foreach (var subscription in subscriptions) - { - if (subscription.CancellationTokenRegistration.HasValue) - subscription.CancellationTokenRegistration.Value.Dispose(); - } + foreach (var sub in subscriptions) + sub.Confirmed = false; } - - while (Status == SocketStatus.Reconnecting) - // Wait for reconnecting to finish - await Task.Delay(100).ConfigureAwait(false); - - await _socket.CloseAsync().ConfigureAwait(false); - if(_socketProcessTask != null) - await _socketProcessTask.ConfigureAwait(false); - _socket.Dispose(); + Task.Run(() => ConnectionLost?.Invoke()); } /// - /// Close a subscription on this connection. If all subscriptions on this connection are closed the connection gets closed as well + /// Handler for a socket which has reconnected /// - /// Subscription to close - /// - public async Task CloseAsync(SocketSubscription subscription) + protected virtual async void HandleReconnected() { - if (Status == SocketStatus.Closing || Status == SocketStatus.Closed || Status == SocketStatus.Disposed) - return; - - log.Write(LogLevel.Trace, $"Socket {SocketId} closing subscription {subscription.Id}"); - if (subscription.CancellationTokenRegistration.HasValue) - subscription.CancellationTokenRegistration.Value.Dispose(); - - if (subscription.Confirmed && _socket.IsOpen) - await socketClient.UnsubscribeAsync(this, subscription).ConfigureAwait(false); - - bool shouldCloseConnection; - lock (subscriptionLock) - { - if (Status == SocketStatus.Closing) - { - log.Write(LogLevel.Trace, $"Socket {SocketId} already closing"); - return; - } - - shouldCloseConnection = subscriptions.All(r => !r.UserSubscription || r == subscription); - if (shouldCloseConnection) - Status = SocketStatus.Closing; - } - - if (shouldCloseConnection) - { - log.Write(LogLevel.Trace, $"Socket {SocketId} closing as there are no more subscriptions"); - await CloseAsync().ConfigureAwait(false); - } - - lock (subscriptionLock) - subscriptions.Remove(subscription); - } - - private async Task ReconnectAsync() - { - // Fail all pending requests + Status = SocketStatus.Resubscribing; lock (pendingRequests) { foreach (var pendingRequest in pendingRequests.ToList()) @@ -310,137 +241,37 @@ namespace CryptoExchange.Net.Sockets } } - if (socketClient.ClientOptions.AutoReconnect && ShouldReconnect) - { - // Should reconnect - DisconnectTime = DateTime.UtcNow; - log.Write(LogLevel.Warning, $"Socket {SocketId} Connection lost, will try to reconnect"); - if (!lostTriggered) - { - lostTriggered = true; - _ = Task.Run(() => ConnectionLost?.Invoke()); - } - - while (ShouldReconnect) - { - if (ReconnectTry > 0) - { - // Wait a bit before attempting reconnect - await Task.Delay(socketClient.ClientOptions.ReconnectInterval).ConfigureAwait(false); - } - - if (!ShouldReconnect) - { - // Should reconnect changed to false while waiting to reconnect - return; - } - - _socket.Reset(); - if (!await _socket.ConnectAsync().ConfigureAwait(false)) - { - // Reconnect failed - ReconnectTry++; - ResubscribeTry = 0; - if (socketClient.ClientOptions.MaxReconnectTries != null - && ReconnectTry >= socketClient.ClientOptions.MaxReconnectTries) - { - log.Write(LogLevel.Warning, $"Socket {SocketId} failed to reconnect after {ReconnectTry} tries, closing"); - ShouldReconnect = false; - - if (socketClient.socketConnections.ContainsKey(SocketId)) - socketClient.socketConnections.TryRemove(SocketId, out _); - - _ = Task.Run(() => ConnectionClosed?.Invoke()); - // Reached max tries, break loop and leave connection closed - break; - } - - // Continue to try again - log.Write(LogLevel.Debug, $"Socket {SocketId} failed to reconnect{(socketClient.ClientOptions.MaxReconnectTries != null ? $", try {ReconnectTry}/{socketClient.ClientOptions.MaxReconnectTries}" : "")}, will try again in {socketClient.ClientOptions.ReconnectInterval}"); - continue; - } - - // Successfully reconnected, start processing - Status = SocketStatus.Connected; - _socketProcessTask = _socket.ProcessAsync(); - - ReconnectTry = 0; - var time = DisconnectTime; - DisconnectTime = null; - - log.Write(LogLevel.Information, $"Socket {SocketId} reconnected after {DateTime.UtcNow - time}"); - - var reconnectResult = await ProcessReconnectAsync().ConfigureAwait(false); - if (!reconnectResult) - { - // Failed to resubscribe everything - ResubscribeTry++; - DisconnectTime = time; - - if (socketClient.ClientOptions.MaxResubscribeTries != null && - ResubscribeTry >= socketClient.ClientOptions.MaxResubscribeTries) - { - log.Write(LogLevel.Warning, $"Socket {SocketId} failed to resubscribe after {ResubscribeTry} tries, closing. Last resubscription error: {reconnectResult.Error}"); - ShouldReconnect = false; - - if (socketClient.socketConnections.ContainsKey(SocketId)) - socketClient.socketConnections.TryRemove(SocketId, out _); - - _ = Task.Run(() => ConnectionClosed?.Invoke()); - } - else - log.Write(LogLevel.Debug, $"Socket {SocketId} resubscribing all subscriptions failed on reconnected socket{(socketClient.ClientOptions.MaxResubscribeTries != null ? $", try {ResubscribeTry}/{socketClient.ClientOptions.MaxResubscribeTries}" : "")}. Error: {reconnectResult.Error}. Disconnecting and reconnecting."); - - // Failed resubscribe, close socket if it is still open - if (_socket.IsOpen) - await _socket.CloseAsync().ConfigureAwait(false); - else - DisconnectTime = DateTime.UtcNow; - - // Break out of the loop, the new processing task should reconnect again - break; - } - else - { - // Succesfully reconnected - log.Write(LogLevel.Information, $"Socket {SocketId} data connection restored."); - ResubscribeTry = 0; - if (lostTriggered) - { - lostTriggered = false; - _ = Task.Run(() => ConnectionRestored?.Invoke(time.HasValue ? DateTime.UtcNow - time.Value : TimeSpan.FromSeconds(0))); - } - - break; - } - } - } + var reconnectSuccessful = await ProcessReconnectAsync().ConfigureAwait(false); + if (!reconnectSuccessful) + await _socket.ReconnectAsync().ConfigureAwait(false); else { - if (!socketClient.ClientOptions.AutoReconnect && ShouldReconnect) - _ = Task.Run(() => ConnectionClosed?.Invoke()); - - // No reconnecting needed - log.Write(LogLevel.Information, $"Socket {SocketId} closed"); - if (socketClient.socketConnections.ContainsKey(SocketId)) - socketClient.socketConnections.TryRemove(SocketId, out _); + Status = SocketStatus.Connected; + _ = Task.Run(() => + { + ConnectionRestored?.Invoke(DateTime.UtcNow - DisconnectTime!.Value); + DisconnectTime = null; + }); } } /// - /// Dispose the connection + /// Handler for an error on a websocket /// - public void Dispose() + /// The exception + protected virtual void HandleError(Exception e) { - Status = SocketStatus.Disposed; - _socket.Dispose(); + if (e is WebSocketException wse) + log.Write(LogLevel.Warning, $"Socket {SocketId} error: Websocket error code {wse.WebSocketErrorCode}, details: " + e.ToLogString()); + else + log.Write(LogLevel.Warning, $"Socket {SocketId} error: " + e.ToLogString()); } /// /// Process a message received by the socket /// /// The received data - private void ProcessMessage(string data) + protected virtual void HandleMessage(string data) { var timestamp = DateTime.UtcNow; log.Write(LogLevel.Trace, $"Socket {SocketId} received data: " + data); @@ -456,15 +287,13 @@ namespace CryptoExchange.Net.Sockets } var handledResponse = false; - PendingRequest[] requests; - lock(pendingRequests) - requests = pendingRequests.ToArray(); // Remove any timed out requests - foreach (var request in requests.Where(r => r.Completed)) + PendingRequest[] requests; + lock (pendingRequests) { - lock (pendingRequests) - pendingRequests.Remove(request); + pendingRequests.RemoveAll(r => r.Completed); + requests = pendingRequests.ToArray(); } // Check if this message is an answer on any pending requests @@ -472,7 +301,7 @@ namespace CryptoExchange.Net.Sockets { if (pendingRequest.CheckData(tokenData)) { - lock (pendingRequests) + lock (pendingRequests) pendingRequests.Remove(pendingRequest); if (!socketClient.ContinueOnQueryResponse) @@ -484,8 +313,8 @@ namespace CryptoExchange.Net.Sockets } // Message was not a request response, check data handlers - var messageEvent = new MessageEvent(this, tokenData, socketClient.ClientOptions.OutputOriginalData ? data: null, timestamp); - var (handled, userProcessTime) = HandleData(messageEvent); + var messageEvent = new MessageEvent(this, tokenData, socketClient.ClientOptions.OutputOriginalData ? data : null, timestamp); + var (handled, userProcessTime, subscription) = HandleData(messageEvent); if (!handled && !handledResponse) { if (!socketClient.UnhandledMessageExpected) @@ -495,10 +324,108 @@ namespace CryptoExchange.Net.Sockets var total = DateTime.UtcNow - timestamp; if (userProcessTime.TotalMilliseconds > 500) - log.Write(LogLevel.Debug, $"Socket {SocketId} message processing slow ({(int)total.TotalMilliseconds}ms, {(int)userProcessTime.TotalMilliseconds}ms user code), consider offloading data handling to another thread. " + + log.Write(LogLevel.Debug, $"Socket {SocketId}{(subscription == null ? "" : " subscription " + subscription!.Id)} message processing slow ({(int)total.TotalMilliseconds}ms, {(int)userProcessTime.TotalMilliseconds}ms user code), consider offloading data handling to another thread. " + "Data from this socket may arrive late or not at all if message processing is continuously slow."); - - log.Write(LogLevel.Trace, $"Socket {SocketId} message processed in {(int)total.TotalMilliseconds}ms, ({(int)userProcessTime.TotalMilliseconds}ms user code)"); + + log.Write(LogLevel.Trace, $"Socket {SocketId}{(subscription == null ? "" : " subscription " + subscription!.Id)} message processed in {(int)total.TotalMilliseconds}ms, ({(int)userProcessTime.TotalMilliseconds}ms user code)"); + } + + /// + /// Connect the websocket + /// + /// + public async Task ConnectAsync() => await _socket.ConnectAsync().ConfigureAwait(false); + + /// + /// Retrieve the underlying socket + /// + /// + public IWebsocket GetSocket() => _socket; + + /// + /// Trigger a reconnect of the socket connection + /// + /// + public async Task TriggerReconnectAsync() => await _socket.ReconnectAsync().ConfigureAwait(false); + + /// + /// Close the connection + /// + /// + public async Task CloseAsync() + { + if (Status == SocketStatus.Closed || Status == SocketStatus.Disposed) + return; + + if (socketClient.socketConnections.ContainsKey(SocketId)) + socketClient.socketConnections.TryRemove(SocketId, out _); + + lock (subscriptionLock) + { + foreach (var subscription in subscriptions) + { + if (subscription.CancellationTokenRegistration.HasValue) + subscription.CancellationTokenRegistration.Value.Dispose(); + } + } + + await _socket.CloseAsync().ConfigureAwait(false); + _socket.Dispose(); + } + + /// + /// Close a subscription on this connection. If all subscriptions on this connection are closed the connection gets closed as well + /// + /// Subscription to close + /// + public async Task CloseAsync(SocketSubscription subscription) + { + lock (subscriptionLock) + { + if (!subscriptions.Contains(subscription)) + return; + + subscriptions.Remove(subscription); + } + + if (Status == SocketStatus.Closing || Status == SocketStatus.Closed || Status == SocketStatus.Disposed) + return; + + log.Write(LogLevel.Debug, $"Socket {SocketId} closing subscription {subscription.Id}"); + if (subscription.CancellationTokenRegistration.HasValue) + subscription.CancellationTokenRegistration.Value.Dispose(); + + if (subscription.Confirmed && _socket.IsOpen) + await socketClient.UnsubscribeAsync(this, subscription).ConfigureAwait(false); + + bool shouldCloseConnection; + lock (subscriptionLock) + { + if (Status == SocketStatus.Closing) + { + log.Write(LogLevel.Debug, $"Socket {SocketId} already closing"); + return; + } + + shouldCloseConnection = subscriptions.All(r => !r.UserSubscription); + if (shouldCloseConnection) + Status = SocketStatus.Closing; + } + + if (shouldCloseConnection) + { + log.Write(LogLevel.Debug, $"Socket {SocketId} closing as there are no more subscriptions"); + await CloseAsync().ConfigureAwait(false); + } + } + + /// + /// Dispose the connection + /// + public void Dispose() + { + Status = SocketStatus.Disposed; + _socket.Dispose(); } /// @@ -513,7 +440,8 @@ namespace CryptoExchange.Net.Sockets return false; subscriptions.Add(subscription); - log.Write(LogLevel.Trace, $"Socket {SocketId} adding new subscription with id {subscription.Id}, total subscriptions on connection: {subscriptions.Count}"); + if(subscription.UserSubscription) + log.Write(LogLevel.Debug, $"Socket {SocketId} adding new subscription with id {subscription.Id}, total subscriptions on connection: {subscriptions.Count(s => s.UserSubscription)}"); return true; } } @@ -544,7 +472,7 @@ namespace CryptoExchange.Net.Sockets /// /// /// True if the data was successfully handled - private (bool, TimeSpan) HandleData(MessageEvent messageEvent) + private (bool, TimeSpan, SocketSubscription?) HandleData(MessageEvent messageEvent) { SocketSubscription? currentSubscription = null; try @@ -585,13 +513,13 @@ namespace CryptoExchange.Net.Sockets } } - return (handled, userCodeDuration); + return (handled, userCodeDuration, currentSubscription); } catch (Exception ex) { log.Write(LogLevel.Error, $"Socket {SocketId} Exception during message processing\r\nException: {ex.ToLogString()}\r\nData: {messageEvent.JsonData}"); currentSubscription?.InvokeExceptionHandler(ex); - return (false, TimeSpan.Zero); + return (false, TimeSpan.Zero, null); } } @@ -649,37 +577,24 @@ namespace CryptoExchange.Net.Sockets } } - /// - /// Handler for a socket opening - /// - protected virtual void SocketOnOpen() - { - ReconnectTry = 0; - PausedActivity = false; - } - - private async Task ReconnectWatcherAsync() - { - while (true) - { - await _reconnectWaitEvent.WaitAsync().ConfigureAwait(false); - if (!ShouldReconnect) - return; - - Status = SocketStatus.Reconnecting; - await ReconnectAsync().ConfigureAwait(false); - - if (!ShouldReconnect) - return; - } - } - private async Task> ProcessReconnectAsync() { if (!_socket.IsOpen) return new CallResult(new WebError("Socket not connected")); - if (Authenticated) + bool anySubscriptions = false; + lock (subscriptionLock) + anySubscriptions = subscriptions.Any(s => s.UserSubscription); + + if (!anySubscriptions) + { + // No need to resubscribe anything + log.Write(LogLevel.Debug, $"Socket {SocketId} Nothing to resubscribe, closing connection"); + _ = _socket.CloseAsync(); + return new CallResult(true); + } + + if (subscriptions.Any(s => s.Authenticated)) { // If we reconnected a authenticated connection we need to re-authenticate var authResult = await socketClient.AuthenticateSocketAsync(this).ConfigureAwait(false); @@ -689,13 +604,22 @@ namespace CryptoExchange.Net.Sockets return authResult; } + Authenticated = true; log.Write(LogLevel.Debug, $"Socket {SocketId} authentication succeeded on reconnected socket."); } // Get a list of all subscriptions on the socket - List subscriptionList; + List subscriptionList = new List(); lock (subscriptionLock) - subscriptionList = subscriptions.Where(h => h.Request != null).ToList(); + { + foreach (var subscription in subscriptions) + { + if (subscription.Request != null) + subscriptionList.Add(subscription); + else + subscription.Confirmed = true; + } + } // Foreach subscription which is subscribed by a subscription request we will need to resend that request to resubscribe for (var i = 0; i < subscriptionList.Count; i += socketClient.ClientOptions.MaxConcurrentResubscriptionsPerSocket) @@ -705,13 +629,16 @@ namespace CryptoExchange.Net.Sockets var taskList = new List>>(); foreach (var subscription in subscriptionList.Skip(i).Take(socketClient.ClientOptions.MaxConcurrentResubscriptionsPerSocket)) - taskList.Add(socketClient.SubscribeAndWaitAsync(this, subscription.Request!, subscription)); + taskList.Add(socketClient.SubscribeAndWaitAsync(this, subscription.Request!, subscription)); await Task.WhenAll(taskList).ConfigureAwait(false); if (taskList.Any(t => !t.Result.Success)) return taskList.First(t => !t.Result.Success).Result; } + foreach (var subscription in subscriptionList) + subscription.Confirmed = true; + if (!_socket.IsOpen) return new CallResult(new WebError("Socket not connected")); @@ -731,7 +658,7 @@ namespace CryptoExchange.Net.Sockets return await socketClient.SubscribeAndWaitAsync(this, socketSubscription.Request!, socketSubscription).ConfigureAwait(false); } - + /// /// Status of the socket connection /// @@ -750,6 +677,10 @@ namespace CryptoExchange.Net.Sockets /// Reconnecting, /// + /// Resubscribing on reconnected socket + /// + Resubscribing, + /// /// Closing /// Closing, diff --git a/CryptoExchange.Net/Sockets/SocketSubscription.cs b/CryptoExchange.Net/Sockets/SocketSubscription.cs index 23e6428..544c517 100644 --- a/CryptoExchange.Net/Sockets/SocketSubscription.cs +++ b/CryptoExchange.Net/Sockets/SocketSubscription.cs @@ -43,19 +43,25 @@ namespace CryptoExchange.Net.Sockets /// public bool Confirmed { get; set; } + /// + /// Whether authentication is needed for this subscription + /// + public bool Authenticated { get; set; } + /// /// Cancellation token registration, should be disposed when subscription is closed. Used for closing the subscription with /// a provided cancelation token /// public CancellationTokenRegistration? CancellationTokenRegistration { get; set; } - private SocketSubscription(int id, object? request, string? identifier, bool userSubscription, Action dataHandler) + private SocketSubscription(int id, object? request, string? identifier, bool userSubscription, bool authenticated, Action dataHandler) { Id = id; UserSubscription = userSubscription; MessageHandler = dataHandler; Request = request; Identifier = identifier; + Authenticated = authenticated; } /// @@ -64,12 +70,13 @@ namespace CryptoExchange.Net.Sockets /// /// /// + /// /// /// public static SocketSubscription CreateForRequest(int id, object request, bool userSubscription, - Action dataHandler) + bool authenticated, Action dataHandler) { - return new SocketSubscription(id, request, null, userSubscription, dataHandler); + return new SocketSubscription(id, request, null, userSubscription, authenticated, dataHandler); } /// @@ -78,12 +85,13 @@ namespace CryptoExchange.Net.Sockets /// /// /// + /// /// /// public static SocketSubscription CreateForIdentifier(int id, string identifier, bool userSubscription, - Action dataHandler) + bool authenticated, Action dataHandler) { - return new SocketSubscription(id, null, identifier, userSubscription, dataHandler); + return new SocketSubscription(id, null, identifier, userSubscription, authenticated, dataHandler); } /// diff --git a/CryptoExchange.Net/Sockets/UpdateSubscription.cs b/CryptoExchange.Net/Sockets/UpdateSubscription.cs index 4462515..3fb71af 100644 --- a/CryptoExchange.Net/Sockets/UpdateSubscription.cs +++ b/CryptoExchange.Net/Sockets/UpdateSubscription.cs @@ -22,8 +22,7 @@ namespace CryptoExchange.Net.Sockets } /// - /// Event when the connection is closed. This event happens when reconnecting/resubscribing has failed too often based on the and options, - /// or is false. The socket will not be reconnected + /// Event when the connection is closed and will not be reconnected /// public event Action ConnectionClosed { diff --git a/CryptoExchange.Net/Sockets/WebSocketParameters.cs b/CryptoExchange.Net/Sockets/WebSocketParameters.cs new file mode 100644 index 0000000..7fe0330 --- /dev/null +++ b/CryptoExchange.Net/Sockets/WebSocketParameters.cs @@ -0,0 +1,79 @@ +using CryptoExchange.Net.Objects; +using System; +using System.Collections.Generic; +using System.Text; + +namespace CryptoExchange.Net.Sockets +{ + /// + /// Parameters for a websocket + /// + public class WebSocketParameters + { + /// + /// The uri to connect to + /// + public Uri Uri { get; set; } + /// + /// Headers to send in the connection handshake + /// + public IDictionary Headers { get; set; } = new Dictionary(); + /// + /// Cookies to send in the connection handshake + /// + public IDictionary Cookies { get; set; } = new Dictionary(); + /// + /// The time to wait between reconnect attempts + /// + public TimeSpan ReconnectInterval { get; set; } = TimeSpan.FromSeconds(5); + /// + /// Proxy for the connection + /// + public ApiProxy? Proxy { get; set; } + /// + /// Whether the socket should automatically reconnect when connection is lost + /// + public bool AutoReconnect { get; set; } + /// + /// The maximum time of no data received before considering the connection lost and closting/reconnecting the socket + /// + public TimeSpan? Timeout { get; set; } + /// + /// Interval at which to send ping frames + /// + public TimeSpan? KeepAliveInterval { get; set; } + /// + /// The max amount of messages to send per second + /// + public int? RatelimitPerSecond { get; set; } + /// + /// Origin header value to send in the connection handshake + /// + public string? Origin { get; set; } + /// + /// Delegate used for processing byte data received from socket connections before it is processed by handlers + /// + public Func? DataInterpreterBytes { get; set; } + + /// + /// Delegate used for processing string data received from socket connections before it is processed by handlers + /// + public Func? DataInterpreterString { get; set; } + + /// + /// Encoding for sending/receiving data + /// + public Encoding Encoding { get; set; } = Encoding.UTF8; + + /// + /// ctor + /// + /// Uri + /// Auto reconnect + public WebSocketParameters(Uri uri, bool autoReconnect) + { + Uri = uri; + AutoReconnect = autoReconnect; + } + } +} diff --git a/CryptoExchange.Net/Sockets/WebsocketFactory.cs b/CryptoExchange.Net/Sockets/WebsocketFactory.cs index 06384cd..cd54111 100644 --- a/CryptoExchange.Net/Sockets/WebsocketFactory.cs +++ b/CryptoExchange.Net/Sockets/WebsocketFactory.cs @@ -1,6 +1,4 @@ -using System; -using System.Collections.Generic; -using CryptoExchange.Net.Interfaces; +using CryptoExchange.Net.Interfaces; using CryptoExchange.Net.Logging; namespace CryptoExchange.Net.Sockets @@ -11,15 +9,9 @@ namespace CryptoExchange.Net.Sockets public class WebsocketFactory : IWebsocketFactory { /// - public IWebsocket CreateWebsocket(Log log, string url) + public IWebsocket CreateWebsocket(Log log, WebSocketParameters parameters) { - return new CryptoExchangeWebSocketClient(log, new Uri(url)); - } - - /// - public IWebsocket CreateWebsocket(Log log, string url, IDictionary cookies, IDictionary headers) - { - return new CryptoExchangeWebSocketClient(log, new Uri(url), cookies, headers); + return new CryptoExchangeWebSocketClient(log, parameters); } } } diff --git a/README.md b/README.md index b6b02b7..145f98a 100644 --- a/README.md +++ b/README.md @@ -18,6 +18,10 @@ I develop and maintain this package on my own for free in my spare time. Donatio Alternatively, sponsor me on Github using [Github Sponsors](https://github.com/sponsors/JKorf) ## Release notes +* Version 5.2.0 - 10 Jul 2022 + * Refactored websocket code, removed some clutter and simplified + * Added ReconnectAsync and GetSubscriptionsState methods on socket clients + * Version 5.1.12 - 12 Jun 2022 * Changed time sync so requests no longer wait for it to complete unless it's the first time * Made log client options changable after client creation