1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-12-14 01:33:26 +00:00
This commit is contained in:
JKorf 2025-11-19 21:46:46 +01:00 committed by Jkorf
parent b8fadf4fc0
commit f000d3a7d1
2 changed files with 47 additions and 85 deletions

View File

@ -60,19 +60,10 @@ namespace CryptoExchange.Net.Sockets
private const int _defaultReceiveBufferSize = 1048576;
private const int _sendBufferSize = 4096;
/// <summary>
/// Received messages, the size and the timestamp
/// </summary>
protected readonly List<ReceiveItem> _receivedMessages;
/// <summary>
/// Received messages lock
/// </summary>
#if NET9_0_OR_GREATER
private readonly Lock _receivedMessagesLock = new Lock();
#else
private readonly object _receivedMessagesLock = new object();
#endif
private int _bytesReceived = 0;
private int _prevSlotBytesReceived = 0;
private DateTime _lastBytesReceivedUpdate = DateTime.UtcNow;
private DateTime _prevSlotBytesReceivedUpdate = DateTime.UtcNow;
/// <summary>
/// Log
@ -104,15 +95,8 @@ namespace CryptoExchange.Net.Sockets
{
get
{
lock (_receivedMessagesLock)
{
UpdateReceivedMessages();
if (_receivedMessages.Count == 0)
return 0;
return Math.Round(_receivedMessages.Sum(v => v.Bytes) / 1000d / 3d);
}
UpdateReceivedMessages();
return Math.Round(_prevSlotBytesReceived * (_lastBytesReceivedUpdate - _prevSlotBytesReceivedUpdate).TotalSeconds / 1000);
}
}
@ -160,7 +144,6 @@ namespace CryptoExchange.Net.Sockets
_connection = connection;
Parameters = websocketParameters;
_receivedMessages = new List<ReceiveItem>();
_sendEvent = new AsyncResetEvent();
_sendBuffer = new ConcurrentQueue<SendItem>();
_ctsSource = new CancellationTokenSource();
@ -630,8 +613,7 @@ namespace CryptoExchange.Net.Sockets
try
{
receiveResult = await _socket.ReceiveAsync(buffer, _ctsSource.Token).ConfigureAwait(false);
lock (_receivedMessagesLock)
_receivedMessages.Add(new ReceiveItem(DateTime.UtcNow, receiveResult.Count));
_bytesReceived += receiveResult.Count;
}
catch (OperationCanceledException ex)
{
@ -682,7 +664,8 @@ namespace CryptoExchange.Net.Sockets
{
// We received data, but it is not complete, write it to a memory stream for reassembling
multiPartMessage = true;
_logger.SocketReceivedPartialMessage(Id, receiveResult.Count);
if (_logger.IsEnabled(LogLevel.Trace))
_logger.SocketReceivedPartialMessage(Id, receiveResult.Count);
// Write the data to a memory stream to be reassembled later
if (multipartStream == null)
@ -694,7 +677,9 @@ namespace CryptoExchange.Net.Sockets
if (!multiPartMessage)
{
// Received a complete message and it's not multi part
_logger.SocketReceivedSingleMessage(Id, receiveResult.Count);
if (_logger.IsEnabled(LogLevel.Trace))
_logger.SocketReceivedSingleMessage(Id, receiveResult.Count);
if (!Parameters.UseUpdatedDeserialization)
await ProcessData(receiveResult.MessageType, new ReadOnlyMemory<byte>(buffer.Array!, buffer.Offset, receiveResult.Count)).ConfigureAwait(false);
else
@ -703,7 +688,9 @@ namespace CryptoExchange.Net.Sockets
else
{
// Received the end of a multipart message, write to memory stream for reassembling
_logger.SocketReceivedPartialMessage(Id, receiveResult.Count);
if (_logger.IsEnabled(LogLevel.Trace))
_logger.SocketReceivedPartialMessage(Id, receiveResult.Count);
multipartStream!.Write(buffer.Array!, buffer.Offset, receiveResult.Count);
}
@ -711,8 +698,7 @@ namespace CryptoExchange.Net.Sockets
}
}
lock (_receivedMessagesLock)
UpdateReceivedMessages();
UpdateReceivedMessages();
if (receiveResult?.MessageType == WebSocketMessageType.Close)
{
@ -731,13 +717,15 @@ namespace CryptoExchange.Net.Sockets
// When the connection gets interrupted we might not have received a full message
if (receiveResult?.EndOfMessage == true)
{
_logger.SocketReassembledMessage(Id, multipartStream!.Length);
if (_logger.IsEnabled(LogLevel.Trace))
_logger.SocketReassembledMessage(Id, multipartStream!.Length);
// Get the underlying buffer of the memory stream holding the written data and delimit it (GetBuffer return the full array, not only the written part)
if (!Parameters.UseUpdatedDeserialization)
await ProcessData(receiveResult.MessageType, new ReadOnlyMemory<byte>(multipartStream.GetBuffer(), 0, (int)multipartStream.Length)).ConfigureAwait(false);
await ProcessData(receiveResult.MessageType, new ReadOnlyMemory<byte>(multipartStream!.GetBuffer(), 0, (int)multipartStream.Length)).ConfigureAwait(false);
else
ProcessDataNew(receiveResult.MessageType, new ReadOnlySpan<byte>(multipartStream.GetBuffer(), 0, (int)multipartStream.Length));
ProcessDataNew(receiveResult.MessageType, new ReadOnlySpan<byte>(multipartStream!.GetBuffer(), 0, (int)multipartStream.Length));
}
else
{
@ -787,8 +775,7 @@ namespace CryptoExchange.Net.Sockets
try
{
receiveResult = await _socket.ReceiveAsync(buffer, _ctsSource.Token).ConfigureAwait(false);
lock (_receivedMessagesLock)
_receivedMessages.Add(new ReceiveItem(DateTime.UtcNow, receiveResult.Count));
_bytesReceived += receiveResult.Count;
}
catch (OperationCanceledException ex)
{
@ -839,7 +826,9 @@ namespace CryptoExchange.Net.Sockets
{
// We received data, but it is not complete, write it to a memory stream for reassembling
multiPartMessage = true;
_logger.SocketReceivedPartialMessage(Id, receiveResult.Count);
if (_logger.IsEnabled(LogLevel.Trace))
_logger.SocketReceivedPartialMessage(Id, receiveResult.Count);
// Write the data to a memory stream to be reassembled later
multipartStream ??= new MemoryStream();
@ -850,13 +839,15 @@ namespace CryptoExchange.Net.Sockets
if (!multiPartMessage)
{
// Received a complete message and it's not multi part
_logger.SocketReceivedSingleMessage(Id, receiveResult.Count);
if (_logger.IsEnabled(LogLevel.Trace))
_logger.SocketReceivedSingleMessage(Id, receiveResult.Count);
ProcessDataNew(receiveResult.MessageType, buffer.Span.Slice(0, receiveResult.Count));
}
else
{
// Received the end of a multipart message, write to memory stream for reassembling
_logger.SocketReceivedPartialMessage(Id, receiveResult.Count);
if (_logger.IsEnabled(LogLevel.Trace))
_logger.SocketReceivedPartialMessage(Id, receiveResult.Count);
multipartStream!.Write(buffer.Span.Slice(0, receiveResult.Count));
}
@ -864,8 +855,7 @@ namespace CryptoExchange.Net.Sockets
}
}
lock (_receivedMessagesLock)
UpdateReceivedMessages();
UpdateReceivedMessages();
if (receiveResult.MessageType == WebSocketMessageType.Close)
{
@ -884,10 +874,11 @@ namespace CryptoExchange.Net.Sockets
// When the connection gets interrupted we might not have received a full message
if (receiveResult.EndOfMessage == true)
{
_logger.SocketReassembledMessage(Id, multipartStream!.Length);
// Get the underlying buffer of the memory stream holding the written data and delimit it (GetBuffer return the full array, not only the written part)
ProcessDataNew(receiveResult.MessageType, new ReadOnlySpan<byte>(multipartStream.GetBuffer(), 0, (int)multipartStream.Length));
if (_logger.IsEnabled(LogLevel.Trace))
_logger.SocketReassembledMessage(Id, multipartStream!.Length);
// Get the underlying buffer of the memory stream holding the written data and delimit it (GetBuffer return the full array, not only the written part)
ProcessDataNew(receiveResult.MessageType, new ReadOnlySpan<byte>(multipartStream!.GetBuffer(), 0, (int)multipartStream.Length));
}
else
{
@ -998,21 +989,16 @@ namespace CryptoExchange.Net.Sockets
/// </summary>
protected void UpdateReceivedMessages()
{
var checkTime = DateTime.UtcNow;
if (checkTime - _lastReceivedMessagesUpdate > TimeSpan.FromSeconds(1))
{
for (var i = 0; i < _receivedMessages.Count; i++)
{
var msg = _receivedMessages[i];
if (checkTime - msg.Timestamp > TimeSpan.FromSeconds(3))
{
_receivedMessages.Remove(msg);
i--;
}
}
var now = DateTime.UtcNow;
var sinceLast = now - _lastBytesReceivedUpdate;
if (sinceLast < TimeSpan.FromSeconds(3))
return;
_lastReceivedMessagesUpdate = checkTime;
}
_prevSlotBytesReceivedUpdate = _lastBytesReceivedUpdate;
_prevSlotBytesReceived = _bytesReceived;
_bytesReceived = 0;
_lastBytesReceivedUpdate = now;
}
/// <summary>
@ -1077,30 +1063,4 @@ namespace CryptoExchange.Net.Sockets
/// </summary>
public byte[] Bytes { get; set; }
}
/// <summary>
/// Received message info
/// </summary>
public struct ReceiveItem
{
/// <summary>
/// Timestamp of the received data
/// </summary>
public DateTime Timestamp { get; set; }
/// <summary>
/// Number of bytes received
/// </summary>
public int Bytes { get; set; }
/// <summary>
/// ctor
/// </summary>
/// <param name="timestamp"></param>
/// <param name="bytes"></param>
public ReceiveItem(DateTime timestamp, int bytes)
{
Timestamp = timestamp;
Bytes = bytes;
}
}
}

View File

@ -530,7 +530,9 @@ namespace CryptoExchange.Net.Sockets
#else
originalData = Encoding.UTF8.GetString(data);
#endif
_logger.ReceivedData(SocketId, originalData);
if (_logger.IsEnabled(LogLevel.Trace))
_logger.ReceivedData(SocketId, originalData);
}
var messageIdentifier = messageConverter.GetMessageIdentifier(data, type);