using CryptoExchange.Net.Logging.Extensions; using CryptoExchange.Net.Objects; using CryptoExchange.Net.Objects.Errors; using CryptoExchange.Net.Objects.Sockets; using CryptoExchange.Net.RateLimiting; using CryptoExchange.Net.Sockets.Default.Interfaces; using Microsoft.Extensions.Logging; using System; using System.Buffers; using System.Collections.Concurrent; using System.IO; using System.Net; using System.Net.Http; using System.Net.WebSockets; using System.Threading; using System.Threading.Tasks; namespace CryptoExchange.Net.Sockets.Default { /// /// A wrapper around the ClientWebSocket /// public class CryptoExchangeWebSocketClient : IWebsocket { enum ProcessState { Idle, Processing, WaitingForClose, Reconnecting } internal static int _lastStreamId; private static readonly ArrayPool _receiveBufferPool = ArrayPool.Shared; private readonly AsyncResetEvent _sendEvent; private readonly ConcurrentQueue _sendBuffer; private readonly SemaphoreSlim _closeSem; private ClientWebSocket _socket; private CancellationTokenSource _ctsSource; private Task? _processTask; private Task? _closeTask; private bool _stopRequested; private bool _disposed; private ProcessState _processState; private DateTime _lastReconnectTime; private readonly string _baseAddress; private int _reconnectAttempt; private readonly int _receiveBufferSize; private readonly RequestDefinition _requestDefinition; private const int _sendBufferSize = 4096; private int _bytesReceived = 0; private int _prevSlotBytesReceived = 0; private DateTime _lastBytesReceivedUpdate = DateTime.UtcNow; private DateTime _prevSlotBytesReceivedUpdate = DateTime.UtcNow; /// /// Log /// protected ILogger _logger; /// public int Id { get; } /// public WebSocketParameters Parameters { get; } /// /// The timestamp this socket has been active for the last time /// public DateTime? LastReceiveTime { get; private set; } /// public Uri Uri => Parameters.Uri; /// public virtual bool IsClosed => _socket.State == WebSocketState.Closed; /// public virtual bool IsOpen => _socket.State == WebSocketState.Open && !_ctsSource.IsCancellationRequested; /// public double IncomingKbps { get { UpdateReceivedMessages(); var seconds = (_lastBytesReceivedUpdate - _prevSlotBytesReceivedUpdate).TotalSeconds; return seconds > 0 ? Math.Round(_prevSlotBytesReceived / seconds / 1000) : 0; } } /// public event Func? OnClose; /// public event Func? OnRequestSent; /// public event Func? OnRequestRateLimited; /// public event Func? OnConnectRateLimited; /// public event Func? OnError; /// public event Func? OnOpen; /// public event Func? OnReconnecting; /// public event Func? OnReconnected; /// public Func>? GetReconnectionUrl { get; set; } private SocketConnection _connection; /// /// ctor /// /// The log object to use /// The socket connection /// The parameters for this socket public CryptoExchangeWebSocketClient(ILogger logger, SocketConnection connection, WebSocketParameters websocketParameters) { Id = NextStreamId(); _logger = logger; _connection = connection; Parameters = websocketParameters; _sendEvent = new AsyncResetEvent(); _sendBuffer = new ConcurrentQueue(); _ctsSource = new CancellationTokenSource(); _receiveBufferSize = websocketParameters.ReceiveBufferSize ?? 65536; _requestDefinition = new RequestDefinition(Uri.AbsolutePath, HttpMethod.Get) { ConnectionId = Id }; _closeSem = new SemaphoreSlim(1, 1); _socket = CreateSocket(); _baseAddress = $"{Uri.Scheme}://{Uri.Host}"; } /// public void UpdateProxy(ApiProxy? proxy) { Parameters.Proxy = proxy; } /// public virtual async Task ConnectAsync(CancellationToken ct) { var connectResult = await ConnectInternalAsync(ct).ConfigureAwait(false); if (!connectResult) return connectResult; await (OnOpen?.Invoke() ?? Task.CompletedTask).ConfigureAwait(false); _processTask = ProcessAsync(); return connectResult; } /// /// Create the socket object /// private ClientWebSocket CreateSocket() { var cookieContainer = new CookieContainer(); foreach (var cookie in Parameters.Cookies) cookieContainer.Add(new Cookie(cookie.Key, cookie.Value)); var socket = new ClientWebSocket(); try { socket.Options.Cookies = cookieContainer; foreach (var header in Parameters.Headers) socket.Options.SetRequestHeader(header.Key, header.Value); socket.Options.KeepAliveInterval = Parameters.KeepAliveInterval ?? TimeSpan.Zero; if (System.Runtime.InteropServices.RuntimeInformation.FrameworkDescription.StartsWith(".NET Framework")) socket.Options.SetBuffer(65536, 65536); // Setting it to anything bigger than 65536 throws an exception in .net framework else socket.Options.SetBuffer(_receiveBufferSize, _sendBufferSize); if (Parameters.Proxy != null) SetProxy(socket, Parameters.Proxy); #if NET6_0_OR_GREATER socket.Options.CollectHttpResponseDetails = true; #endif #if NET9_0_OR_GREATER socket.Options.KeepAliveTimeout = Parameters.KeepAliveTimeout ?? TimeSpan.FromSeconds(10); #endif } catch (PlatformNotSupportedException) { // Options are not supported on certain platforms (WebAssembly for instance) // best we can do it try to connect without setting options. } return socket; } private async Task ConnectInternalAsync(CancellationToken ct) { _logger.SocketConnecting(Id); try { if (Parameters.RateLimiter != null) { var limitResult = await Parameters.RateLimiter.ProcessAsync(_logger, Id, RateLimitItemType.Connection, _requestDefinition, _baseAddress, null, 1, Parameters.RateLimitingBehavior, null, _ctsSource.Token).ConfigureAwait(false); if (!limitResult) return new CallResult(new ClientRateLimitError("Connection limit reached")); } using CancellationTokenSource tcs = new(TimeSpan.FromSeconds(10)); using var linked = CancellationTokenSource.CreateLinkedTokenSource(tcs.Token, _ctsSource.Token, ct); await _socket.ConnectAsync(Uri, linked.Token).ConfigureAwait(false); } catch (Exception e) { if (ct.IsCancellationRequested) { _logger.SocketConnectingCanceled(Id); } else if (!_ctsSource.IsCancellationRequested) { // if _ctsSource was canceled this was already logged _logger.SocketConnectionFailed(Id, e.Message, e); } if (e is WebSocketException we) { #if (NET6_0_OR_GREATER) if (_socket.HttpStatusCode == HttpStatusCode.TooManyRequests) { await (OnConnectRateLimited?.Invoke() ?? Task.CompletedTask).ConfigureAwait(false); return new CallResult(new ServerRateLimitError(we.Message, we)); } if (_socket.HttpStatusCode == HttpStatusCode.Unauthorized) return new CallResult(new ServerError(new ErrorInfo(ErrorType.Unauthorized, "Server returned status code `401` when `101` was expected"))); #else // ClientWebSocket.HttpStatusCode is only available in .NET6+ https://learn.microsoft.com/en-us/dotnet/api/system.net.websockets.clientwebsocket.httpstatuscode?view=net-8.0 // Try to read 429 from the message instead if (we.Message.Contains("429")) { await (OnConnectRateLimited?.Invoke() ?? Task.CompletedTask).ConfigureAwait(false); return new CallResult(new ServerRateLimitError(we.Message, we)); } #endif } return new CallResult(new CantConnectError(e)); } _logger.SocketConnected(Id, Uri); return CallResult.SuccessResult; } /// private async Task ProcessAsync() { while (!_stopRequested) { _logger.SocketStartingProcessing(Id); SetProcessState(ProcessState.Processing); var sendTask = SendLoopAsync(); Task receiveTask; #if !NETSTANDARD2_0 receiveTask = ReceiveLoopNewAsync(); #else receiveTask = ReceiveLoopAsync(); #endif var timeoutTask = Parameters.Timeout != null && Parameters.Timeout > TimeSpan.FromSeconds(0) ? CheckTimeoutAsync() : Task.CompletedTask; await Task.WhenAll(sendTask, receiveTask, timeoutTask).ConfigureAwait(false); _logger.SocketFinishedProcessing(Id); SetProcessState(ProcessState.WaitingForClose); while (_closeTask == null) await Task.Delay(50).ConfigureAwait(false); await _closeTask.ConfigureAwait(false); if (!_stopRequested) _closeTask = null; if (Parameters.ReconnectPolicy == ReconnectPolicy.Disabled) { SetProcessState(ProcessState.Idle); await (OnClose?.Invoke() ?? Task.CompletedTask).ConfigureAwait(false); return; } if (!_stopRequested) { SetProcessState(ProcessState.Reconnecting); await (OnReconnecting?.Invoke() ?? Task.CompletedTask).ConfigureAwait(false); } if (Parameters.RateLimiter != null) await Parameters.RateLimiter.ResetAsync(RateLimitItemType.Connection, _requestDefinition, _baseAddress, null, null, _ctsSource.Token).ConfigureAwait(false); // Delay here to prevent very rapid looping when a connection to the server is accepted and immediately disconnected var initialDelay = GetReconnectDelay(); await Task.Delay(initialDelay).ConfigureAwait(false); while (!_stopRequested) { _logger.SocketAttemptReconnect(Id); var task = GetReconnectionUrl?.Invoke(); if (task != null) { var reconnectUri = await task.ConfigureAwait(false); if (reconnectUri != null && Parameters.Uri.ToString() != reconnectUri.ToString()) { _logger.SocketSetReconnectUri(Id, reconnectUri); Parameters.Uri = reconnectUri; } } _socket?.Dispose(); _socket = CreateSocket(); _ctsSource.Dispose(); _ctsSource = new CancellationTokenSource(); while (_sendBuffer.TryDequeue(out _)) { } // Clear send buffer _reconnectAttempt++; var connected = await ConnectInternalAsync(default).ConfigureAwait(false); if (!connected) { // Delay between reconnect attempts var delay = GetReconnectDelay(); await Task.Delay(delay).ConfigureAwait(false); continue; } _reconnectAttempt = 0; _lastReconnectTime = DateTime.UtcNow; // Set to processing before reconnect handling SetProcessState(ProcessState.Processing); await (OnReconnected?.Invoke() ?? Task.CompletedTask).ConfigureAwait(false); break; } } SetProcessState(ProcessState.Idle); } private TimeSpan GetReconnectDelay() { if (_reconnectAttempt == 0) { // Means this is directly after disconnecting. Only delay if the last reconnect time is very recent var sinceLastReconnect = DateTime.UtcNow - _lastReconnectTime; if (sinceLastReconnect < TimeSpan.FromSeconds(5)) return TimeSpan.FromSeconds(5) - sinceLastReconnect; return TimeSpan.FromMilliseconds(1); } var delay = Parameters.ReconnectPolicy == ReconnectPolicy.FixedDelay ? Parameters.ReconnectInterval : TimeSpan.FromSeconds(Math.Pow(2, Math.Min(5, _reconnectAttempt))); if (delay > TimeSpan.Zero) return delay; return TimeSpan.FromMilliseconds(1); } /// public virtual bool Send(int id, string data, int weight) { if (_ctsSource.IsCancellationRequested || _processState != ProcessState.Processing) return false; var bytes = Parameters.Encoding.GetBytes(data); _logger.SocketAddingBytesToSendBuffer(Id, id, bytes); _sendBuffer.Enqueue(new SendItem { Id = id, Type = WebSocketMessageType.Text, Weight = weight, Bytes = bytes }); _sendEvent.Set(); return true; } /// public virtual bool Send(int id, byte[] data, int weight) { if (_ctsSource.IsCancellationRequested || _processState != ProcessState.Processing) return false; _logger.SocketAddingBytesToSendBuffer(Id, id, data); _sendBuffer.Enqueue(new SendItem { Id = id, Type = WebSocketMessageType.Binary, Weight = weight, Bytes = data }); _sendEvent.Set(); return true; } /// public virtual async Task ReconnectAsync() { if (_processState != ProcessState.Processing && IsOpen) return; _logger.SocketReconnectRequested(Id); _closeTask = CloseInternalAsync(); await _closeTask.ConfigureAwait(false); } /// public virtual async Task CloseAsync() { await _closeSem.WaitAsync().ConfigureAwait(false); _stopRequested = true; try { if (_closeTask?.IsCompleted == false) { _logger.SocketCloseAsyncWaitingForExistingCloseTask(Id); await _closeTask.ConfigureAwait(false); return; } if (!IsOpen) { _logger.SocketCloseAsyncSocketNotOpen(Id); return; } _logger.SocketClosing(Id); _closeTask = CloseInternalAsync(); } finally { _closeSem.Release(); } await _closeTask.ConfigureAwait(false); if(_processTask != null) await _processTask.ConfigureAwait(false); await (OnClose?.Invoke() ?? Task.CompletedTask).ConfigureAwait(false); _logger.SocketClosed(Id); } /// /// Internal close method /// /// private async Task CloseInternalAsync() { if (_disposed) return; try { if (_socket.State == WebSocketState.CloseReceived) { await _socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing", default).ConfigureAwait(false); } else if (_socket.State == WebSocketState.Open) { await _socket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Closing", default).ConfigureAwait(false); var startWait = DateTime.UtcNow; while (_socket.State != WebSocketState.Closed && _socket.State != WebSocketState.Aborted) { // Wait until we receive close confirmation await Task.Delay(10).ConfigureAwait(false); if (DateTime.UtcNow - startWait > TimeSpan.FromSeconds(1)) break; // Wait for max 1 second, then just abort the connection } } } catch (Exception) { // Can sometimes throw an exception when socket is in aborted state due to timing // Websocket is set to Aborted state when the cancelation token is set during SendAsync/ReceiveAsync // So socket might go to aborted state, might still be open } if (!_disposed) _ctsSource.Cancel(); } /// /// Dispose the socket /// public void Dispose() { if (_disposed) return; if (_ctsSource?.IsCancellationRequested == false) _ctsSource.Cancel(); _logger.SocketDisposing(Id); _disposed = true; _socket.Dispose(); _ctsSource?.Dispose(); _logger.SocketDisposed(Id); } /// /// Loop for sending data /// /// private async Task SendLoopAsync() { try { while (true) { try { if (_sendBuffer.IsEmpty) await _sendEvent.WaitAsync(ct: _ctsSource.Token).ConfigureAwait(false); } catch (OperationCanceledException) { break; } if (_ctsSource.IsCancellationRequested) break; while (_sendBuffer.TryDequeue(out var data)) { if (Parameters.RateLimiter != null) { try { var limitResult = await Parameters.RateLimiter.ProcessAsync(_logger, data.Id, RateLimitItemType.Request, _requestDefinition, _baseAddress, null, data.Weight, Parameters.RateLimitingBehavior, null, _ctsSource.Token).ConfigureAwait(false); if (!limitResult) { await (OnRequestRateLimited?.Invoke(data.Id) ?? Task.CompletedTask).ConfigureAwait(false); continue; } } catch (OperationCanceledException) { // canceled break; } } try { await _socket.SendAsync(new ArraySegment(data.Bytes, 0, data.Bytes.Length), data.Type, true, _ctsSource.Token).ConfigureAwait(false); await (OnRequestSent?.Invoke(data.Id) ?? Task.CompletedTask).ConfigureAwait(false); _logger.SocketSentBytes(Id, data.Id, data.Bytes.Length); } catch (OperationCanceledException) { // canceled break; } catch (Exception ioe) { // Connection closed unexpectedly, .NET framework await (OnError?.Invoke(ioe) ?? Task.CompletedTask).ConfigureAwait(false); if (_closeTask?.IsCompleted != false) _closeTask = CloseInternalAsync(); break; } } } } catch (Exception e) { // Because this is running in a separate task and not awaited until the socket gets closed // any exception here will crash the send processing, but do so silently unless the socket get's stopped. // Make sure we at least let the owner know there was an error _logger.SocketSendLoopStoppedWithException(Id, e.Message, e); await (OnError?.Invoke(e) ?? Task.CompletedTask).ConfigureAwait(false); if (_closeTask?.IsCompleted != false) _closeTask = CloseInternalAsync(); } finally { _logger.SocketSendLoopFinished(Id); } } #if NETSTANDARD2_0 /// /// Loop for receiving and reassembling data /// /// private async Task ReceiveLoopAsync() { byte[] rentedBuffer = _receiveBufferPool.Rent(_receiveBufferSize); var buffer = new ArraySegment(rentedBuffer); try { while (true) { if (_ctsSource.IsCancellationRequested) break; MemoryStream? multipartStream = null; WebSocketReceiveResult? receiveResult = null; bool multiPartMessage = false; while (true) { try { receiveResult = await _socket.ReceiveAsync(buffer, _ctsSource.Token).ConfigureAwait(false); _bytesReceived += receiveResult.Count; } catch (OperationCanceledException ex) { if (ex.InnerException?.InnerException?.Message.Contains("KeepAliveTimeout") == true) // Specific case that the websocket connection got closed because of a ping frame timeout // Unfortunately doesn't seem to be a nicer way to catch _logger.SocketPingTimeout(Id); if (_closeTask?.IsCompleted != false) _closeTask = CloseInternalAsync(); // canceled break; } catch (Exception wse) { if (!_ctsSource.Token.IsCancellationRequested && !_stopRequested) // Connection closed unexpectedly await (OnError?.Invoke(wse) ?? Task.CompletedTask).ConfigureAwait(false); if (_closeTask?.IsCompleted != false) _closeTask = CloseInternalAsync(); break; } LastReceiveTime = DateTime.UtcNow; if (receiveResult.MessageType == WebSocketMessageType.Close) { // Connection closed if (_socket.State == WebSocketState.CloseReceived) { // Close received means it server initiated, we should send a confirmation and close the socket _logger.SocketReceivedCloseMessage(Id, receiveResult.CloseStatus.ToString()!, receiveResult.CloseStatusDescription ?? string.Empty); if (_closeTask?.IsCompleted != false) _closeTask = CloseInternalAsync(); } else { // Means the socket is now closed and we were the one initiating it _logger.SocketReceivedCloseConfirmation(Id, receiveResult.CloseStatus.ToString()!, receiveResult.CloseStatusDescription ?? string.Empty); } break; } if (!receiveResult.EndOfMessage) { // We received data, but it is not complete, write it to a memory stream for reassembling multiPartMessage = true; if (_logger.IsEnabled(LogLevel.Trace)) _logger.SocketReceivedPartialMessage(Id, receiveResult.Count); // Write the data to a memory stream to be reassembled later if (multipartStream == null) multipartStream = new MemoryStream(); multipartStream.Write(buffer.Array!, buffer.Offset, receiveResult.Count); } else { if (!multiPartMessage) { // Received a complete message and it's not multi part if (_logger.IsEnabled(LogLevel.Trace)) _logger.SocketReceivedSingleMessage(Id, receiveResult.Count); ProcessDataNew(receiveResult.MessageType, new ReadOnlySpan(buffer.Array!, buffer.Offset, receiveResult.Count)); } else { // Received the end of a multipart message, write to memory stream for reassembling if (_logger.IsEnabled(LogLevel.Trace)) _logger.SocketReceivedPartialMessage(Id, receiveResult.Count); multipartStream!.Write(buffer.Array!, buffer.Offset, receiveResult.Count); } break; } } UpdateReceivedMessages(); if (receiveResult?.MessageType == WebSocketMessageType.Close) // Received close message break; if (receiveResult == null || _ctsSource.IsCancellationRequested) // Error during receiving or cancellation requested, stop. break; if (multiPartMessage) { // When the connection gets interrupted we might not have received a full message if (receiveResult?.EndOfMessage == true) { if (_logger.IsEnabled(LogLevel.Trace)) _logger.SocketReassembledMessage(Id, multipartStream!.Length); // Get the underlying buffer of the memory stream holding the written data and delimit it (GetBuffer return the full array, not only the written part) ProcessDataNew(receiveResult.MessageType, new ReadOnlySpan(multipartStream!.GetBuffer(), 0, (int)multipartStream.Length)); } else { _logger.SocketDiscardIncompleteMessage(Id, multipartStream!.Length); } } } } catch (Exception e) { // Because this is running in a separate task and not awaited until the socket gets closed // any exception here will crash the receive processing, but do so silently unless the socket gets stopped. // Make sure we at least let the owner know there was an error _logger.SocketReceiveLoopStoppedWithException(Id, e); await (OnError?.Invoke(e) ?? Task.CompletedTask).ConfigureAwait(false); if (_closeTask?.IsCompleted != false) _closeTask = CloseInternalAsync(); } finally { _receiveBufferPool.Return(rentedBuffer, true); _logger.SocketReceiveLoopFinished(Id); } } #endif #if !NETSTANDARD2_0 /// /// Loop for receiving and reassembling data /// /// private async Task ReceiveLoopNewAsync() { byte[] rentedBuffer = _receiveBufferPool.Rent(_receiveBufferSize); var buffer = new Memory(rentedBuffer); try { while (true) { if (_ctsSource.IsCancellationRequested) break; MemoryStream? multipartStream = null; ValueWebSocketReceiveResult receiveResult = new(); bool multiPartMessage = false; while (true) { try { receiveResult = await _socket.ReceiveAsync(buffer, _ctsSource.Token).ConfigureAwait(false); _bytesReceived += receiveResult.Count; } catch (OperationCanceledException ex) { if (ex.InnerException?.InnerException?.Message.Contains("KeepAliveTimeout") == true) // Specific case that the websocket connection got closed because of a ping frame timeout // Unfortunately doesn't seem to be a nicer way to catch _logger.SocketPingTimeout(Id); if (_closeTask?.IsCompleted != false) _closeTask = CloseInternalAsync(); // canceled break; } catch (Exception wse) { if (!_ctsSource.Token.IsCancellationRequested && !_stopRequested) // Connection closed unexpectedly await (OnError?.Invoke(wse) ?? Task.CompletedTask).ConfigureAwait(false); if (_closeTask?.IsCompleted != false) _closeTask = CloseInternalAsync(); break; } LastReceiveTime = DateTime.UtcNow; if (receiveResult.MessageType == WebSocketMessageType.Close) { // Connection closed if (_socket.State == WebSocketState.CloseReceived) { // Close received means it server initiated, we should send a confirmation and close the socket _logger.SocketReceivedCloseMessage(Id, _socket.CloseStatus.ToString()!, _socket.CloseStatusDescription ?? string.Empty); if (_closeTask?.IsCompleted != false) _closeTask = CloseInternalAsync(); } else { // Means the socket is now closed and we were the one initiating it _logger.SocketReceivedCloseConfirmation(Id, _socket.CloseStatus.ToString()!, _socket.CloseStatusDescription ?? string.Empty); } break; } if (!receiveResult.EndOfMessage) { // We received data, but it is not complete, write it to a memory stream for reassembling multiPartMessage = true; if (_logger.IsEnabled(LogLevel.Trace)) _logger.SocketReceivedPartialMessage(Id, receiveResult.Count); // Write the data to a memory stream to be reassembled later multipartStream ??= new MemoryStream(); multipartStream.Write(buffer.Span.Slice(0, receiveResult.Count)); } else { if (!multiPartMessage) { // Received a complete message and it's not multi part if (_logger.IsEnabled(LogLevel.Trace)) _logger.SocketReceivedSingleMessage(Id, receiveResult.Count); ProcessDataNew(receiveResult.MessageType, buffer.Span.Slice(0, receiveResult.Count)); } else { // Received the end of a multipart message, write to memory stream for reassembling if (_logger.IsEnabled(LogLevel.Trace)) _logger.SocketReceivedPartialMessage(Id, receiveResult.Count); multipartStream!.Write(buffer.Span.Slice(0, receiveResult.Count)); } break; } } UpdateReceivedMessages(); if (receiveResult.MessageType == WebSocketMessageType.Close) // Received close message break; if (_ctsSource.IsCancellationRequested) // Error during receiving or cancellation requested, stop. break; if (multiPartMessage) { // When the connection gets interrupted we might not have received a full message if (receiveResult.EndOfMessage == true) { if (_logger.IsEnabled(LogLevel.Trace)) _logger.SocketReassembledMessage(Id, multipartStream!.Length); // Get the underlying buffer of the memory stream holding the written data and delimit it (GetBuffer return the full array, not only the written part) ProcessDataNew(receiveResult.MessageType, new ReadOnlySpan(multipartStream!.GetBuffer(), 0, (int)multipartStream.Length)); } else { _logger.SocketDiscardIncompleteMessage(Id, multipartStream!.Length); } } } } catch(Exception e) { // Because this is running in a separate task and not awaited until the socket gets closed // any exception here will crash the receive processing, but do so silently unless the socket gets stopped. // Make sure we at least let the owner know there was an error _logger.SocketReceiveLoopStoppedWithException(Id, e); await (OnError?.Invoke(e) ?? Task.CompletedTask).ConfigureAwait(false); if (_closeTask?.IsCompleted != false) _closeTask = CloseInternalAsync(); } finally { _receiveBufferPool.Return(rentedBuffer, true); _logger.SocketReceiveLoopFinished(Id); } } #endif /// /// Process a stream message /// /// /// /// protected void ProcessDataNew(WebSocketMessageType type, ReadOnlySpan data) { _connection.HandleStreamMessage2(type, data); } /// /// Checks if there is no data received for a period longer than the specified timeout /// /// protected async Task CheckTimeoutAsync() { _logger.SocketStartingTaskForNoDataReceivedCheck(Id, Parameters.Timeout); LastReceiveTime = DateTime.UtcNow; try { while (true) { if (_ctsSource.IsCancellationRequested) return; if (DateTime.UtcNow - LastReceiveTime > Parameters.Timeout) { _logger.SocketNoDataReceiveTimoutReconnect(Id, Parameters.Timeout); _ = ReconnectAsync().ConfigureAwait(false); return; } try { await Task.Delay(500, _ctsSource.Token).ConfigureAwait(false); } catch (OperationCanceledException) { // canceled break; } } } catch (Exception e) { // Because this is running in a separate task and not awaited until the socket gets closed // any exception here will stop the timeout checking, but do so silently unless the socket get's stopped. // Make sure we at least let the owner know there was an error await (OnError?.Invoke(e) ?? Task.CompletedTask).ConfigureAwait(false); } } /// /// Get the next identifier /// /// private static int NextStreamId() => Interlocked.Increment(ref _lastStreamId); /// /// Update the received messages list, removing messages received longer than 3s ago /// protected void UpdateReceivedMessages() { var now = DateTime.UtcNow; var sinceLast = now - _lastBytesReceivedUpdate; if (sinceLast < TimeSpan.FromSeconds(3)) return; _prevSlotBytesReceivedUpdate = _lastBytesReceivedUpdate; _prevSlotBytesReceived = _bytesReceived; _bytesReceived = 0; _lastBytesReceivedUpdate = now; } /// /// Set proxy on socket /// /// /// /// protected virtual void SetProxy(ClientWebSocket socket, ApiProxy proxy) { if (!Uri.TryCreate($"{proxy.Host}:{proxy.Port}", UriKind.Absolute, out var uri)) throw new ArgumentException("Proxy settings invalid, {proxy.Host}:{proxy.Port} not a valid URI", nameof(proxy)); socket.Options.Proxy = uri?.Scheme == null ? socket.Options.Proxy = new WebProxy(proxy.Host, proxy.Port) : socket.Options.Proxy = new WebProxy { Address = uri }; if (proxy.Login != null) socket.Options.Proxy.Credentials = new NetworkCredential(proxy.Login, proxy.Password); } private void SetProcessState(ProcessState state) { if (_processState == state) return; _logger.SocketProcessingStateChanged(Id, _processState.ToString(), state.ToString()); _processState = state; } } /// /// Message info /// public struct SendItem { /// /// The request id /// public int Id { get; set; } /// /// The request weight /// public int Weight { get; set; } /// /// Timestamp the request was sent /// public DateTime SendTime { get; set; } /// /// Message type /// public WebSocketMessageType Type { get; set; } /// /// The bytes to send /// public byte[] Bytes { get; set; } } }