From 58098edaa63b5d4bf7e5ac3ff02e681a327ce442 Mon Sep 17 00:00:00 2001 From: JKorf Date: Sun, 17 Dec 2023 21:17:31 +0100 Subject: [PATCH] wip --- .../Converters/JTokenAccessor.cs | 17 ++++++++ .../Converters/SocketConverter.cs | 39 +++++++++++-------- .../Interfaces/IMessageAccessor.cs | 1 + .../Interfaces/IMessageProcessor.cs | 2 +- .../Objects/Sockets/ParsedMessage.cs | 8 +++- .../Sockets/StreamMessageParseCallback.cs | 12 +++--- CryptoExchange.Net/Sockets/Query.cs | 4 +- .../Sockets/SocketConnection.cs | 4 +- .../Sockets/SocketListenerManager.cs | 14 +++---- CryptoExchange.Net/Sockets/Subscription.cs | 19 +++------ .../Sockets/SystemSubscription.cs | 2 +- 11 files changed, 72 insertions(+), 50 deletions(-) diff --git a/CryptoExchange.Net/Converters/JTokenAccessor.cs b/CryptoExchange.Net/Converters/JTokenAccessor.cs index 16dec1e..9ca6886 100644 --- a/CryptoExchange.Net/Converters/JTokenAccessor.cs +++ b/CryptoExchange.Net/Converters/JTokenAccessor.cs @@ -62,6 +62,23 @@ namespace CryptoExchange.Net.Converters } public bool IsObject(string? key) => _token.Type == JTokenType.Object; + public bool IsArray(IEnumerable indexes) + { + var item = _token; + foreach(var index in indexes) + { + if (item.Type != JTokenType.Array) + return false; + + var arr = ((JArray)item); + if (arr.Count <= index) + return false; + + item = arr[index]; + } + + return item.Type == JTokenType.Array; + } private JToken? GetToken(string key) { diff --git a/CryptoExchange.Net/Converters/SocketConverter.cs b/CryptoExchange.Net/Converters/SocketConverter.cs index 93d4202..aa56aeb 100644 --- a/CryptoExchange.Net/Converters/SocketConverter.cs +++ b/CryptoExchange.Net/Converters/SocketConverter.cs @@ -48,7 +48,8 @@ namespace CryptoExchange.Net.Converters preInstance.OriginalData = data; } - preInstance.Identifier = result.Identifier; + preInstance.StreamIdentifier = result.StreamIdentifier; + preInstance.TypeIdentifier = result.TypeIdentifier; preInstance.Parsed = true; return preInstance; } @@ -70,25 +71,28 @@ namespace CryptoExchange.Net.Converters if (InterpreterPipeline.GetIdentity != null) { - var identity = InterpreterPipeline.GetIdentity(accessor); - if (identity != null) + var (streamIdentity, typeIdentity) = InterpreterPipeline.GetIdentity(accessor); + if (streamIdentity != null) { - var result = listenerManager.IdToType(identity); - if (result == null) + var result = listenerManager.IdToType(streamIdentity, typeIdentity); + if (result != null) + { + var idInstance = InterpreterPipeline.ObjectInitializer(token, result!); + if (outputOriginalData) + { + stream.Position = 0; + idInstance.OriginalData = sr.ReadToEnd(); + } + + idInstance.StreamIdentifier = streamIdentity; + idInstance.TypeIdentifier = typeIdentity; + idInstance.Parsed = true; + return idInstance; + } + else { } - - var idInstance = InterpreterPipeline.ObjectInitializer(token, result!); - if (outputOriginalData) - { - stream.Position = 0; - idInstance.OriginalData = sr.ReadToEnd(); - } - - idInstance.Identifier = identity; - idInstance.Parsed = true; - return idInstance; } else { @@ -175,7 +179,8 @@ namespace CryptoExchange.Net.Converters instance.OriginalData = sr.ReadToEnd(); } - instance.Identifier = inspectResult.Identifier; + instance.StreamIdentifier = inspectResult.StreamIdentifier; + instance.TypeIdentifier = inspectResult.TypeIdentifier; instance.Parsed = inspectResult.Type != null; return instance; } diff --git a/CryptoExchange.Net/Interfaces/IMessageAccessor.cs b/CryptoExchange.Net/Interfaces/IMessageAccessor.cs index 8b85bf3..2411de7 100644 --- a/CryptoExchange.Net/Interfaces/IMessageAccessor.cs +++ b/CryptoExchange.Net/Interfaces/IMessageAccessor.cs @@ -7,6 +7,7 @@ namespace CryptoExchange.Net.Interfaces public interface IMessageAccessor { bool IsObject(string? key); + bool IsArray(IEnumerable indexes); string? GetStringValue(string key); int? GetIntValue(string key); public int? GetCount(string key); diff --git a/CryptoExchange.Net/Interfaces/IMessageProcessor.cs b/CryptoExchange.Net/Interfaces/IMessageProcessor.cs index b842f3d..e7a9506 100644 --- a/CryptoExchange.Net/Interfaces/IMessageProcessor.cs +++ b/CryptoExchange.Net/Interfaces/IMessageProcessor.cs @@ -13,6 +13,6 @@ namespace CryptoExchange.Net.Interfaces public int Id { get; } public List Identifiers { get; } Task HandleMessageAsync(SocketConnection connection, DataEvent message); - public Type ExpectedMessageType { get; } + public Func ExpectedTypeDelegate { get; } } } diff --git a/CryptoExchange.Net/Objects/Sockets/ParsedMessage.cs b/CryptoExchange.Net/Objects/Sockets/ParsedMessage.cs index ec74ae6..ffd072d 100644 --- a/CryptoExchange.Net/Objects/Sockets/ParsedMessage.cs +++ b/CryptoExchange.Net/Objects/Sockets/ParsedMessage.cs @@ -6,9 +6,13 @@ public abstract class BaseParsedMessage { /// - /// Identifier string + /// Stream identifier string /// - public string Identifier { get; set; } = null!; + public string StreamIdentifier { get; set; } = null!; + /// + /// Type identifier string + /// + public string TypeIdentifier { get; set; } = null!; /// /// Original data if the option is enabled /// diff --git a/CryptoExchange.Net/Objects/Sockets/StreamMessageParseCallback.cs b/CryptoExchange.Net/Objects/Sockets/StreamMessageParseCallback.cs index 0356d7e..813e706 100644 --- a/CryptoExchange.Net/Objects/Sockets/StreamMessageParseCallback.cs +++ b/CryptoExchange.Net/Objects/Sockets/StreamMessageParseCallback.cs @@ -14,8 +14,8 @@ namespace CryptoExchange.Net.Objects.Sockets { public Func? PreProcessCallback { get; set; } public List PreInspectCallbacks { get; set; } = new List(); - public Func GetIdentity { get; set; } - //public List PostInspectCallbacks { get; set; } = new List(); + public Func GetIdentity { get; set; } + public List PostInspectCallbacks { get; set; } = new List(); public Func ObjectInitializer { get; set; } = SocketConverter.InstantiateMessageObject; } @@ -45,18 +45,20 @@ namespace CryptoExchange.Net.Objects.Sockets public class PostInspectArrayCallback { public List TypeFields { get; set; } = new List(); - public Func, Dictionary, PostInspectResult> Callback { get; set; } + public Func Callback { get; set; } } public class PreInspectResult { public bool Matched { get; set; } - public string Identifier { get; set; } + public string StreamIdentifier { get; set; } + public string TypeIdentifier { get; set; } } public class PostInspectResult { public Type? Type { get; set; } - public string Identifier { get; set; } + public string StreamIdentifier { get; set; } + public string TypeIdentifier { get; set; } } } diff --git a/CryptoExchange.Net/Sockets/Query.cs b/CryptoExchange.Net/Sockets/Query.cs index 2269ddd..288677f 100644 --- a/CryptoExchange.Net/Sockets/Query.cs +++ b/CryptoExchange.Net/Sockets/Query.cs @@ -46,7 +46,7 @@ namespace CryptoExchange.Net.Sockets /// public int Weight { get; } - public abstract Type ExpectedMessageType { get; } + public abstract Func ExpectedTypeDelegate { get; } /// /// ctor @@ -107,7 +107,7 @@ namespace CryptoExchange.Net.Sockets /// Response object type public abstract class Query : BaseQuery { - public override Type ExpectedMessageType => typeof(TResponse); + public override Func ExpectedTypeDelegate => x => typeof(TResponse); /// /// The typed call result diff --git a/CryptoExchange.Net/Sockets/SocketConnection.cs b/CryptoExchange.Net/Sockets/SocketConnection.cs index fde3f31..1be8767 100644 --- a/CryptoExchange.Net/Sockets/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/SocketConnection.cs @@ -335,13 +335,13 @@ namespace CryptoExchange.Net.Sockets return; } - if (!await _listenerManager.InvokeListenersAsync(this, result.Identifier, result).ConfigureAwait(false)) + if (!await _listenerManager.InvokeListenersAsync(this, result.StreamIdentifier, result).ConfigureAwait(false)) { // Not able to find a listener for this message stream.Position = 0; var unhandledBuffer = new byte[stream.Length]; stream.Read(unhandledBuffer, 0, unhandledBuffer.Length); - _logger.Log(LogLevel.Warning, $"Socket {SocketId} Message unidentified. Id: {result.Identifier.ToLowerInvariant()}, listening ids: [{string.Join(", ", _listenerManager.GetListenIds())}], Message: {Encoding.UTF8.GetString(unhandledBuffer)} "); + _logger.Log(LogLevel.Warning, $"Socket {SocketId} Message unidentified. Id: {result.StreamIdentifier.ToLowerInvariant()}, listening ids: [{string.Join(", ", _listenerManager.GetListenIds())}], Message: {Encoding.UTF8.GetString(unhandledBuffer)} "); UnhandledMessage?.Invoke(result); return; } diff --git a/CryptoExchange.Net/Sockets/SocketListenerManager.cs b/CryptoExchange.Net/Sockets/SocketListenerManager.cs index 888cda6..ae753c0 100644 --- a/CryptoExchange.Net/Sockets/SocketListenerManager.cs +++ b/CryptoExchange.Net/Sockets/SocketListenerManager.cs @@ -15,24 +15,24 @@ namespace CryptoExchange.Net.Sockets private int _socketId; private object _lock = new object(); private Dictionary _idMap; - private Dictionary _typeMap; + private Dictionary> _typeMap; private Dictionary> _listeners; public SocketListenerManager(ILogger logger, int socketId) { _idMap = new Dictionary(); _listeners = new Dictionary>(); - _typeMap = new Dictionary(); + _typeMap = new Dictionary>(); _logger = logger; _socketId = socketId; } - public Type? IdToType(string id) + public Type? IdToType(string streamIdentifier, string typeIdentifier) { lock (_lock) { - _typeMap.TryGetValue(id, out var type); - return type; + _typeMap.TryGetValue(streamIdentifier, out var typeDelegate); + return typeDelegate?.Invoke(typeIdentifier); } } @@ -78,7 +78,7 @@ namespace CryptoExchange.Net.Sockets foreach (var listener in listeners) { - _logger.Log(LogLevel.Trace, $"Socket {_socketId} Message mapped to processor {listener.Id} with identifier {data.Identifier}"); + _logger.Log(LogLevel.Trace, $"Socket {_socketId} Message mapped to processor {listener.Id} with identifier {data.StreamIdentifier}"); if (listener is BaseQuery query) { Remove(listener); @@ -160,7 +160,7 @@ namespace CryptoExchange.Net.Sockets private void UpdateMap() { - _typeMap = _listeners.ToDictionary(x => x.Key, x => x.Value.First().ExpectedMessageType); + _typeMap = _listeners.ToDictionary(x => x.Key, x => x.Value.First().ExpectedTypeDelegate); } } } diff --git a/CryptoExchange.Net/Sockets/Subscription.cs b/CryptoExchange.Net/Sockets/Subscription.cs index 7fe07c3..4cfd96c 100644 --- a/CryptoExchange.Net/Sockets/Subscription.cs +++ b/CryptoExchange.Net/Sockets/Subscription.cs @@ -69,7 +69,7 @@ namespace CryptoExchange.Net.Sockets /// public event Action? Exception; - public abstract Type ExpectedMessageType { get; } + public abstract Func ExpectedTypeDelegate { get; } /// /// ctor @@ -125,7 +125,7 @@ namespace CryptoExchange.Net.Sockets } /// - public abstract class Subscription : Subscription + public abstract class Subscription : Subscription { /// /// ctor @@ -138,9 +138,9 @@ namespace CryptoExchange.Net.Sockets } /// - public abstract class Subscription : Subscription + public abstract class Subscription : Subscription { - public override Type ExpectedMessageType => typeof(TEvent); + //public override Func ExpectedTypeDelegate => (x) => typeof(TEvent); /// /// ctor @@ -152,15 +152,8 @@ namespace CryptoExchange.Net.Sockets } /// - public override Task DoHandleMessageAsync(SocketConnection connection, DataEvent message) - => HandleEventAsync(connection, message.As((ParsedMessage)message.Data)); - - /// - /// Handle the update message - /// - /// - /// - public abstract Task HandleEventAsync(SocketConnection connection, DataEvent> message); + //public override Task DoHandleMessageAsync(SocketConnection connection, DataEvent message) + // => HandleEventAsync(connection, message.As((ParsedMessage)message.Data)); public override void HandleSubQueryResponse(BaseParsedMessage message) => HandleSubQueryResponse((ParsedMessage)message); diff --git a/CryptoExchange.Net/Sockets/SystemSubscription.cs b/CryptoExchange.Net/Sockets/SystemSubscription.cs index e6d9e3d..bcab2aa 100644 --- a/CryptoExchange.Net/Sockets/SystemSubscription.cs +++ b/CryptoExchange.Net/Sockets/SystemSubscription.cs @@ -29,7 +29,7 @@ namespace CryptoExchange.Net.Sockets public abstract class SystemSubscription : SystemSubscription { - public override Type ExpectedMessageType => typeof(T); + public override Func ExpectedTypeDelegate => (x) => typeof(T); public override Task DoHandleMessageAsync(SocketConnection connection, DataEvent message) => HandleMessageAsync(connection, message.As((ParsedMessage)message.Data));