diff --git a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs index 2754b7b..c9e9515 100644 --- a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs +++ b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs @@ -44,7 +44,7 @@ namespace CryptoExchange.Net.Sockets /// /// Received messages time -> size /// - protected readonly Dictionary _receivedMessages; + internal readonly List _receivedMessages; /// /// Received messages lock /// @@ -152,7 +152,7 @@ namespace CryptoExchange.Net.Sockets if (!_receivedMessages.Any()) return 0; - return Math.Round(_receivedMessages.Values.Sum(v => v) / 1000 / 3d); + return Math.Round(_receivedMessages.Sum(v => v.Bytes) / 1000 / 3d); } } } @@ -215,7 +215,7 @@ namespace CryptoExchange.Net.Sockets this.headers = headers; _outgoingMessages = new List(); - _receivedMessages = new Dictionary(); + _receivedMessages = new List(); _sendEvent = new AsyncResetEvent(); _sendBuffer = new ConcurrentQueue(); _ctsSource = new CancellationTokenSource(); @@ -380,60 +380,71 @@ namespace CryptoExchange.Net.Sockets private async Task SendLoopAsync() { _startedSent = true; - while (true) + try { - if (_closing) - break; - - await _sendEvent.WaitAsync().ConfigureAwait(false); - - if (_closing) - break; - - while (_sendBuffer.TryDequeue(out var data)) + while (true) { - if(RatelimitPerSecond != null) + if (_closing) + break; + + await _sendEvent.WaitAsync().ConfigureAwait(false); + + if (_closing) + break; + + while (_sendBuffer.TryDequeue(out var data)) { - // Wait for rate limit - DateTime? start = null; - while (MessagesSentLastSecond() >= RatelimitPerSecond) + if (RatelimitPerSecond != null) { - if (start == null) - start = DateTime.UtcNow; - await Task.Delay(10).ConfigureAwait(false); + // Wait for rate limit + DateTime? start = null; + while (MessagesSentLastSecond() >= RatelimitPerSecond) + { + if (start == null) + start = DateTime.UtcNow; + await Task.Delay(10).ConfigureAwait(false); + } + + if (start != null) + log.Write(LogLevel.Trace, $"Socket {Id} sent delayed {Math.Round((DateTime.UtcNow - start.Value).TotalMilliseconds)}ms because of rate limit"); } - if (start != null) - log.Write(LogLevel.Trace, $"Socket {Id} sent delayed {Math.Round((DateTime.UtcNow - start.Value).TotalMilliseconds)}ms because of rate limit"); - } - - try - { - await _socket.SendAsync(new ArraySegment(data, 0, data.Length), WebSocketMessageType.Text, true, _ctsSource.Token).ConfigureAwait(false); - _outgoingMessages.Add(DateTime.UtcNow); - log.Write(LogLevel.Trace, $"Socket {Id} sent {data.Length} bytes"); - } - catch (OperationCanceledException) - { - // cancelled - break; - } - catch (IOException ioe) - { - // Connection closed unexpectedly, .NET framework - Handle(errorHandlers, ioe); - await CloseInternalAsync(false, true).ConfigureAwait(false); - break; - } - catch (WebSocketException wse) - { - // Connection closed unexpectedly - Handle(errorHandlers, wse); - await CloseInternalAsync(false, true).ConfigureAwait(false); - break; + try + { + await _socket.SendAsync(new ArraySegment(data, 0, data.Length), WebSocketMessageType.Text, true, _ctsSource.Token).ConfigureAwait(false); + _outgoingMessages.Add(DateTime.UtcNow); + log.Write(LogLevel.Trace, $"Socket {Id} sent {data.Length} bytes"); + } + catch (OperationCanceledException) + { + // cancelled + break; + } + catch (IOException ioe) + { + // Connection closed unexpectedly, .NET framework + Handle(errorHandlers, ioe); + await CloseInternalAsync(false, true).ConfigureAwait(false); + break; + } + catch (WebSocketException wse) + { + // Connection closed unexpectedly + Handle(errorHandlers, wse); + await CloseInternalAsync(false, true).ConfigureAwait(false); + break; + } } } } + catch (Exception e) + { + // Because this is running in a separate task and not awaited until the socket gets closed + // any exception here will crash the send processing, but do so silently unless the socket get's stopped. + // Make sure we at least let the owner know there was an error + Handle(errorHandlers, e); + throw; + } } /// @@ -446,101 +457,112 @@ namespace CryptoExchange.Net.Sockets var buffer = new ArraySegment(new byte[65536]); var received = 0; - while (true) + try { - if (_closing) - break; - - MemoryStream? memoryStream = null; - WebSocketReceiveResult? receiveResult = null; - bool multiPartMessage = false; while (true) { - try - { - receiveResult = await _socket.ReceiveAsync(buffer, _ctsSource.Token).ConfigureAwait(false); - received += receiveResult.Count; - lock(_receivedMessagesLock) - _receivedMessages.Add(DateTime.UtcNow, receiveResult.Count); - } - catch (OperationCanceledException) - { - // cancelled + if (_closing) break; - } - catch (WebSocketException wse) - { - // Connection closed unexpectedly - Handle(errorHandlers, wse); - await CloseInternalAsync(true, false).ConfigureAwait(false); - break; - } - catch (IOException ioe) - { - // Connection closed unexpectedly, .NET framework - Handle(errorHandlers, ioe); - await CloseInternalAsync(true, false).ConfigureAwait(false); - break; - } - if (receiveResult.MessageType == WebSocketMessageType.Close) + MemoryStream? memoryStream = null; + WebSocketReceiveResult? receiveResult = null; + bool multiPartMessage = false; + while (true) { - // Connection closed unexpectedly - log.Write(LogLevel.Debug, $"Socket {Id} received `Close` message"); - await CloseInternalAsync(true, false).ConfigureAwait(false); - break; - } - - if (!receiveResult.EndOfMessage) - { - // We received data, but it is not complete, write it to a memory stream for reassembling - multiPartMessage = true; - if (memoryStream == null) - memoryStream = new MemoryStream(); - log.Write(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in partial message"); - await memoryStream.WriteAsync(buffer.Array, buffer.Offset, receiveResult.Count).ConfigureAwait(false); - } - else - { - if (!multiPartMessage) + try { - // Received a complete message and it's not multi part - log.Write(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in single message"); - HandleMessage(buffer.Array, buffer.Offset, receiveResult.Count, receiveResult.MessageType); + receiveResult = await _socket.ReceiveAsync(buffer, _ctsSource.Token).ConfigureAwait(false); + received += receiveResult.Count; + lock (_receivedMessagesLock) + _receivedMessages.Add(new ReceiveItem(DateTime.UtcNow, receiveResult.Count)); + } + catch (OperationCanceledException) + { + // cancelled + break; + } + catch (WebSocketException wse) + { + // Connection closed unexpectedly + Handle(errorHandlers, wse); + await CloseInternalAsync(true, false).ConfigureAwait(false); + break; + } + catch (IOException ioe) + { + // Connection closed unexpectedly, .NET framework + Handle(errorHandlers, ioe); + await CloseInternalAsync(true, false).ConfigureAwait(false); + break; + } + + if (receiveResult.MessageType == WebSocketMessageType.Close) + { + // Connection closed unexpectedly + log.Write(LogLevel.Debug, $"Socket {Id} received `Close` message"); + await CloseInternalAsync(true, false).ConfigureAwait(false); + break; + } + + if (!receiveResult.EndOfMessage) + { + // We received data, but it is not complete, write it to a memory stream for reassembling + multiPartMessage = true; + if (memoryStream == null) + memoryStream = new MemoryStream(); + log.Write(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in partial message"); + await memoryStream.WriteAsync(buffer.Array, buffer.Offset, receiveResult.Count).ConfigureAwait(false); } else { - // Received the end of a multipart message, write to memory stream for reassembling - log.Write(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in partial message"); - await memoryStream!.WriteAsync(buffer.Array, buffer.Offset, receiveResult.Count).ConfigureAwait(false); + if (!multiPartMessage) + { + // Received a complete message and it's not multi part + log.Write(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in single message"); + HandleMessage(buffer.Array, buffer.Offset, receiveResult.Count, receiveResult.MessageType); + } + else + { + // Received the end of a multipart message, write to memory stream for reassembling + log.Write(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in partial message"); + await memoryStream!.WriteAsync(buffer.Array, buffer.Offset, receiveResult.Count).ConfigureAwait(false); + } + break; } + } + + lock (_receivedMessagesLock) + UpdateReceivedMessages(); + + if (receiveResult?.MessageType == WebSocketMessageType.Close) + { + // Received close message break; } - } - lock (_receivedMessagesLock) - UpdateReceivedMessages(); + if (receiveResult == null || _closing) + { + // Error during receiving or cancellation requested, stop. + break; + } - if (receiveResult?.MessageType == WebSocketMessageType.Close) - { - // Received close message - break; - } - - if (receiveResult == null || _closing) - { - // Error during receiving or cancellation requested, stop. - break; - } - - if (multiPartMessage) - { - // Reassemble complete message from memory stream - log.Write(LogLevel.Trace, $"Socket {Id} reassembled message of {memoryStream!.Length} bytes"); - HandleMessage(memoryStream!.ToArray(), 0, (int)memoryStream.Length, receiveResult.MessageType); - memoryStream.Dispose(); + if (multiPartMessage) + { + // Reassemble complete message from memory stream + log.Write(LogLevel.Trace, $"Socket {Id} reassembled message of {memoryStream!.Length} bytes"); + HandleMessage(memoryStream!.ToArray(), 0, (int)memoryStream.Length, receiveResult.MessageType); + memoryStream.Dispose(); + } } } + catch(Exception e) + { + // Because this is running in a separate task and not awaited until the socket gets closed + // any exception here will crash the receive processing, but do so silently unless the socket get's stopped. + // Make sure we at least let the owner know there was an error + Handle(errorHandlers, e); + throw; + } } /// @@ -604,27 +626,38 @@ namespace CryptoExchange.Net.Sockets protected async Task CheckTimeoutAsync() { log.Write(LogLevel.Debug, $"Socket {Id} Starting task checking for no data received for {Timeout}"); - while (true) - { - if (_closing) - return; + try + { + while (true) + { + if (_closing) + return; - if (DateTime.UtcNow - LastActionTime > Timeout) - { - log.Write(LogLevel.Warning, $"Socket {Id} No data received for {Timeout}, reconnecting socket"); - _ = CloseAsync().ConfigureAwait(false); - return; - } - try - { - await Task.Delay(500, _ctsSource.Token).ConfigureAwait(false); - } - catch (OperationCanceledException) - { - // cancelled - break; + if (DateTime.UtcNow - LastActionTime > Timeout) + { + log.Write(LogLevel.Warning, $"Socket {Id} No data received for {Timeout}, reconnecting socket"); + _ = CloseAsync().ConfigureAwait(false); + return; + } + try + { + await Task.Delay(500, _ctsSource.Token).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + // cancelled + break; + } } } + catch (Exception e) + { + // Because this is running in a separate task and not awaited until the socket gets closed + // any exception here will stop the timeout checking, but do so silently unless the socket get's stopped. + // Make sure we at least let the owner know there was an error + Handle(errorHandlers, e); + throw; + } } /// @@ -679,12 +712,24 @@ namespace CryptoExchange.Net.Sockets var checkTime = DateTime.UtcNow; if (checkTime - _lastReceivedMessagesUpdate > TimeSpan.FromSeconds(1)) { - foreach (var msgTime in _receivedMessages.Keys.ToList()) - if (checkTime - msgTime > TimeSpan.FromSeconds(3)) - _receivedMessages.Remove(msgTime); + foreach (var msg in _receivedMessages.ToList()) // To list here because we're removing from the list + if (checkTime - msg.Timestamp > TimeSpan.FromSeconds(3)) + _receivedMessages.Remove(msg); _lastReceivedMessagesUpdate = checkTime; } } } + + internal struct ReceiveItem + { + public DateTime Timestamp { get; set; } + public int Bytes { get; set; } + + public ReceiveItem(DateTime timestamp, int bytes) + { + Timestamp = timestamp; + Bytes = bytes; + } + } }