1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-06-07 16:06:15 +00:00

Allow specifying the ReceiveMessageBuffer size on Websockets (#228)

In order to support more User websocket connections, allow reducing the memory requirements for the receive buffer, keeping the default buffer.
This commit is contained in:
James Carter 2025-02-23 18:58:05 +00:00 committed by GitHub
parent 3b15c35a02
commit 4c050744ad
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 26 additions and 6 deletions

View File

@ -600,7 +600,8 @@ namespace CryptoExchange.Net.Clients
RateLimiter = ClientOptions.RateLimiterEnabled ? RateLimiter : null, RateLimiter = ClientOptions.RateLimiterEnabled ? RateLimiter : null,
RateLimitingBehavior = ClientOptions.RateLimitingBehaviour, RateLimitingBehavior = ClientOptions.RateLimitingBehaviour,
Proxy = ClientOptions.Proxy, Proxy = ClientOptions.Proxy,
Timeout = ApiOptions.SocketNoDataTimeout ?? ClientOptions.SocketNoDataTimeout Timeout = ApiOptions.SocketNoDataTimeout ?? ClientOptions.SocketNoDataTimeout,
ReceiveBufferSize = ClientOptions.ReceiveBufferSize,
}; };
/// <summary> /// <summary>

View File

@ -52,6 +52,15 @@ namespace CryptoExchange.Net.Objects.Options
/// </summary> /// </summary>
public TimeSpan? ConnectDelayAfterRateLimited { get; set; } public TimeSpan? ConnectDelayAfterRateLimited { get; set; }
/// <summary>
/// The buffer size to use for receiving data. Leave unset to use the default buffer size.
/// </summary>
/// <remarks>
/// Only specify this if you are creating a significant amount of connections and understand the typical message length we receive from the exchange.
/// Setting this too low can increase memory consumption and allocations.
/// </remarks>
public int? ReceiveBufferSize { get; set; }
/// <summary> /// <summary>
/// Create a copy of this options /// Create a copy of this options
/// </summary> /// </summary>
@ -72,6 +81,7 @@ namespace CryptoExchange.Net.Objects.Options
item.RequestTimeout = RequestTimeout; item.RequestTimeout = RequestTimeout;
item.RateLimitingBehaviour = RateLimitingBehaviour; item.RateLimitingBehaviour = RateLimitingBehaviour;
item.RateLimiterEnabled = RateLimiterEnabled; item.RateLimiterEnabled = RateLimiterEnabled;
item.ReceiveBufferSize = ReceiveBufferSize;
return item; return item;
} }
} }

View File

@ -64,6 +64,11 @@ namespace CryptoExchange.Net.Objects.Sockets
/// </summary> /// </summary>
public Encoding Encoding { get; set; } = Encoding.UTF8; public Encoding Encoding { get; set; } = Encoding.UTF8;
/// <summary>
/// The buffer size to use for receiving data
/// </summary>
public int? ReceiveBufferSize { get; set; } = null;
/// <summary> /// <summary>
/// ctor /// ctor
/// </summary> /// </summary>

View File

@ -5,6 +5,7 @@ using CryptoExchange.Net.Objects.Sockets;
using CryptoExchange.Net.RateLimiting; using CryptoExchange.Net.RateLimiting;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using System; using System;
using System.Buffers;
using System.Collections.Concurrent; using System.Collections.Concurrent;
using System.Collections.Generic; using System.Collections.Generic;
using System.IO; using System.IO;
@ -32,6 +33,7 @@ namespace CryptoExchange.Net.Sockets
internal static int _lastStreamId; internal static int _lastStreamId;
private static readonly object _streamIdLock = new(); private static readonly object _streamIdLock = new();
private static readonly ArrayPool<byte> _receiveBufferPool = ArrayPool<byte>.Shared;
private readonly AsyncResetEvent _sendEvent; private readonly AsyncResetEvent _sendEvent;
private readonly ConcurrentQueue<SendItem> _sendBuffer; private readonly ConcurrentQueue<SendItem> _sendBuffer;
@ -48,8 +50,9 @@ namespace CryptoExchange.Net.Sockets
private DateTime _lastReconnectTime; private DateTime _lastReconnectTime;
private readonly string _baseAddress; private readonly string _baseAddress;
private int _reconnectAttempt; private int _reconnectAttempt;
private readonly int _receiveBufferSize;
private const int _receiveBufferSize = 1048576; private const int _defaultReceiveBufferSize = 1048576;
private const int _sendBufferSize = 4096; private const int _sendBufferSize = 4096;
/// <summary> /// <summary>
@ -149,6 +152,7 @@ namespace CryptoExchange.Net.Sockets
_sendBuffer = new ConcurrentQueue<SendItem>(); _sendBuffer = new ConcurrentQueue<SendItem>();
_ctsSource = new CancellationTokenSource(); _ctsSource = new CancellationTokenSource();
_receivedMessagesLock = new object(); _receivedMessagesLock = new object();
_receiveBufferSize = websocketParameters.ReceiveBufferSize ?? _defaultReceiveBufferSize;
_closeSem = new SemaphoreSlim(1, 1); _closeSem = new SemaphoreSlim(1, 1);
_socket = CreateSocket(); _socket = CreateSocket();
@ -566,8 +570,8 @@ namespace CryptoExchange.Net.Sockets
/// <returns></returns> /// <returns></returns>
private async Task ReceiveLoopAsync() private async Task ReceiveLoopAsync()
{ {
var buffer = new ArraySegment<byte>(new byte[_receiveBufferSize]); byte[] rentedBuffer = _receiveBufferPool.Rent(_receiveBufferSize);
var received = 0; var buffer = new ArraySegment<byte>(rentedBuffer);
try try
{ {
while (true) while (true)
@ -583,7 +587,6 @@ namespace CryptoExchange.Net.Sockets
try try
{ {
receiveResult = await _socket.ReceiveAsync(buffer, _ctsSource.Token).ConfigureAwait(false); receiveResult = await _socket.ReceiveAsync(buffer, _ctsSource.Token).ConfigureAwait(false);
received += receiveResult.Count;
lock (_receivedMessagesLock) lock (_receivedMessagesLock)
_receivedMessages.Add(new ReceiveItem(DateTime.UtcNow, receiveResult.Count)); _receivedMessages.Add(new ReceiveItem(DateTime.UtcNow, receiveResult.Count));
} }
@ -649,7 +652,7 @@ namespace CryptoExchange.Net.Sockets
{ {
// Received a complete message and it's not multi part // Received a complete message and it's not multi part
_logger.SocketReceivedSingleMessage(Id, receiveResult.Count); _logger.SocketReceivedSingleMessage(Id, receiveResult.Count);
await ProcessData(receiveResult.MessageType, new ReadOnlyMemory<byte>(buffer.Array, buffer.Offset, receiveResult.Count)).ConfigureAwait(false); await ProcessData(receiveResult.MessageType, new ReadOnlyMemory<byte>(buffer.Array!, buffer.Offset, receiveResult.Count)).ConfigureAwait(false);
} }
else else
{ {
@ -705,6 +708,7 @@ namespace CryptoExchange.Net.Sockets
} }
finally finally
{ {
_receiveBufferPool.Return(rentedBuffer, true);
_logger.SocketReceiveLoopFinished(Id); _logger.SocketReceiveLoopFinished(Id);
} }
} }