diff --git a/CryptoExchange.Net/Clients/SocketApiClient.cs b/CryptoExchange.Net/Clients/SocketApiClient.cs index 6d17d6f..f3b8e75 100644 --- a/CryptoExchange.Net/Clients/SocketApiClient.cs +++ b/CryptoExchange.Net/Clients/SocketApiClient.cs @@ -600,7 +600,8 @@ namespace CryptoExchange.Net.Clients RateLimiter = ClientOptions.RateLimiterEnabled ? RateLimiter : null, RateLimitingBehavior = ClientOptions.RateLimitingBehaviour, Proxy = ClientOptions.Proxy, - Timeout = ApiOptions.SocketNoDataTimeout ?? ClientOptions.SocketNoDataTimeout + Timeout = ApiOptions.SocketNoDataTimeout ?? ClientOptions.SocketNoDataTimeout, + ReceiveBufferSize = ClientOptions.ReceiveBufferSize, }; /// diff --git a/CryptoExchange.Net/Objects/Options/SocketExchangeOptions.cs b/CryptoExchange.Net/Objects/Options/SocketExchangeOptions.cs index babe3f5..e643f39 100644 --- a/CryptoExchange.Net/Objects/Options/SocketExchangeOptions.cs +++ b/CryptoExchange.Net/Objects/Options/SocketExchangeOptions.cs @@ -52,6 +52,15 @@ namespace CryptoExchange.Net.Objects.Options /// public TimeSpan? ConnectDelayAfterRateLimited { get; set; } + /// + /// The buffer size to use for receiving data. Leave unset to use the default buffer size. + /// + /// + /// Only specify this if you are creating a significant amount of connections and understand the typical message length we receive from the exchange. + /// Setting this too low can increase memory consumption and allocations. + /// + public int? ReceiveBufferSize { get; set; } + /// /// Create a copy of this options /// @@ -72,6 +81,7 @@ namespace CryptoExchange.Net.Objects.Options item.RequestTimeout = RequestTimeout; item.RateLimitingBehaviour = RateLimitingBehaviour; item.RateLimiterEnabled = RateLimiterEnabled; + item.ReceiveBufferSize = ReceiveBufferSize; return item; } } diff --git a/CryptoExchange.Net/Objects/Sockets/WebSocketParameters.cs b/CryptoExchange.Net/Objects/Sockets/WebSocketParameters.cs index c401c0e..31c3224 100644 --- a/CryptoExchange.Net/Objects/Sockets/WebSocketParameters.cs +++ b/CryptoExchange.Net/Objects/Sockets/WebSocketParameters.cs @@ -64,6 +64,11 @@ namespace CryptoExchange.Net.Objects.Sockets /// public Encoding Encoding { get; set; } = Encoding.UTF8; + /// + /// The buffer size to use for receiving data + /// + public int? ReceiveBufferSize { get; set; } = null; + /// /// ctor /// diff --git a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs index 32877fd..d1b8107 100644 --- a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs +++ b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs @@ -5,6 +5,7 @@ using CryptoExchange.Net.Objects.Sockets; using CryptoExchange.Net.RateLimiting; using Microsoft.Extensions.Logging; using System; +using System.Buffers; using System.Collections.Concurrent; using System.Collections.Generic; using System.IO; @@ -32,6 +33,7 @@ namespace CryptoExchange.Net.Sockets internal static int _lastStreamId; private static readonly object _streamIdLock = new(); + private static readonly ArrayPool _receiveBufferPool = ArrayPool.Shared; private readonly AsyncResetEvent _sendEvent; private readonly ConcurrentQueue _sendBuffer; @@ -48,8 +50,9 @@ namespace CryptoExchange.Net.Sockets private DateTime _lastReconnectTime; private readonly string _baseAddress; private int _reconnectAttempt; + private readonly int _receiveBufferSize; - private const int _receiveBufferSize = 1048576; + private const int _defaultReceiveBufferSize = 1048576; private const int _sendBufferSize = 4096; /// @@ -149,6 +152,7 @@ namespace CryptoExchange.Net.Sockets _sendBuffer = new ConcurrentQueue(); _ctsSource = new CancellationTokenSource(); _receivedMessagesLock = new object(); + _receiveBufferSize = websocketParameters.ReceiveBufferSize ?? _defaultReceiveBufferSize; _closeSem = new SemaphoreSlim(1, 1); _socket = CreateSocket(); @@ -566,8 +570,8 @@ namespace CryptoExchange.Net.Sockets /// private async Task ReceiveLoopAsync() { - var buffer = new ArraySegment(new byte[_receiveBufferSize]); - var received = 0; + byte[] rentedBuffer = _receiveBufferPool.Rent(_receiveBufferSize); + var buffer = new ArraySegment(rentedBuffer); try { while (true) @@ -583,7 +587,6 @@ namespace CryptoExchange.Net.Sockets try { receiveResult = await _socket.ReceiveAsync(buffer, _ctsSource.Token).ConfigureAwait(false); - received += receiveResult.Count; lock (_receivedMessagesLock) _receivedMessages.Add(new ReceiveItem(DateTime.UtcNow, receiveResult.Count)); } @@ -649,7 +652,7 @@ namespace CryptoExchange.Net.Sockets { // Received a complete message and it's not multi part _logger.SocketReceivedSingleMessage(Id, receiveResult.Count); - await ProcessData(receiveResult.MessageType, new ReadOnlyMemory(buffer.Array, buffer.Offset, receiveResult.Count)).ConfigureAwait(false); + await ProcessData(receiveResult.MessageType, new ReadOnlyMemory(buffer.Array!, buffer.Offset, receiveResult.Count)).ConfigureAwait(false); } else { @@ -705,6 +708,7 @@ namespace CryptoExchange.Net.Sockets } finally { + _receiveBufferPool.Return(rentedBuffer, true); _logger.SocketReceiveLoopFinished(Id); } }