diff --git a/CryptoExchange.Net/Sockets/Query.cs b/CryptoExchange.Net/Sockets/Query.cs index 25c379c..b21fbd1 100644 --- a/CryptoExchange.Net/Sockets/Query.cs +++ b/CryptoExchange.Net/Sockets/Query.cs @@ -22,6 +22,7 @@ namespace CryptoExchange.Net.Sockets public DateTime RequestTimestamp { get; set; } public CallResult? Result { get; set; } public BaseParsedMessage Response { get; set; } + public Action OnFinished { get; set; } protected AsyncResetEvent _event; protected CancellationTokenSource? _cts; @@ -133,6 +134,9 @@ namespace CryptoExchange.Net.Sockets Completed = true; Response = message.Data; Result = await HandleMessageAsync(connection, message.As((ParsedMessage)message.Data)).ConfigureAwait(false); + // Set() gives calling/waiting request the signal to continue and allows the message processing thread to continue with next message. + // However, the processing of the message isn't fully finished yet? + OnFinished?.Invoke(); _event.Set(); return Result; } @@ -152,6 +156,7 @@ namespace CryptoExchange.Net.Sockets Completed = true; Result = new CallResult(new CancellationRequestedError()); + OnFinished?.Invoke(); _event.Set(); } @@ -160,6 +165,7 @@ namespace CryptoExchange.Net.Sockets { Result = new CallResult(new ServerError(error)); Completed = true; + OnFinished?.Invoke(); _event.Set(); } } diff --git a/CryptoExchange.Net/Sockets/SocketConnection.cs b/CryptoExchange.Net/Sockets/SocketConnection.cs index 2868be3..9e18ee5 100644 --- a/CryptoExchange.Net/Sockets/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/SocketConnection.cs @@ -260,23 +260,25 @@ namespace CryptoExchange.Net.Sockets query.Fail("Connection interupted"); _listenerManager.Remove(query); } - // Mark subscription as 'not confirmed', only map updates to them if confirmed. Don't await sub answer here - var reconnectSuccessful = await ProcessReconnectAsync().ConfigureAwait(false); - if (!reconnectSuccessful) + _ = Task.Run(async () => { - _logger.Log(LogLevel.Warning, $"Socket {SocketId} Failed reconnect processing: {reconnectSuccessful.Error}, reconnecting again"); - await _socket.ReconnectAsync().ConfigureAwait(false); - } - else - { - Status = SocketStatus.Connected; - _ = Task.Run(() => + var reconnectSuccessful = await ProcessReconnectAsync().ConfigureAwait(false); + if (!reconnectSuccessful) { - ConnectionRestored?.Invoke(DateTime.UtcNow - DisconnectTime!.Value); - DisconnectTime = null; - }); - } + _logger.Log(LogLevel.Warning, $"Socket {SocketId} Failed reconnect processing: {reconnectSuccessful.Error}, reconnecting again"); + _ = _socket.ReconnectAsync().ConfigureAwait(false); + } + else + { + Status = SocketStatus.Connected; + _ = Task.Run(() => + { + ConnectionRestored?.Invoke(DateTime.UtcNow - DisconnectTime!.Value); + DisconnectTime = null; + }); + } + }); } /// @@ -315,11 +317,6 @@ namespace CryptoExchange.Net.Sockets /// protected virtual async Task HandleStreamMessage(WebSocketMessageType type, Stream stream) { - var buffer2 = new byte[stream.Length]; - stream.Position = 0; - stream.Read(buffer2, 0, buffer2.Length); - Debug.WriteLine("0 " + Encoding.UTF8.GetString(buffer2)); - stream.Position = 0; var result = ReadJson(type, stream); if (result == null) { @@ -388,7 +385,12 @@ namespace CryptoExchange.Net.Sockets var idInstance = accessor.Instantiate(typeResult); if (ApiClient.ApiOptions.OutputOriginalData ?? ApiClient.ClientOptions.OutputOriginalData) - idInstance.OriginalData = idInstance.OriginalData; + { + var buffer2 = new byte[stream.Length]; + stream.Position = 0; + stream.Read(buffer2, 0, buffer2.Length); + idInstance.OriginalData = Encoding.UTF8.GetString(buffer2); + } idInstance.StreamIdentifier = streamIdentity; idInstance.TypeIdentifier = typeIdentity; @@ -527,9 +529,9 @@ namespace CryptoExchange.Net.Sockets /// /// Query to send /// - public virtual async Task SendAndWaitQueryAsync(BaseQuery query) + public virtual async Task SendAndWaitQueryAsync(BaseQuery query, Action? onFinished = null) { - await SendAndWaitIntAsync(query).ConfigureAwait(false); + await SendAndWaitIntAsync(query, onFinished).ConfigureAwait(false); return query.Result ?? new CallResult(new ServerError("Timeout")); } @@ -539,13 +541,13 @@ namespace CryptoExchange.Net.Sockets /// Query response type /// Query to send /// - public virtual async Task> SendAndWaitQueryAsync(Query query) + public virtual async Task> SendAndWaitQueryAsync(Query query, Action? onFinished = null) { - await SendAndWaitIntAsync(query).ConfigureAwait(false); + await SendAndWaitIntAsync(query, onFinished).ConfigureAwait(false); return query.TypedResult ?? new CallResult(new ServerError("Timeout")); } - private async Task SendAndWaitIntAsync(BaseQuery query) + private async Task SendAndWaitIntAsync(BaseQuery query, Action onFinished) { _listenerManager.Add(query); var sendOk = Send(query.Id, query.Request, query.Weight); @@ -555,6 +557,7 @@ namespace CryptoExchange.Net.Sockets return; } + query.OnFinished = onFinished; while (true) { if (!_socket.IsOpen) @@ -665,14 +668,10 @@ namespace CryptoExchange.Net.Sockets if (subQuery == null) continue; - taskList.Add(SendAndWaitQueryAsync(subQuery).ContinueWith((x) => + taskList.Add(SendAndWaitQueryAsync(subQuery, () => { - Debug.WriteLine("1"); subscription.HandleSubQueryResponse(subQuery.Response); - Debug.WriteLine("2"); _listenerManager.Reset(subscription); - Debug.WriteLine("3"); - return x.Result; })); } diff --git a/CryptoExchange.Net/Sockets/SocketListenerManager.cs b/CryptoExchange.Net/Sockets/SocketListenerManager.cs index 55eb33b..71c1d30 100644 --- a/CryptoExchange.Net/Sockets/SocketListenerManager.cs +++ b/CryptoExchange.Net/Sockets/SocketListenerManager.cs @@ -71,7 +71,7 @@ namespace CryptoExchange.Net.Sockets { lock (_lock) { - Debug.WriteLine("4 Resetting"); + //Debug.WriteLine("4 Resetting"); Remove(processor); Add(processor); }