From b640690a0f326675f7594207db76efca698f83e6 Mon Sep 17 00:00:00 2001 From: JKorf Date: Thu, 30 Nov 2023 13:21:37 +0100 Subject: [PATCH] wip --- CryptoExchange.Net/Clients/SocketApiClient.cs | 10 +++----- .../Converters/JTokenAccessor.cs | 5 ++++ .../Converters/SocketConverter.cs | 25 ------------------- .../Interfaces/IMessageAccessor.cs | 1 + .../Sockets/CryptoExchangeWebSocketClient.cs | 1 + .../Sockets/SocketConnection.cs | 9 +++++-- 6 files changed, 17 insertions(+), 34 deletions(-) diff --git a/CryptoExchange.Net/Clients/SocketApiClient.cs b/CryptoExchange.Net/Clients/SocketApiClient.cs index 1cc5723..729124d 100644 --- a/CryptoExchange.Net/Clients/SocketApiClient.cs +++ b/CryptoExchange.Net/Clients/SocketApiClient.cs @@ -197,7 +197,7 @@ namespace CryptoExchange.Net socketConnection = socketResult.Data; // Add a subscription on the socket connection - var success = socketConnection.AddSubscription(subscription); + var success = socketConnection.CanAddSubscription(); if (!success) { _logger.Log(LogLevel.Trace, $"Socket {socketConnection.SocketId} failed to add subscription, retrying on different connection"); @@ -249,12 +249,8 @@ namespace CryptoExchange.Net subscription.HandleSubQueryResponse(subQuery.Response); } - else - { - // No request to be sent, so just mark the subscription as comfirmed - subscription.Confirmed = true; - } + subscription.Confirmed = true; if (ct != default) { subscription.CancellationTokenRegistration = ct.Register(async () => @@ -264,7 +260,7 @@ namespace CryptoExchange.Net }, false); } - subscription.Confirmed = true; + socketConnection.AddSubscription(subscription); _logger.Log(LogLevel.Information, $"Socket {socketConnection.SocketId} subscription {subscription.Id} completed successfully"); return new CallResult(new UpdateSubscription(socketConnection, subscription)); } diff --git a/CryptoExchange.Net/Converters/JTokenAccessor.cs b/CryptoExchange.Net/Converters/JTokenAccessor.cs index 75d5d47..16dec1e 100644 --- a/CryptoExchange.Net/Converters/JTokenAccessor.cs +++ b/CryptoExchange.Net/Converters/JTokenAccessor.cs @@ -34,6 +34,9 @@ namespace CryptoExchange.Net.Converters if (arr.Count <= index) return null; + if (arr[index].Type != JTokenType.String) + return null; + return arr[index].Value(); } @@ -58,6 +61,8 @@ namespace CryptoExchange.Net.Converters return accessToken?.ToString(); } + public bool IsObject(string? key) => _token.Type == JTokenType.Object; + private JToken? GetToken(string key) { if (_cache.TryGetValue(key, out var token)) diff --git a/CryptoExchange.Net/Converters/SocketConverter.cs b/CryptoExchange.Net/Converters/SocketConverter.cs index 09e2923..f36fdee 100644 --- a/CryptoExchange.Net/Converters/SocketConverter.cs +++ b/CryptoExchange.Net/Converters/SocketConverter.cs @@ -178,30 +178,5 @@ namespace CryptoExchange.Net.Converters var instance = (BaseParsedMessage)Activator.CreateInstance(resultMessageType, type == null ? null : token.ToObject(type, _serializer)); return instance; } - - - private string? GetValueForKey(JToken token, string key) - { - var splitTokens = key.Split(new char[] { ':' }); - var accessToken = token; - foreach (var splitToken in splitTokens) - { - accessToken = accessToken[splitToken]; - - if (accessToken == null) - break; - - if (accessToken.Type == JTokenType.Array) - { - // Received array, take first item as reference - accessToken = accessToken.First!; - } - } - - if (accessToken?.Type == JTokenType.Object) - return ((JObject)accessToken).Properties().First().Name; - - return accessToken?.ToString(); - } } } diff --git a/CryptoExchange.Net/Interfaces/IMessageAccessor.cs b/CryptoExchange.Net/Interfaces/IMessageAccessor.cs index a4e92a7..8b85bf3 100644 --- a/CryptoExchange.Net/Interfaces/IMessageAccessor.cs +++ b/CryptoExchange.Net/Interfaces/IMessageAccessor.cs @@ -6,6 +6,7 @@ namespace CryptoExchange.Net.Interfaces { public interface IMessageAccessor { + bool IsObject(string? key); string? GetStringValue(string key); int? GetIntValue(string key); public int? GetCount(string key); diff --git a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs index ad92e46..7a4d470 100644 --- a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs +++ b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs @@ -582,6 +582,7 @@ namespace CryptoExchange.Net.Sockets protected async Task ProcessData(WebSocketMessageType type, Stream stream) { + LastActionTime = DateTime.UtcNow; stream.Position = 0; if (Parameters.Interceptor != null) stream = Parameters.Interceptor.Invoke(stream); diff --git a/CryptoExchange.Net/Sockets/SocketConnection.cs b/CryptoExchange.Net/Sockets/SocketConnection.cs index 002f1c6..a32b14d 100644 --- a/CryptoExchange.Net/Sockets/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/SocketConnection.cs @@ -10,6 +10,7 @@ using System.Net.WebSockets; using System.IO; using CryptoExchange.Net.Objects.Sockets; using System.Text; +using System.Diagnostics.CodeAnalysis; namespace CryptoExchange.Net.Sockets { @@ -313,10 +314,12 @@ namespace CryptoExchange.Net.Sockets var result = ApiClient.StreamConverter.ReadJson(type, stream, _listenerManager.GetMapping(), ApiClient.ApiOptions.OutputOriginalData ?? ApiClient.ClientOptions.OutputOriginalData); if(result == null) { - // Not able to parse at all - stream.Position = 0; + // Not able to parse at all var buffer = new byte[stream.Length]; + stream.Position = 0; stream.Read(buffer, 0, buffer.Length); + _logger.LogDebug($"Socket {SocketId} Failed to parse data: {Encoding.UTF8.GetString(buffer)}"); + UnparsedMessage?.Invoke(buffer); return; } @@ -436,6 +439,8 @@ namespace CryptoExchange.Net.Sockets _socket.Dispose(); } + public bool CanAddSubscription() => Status == SocketStatus.None || Status == SocketStatus.Connected; + /// /// Add a subscription to this connection ///