diff --git a/CryptoExchange.Net/Clients/BaseSocketClient.cs b/CryptoExchange.Net/Clients/BaseSocketClient.cs index 2e249ac..eeda53a 100644 --- a/CryptoExchange.Net/Clients/BaseSocketClient.cs +++ b/CryptoExchange.Net/Clients/BaseSocketClient.cs @@ -443,6 +443,9 @@ namespace CryptoExchange.Net /// /// protected internal virtual JToken ProcessTokenData(JToken message) + + + { return message; } @@ -605,27 +608,27 @@ namespace CryptoExchange.Net if (disposing) break; - foreach (var socket in socketConnections.Values) + foreach (var socketConnection in socketConnections.Values) { if (disposing) break; - if (!socket.Connected) + if (!socketConnection.Connected) continue; - var obj = objGetter(socket); + var obj = objGetter(socketConnection); if (obj == null) continue; - log.Write(LogLevel.Trace, $"Socket {socket.SocketId} sending periodic {identifier}"); + log.Write(LogLevel.Trace, $"Socket {socketConnection.SocketId} sending periodic {identifier}"); try { - socket.Send(obj); + socketConnection.Send(obj); } catch (Exception ex) { - log.Write(LogLevel.Warning, $"Socket {socket.SocketId} Periodic send {identifier} failed: " + ex.ToLogString()); + log.Write(LogLevel.Warning, $"Socket {socketConnection.SocketId} Periodic send {identifier} failed: " + ex.ToLogString()); } } } @@ -680,18 +683,14 @@ namespace CryptoExchange.Net public virtual async Task UnsubscribeAllAsync() { log.Write(LogLevel.Information, $"Closing all {socketConnections.Sum(s => s.Value.SubscriptionCount)} subscriptions"); - - await Task.Run(async () => + var tasks = new List(); { - var tasks = new List(); - { - var socketList = socketConnections.Values; - foreach (var sub in socketList) - tasks.Add(sub.CloseAsync()); - } + var socketList = socketConnections.Values; + foreach (var sub in socketList) + tasks.Add(sub.CloseAsync()); + } - await Task.WhenAll(tasks.ToArray()).ConfigureAwait(false); - }).ConfigureAwait(false); + await Task.WhenAll(tasks.ToArray()).ConfigureAwait(false); } /// @@ -703,7 +702,7 @@ namespace CryptoExchange.Net periodicEvent?.Set(); periodicEvent?.Dispose(); log.Write(LogLevel.Debug, "Disposing socket client, closing all subscriptions"); - Task.Run(UnsubscribeAllAsync).ConfigureAwait(false).GetAwaiter().GetResult(); + _ = UnsubscribeAllAsync(); semaphoreSlim?.Dispose(); base.Dispose(); } diff --git a/CryptoExchange.Net/Interfaces/IWebsocket.cs b/CryptoExchange.Net/Interfaces/IWebsocket.cs index 5cde778..153d476 100644 --- a/CryptoExchange.Net/Interfaces/IWebsocket.cs +++ b/CryptoExchange.Net/Interfaces/IWebsocket.cs @@ -85,7 +85,11 @@ namespace CryptoExchange.Net.Interfaces /// Connect the socket /// /// - Task ConnectAsync(); + Task ConnectAsync(); + /// + /// Receive and send messages over the connection. Resulting task should complete when closing the socket. + /// + /// Task ProcessAsync(); /// /// Send data diff --git a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs index 76461dc..a21b291 100644 --- a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs +++ b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs @@ -23,7 +23,7 @@ namespace CryptoExchange.Net.Sockets public class CryptoExchangeWebSocketClient : IWebsocket { internal static int lastStreamId; - private static readonly object streamIdLock = new object(); + private static readonly object streamIdLock = new(); private ClientWebSocket _socket; private readonly AsyncResetEvent _sendEvent; @@ -39,6 +39,7 @@ namespace CryptoExchange.Net.Sockets /// Received messages, the size and the timstamp /// protected readonly List _receivedMessages; + /// /// Received messages lock /// @@ -52,19 +53,19 @@ namespace CryptoExchange.Net.Sockets /// /// Handlers for when an error happens on the socket /// - protected readonly List> errorHandlers = new List>(); + protected readonly List> errorHandlers = new(); /// /// Handlers for when the socket connection is opened /// - protected readonly List openHandlers = new List(); + protected readonly List openHandlers = new(); /// /// Handlers for when the connection is closed /// - protected readonly List closeHandlers = new List(); + protected readonly List closeHandlers = new(); /// /// Handlers for when a message is received /// - protected readonly List> messageHandlers = new List>(); + protected readonly List> messageHandlers = new(); /// public int Id { get; } @@ -223,7 +224,7 @@ namespace CryptoExchange.Net.Sockets log.Write(LogLevel.Debug, $"Socket {Id} connecting"); try { - using CancellationTokenSource tcs = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + using CancellationTokenSource tcs = new(TimeSpan.FromSeconds(10)); await _socket.ConnectAsync(Uri, tcs.Token).ConfigureAwait(false); Handle(openHandlers); diff --git a/CryptoExchange.Net/Sockets/PendingRequest.cs b/CryptoExchange.Net/Sockets/PendingRequest.cs new file mode 100644 index 0000000..63ed6d9 --- /dev/null +++ b/CryptoExchange.Net/Sockets/PendingRequest.cs @@ -0,0 +1,47 @@ +using CryptoExchange.Net.Objects; +using Newtonsoft.Json.Linq; +using System; +using System.Threading; + +namespace CryptoExchange.Net.Sockets +{ + internal class PendingRequest + { + public Func Handler { get; } + public JToken? Result { get; private set; } + public bool Completed { get; private set; } + public AsyncResetEvent Event { get; } + public TimeSpan Timeout { get; } + + private CancellationTokenSource cts; + + public PendingRequest(Func handler, TimeSpan timeout) + { + Handler = handler; + Event = new AsyncResetEvent(false, false); + Timeout = timeout; + + cts = new CancellationTokenSource(timeout); + cts.Token.Register(Fail, false); + } + + public bool CheckData(JToken data) + { + if (Handler(data)) + { + Result = data; + Completed = true; + Event.Set(); + return true; + } + + return false; + } + + public void Fail() + { + Completed = true; + Event.Set(); + } + } +} diff --git a/CryptoExchange.Net/Sockets/SocketConnection.cs b/CryptoExchange.Net/Sockets/SocketConnection.cs index 58a7ebd..2f24121 100644 --- a/CryptoExchange.Net/Sockets/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/SocketConnection.cs @@ -65,16 +65,22 @@ namespace CryptoExchange.Net.Sockets /// /// If connection is made /// - public bool Connected => Socket.IsOpen; + public bool Connected => _socket.IsOpen; /// - /// The underlying websocket + /// The unique ID of the socket /// - private IWebsocket Socket { get; set; } - public int SocketId => Socket.Id; + public int SocketId => _socket.Id; - public double IncomingKbps => Socket.IncomingKbps; - public Uri Uri => Socket.Uri; + /// + /// The current kilobytes per second of data being received, averaged over the last 3 seconds + /// + public double IncomingKbps => _socket.IncomingKbps; + + /// + /// The connection uri + /// + public Uri Uri => _socket.Uri; /// /// The API client the connection is for @@ -117,7 +123,7 @@ namespace CryptoExchange.Net.Sockets if (pausedActivity != value) { pausedActivity = value; - log.Write(LogLevel.Information, $"Socket {Socket.Id} Paused activity: " + value); + log.Write(LogLevel.Information, $"Socket {SocketId} Paused activity: " + value); if(pausedActivity) ActivityPaused?.Invoke(); else ActivityUnpaused?.Invoke(); } @@ -126,14 +132,19 @@ namespace CryptoExchange.Net.Sockets private bool pausedActivity; private readonly List subscriptions; - private readonly object subscriptionLock = new object(); + private readonly object subscriptionLock = new(); private bool lostTriggered; private readonly Log log; private readonly BaseSocketClient socketClient; private readonly List pendingRequests; - private Task _socketProcessReconnectTask; + private Task? _socketProcessReconnectTask; + + /// + /// The underlying websocket + /// + private readonly IWebsocket _socket; /// /// New socket connection @@ -150,25 +161,33 @@ namespace CryptoExchange.Net.Sockets pendingRequests = new List(); subscriptions = new List(); - Socket = socket; + _socket = socket; - Socket.Timeout = client.ClientOptions.SocketNoDataTimeout; - Socket.OnMessage += ProcessMessage; - Socket.OnOpen += SocketOnOpen; + _socket.Timeout = client.ClientOptions.SocketNoDataTimeout; + _socket.OnMessage += ProcessMessage; + _socket.OnOpen += SocketOnOpen; } + /// + /// Connect the websocket and start processing + /// + /// public async Task ConnectAsync() { - var connected = await Socket.ConnectAsync().ConfigureAwait(false); + var connected = await _socket.ConnectAsync().ConfigureAwait(false); if (connected) StartProcessingTask(); return connected; } + /// + /// Trigger a reconnect of the socket connection + /// + /// public async Task TriggerReconnectAsync() { - await Socket.CloseAsync().ConfigureAwait(false); + await _socket.CloseAsync().ConfigureAwait(false); } /// @@ -178,8 +197,8 @@ namespace CryptoExchange.Net.Sockets public async Task CloseAsync() { ShouldReconnect = false; - if (socketClient.socketConnections.ContainsKey(Socket.Id)) - socketClient.socketConnections.TryRemove(Socket.Id, out _); + if (socketClient.socketConnections.ContainsKey(SocketId)) + socketClient.socketConnections.TryRemove(SocketId, out _); lock (subscriptionLock) { @@ -189,9 +208,12 @@ namespace CryptoExchange.Net.Sockets subscription.CancellationTokenRegistration.Value.Dispose(); } } - await Socket.CloseAsync().ConfigureAwait(false); - Socket.Dispose(); - await _socketProcessReconnectTask.ConfigureAwait(false); + await _socket.CloseAsync().ConfigureAwait(false); + + if (_socketProcessReconnectTask != null) + await _socketProcessReconnectTask.ConfigureAwait(false); + + _socket.Dispose(); } /// @@ -201,7 +223,7 @@ namespace CryptoExchange.Net.Sockets /// public async Task CloseAsync(SocketSubscription subscription) { - if (!Socket.IsOpen) + if (!_socket.IsOpen) return; if (subscription.CancellationTokenRegistration.HasValue) @@ -225,7 +247,7 @@ namespace CryptoExchange.Net.Sockets { _socketProcessReconnectTask = Task.Run(async () => { - await Socket.ProcessAsync().ConfigureAwait(false); + await _socket.ProcessAsync().ConfigureAwait(false); await ReconnectAsync().ConfigureAwait(false); }); } @@ -246,7 +268,7 @@ namespace CryptoExchange.Net.Sockets { // Should reconnect DisconnectTime = DateTime.UtcNow; - log.Write(LogLevel.Warning, $"Socket {Socket.Id} Connection lost, will try to reconnect"); + log.Write(LogLevel.Warning, $"Socket {SocketId} Connection lost, will try to reconnect"); if (!lostTriggered) { lostTriggered = true; @@ -267,8 +289,8 @@ namespace CryptoExchange.Net.Sockets return; } - Socket.Reset(); - if (!await Socket.ConnectAsync().ConfigureAwait(false)) + _socket.Reset(); + if (!await _socket.ConnectAsync().ConfigureAwait(false)) { // Reconnect failed ReconnectTry++; @@ -276,11 +298,11 @@ namespace CryptoExchange.Net.Sockets if (socketClient.ClientOptions.MaxReconnectTries != null && ReconnectTry >= socketClient.ClientOptions.MaxReconnectTries) { - log.Write(LogLevel.Warning, $"Socket {Socket.Id} failed to reconnect after {ReconnectTry} tries, closing"); + log.Write(LogLevel.Warning, $"Socket {SocketId} failed to reconnect after {ReconnectTry} tries, closing"); ShouldReconnect = false; - if (socketClient.socketConnections.ContainsKey(Socket.Id)) - socketClient.socketConnections.TryRemove(Socket.Id, out _); + if (socketClient.socketConnections.ContainsKey(SocketId)) + socketClient.socketConnections.TryRemove(SocketId, out _); _ = Task.Run(() => ConnectionClosed?.Invoke()); // Reached max tries, break loop and leave connection closed @@ -288,7 +310,7 @@ namespace CryptoExchange.Net.Sockets } // Continue to try again - log.Write(LogLevel.Debug, $"Socket {Socket.Id} failed to reconnect{(socketClient.ClientOptions.MaxReconnectTries != null ? $", try {ReconnectTry}/{socketClient.ClientOptions.MaxReconnectTries}" : "")}, will try again in {socketClient.ClientOptions.ReconnectInterval}"); + log.Write(LogLevel.Debug, $"Socket {SocketId} failed to reconnect{(socketClient.ClientOptions.MaxReconnectTries != null ? $", try {ReconnectTry}/{socketClient.ClientOptions.MaxReconnectTries}" : "")}, will try again in {socketClient.ClientOptions.ReconnectInterval}"); continue; } @@ -299,7 +321,7 @@ namespace CryptoExchange.Net.Sockets var time = DisconnectTime; DisconnectTime = null; - log.Write(LogLevel.Information, $"Socket {Socket.Id} reconnected after {DateTime.UtcNow - time}"); + log.Write(LogLevel.Information, $"Socket {SocketId} reconnected after {DateTime.UtcNow - time}"); var reconnectResult = await ProcessReconnectAsync().ConfigureAwait(false); if (!reconnectResult) @@ -311,20 +333,20 @@ namespace CryptoExchange.Net.Sockets if (socketClient.ClientOptions.MaxResubscribeTries != null && ResubscribeTry >= socketClient.ClientOptions.MaxResubscribeTries) { - log.Write(LogLevel.Warning, $"Socket {Socket.Id} failed to resubscribe after {ResubscribeTry} tries, closing. Last resubscription error: {reconnectResult.Error}"); + log.Write(LogLevel.Warning, $"Socket {SocketId} failed to resubscribe after {ResubscribeTry} tries, closing. Last resubscription error: {reconnectResult.Error}"); ShouldReconnect = false; - if (socketClient.socketConnections.ContainsKey(Socket.Id)) - socketClient.socketConnections.TryRemove(Socket.Id, out _); + if (socketClient.socketConnections.ContainsKey(SocketId)) + socketClient.socketConnections.TryRemove(SocketId, out _); _ = Task.Run(() => ConnectionClosed?.Invoke()); } else - log.Write(LogLevel.Debug, $"Socket {Socket.Id} resubscribing all subscriptions failed on reconnected socket{(socketClient.ClientOptions.MaxResubscribeTries != null ? $", try {ResubscribeTry}/{socketClient.ClientOptions.MaxResubscribeTries}" : "")}. Error: {reconnectResult.Error}. Disconnecting and reconnecting."); + log.Write(LogLevel.Debug, $"Socket {SocketId} resubscribing all subscriptions failed on reconnected socket{(socketClient.ClientOptions.MaxResubscribeTries != null ? $", try {ResubscribeTry}/{socketClient.ClientOptions.MaxResubscribeTries}" : "")}. Error: {reconnectResult.Error}. Disconnecting and reconnecting."); // Failed resubscribe, close socket if it is still open - if (Socket.IsOpen) - await Socket.CloseAsync().ConfigureAwait(false); + if (_socket.IsOpen) + await _socket.CloseAsync().ConfigureAwait(false); else DisconnectTime = DateTime.UtcNow; @@ -334,7 +356,7 @@ namespace CryptoExchange.Net.Sockets else { // Succesfully reconnected - log.Write(LogLevel.Information, $"Socket {Socket.Id} data connection restored."); + log.Write(LogLevel.Information, $"Socket {SocketId} data connection restored."); ResubscribeTry = 0; if (lostTriggered) { @@ -352,15 +374,18 @@ namespace CryptoExchange.Net.Sockets _ = Task.Run(() => ConnectionClosed?.Invoke()); // No reconnecting needed - log.Write(LogLevel.Information, $"Socket {Socket.Id} closed"); - if (socketClient.socketConnections.ContainsKey(Socket.Id)) - socketClient.socketConnections.TryRemove(Socket.Id, out _); + log.Write(LogLevel.Information, $"Socket {SocketId} closed"); + if (socketClient.socketConnections.ContainsKey(SocketId)) + socketClient.socketConnections.TryRemove(SocketId, out _); } } + /// + /// Dispose the connection + /// public void Dispose() { - Socket.Dispose(); + _socket.Dispose(); } /// @@ -370,7 +395,7 @@ namespace CryptoExchange.Net.Sockets private void ProcessMessage(string data) { var timestamp = DateTime.UtcNow; - log.Write(LogLevel.Trace, $"Socket {Socket.Id} received data: " + data); + log.Write(LogLevel.Trace, $"Socket {SocketId} received data: " + data); if (string.IsNullOrEmpty(data)) return; var tokenData = data.ToJToken(log); @@ -416,16 +441,16 @@ namespace CryptoExchange.Net.Sockets if (!handled && !handledResponse) { if (!socketClient.UnhandledMessageExpected) - log.Write(LogLevel.Warning, $"Socket {Socket.Id} Message not handled: " + tokenData); + log.Write(LogLevel.Warning, $"Socket {SocketId} Message not handled: " + tokenData); UnhandledMessage?.Invoke(tokenData); } var total = DateTime.UtcNow - timestamp; if (userProcessTime.TotalMilliseconds > 500) - log.Write(LogLevel.Debug, $"Socket {Socket.Id} message processing slow ({(int)total.TotalMilliseconds}ms), consider offloading data handling to another thread. " + + log.Write(LogLevel.Debug, $"Socket {SocketId} message processing slow ({(int)total.TotalMilliseconds}ms), consider offloading data handling to another thread. " + "Data from this socket may arrive late or not at all if message processing is continuously slow."); - log.Write(LogLevel.Trace, $"Socket {Socket.Id} message processed in {(int)total.TotalMilliseconds}ms, ({(int)userProcessTime.TotalMilliseconds}ms user code)"); + log.Write(LogLevel.Trace, $"Socket {SocketId} message processed in {(int)total.TotalMilliseconds}ms, ({(int)userProcessTime.TotalMilliseconds}ms user code)"); } /// @@ -509,7 +534,7 @@ namespace CryptoExchange.Net.Sockets } catch (Exception ex) { - log.Write(LogLevel.Error, $"Socket {Socket.Id} Exception during message processing\r\nException: {ex.ToLogString()}\r\nData: {messageEvent.JsonData}"); + log.Write(LogLevel.Error, $"Socket {SocketId} Exception during message processing\r\nException: {ex.ToLogString()}\r\nData: {messageEvent.JsonData}"); currentSubscription?.InvokeExceptionHandler(ex); return (false, TimeSpan.Zero); } @@ -554,8 +579,8 @@ namespace CryptoExchange.Net.Sockets /// The data to send public virtual void Send(string data) { - log.Write(LogLevel.Trace, $"Socket {Socket.Id} sending data: {data}"); - Socket.Send(data); + log.Write(LogLevel.Trace, $"Socket {SocketId} sending data: {data}"); + _socket.Send(data); } /// @@ -569,7 +594,7 @@ namespace CryptoExchange.Net.Sockets private async Task> ProcessReconnectAsync() { - if (!Socket.IsOpen) + if (!_socket.IsOpen) return new CallResult(new WebError("Socket not connected")); if (Authenticated) @@ -578,11 +603,11 @@ namespace CryptoExchange.Net.Sockets var authResult = await socketClient.AuthenticateSocketAsync(this).ConfigureAwait(false); if (!authResult) { - log.Write(LogLevel.Warning, $"Socket {Socket.Id} authentication failed on reconnected socket. Disconnecting and reconnecting."); + log.Write(LogLevel.Warning, $"Socket {SocketId} authentication failed on reconnected socket. Disconnecting and reconnecting."); return authResult; } - log.Write(LogLevel.Debug, $"Socket {Socket.Id} authentication succeeded on reconnected socket."); + log.Write(LogLevel.Debug, $"Socket {SocketId} authentication succeeded on reconnected socket."); } // Get a list of all subscriptions on the socket @@ -593,7 +618,7 @@ namespace CryptoExchange.Net.Sockets // Foreach subscription which is subscribed by a subscription request we will need to resend that request to resubscribe for (var i = 0; i < subscriptionList.Count; i += socketClient.ClientOptions.MaxConcurrentResubscriptionsPerSocket) { - if (!Socket.IsOpen) + if (!_socket.IsOpen) return new CallResult(new WebError("Socket not connected")); var taskList = new List>>(); @@ -605,10 +630,10 @@ namespace CryptoExchange.Net.Sockets return taskList.First(t => !t.Result.Success).Result; } - if (!Socket.IsOpen) + if (!_socket.IsOpen) return new CallResult(new WebError("Socket not connected")); - log.Write(LogLevel.Debug, $"Socket {Socket.Id} all subscription successfully resubscribed on reconnected socket."); + log.Write(LogLevel.Debug, $"Socket {SocketId} all subscription successfully resubscribed on reconnected socket."); return new CallResult(true); } @@ -619,50 +644,10 @@ namespace CryptoExchange.Net.Sockets internal async Task> ResubscribeAsync(SocketSubscription socketSubscription) { - if (!Socket.IsOpen) + if (!_socket.IsOpen) return new CallResult(new UnknownError("Socket is not connected")); return await socketClient.SubscribeAndWaitAsync(this, socketSubscription.Request!, socketSubscription).ConfigureAwait(false); } } - - internal class PendingRequest - { - public Func Handler { get; } - public JToken? Result { get; private set; } - public bool Completed { get; private set; } - public AsyncResetEvent Event { get; } - public TimeSpan Timeout { get; } - - private CancellationTokenSource cts; - - public PendingRequest(Func handler, TimeSpan timeout) - { - Handler = handler; - Event = new AsyncResetEvent(false, false); - Timeout = timeout; - - cts = new CancellationTokenSource(timeout); - cts.Token.Register(Fail, false); - } - - public bool CheckData(JToken data) - { - if (Handler(data)) - { - Result = data; - Completed = true; - Event.Set(); - return true; - } - - return false; - } - - public void Fail() - { - Completed = true; - Event.Set(); - } - } } diff --git a/CryptoExchange.Net/Sockets/WebsocketFactory.cs b/CryptoExchange.Net/Sockets/WebsocketFactory.cs index 5a2e67b..06384cd 100644 --- a/CryptoExchange.Net/Sockets/WebsocketFactory.cs +++ b/CryptoExchange.Net/Sockets/WebsocketFactory.cs @@ -6,7 +6,7 @@ using CryptoExchange.Net.Logging; namespace CryptoExchange.Net.Sockets { /// - /// Default weboscket factory implementation + /// Default websocket factory implementation /// public class WebsocketFactory : IWebsocketFactory {