From c41e1289007bc1a8e3448214444b6097336bcad9 Mon Sep 17 00:00:00 2001 From: JKorf Date: Sat, 18 Nov 2023 14:53:50 +0100 Subject: [PATCH] wip --- CryptoExchange.Net/Clients/SocketApiClient.cs | 8 ++ .../Converters/JTokenAccessor.cs | 83 +++++++++++++++++++ .../Converters/SocketConverter.cs | 49 +++++++++-- .../Interfaces/IMessageAccessor.cs | 15 ++++ .../Interfaces/IMessageProcessor.cs | 3 +- CryptoExchange.Net/Interfaces/IWebsocket.cs | 3 +- .../Sockets/StreamMessageParseCallback.cs | 20 ++++- .../Objects/Testing/TestWebsocket.cs | 2 +- .../Sockets/CryptoExchangeWebSocketClient.cs | 10 +-- CryptoExchange.Net/Sockets/Query.cs | 10 ++- .../Sockets/SocketConnection.cs | 18 ++-- .../Sockets/SocketListenerManager.cs | 12 ++- CryptoExchange.Net/Sockets/Subscription.cs | 26 ++++-- .../Sockets/SystemSubscription.cs | 6 +- 14 files changed, 224 insertions(+), 41 deletions(-) create mode 100644 CryptoExchange.Net/Converters/JTokenAccessor.cs create mode 100644 CryptoExchange.Net/Interfaces/IMessageAccessor.cs diff --git a/CryptoExchange.Net/Clients/SocketApiClient.cs b/CryptoExchange.Net/Clients/SocketApiClient.cs index 5bd0305..1cc5723 100644 --- a/CryptoExchange.Net/Clients/SocketApiClient.cs +++ b/CryptoExchange.Net/Clients/SocketApiClient.cs @@ -169,6 +169,9 @@ namespace CryptoExchange.Net if (_disposing) return new CallResult(new InvalidOperationError("Client disposed, can't subscribe")); + if (subscription.Authenticated && AuthenticationProvider == null) + return new CallResult(new NoApiCredentialsError()); + SocketConnection socketConnection; var released = false; // Wait for a semaphore here, so we only connect 1 socket at a time. @@ -243,6 +246,8 @@ namespace CryptoExchange.Net return new CallResult(subResult.Error!); } + + subscription.HandleSubQueryResponse(subQuery.Response); } else { @@ -355,6 +360,9 @@ namespace CryptoExchange.Net /// public virtual async Task> AuthenticateSocketAsync(SocketConnection socket) { + if (AuthenticationProvider == null) + return new CallResult(new NoApiCredentialsError()); + _logger.Log(LogLevel.Debug, $"Socket {socket.SocketId} Attempting to authenticate"); var authRequest = GetAuthenticationRequest(); var result = await socket.SendAndWaitQueryAsync(authRequest).ConfigureAwait(false); diff --git a/CryptoExchange.Net/Converters/JTokenAccessor.cs b/CryptoExchange.Net/Converters/JTokenAccessor.cs new file mode 100644 index 0000000..75d5d47 --- /dev/null +++ b/CryptoExchange.Net/Converters/JTokenAccessor.cs @@ -0,0 +1,83 @@ +using CryptoExchange.Net.Interfaces; +using Newtonsoft.Json.Linq; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; + +namespace CryptoExchange.Net.Converters +{ + internal class JTokenAccessor : IMessageAccessor + { + private readonly JToken _token; + private Dictionary _cache = new Dictionary(); + + public JTokenAccessor(JToken token) + { + _token = token; + } + + public int? GetArrayIntValue(string? key, int index) + { + var accessToken = key == null ? _token : GetToken(key); + if (accessToken == null || accessToken is not JArray arr) + return null; + return arr[index].Value(); + } + + public string? GetArrayStringValue(string? key, int index) + { + var accessToken = key == null ? _token : GetToken(key); + if (accessToken == null || accessToken is not JArray arr) + return null; + + if (arr.Count <= index) + return null; + + return arr[index].Value(); + } + + public int? GetCount(string key) + { + var accessToken = GetToken(key); + return accessToken.Count(); + } + + public int? GetIntValue(string key) + { + var accessToken = GetToken(key); + return accessToken?.Value(); + } + + public string? GetStringValue(string key) + { + var accessToken = GetToken(key); + if (accessToken?.Type == JTokenType.Object) + return ((JObject)accessToken).Properties().First().Name; + + return accessToken?.ToString(); + } + + private JToken? GetToken(string key) + { + if (_cache.TryGetValue(key, out var token)) + return token; + + var splitTokens = key.Split(new char[] { ':' }); + var accessToken = _token; + foreach (var splitToken in splitTokens) + { + if (accessToken.Type == JTokenType.Array) + return null; + + accessToken = accessToken[splitToken]; + + if (accessToken == null) + break; + } + + _cache.Add(key, accessToken); + return accessToken; + } + } +} diff --git a/CryptoExchange.Net/Converters/SocketConverter.cs b/CryptoExchange.Net/Converters/SocketConverter.cs index cf7972b..09e2923 100644 --- a/CryptoExchange.Net/Converters/SocketConverter.cs +++ b/CryptoExchange.Net/Converters/SocketConverter.cs @@ -8,6 +8,7 @@ using System; using System.Collections.Generic; using System.IO; using System.Linq; +using System.Net.WebSockets; using System.Text; namespace CryptoExchange.Net.Converters @@ -22,13 +23,16 @@ namespace CryptoExchange.Net.Converters public abstract MessageInterpreterPipeline InterpreterPipeline { get; } /// - public BaseParsedMessage? ReadJson(Stream stream, Dictionary processors, bool outputOriginalData) + public BaseParsedMessage? ReadJson(WebSocketMessageType websocketMessageType, Stream stream, Dictionary processors, bool outputOriginalData) { // Start reading the data // Once we reach the properties that identify the message we save those in a dict // Once all id properties have been read callback to see what the deserialization type should be // Deserialize to the correct type + if (InterpreterPipeline.PreProcessCallback != null) + stream = InterpreterPipeline.PreProcessCallback(websocketMessageType, stream); + using var sr = new StreamReader(stream, Encoding.UTF8, false, (int)stream.Length, true); foreach (var callback in InterpreterPipeline.PreInspectCallbacks) { @@ -56,14 +60,36 @@ namespace CryptoExchange.Net.Converters { token = JToken.Load(jsonTextReader); } - catch(Exception) + catch(Exception ex) { // Not a json message return null; } + var accessor = new JTokenAccessor(token); + + if (InterpreterPipeline.GetIdentity != null) + { + var identity = InterpreterPipeline.GetIdentity(accessor); + if (identity != null) + { + if (processors.TryGetValue(identity, out var type)) + { + var idInstance = InterpreterPipeline.ObjectInitializer(token, type); + if (outputOriginalData) + { + stream.Position = 0; + idInstance.OriginalData = sr.ReadToEnd(); + } + + idInstance.Identifier = identity; + idInstance.Parsed = true; + return idInstance; + } + } + } + PostInspectResult? inspectResult = null; - Dictionary typeIdDict = new Dictionary(); object? usedParser = null; if (token.Type == JTokenType.Object) { @@ -72,22 +98,20 @@ namespace CryptoExchange.Net.Converters bool allFieldsPresent = true; foreach (var field in callback.TypeFields) { - var value = typeIdDict.TryGetValue(field, out var cachedValue) ? cachedValue : GetValueForKey(token, field); + var value = accessor.GetStringValue(field.Key); if (value == null) { - if (callback.AllFieldPresentNeeded) + if (field.Required) { allFieldsPresent = false; break; } } - - typeIdDict[field] = value; } if (allFieldsPresent) { - inspectResult = callback.Callback(typeIdDict, processors); + inspectResult = callback.Callback(accessor, processors); usedParser = callback; if (inspectResult.Type != null) break; @@ -126,7 +150,10 @@ namespace CryptoExchange.Net.Converters } if (usedParser == null) - throw new Exception("No parser found for message"); + { + //throw new Exception("No parser found for message"); + return null; + } BaseParsedMessage instance; if (inspectResult.Type != null) @@ -152,6 +179,7 @@ namespace CryptoExchange.Net.Converters return instance; } + private string? GetValueForKey(JToken token, string key) { var splitTokens = key.Split(new char[] { ':' }); @@ -170,6 +198,9 @@ namespace CryptoExchange.Net.Converters } } + if (accessToken?.Type == JTokenType.Object) + return ((JObject)accessToken).Properties().First().Name; + return accessToken?.ToString(); } } diff --git a/CryptoExchange.Net/Interfaces/IMessageAccessor.cs b/CryptoExchange.Net/Interfaces/IMessageAccessor.cs new file mode 100644 index 0000000..a4e92a7 --- /dev/null +++ b/CryptoExchange.Net/Interfaces/IMessageAccessor.cs @@ -0,0 +1,15 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace CryptoExchange.Net.Interfaces +{ + public interface IMessageAccessor + { + string? GetStringValue(string key); + int? GetIntValue(string key); + public int? GetCount(string key); + public int? GetArrayIntValue(string? key, int index); + public string? GetArrayStringValue(string? key, int index); + } +} diff --git a/CryptoExchange.Net/Interfaces/IMessageProcessor.cs b/CryptoExchange.Net/Interfaces/IMessageProcessor.cs index 9fa1824..b842f3d 100644 --- a/CryptoExchange.Net/Interfaces/IMessageProcessor.cs +++ b/CryptoExchange.Net/Interfaces/IMessageProcessor.cs @@ -1,5 +1,6 @@ using CryptoExchange.Net.Objects; using CryptoExchange.Net.Objects.Sockets; +using CryptoExchange.Net.Sockets; using System; using System.Collections.Generic; using System.Text; @@ -11,7 +12,7 @@ namespace CryptoExchange.Net.Interfaces { public int Id { get; } public List Identifiers { get; } - Task HandleMessageAsync(DataEvent message); + Task HandleMessageAsync(SocketConnection connection, DataEvent message); public Type ExpectedMessageType { get; } } } diff --git a/CryptoExchange.Net/Interfaces/IWebsocket.cs b/CryptoExchange.Net/Interfaces/IWebsocket.cs index 2ab78a8..a64b54a 100644 --- a/CryptoExchange.Net/Interfaces/IWebsocket.cs +++ b/CryptoExchange.Net/Interfaces/IWebsocket.cs @@ -2,6 +2,7 @@ using CryptoExchange.Net.Sockets; using System; using System.IO; +using System.Net.WebSockets; using System.Security.Authentication; using System.Text; using System.Threading.Tasks; @@ -20,7 +21,7 @@ namespace CryptoExchange.Net.Interfaces /// /// Websocket message received event /// - event Func OnStreamMessage; + event Func OnStreamMessage; /// /// Websocket sent event, RequestId as parameter /// diff --git a/CryptoExchange.Net/Objects/Sockets/StreamMessageParseCallback.cs b/CryptoExchange.Net/Objects/Sockets/StreamMessageParseCallback.cs index 0037715..cd87415 100644 --- a/CryptoExchange.Net/Objects/Sockets/StreamMessageParseCallback.cs +++ b/CryptoExchange.Net/Objects/Sockets/StreamMessageParseCallback.cs @@ -5,13 +5,16 @@ using Newtonsoft.Json.Linq; using System; using System.Collections.Generic; using System.IO; +using System.Net.WebSockets; using System.Text; namespace CryptoExchange.Net.Objects.Sockets { public class MessageInterpreterPipeline { + public Func? PreProcessCallback { get; set; } public List PreInspectCallbacks { get; set; } = new List(); + public Func GetIdentity { get; set; } public List PostInspectCallbacks { get; set; } = new List(); public Func ObjectInitializer { get; set; } = SocketConverter.InstantiateMessageObject; } @@ -23,9 +26,20 @@ namespace CryptoExchange.Net.Objects.Sockets public class PostInspectCallback { - public bool AllFieldPresentNeeded { get; set; } = true; - public List TypeFields { get; set; } = new List(); - public Func, Dictionary, PostInspectResult> Callback { get; set; } + public List TypeFields { get; set; } = new List(); + public Func, PostInspectResult> Callback { get; set; } + } + + public class TypeField + { + public string Key { get; set; } + public bool Required { get; set; } + + public TypeField(string key, bool required = true) + { + Key = key; + Required = required; + } } public class PostInspectArrayCallback diff --git a/CryptoExchange.Net/Objects/Testing/TestWebsocket.cs b/CryptoExchange.Net/Objects/Testing/TestWebsocket.cs index 48664f6..1067434 100644 --- a/CryptoExchange.Net/Objects/Testing/TestWebsocket.cs +++ b/CryptoExchange.Net/Objects/Testing/TestWebsocket.cs @@ -33,7 +33,7 @@ namespace CryptoExchange.Net.Objects.Testing var bytes = Encoding.UTF8.GetBytes(data); var stream = new MemoryStream(bytes); stream.Position = 0; - _ = ProcessData(stream); + _ = ProcessData(System.Net.WebSockets.WebSocketMessageType.Text, stream); } } } diff --git a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs index 1409056..ad92e46 100644 --- a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs +++ b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs @@ -101,7 +101,7 @@ namespace CryptoExchange.Net.Sockets public event Action? OnClose; /// - public event Func? OnStreamMessage; + public event Func? OnStreamMessage; /// public event Action? OnRequestSent; @@ -521,7 +521,7 @@ namespace CryptoExchange.Net.Sockets { // Received a complete message and it's not multi part _logger.Log(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in single message"); - await ProcessData(new MemoryStream(buffer.Array, buffer.Offset, receiveResult.Count)).ConfigureAwait(false); + await ProcessData(receiveResult.MessageType, new MemoryStream(buffer.Array, buffer.Offset, receiveResult.Count)).ConfigureAwait(false); } else { @@ -555,7 +555,7 @@ namespace CryptoExchange.Net.Sockets { // Reassemble complete message from memory stream _logger.Log(LogLevel.Trace, $"Socket {Id} reassembled message of {memoryStream!.Length} bytes"); - await ProcessData(memoryStream).ConfigureAwait(false); + await ProcessData(receiveResult.MessageType, memoryStream).ConfigureAwait(false); memoryStream.Dispose(); } else @@ -580,13 +580,13 @@ namespace CryptoExchange.Net.Sockets } } - protected async Task ProcessData(Stream stream) + protected async Task ProcessData(WebSocketMessageType type, Stream stream) { stream.Position = 0; if (Parameters.Interceptor != null) stream = Parameters.Interceptor.Invoke(stream); if (OnStreamMessage != null) - await OnStreamMessage.Invoke(stream).ConfigureAwait(false); + await OnStreamMessage.Invoke(type, stream).ConfigureAwait(false); } /// diff --git a/CryptoExchange.Net/Sockets/Query.cs b/CryptoExchange.Net/Sockets/Query.cs index 9983e0f..2269ddd 100644 --- a/CryptoExchange.Net/Sockets/Query.cs +++ b/CryptoExchange.Net/Sockets/Query.cs @@ -21,6 +21,7 @@ namespace CryptoExchange.Net.Sockets public bool Completed { get; set; } public DateTime RequestTimestamp { get; set; } public CallResult? Result { get; set; } + public BaseParsedMessage Response { get; set; } protected AsyncResetEvent _event; protected CancellationTokenSource? _cts; @@ -96,7 +97,7 @@ namespace CryptoExchange.Net.Sockets /// /// /// - public abstract Task HandleMessageAsync(DataEvent message); + public abstract Task HandleMessageAsync(SocketConnection connection, DataEvent message); } @@ -124,10 +125,11 @@ namespace CryptoExchange.Net.Sockets } /// - public override async Task HandleMessageAsync(DataEvent message) + public override async Task HandleMessageAsync(SocketConnection connection, DataEvent message) { Completed = true; - Result = await HandleMessageAsync(message.As((ParsedMessage)message.Data)).ConfigureAwait(false); + Response = message.Data; + Result = await HandleMessageAsync(connection, message.As((ParsedMessage)message.Data)).ConfigureAwait(false); _event.Set(); return Result; } @@ -137,7 +139,7 @@ namespace CryptoExchange.Net.Sockets /// /// /// - public virtual Task> HandleMessageAsync(DataEvent> message) => Task.FromResult(new CallResult(message.Data.TypedData!)); + public virtual Task> HandleMessageAsync(SocketConnection connection, DataEvent> message) => Task.FromResult(new CallResult(message.Data.TypedData!)); /// public override void Timeout() diff --git a/CryptoExchange.Net/Sockets/SocketConnection.cs b/CryptoExchange.Net/Sockets/SocketConnection.cs index 7e69979..002f1c6 100644 --- a/CryptoExchange.Net/Sockets/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/SocketConnection.cs @@ -308,9 +308,9 @@ namespace CryptoExchange.Net.Sockets /// /// /// - protected virtual async Task HandleStreamMessage(Stream stream) + protected virtual async Task HandleStreamMessage(WebSocketMessageType type, Stream stream) { - var result = ApiClient.StreamConverter.ReadJson(stream, _listenerManager.GetMapping(), ApiClient.ApiOptions.OutputOriginalData ?? ApiClient.ClientOptions.OutputOriginalData); + var result = ApiClient.StreamConverter.ReadJson(type, stream, _listenerManager.GetMapping(), ApiClient.ApiOptions.OutputOriginalData ?? ApiClient.ClientOptions.OutputOriginalData); if(result == null) { // Not able to parse at all @@ -331,7 +331,7 @@ namespace CryptoExchange.Net.Sockets return; } - if (!await _listenerManager.InvokeListenersAsync(result.Identifier, result).ConfigureAwait(false)) + if (!await _listenerManager.InvokeListenersAsync(this, result.Identifier, result).ConfigureAwait(false)) { // Not able to find a listener for this message stream.Position = 0; @@ -607,8 +607,12 @@ namespace CryptoExchange.Net.Sockets var subQuery = subscription.GetSubQuery(this); if (subQuery == null) continue; - - taskList.Add(SendAndWaitQueryAsync(subQuery)); + + taskList.Add(SendAndWaitQueryAsync(subQuery).ContinueWith((x) => + { + subscription.HandleSubQueryResponse(subQuery.Response); + return x.Result; + })); } await Task.WhenAll(taskList).ConfigureAwait(false); @@ -645,7 +649,9 @@ namespace CryptoExchange.Net.Sockets if (subQuery == null) return new CallResult(null); - return await SendAndWaitQueryAsync(subQuery).ConfigureAwait(false); + var result = await SendAndWaitQueryAsync(subQuery).ConfigureAwait(false); + subscription.HandleSubQueryResponse(subQuery.Response); + return result; } /// diff --git a/CryptoExchange.Net/Sockets/SocketListenerManager.cs b/CryptoExchange.Net/Sockets/SocketListenerManager.cs index 1b5d7e5..9f4c77f 100644 --- a/CryptoExchange.Net/Sockets/SocketListenerManager.cs +++ b/CryptoExchange.Net/Sockets/SocketListenerManager.cs @@ -64,7 +64,7 @@ namespace CryptoExchange.Net.Sockets } } - public async Task InvokeListenersAsync(string id, BaseParsedMessage data) + public async Task InvokeListenersAsync(SocketConnection connection, string id, BaseParsedMessage data) { List listeners; lock (_lock) @@ -91,7 +91,15 @@ namespace CryptoExchange.Net.Sockets // Matched based on identifier var userSw = Stopwatch.StartNew(); var dataEvent = new DataEvent(data, null, data.OriginalData, DateTime.UtcNow, null); - await listener.HandleMessageAsync(dataEvent).ConfigureAwait(false); + try + { + await listener.HandleMessageAsync(connection, dataEvent).ConfigureAwait(false); + } + catch (Exception ex) + { + // TODO + } + userSw.Stop(); if (userSw.ElapsedMilliseconds > 500) { diff --git a/CryptoExchange.Net/Sockets/Subscription.cs b/CryptoExchange.Net/Sockets/Subscription.cs index adcb653..7fe07c3 100644 --- a/CryptoExchange.Net/Sockets/Subscription.cs +++ b/CryptoExchange.Net/Sockets/Subscription.cs @@ -91,17 +91,20 @@ namespace CryptoExchange.Net.Sockets /// public abstract BaseQuery? GetSubQuery(SocketConnection connection); + public virtual void HandleSubQueryResponse(BaseParsedMessage message) { } + public virtual void HandleUnsubQueryResponse(BaseParsedMessage message) { } + /// /// Get the unsubscribe object to send when unsubscribing /// /// public abstract BaseQuery? GetUnsubQuery(); - public async Task HandleMessageAsync(DataEvent message) + public async Task HandleMessageAsync(SocketConnection connection, DataEvent message) { ConnectionInvocations++; TotalInvocations++; - return await DoHandleMessageAsync(message).ConfigureAwait(false); + return await DoHandleMessageAsync(connection, message).ConfigureAwait(false); } /// @@ -109,7 +112,7 @@ namespace CryptoExchange.Net.Sockets /// /// /// - public abstract Task DoHandleMessageAsync(DataEvent message); + public abstract Task DoHandleMessageAsync(SocketConnection connection, DataEvent message); /// /// Invoke the exception event @@ -149,14 +152,25 @@ namespace CryptoExchange.Net.Sockets } /// - public override Task DoHandleMessageAsync(DataEvent message) - => HandleEventAsync(message.As((ParsedMessage)message.Data)); + public override Task DoHandleMessageAsync(SocketConnection connection, DataEvent message) + => HandleEventAsync(connection, message.As((ParsedMessage)message.Data)); /// /// Handle the update message /// /// /// - public abstract Task HandleEventAsync(DataEvent> message); + public abstract Task HandleEventAsync(SocketConnection connection, DataEvent> message); + + public override void HandleSubQueryResponse(BaseParsedMessage message) + => HandleSubQueryResponse((ParsedMessage)message); + + public virtual void HandleSubQueryResponse(ParsedMessage message) { } + + public override void HandleUnsubQueryResponse(BaseParsedMessage message) + => HandleUnsubQueryResponse((ParsedMessage)message); + + public virtual void HandleUnsubQueryResponse(ParsedMessage message) { } + } } diff --git a/CryptoExchange.Net/Sockets/SystemSubscription.cs b/CryptoExchange.Net/Sockets/SystemSubscription.cs index ea11614..e6d9e3d 100644 --- a/CryptoExchange.Net/Sockets/SystemSubscription.cs +++ b/CryptoExchange.Net/Sockets/SystemSubscription.cs @@ -30,13 +30,13 @@ namespace CryptoExchange.Net.Sockets public abstract class SystemSubscription : SystemSubscription { public override Type ExpectedMessageType => typeof(T); - public override Task DoHandleMessageAsync(DataEvent message) - => HandleMessageAsync(message.As((ParsedMessage)message.Data)); + public override Task DoHandleMessageAsync(SocketConnection connection, DataEvent message) + => HandleMessageAsync(connection, message.As((ParsedMessage)message.Data)); protected SystemSubscription(ILogger logger, bool authenticated) : base(logger, authenticated) { } - public abstract Task HandleMessageAsync(DataEvent> message); + public abstract Task HandleMessageAsync(SocketConnection connection, DataEvent> message); } }