1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-06-07 16:06:15 +00:00

Refactor to prevent usage of thread block ManualResetEvent in socket message handling

This commit is contained in:
JKorf 2024-07-09 16:50:08 +02:00
parent 6951f31be7
commit 8dac3d7aa6
9 changed files with 28 additions and 23 deletions

View File

@ -251,7 +251,7 @@ namespace CryptoExchange.Net.Clients
return new CallResult<UpdateSubscription>(new ServerError("Socket is paused")); return new CallResult<UpdateSubscription>(new ServerError("Socket is paused"));
} }
var waitEvent = new ManualResetEvent(false); var waitEvent = new AsyncResetEvent(false);
var subQuery = subscription.GetSubQuery(socketConnection); var subQuery = subscription.GetSubQuery(socketConnection);
if (subQuery != null) if (subQuery != null)
{ {

View File

@ -3,6 +3,7 @@ using CryptoExchange.Net.Objects.Sockets;
using CryptoExchange.Net.Sockets; using CryptoExchange.Net.Sockets;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Interfaces namespace CryptoExchange.Net.Interfaces
{ {
@ -25,7 +26,7 @@ namespace CryptoExchange.Net.Interfaces
/// <param name="connection"></param> /// <param name="connection"></param>
/// <param name="message"></param> /// <param name="message"></param>
/// <returns></returns> /// <returns></returns>
CallResult Handle(SocketConnection connection, DataEvent<object> message); Task<CallResult> Handle(SocketConnection connection, DataEvent<object> message);
/// <summary> /// <summary>
/// Get the type the message should be deserialized to /// Get the type the message should be deserialized to
/// </summary> /// </summary>

View File

@ -17,7 +17,7 @@ namespace CryptoExchange.Net.Interfaces
/// <summary> /// <summary>
/// Websocket message received event /// Websocket message received event
/// </summary> /// </summary>
event Action<WebSocketMessageType, ReadOnlyMemory<byte>> OnStreamMessage; event Func<WebSocketMessageType, ReadOnlyMemory<byte>, Task> OnStreamMessage;
/// <summary> /// <summary>
/// Websocket sent event, RequestId as parameter /// Websocket sent event, RequestId as parameter
/// </summary> /// </summary>

View File

@ -151,7 +151,7 @@ namespace CryptoExchange.Net.Logging.Extensions
"[Sckt {SocketId}] discarding incomplete message of {NumBytes} bytes"); "[Sckt {SocketId}] discarding incomplete message of {NumBytes} bytes");
_receiveLoopStoppedWithException = LoggerMessage.Define<int>( _receiveLoopStoppedWithException = LoggerMessage.Define<int>(
LogLevel.Warning, LogLevel.Error,
new EventId(1024, "ReceiveLoopStoppedWithException"), new EventId(1024, "ReceiveLoopStoppedWithException"),
"[Sckt {SocketId}] receive loop stopped with exception"); "[Sckt {SocketId}] receive loop stopped with exception");

View File

@ -108,7 +108,7 @@ namespace CryptoExchange.Net.Sockets
public event Func<Task>? OnClose; public event Func<Task>? OnClose;
/// <inheritdoc /> /// <inheritdoc />
public event Action<WebSocketMessageType, ReadOnlyMemory<byte>>? OnStreamMessage; public event Func<WebSocketMessageType, ReadOnlyMemory<byte>, Task>? OnStreamMessage;
/// <inheritdoc /> /// <inheritdoc />
public event Func<int, Task>? OnRequestSent; public event Func<int, Task>? OnRequestSent;
@ -582,7 +582,8 @@ namespace CryptoExchange.Net.Sockets
{ {
// Received a complete message and it's not multi part // Received a complete message and it's not multi part
_logger.SocketReceivedSingleMessage(Id, receiveResult.Count); _logger.SocketReceivedSingleMessage(Id, receiveResult.Count);
ProcessData(receiveResult.MessageType, new ReadOnlyMemory<byte>(buffer.Array, buffer.Offset, receiveResult.Count)); await ProcessData(receiveResult.MessageType, new ReadOnlyMemory<byte>(buffer.Array, buffer.Offset, receiveResult.Count)).ConfigureAwait(false);
_logger.LogTrace($"[Sckt {Id}] process completed");
} }
else else
{ {
@ -617,7 +618,7 @@ namespace CryptoExchange.Net.Sockets
{ {
_logger.SocketReassembledMessage(Id, multipartStream!.Length); _logger.SocketReassembledMessage(Id, multipartStream!.Length);
// Get the underlying buffer of the memorystream holding the written data and delimit it (GetBuffer return the full array, not only the written part) // Get the underlying buffer of the memorystream holding the written data and delimit it (GetBuffer return the full array, not only the written part)
ProcessData(receiveResult.MessageType, new ReadOnlyMemory<byte>(multipartStream.GetBuffer(), 0, (int)multipartStream.Length)); await ProcessData(receiveResult.MessageType, new ReadOnlyMemory<byte>(multipartStream.GetBuffer(), 0, (int)multipartStream.Length)).ConfigureAwait(false);
} }
else else
{ {
@ -633,7 +634,8 @@ namespace CryptoExchange.Net.Sockets
// Make sure we at least let the owner know there was an error // Make sure we at least let the owner know there was an error
_logger.SocketReceiveLoopStoppedWithException(Id, e); _logger.SocketReceiveLoopStoppedWithException(Id, e);
await (OnError?.Invoke(e) ?? Task.CompletedTask).ConfigureAwait(false); await (OnError?.Invoke(e) ?? Task.CompletedTask).ConfigureAwait(false);
throw; if (_closeTask?.IsCompleted != false)
_closeTask = CloseInternalAsync();
} }
finally finally
{ {
@ -647,10 +649,10 @@ namespace CryptoExchange.Net.Sockets
/// <param name="type"></param> /// <param name="type"></param>
/// <param name="data"></param> /// <param name="data"></param>
/// <returns></returns> /// <returns></returns>
protected void ProcessData(WebSocketMessageType type, ReadOnlyMemory<byte> data) protected async Task ProcessData(WebSocketMessageType type, ReadOnlyMemory<byte> data)
{ {
LastActionTime = DateTime.UtcNow; LastActionTime = DateTime.UtcNow;
OnStreamMessage?.Invoke(type, data); await (OnStreamMessage?.Invoke(type, data) ?? Task.CompletedTask).ConfigureAwait(false);
} }
/// <summary> /// <summary>

View File

@ -42,7 +42,7 @@ namespace CryptoExchange.Net.Sockets
/// <summary> /// <summary>
/// Wait event for the calling message processing thread /// Wait event for the calling message processing thread
/// </summary> /// </summary>
public ManualResetEvent? ContinueAwaiter { get; set; } public AsyncResetEvent? ContinueAwaiter { get; set; }
/// <summary> /// <summary>
/// Strings to match this query to a received message /// Strings to match this query to a received message
@ -135,7 +135,7 @@ namespace CryptoExchange.Net.Sockets
/// <param name="message"></param> /// <param name="message"></param>
/// <param name="connection"></param> /// <param name="connection"></param>
/// <returns></returns> /// <returns></returns>
public abstract CallResult Handle(SocketConnection connection, DataEvent<object> message); public abstract Task<CallResult> Handle(SocketConnection connection, DataEvent<object> message);
} }
@ -165,13 +165,14 @@ namespace CryptoExchange.Net.Sockets
} }
/// <inheritdoc /> /// <inheritdoc />
public override CallResult Handle(SocketConnection connection, DataEvent<object> message) public override async Task<CallResult> Handle(SocketConnection connection, DataEvent<object> message)
{ {
Completed = true; Completed = true;
Response = message.Data; Response = message.Data;
Result = HandleMessage(connection, message.As((TServerResponse)message.Data)); Result = HandleMessage(connection, message.As((TServerResponse)message.Data));
_event.Set(); _event.Set();
ContinueAwaiter?.WaitOne(); if (ContinueAwaiter != null)
await ContinueAwaiter.WaitAsync().ConfigureAwait(false);
return Result; return Result;
} }

View File

@ -413,7 +413,7 @@ namespace CryptoExchange.Net.Sockets
/// <param name="data"></param> /// <param name="data"></param>
/// <param name="type"></param> /// <param name="type"></param>
/// <returns></returns> /// <returns></returns>
protected virtual void HandleStreamMessage(WebSocketMessageType type, ReadOnlyMemory<byte> data) protected virtual async Task HandleStreamMessage(WebSocketMessageType type, ReadOnlyMemory<byte> data)
{ {
var sw = Stopwatch.StartNew(); var sw = Stopwatch.StartNew();
var receiveTime = DateTime.UtcNow; var receiveTime = DateTime.UtcNow;
@ -507,7 +507,7 @@ namespace CryptoExchange.Net.Sockets
try try
{ {
var innerSw = Stopwatch.StartNew(); var innerSw = Stopwatch.StartNew();
processor.Handle(this, new DataEvent<object>(deserialized, null, null, originalData, receiveTime, null)); await processor.Handle(this, new DataEvent<object>(deserialized, null, null, originalData, receiveTime, null)).ConfigureAwait(false);
totalUserTime += (int)innerSw.ElapsedMilliseconds; totalUserTime += (int)innerSw.ElapsedMilliseconds;
} }
catch (Exception ex) catch (Exception ex)
@ -697,7 +697,7 @@ namespace CryptoExchange.Net.Sockets
/// <param name="continueEvent">Wait event for when the socket message handler can continue</param> /// <param name="continueEvent">Wait event for when the socket message handler can continue</param>
/// <param name="ct">Cancellation token</param> /// <param name="ct">Cancellation token</param>
/// <returns></returns> /// <returns></returns>
public virtual async Task<CallResult> SendAndWaitQueryAsync(Query query, ManualResetEvent? continueEvent = null, CancellationToken ct = default) public virtual async Task<CallResult> SendAndWaitQueryAsync(Query query, AsyncResetEvent? continueEvent = null, CancellationToken ct = default)
{ {
await SendAndWaitIntAsync(query, continueEvent, ct).ConfigureAwait(false); await SendAndWaitIntAsync(query, continueEvent, ct).ConfigureAwait(false);
return query.Result ?? new CallResult(new ServerError("Timeout")); return query.Result ?? new CallResult(new ServerError("Timeout"));
@ -712,13 +712,13 @@ namespace CryptoExchange.Net.Sockets
/// <param name="continueEvent">Wait event for when the socket message handler can continue</param> /// <param name="continueEvent">Wait event for when the socket message handler can continue</param>
/// <param name="ct">Cancellation token</param> /// <param name="ct">Cancellation token</param>
/// <returns></returns> /// <returns></returns>
public virtual async Task<CallResult<THandlerResponse>> SendAndWaitQueryAsync<TServerResponse, THandlerResponse>(Query<TServerResponse, THandlerResponse> query, ManualResetEvent? continueEvent = null, CancellationToken ct = default) public virtual async Task<CallResult<THandlerResponse>> SendAndWaitQueryAsync<TServerResponse, THandlerResponse>(Query<TServerResponse, THandlerResponse> query, AsyncResetEvent? continueEvent = null, CancellationToken ct = default)
{ {
await SendAndWaitIntAsync(query, continueEvent, ct).ConfigureAwait(false); await SendAndWaitIntAsync(query, continueEvent, ct).ConfigureAwait(false);
return query.TypedResult ?? new CallResult<THandlerResponse>(new ServerError("Timeout")); return query.TypedResult ?? new CallResult<THandlerResponse>(new ServerError("Timeout"));
} }
private async Task SendAndWaitIntAsync(Query query, ManualResetEvent? continueEvent, CancellationToken ct = default) private async Task SendAndWaitIntAsync(Query query, AsyncResetEvent? continueEvent, CancellationToken ct = default)
{ {
lock(_listenersLock) lock(_listenersLock)
_listeners.Add(query); _listeners.Add(query);
@ -876,7 +876,7 @@ namespace CryptoExchange.Net.Sockets
if (subQuery == null) if (subQuery == null)
continue; continue;
var waitEvent = new ManualResetEvent(false); var waitEvent = new AsyncResetEvent(false);
taskList.Add(SendAndWaitQueryAsync(subQuery, waitEvent).ContinueWith((r) => taskList.Add(SendAndWaitQueryAsync(subQuery, waitEvent).ContinueWith((r) =>
{ {
subscription.HandleSubQueryResponse(subQuery.Response!); subscription.HandleSubQueryResponse(subQuery.Response!);

View File

@ -5,6 +5,7 @@ using Microsoft.Extensions.Logging;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Threading; using System.Threading;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Sockets namespace CryptoExchange.Net.Sockets
{ {
@ -122,11 +123,11 @@ namespace CryptoExchange.Net.Sockets
/// <param name="connection"></param> /// <param name="connection"></param>
/// <param name="message"></param> /// <param name="message"></param>
/// <returns></returns> /// <returns></returns>
public CallResult Handle(SocketConnection connection, DataEvent<object> message) public Task<CallResult> Handle(SocketConnection connection, DataEvent<object> message)
{ {
ConnectionInvocations++; ConnectionInvocations++;
TotalInvocations++; TotalInvocations++;
return DoHandleMessage(connection, message); return Task.FromResult(DoHandleMessage(connection, message));
} }
/// <summary> /// <summary>

View File

@ -23,7 +23,7 @@ namespace CryptoExchange.Net.Testing.Implementations
public event Func<Exception, Task>? OnError; public event Func<Exception, Task>? OnError;
#pragma warning restore 0067 #pragma warning restore 0067
public event Func<int, Task>? OnRequestSent; public event Func<int, Task>? OnRequestSent;
public event Action<WebSocketMessageType, ReadOnlyMemory<byte>>? OnStreamMessage; public event Func<WebSocketMessageType, ReadOnlyMemory<byte>, Task>? OnStreamMessage;
public event Func<Task>? OnOpen; public event Func<Task>? OnOpen;
public int Id { get; } public int Id { get; }