diff --git a/CryptoExchange.Net/Clients/SocketApiClient.cs b/CryptoExchange.Net/Clients/SocketApiClient.cs index 064c559..21bf2d4 100644 --- a/CryptoExchange.Net/Clients/SocketApiClient.cs +++ b/CryptoExchange.Net/Clients/SocketApiClient.cs @@ -251,7 +251,7 @@ namespace CryptoExchange.Net.Clients return new CallResult(new ServerError("Socket is paused")); } - var waitEvent = new ManualResetEvent(false); + var waitEvent = new AsyncResetEvent(false); var subQuery = subscription.GetSubQuery(socketConnection); if (subQuery != null) { diff --git a/CryptoExchange.Net/Interfaces/IMessageProcessor.cs b/CryptoExchange.Net/Interfaces/IMessageProcessor.cs index 679959c..749e738 100644 --- a/CryptoExchange.Net/Interfaces/IMessageProcessor.cs +++ b/CryptoExchange.Net/Interfaces/IMessageProcessor.cs @@ -3,6 +3,7 @@ using CryptoExchange.Net.Objects.Sockets; using CryptoExchange.Net.Sockets; using System; using System.Collections.Generic; +using System.Threading.Tasks; namespace CryptoExchange.Net.Interfaces { @@ -25,7 +26,7 @@ namespace CryptoExchange.Net.Interfaces /// /// /// - CallResult Handle(SocketConnection connection, DataEvent message); + Task Handle(SocketConnection connection, DataEvent message); /// /// Get the type the message should be deserialized to /// diff --git a/CryptoExchange.Net/Interfaces/IWebsocket.cs b/CryptoExchange.Net/Interfaces/IWebsocket.cs index 92c0a50..09bffae 100644 --- a/CryptoExchange.Net/Interfaces/IWebsocket.cs +++ b/CryptoExchange.Net/Interfaces/IWebsocket.cs @@ -17,7 +17,7 @@ namespace CryptoExchange.Net.Interfaces /// /// Websocket message received event /// - event Action> OnStreamMessage; + event Func, Task> OnStreamMessage; /// /// Websocket sent event, RequestId as parameter /// diff --git a/CryptoExchange.Net/Logging/Extensions/CryptoExchangeWebSocketClientLoggingExtension.cs b/CryptoExchange.Net/Logging/Extensions/CryptoExchangeWebSocketClientLoggingExtension.cs index d1135a1..705cff8 100644 --- a/CryptoExchange.Net/Logging/Extensions/CryptoExchangeWebSocketClientLoggingExtension.cs +++ b/CryptoExchange.Net/Logging/Extensions/CryptoExchangeWebSocketClientLoggingExtension.cs @@ -151,7 +151,7 @@ namespace CryptoExchange.Net.Logging.Extensions "[Sckt {SocketId}] discarding incomplete message of {NumBytes} bytes"); _receiveLoopStoppedWithException = LoggerMessage.Define( - LogLevel.Warning, + LogLevel.Error, new EventId(1024, "ReceiveLoopStoppedWithException"), "[Sckt {SocketId}] receive loop stopped with exception"); diff --git a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs index 5c73381..6de3abe 100644 --- a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs +++ b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs @@ -108,7 +108,7 @@ namespace CryptoExchange.Net.Sockets public event Func? OnClose; /// - public event Action>? OnStreamMessage; + public event Func, Task>? OnStreamMessage; /// public event Func? OnRequestSent; @@ -582,7 +582,8 @@ namespace CryptoExchange.Net.Sockets { // Received a complete message and it's not multi part _logger.SocketReceivedSingleMessage(Id, receiveResult.Count); - ProcessData(receiveResult.MessageType, new ReadOnlyMemory(buffer.Array, buffer.Offset, receiveResult.Count)); + await ProcessData(receiveResult.MessageType, new ReadOnlyMemory(buffer.Array, buffer.Offset, receiveResult.Count)).ConfigureAwait(false); + _logger.LogTrace($"[Sckt {Id}] process completed"); } else { @@ -617,7 +618,7 @@ namespace CryptoExchange.Net.Sockets { _logger.SocketReassembledMessage(Id, multipartStream!.Length); // Get the underlying buffer of the memorystream holding the written data and delimit it (GetBuffer return the full array, not only the written part) - ProcessData(receiveResult.MessageType, new ReadOnlyMemory(multipartStream.GetBuffer(), 0, (int)multipartStream.Length)); + await ProcessData(receiveResult.MessageType, new ReadOnlyMemory(multipartStream.GetBuffer(), 0, (int)multipartStream.Length)).ConfigureAwait(false); } else { @@ -633,7 +634,8 @@ namespace CryptoExchange.Net.Sockets // Make sure we at least let the owner know there was an error _logger.SocketReceiveLoopStoppedWithException(Id, e); await (OnError?.Invoke(e) ?? Task.CompletedTask).ConfigureAwait(false); - throw; + if (_closeTask?.IsCompleted != false) + _closeTask = CloseInternalAsync(); } finally { @@ -647,10 +649,10 @@ namespace CryptoExchange.Net.Sockets /// /// /// - protected void ProcessData(WebSocketMessageType type, ReadOnlyMemory data) + protected async Task ProcessData(WebSocketMessageType type, ReadOnlyMemory data) { LastActionTime = DateTime.UtcNow; - OnStreamMessage?.Invoke(type, data); + await (OnStreamMessage?.Invoke(type, data) ?? Task.CompletedTask).ConfigureAwait(false); } /// diff --git a/CryptoExchange.Net/Sockets/Query.cs b/CryptoExchange.Net/Sockets/Query.cs index a45cd3c..5427b45 100644 --- a/CryptoExchange.Net/Sockets/Query.cs +++ b/CryptoExchange.Net/Sockets/Query.cs @@ -42,7 +42,7 @@ namespace CryptoExchange.Net.Sockets /// /// Wait event for the calling message processing thread /// - public ManualResetEvent? ContinueAwaiter { get; set; } + public AsyncResetEvent? ContinueAwaiter { get; set; } /// /// Strings to match this query to a received message @@ -135,7 +135,7 @@ namespace CryptoExchange.Net.Sockets /// /// /// - public abstract CallResult Handle(SocketConnection connection, DataEvent message); + public abstract Task Handle(SocketConnection connection, DataEvent message); } @@ -165,13 +165,14 @@ namespace CryptoExchange.Net.Sockets } /// - public override CallResult Handle(SocketConnection connection, DataEvent message) + public override async Task Handle(SocketConnection connection, DataEvent message) { Completed = true; Response = message.Data; Result = HandleMessage(connection, message.As((TServerResponse)message.Data)); _event.Set(); - ContinueAwaiter?.WaitOne(); + if (ContinueAwaiter != null) + await ContinueAwaiter.WaitAsync().ConfigureAwait(false); return Result; } diff --git a/CryptoExchange.Net/Sockets/SocketConnection.cs b/CryptoExchange.Net/Sockets/SocketConnection.cs index dad3ad1..88d7352 100644 --- a/CryptoExchange.Net/Sockets/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/SocketConnection.cs @@ -413,7 +413,7 @@ namespace CryptoExchange.Net.Sockets /// /// /// - protected virtual void HandleStreamMessage(WebSocketMessageType type, ReadOnlyMemory data) + protected virtual async Task HandleStreamMessage(WebSocketMessageType type, ReadOnlyMemory data) { var sw = Stopwatch.StartNew(); var receiveTime = DateTime.UtcNow; @@ -507,7 +507,7 @@ namespace CryptoExchange.Net.Sockets try { var innerSw = Stopwatch.StartNew(); - processor.Handle(this, new DataEvent(deserialized, null, null, originalData, receiveTime, null)); + await processor.Handle(this, new DataEvent(deserialized, null, null, originalData, receiveTime, null)).ConfigureAwait(false); totalUserTime += (int)innerSw.ElapsedMilliseconds; } catch (Exception ex) @@ -697,7 +697,7 @@ namespace CryptoExchange.Net.Sockets /// Wait event for when the socket message handler can continue /// Cancellation token /// - public virtual async Task SendAndWaitQueryAsync(Query query, ManualResetEvent? continueEvent = null, CancellationToken ct = default) + public virtual async Task SendAndWaitQueryAsync(Query query, AsyncResetEvent? continueEvent = null, CancellationToken ct = default) { await SendAndWaitIntAsync(query, continueEvent, ct).ConfigureAwait(false); return query.Result ?? new CallResult(new ServerError("Timeout")); @@ -712,13 +712,13 @@ namespace CryptoExchange.Net.Sockets /// Wait event for when the socket message handler can continue /// Cancellation token /// - public virtual async Task> SendAndWaitQueryAsync(Query query, ManualResetEvent? continueEvent = null, CancellationToken ct = default) + public virtual async Task> SendAndWaitQueryAsync(Query query, AsyncResetEvent? continueEvent = null, CancellationToken ct = default) { await SendAndWaitIntAsync(query, continueEvent, ct).ConfigureAwait(false); return query.TypedResult ?? new CallResult(new ServerError("Timeout")); } - private async Task SendAndWaitIntAsync(Query query, ManualResetEvent? continueEvent, CancellationToken ct = default) + private async Task SendAndWaitIntAsync(Query query, AsyncResetEvent? continueEvent, CancellationToken ct = default) { lock(_listenersLock) _listeners.Add(query); @@ -876,7 +876,7 @@ namespace CryptoExchange.Net.Sockets if (subQuery == null) continue; - var waitEvent = new ManualResetEvent(false); + var waitEvent = new AsyncResetEvent(false); taskList.Add(SendAndWaitQueryAsync(subQuery, waitEvent).ContinueWith((r) => { subscription.HandleSubQueryResponse(subQuery.Response!); diff --git a/CryptoExchange.Net/Sockets/Subscription.cs b/CryptoExchange.Net/Sockets/Subscription.cs index cae4e0a..2f68fbe 100644 --- a/CryptoExchange.Net/Sockets/Subscription.cs +++ b/CryptoExchange.Net/Sockets/Subscription.cs @@ -5,6 +5,7 @@ using Microsoft.Extensions.Logging; using System; using System.Collections.Generic; using System.Threading; +using System.Threading.Tasks; namespace CryptoExchange.Net.Sockets { @@ -122,11 +123,11 @@ namespace CryptoExchange.Net.Sockets /// /// /// - public CallResult Handle(SocketConnection connection, DataEvent message) + public Task Handle(SocketConnection connection, DataEvent message) { ConnectionInvocations++; TotalInvocations++; - return DoHandleMessage(connection, message); + return Task.FromResult(DoHandleMessage(connection, message)); } /// diff --git a/CryptoExchange.Net/Testing/Implementations/TestSocket.cs b/CryptoExchange.Net/Testing/Implementations/TestSocket.cs index 61b1c5a..5e28e9d 100644 --- a/CryptoExchange.Net/Testing/Implementations/TestSocket.cs +++ b/CryptoExchange.Net/Testing/Implementations/TestSocket.cs @@ -23,7 +23,7 @@ namespace CryptoExchange.Net.Testing.Implementations public event Func? OnError; #pragma warning restore 0067 public event Func? OnRequestSent; - public event Action>? OnStreamMessage; + public event Func, Task>? OnStreamMessage; public event Func? OnOpen; public int Id { get; }