diff --git a/CryptoExchange.Net/Clients/SocketApiClient.cs b/CryptoExchange.Net/Clients/SocketApiClient.cs index 1b1b2d6..7e3b3e8 100644 --- a/CryptoExchange.Net/Clients/SocketApiClient.cs +++ b/CryptoExchange.Net/Clients/SocketApiClient.cs @@ -233,7 +233,7 @@ namespace CryptoExchange.Net if (subQuery != null) { // Send the request and wait for answer - var subResult = await socketConnection.SendAndWaitQueryAsync(subQuery).ConfigureAwait(false); + var subResult = await socketConnection.SendAndWaitQueryAsync(subQuery).ConfigureAwait(false); // TODO return null on timeout if (!subResult) { _logger.Log(LogLevel.Warning, $"Socket {socketConnection.SocketId} failed to subscribe: {subResult.Error}"); @@ -532,7 +532,8 @@ namespace CryptoExchange.Net /// Identifier for the periodic send /// How often /// Method returning the query to send - protected virtual void QueryPeriodic(string identifier, TimeSpan interval, Func queryDelegate) + /// The callback for processing the response + protected virtual void QueryPeriodic(string identifier, TimeSpan interval, Func queryDelegate, Action? callback) { if (queryDelegate == null) throw new ArgumentNullException(nameof(queryDelegate)); @@ -565,7 +566,8 @@ namespace CryptoExchange.Net try { - await socketConnection.SendAndWaitQueryAsync(query).ConfigureAwait(false); + var result = await socketConnection.SendAndWaitQueryAsync(query).ConfigureAwait(false); + callback?.Invoke(result); } catch (Exception ex) { diff --git a/CryptoExchange.Net/Converters/SocketConverter.cs b/CryptoExchange.Net/Converters/SocketConverter.cs index 1766474..33c02b3 100644 --- a/CryptoExchange.Net/Converters/SocketConverter.cs +++ b/CryptoExchange.Net/Converters/SocketConverter.cs @@ -1,4 +1,5 @@ -using CryptoExchange.Net.Objects; +using CryptoExchange.Net.Interfaces; +using CryptoExchange.Net.Objects; using CryptoExchange.Net.Objects.Sockets; using CryptoExchange.Net.Sockets; using Newtonsoft.Json; @@ -18,12 +19,10 @@ namespace CryptoExchange.Net.Converters { private static JsonSerializer _serializer = JsonSerializer.Create(SerializerOptions.WithConverters); - public abstract List InterpreterPipeline { get; } - - public virtual string CreateIdentifierString(Dictionary idValues) => string.Join("-", idValues.Values.Where(v => v != null).Select(v => v!.ToLower())); + public abstract MessageInterpreterPipeline InterpreterPipeline { get; } /// - public BaseParsedMessage? ReadJson(Stream stream, ConcurrentList pendingRequests, ConcurrentList listeners, bool outputOriginalData) + public BaseParsedMessage? ReadJson(Stream stream, IDictionary processors, bool outputOriginalData) { // Start reading the data // Once we reach the properties that identify the message we save those in a dict @@ -31,6 +30,26 @@ namespace CryptoExchange.Net.Converters // Deserialize to the correct type using var sr = new StreamReader(stream, Encoding.UTF8, false, (int)stream.Length, true); + foreach (var callback in InterpreterPipeline.PreInspectCallbacks) + { + var result = callback.Callback(stream); + if (result.Matched) + { + var data = sr.ReadToEnd(); + var messageType = typeof(ParsedMessage<>).MakeGenericType(typeof(string)); + var preInstance = (BaseParsedMessage)Activator.CreateInstance(messageType, data); + if (outputOriginalData) + { + stream.Position = 0; + preInstance.OriginalData = data; + } + + preInstance.Identifier = result.Identifier; + preInstance.Parsed = true; + return preInstance; + } + } + using var jsonTextReader = new JsonTextReader(sr); JToken token; try @@ -49,10 +68,10 @@ namespace CryptoExchange.Net.Converters token = token.First!; } - Type? resultType = null; + PostInspectResult? inspectResult = null; Dictionary typeIdDict = new Dictionary(); - StreamMessageParseCallback? usedParser = null; - foreach (var callback in InterpreterPipeline) + PostInspectCallback? usedParser = null; + foreach (var callback in InterpreterPipeline.PostInspectCallbacks) { bool allFieldsPresent = true; foreach(var field in callback.TypeFields) @@ -69,7 +88,7 @@ namespace CryptoExchange.Net.Converters if (allFieldsPresent) { - resultType = callback.Callback(typeIdDict, pendingRequests, listeners); + inspectResult = callback.Callback(typeIdDict, processors); usedParser = callback; break; } @@ -78,20 +97,16 @@ namespace CryptoExchange.Net.Converters if (usedParser == null) throw new Exception("No parser found for message"); - var subIdDict = new Dictionary(); - foreach (var field in usedParser.IdFields) - subIdDict[field] = typeIdDict.TryGetValue(field, out var cachedValue) ? cachedValue : GetValueForKey(token, field); - - var resultMessageType = typeof(ParsedMessage<>).MakeGenericType(resultType); - var instance = (BaseParsedMessage)Activator.CreateInstance(resultMessageType, resultType == null ? null : token.ToObject(resultType, _serializer)); + var resultMessageType = typeof(ParsedMessage<>).MakeGenericType(inspectResult.Type); + var instance = (BaseParsedMessage)Activator.CreateInstance(resultMessageType, inspectResult.Type == null ? null : token.ToObject(inspectResult.Type, _serializer)); if (outputOriginalData) { stream.Position = 0; instance.OriginalData = sr.ReadToEnd(); } - instance.Identifier = CreateIdentifierString(subIdDict); - instance.Parsed = resultType != null; + instance.Identifier = inspectResult.Identifier; + instance.Parsed = inspectResult.Type != null; return instance; } diff --git a/CryptoExchange.Net/Interfaces/IMessageProcessor.cs b/CryptoExchange.Net/Interfaces/IMessageProcessor.cs index 0a3f457..9d049b7 100644 --- a/CryptoExchange.Net/Interfaces/IMessageProcessor.cs +++ b/CryptoExchange.Net/Interfaces/IMessageProcessor.cs @@ -11,5 +11,6 @@ namespace CryptoExchange.Net.Interfaces { public int Id { get; } Task HandleMessageAsync(DataEvent message); + public Type ExpectedMessageType { get; } } } diff --git a/CryptoExchange.Net/Objects/Sockets/StreamMessageParseCallback.cs b/CryptoExchange.Net/Objects/Sockets/StreamMessageParseCallback.cs index 288cf64..c60f178 100644 --- a/CryptoExchange.Net/Objects/Sockets/StreamMessageParseCallback.cs +++ b/CryptoExchange.Net/Objects/Sockets/StreamMessageParseCallback.cs @@ -1,14 +1,39 @@ -using CryptoExchange.Net.Sockets; +using CryptoExchange.Net.Converters; +using CryptoExchange.Net.Interfaces; +using CryptoExchange.Net.Sockets; using System; using System.Collections.Generic; +using System.IO; using System.Text; namespace CryptoExchange.Net.Objects.Sockets { - public class StreamMessageParseCallback + public class MessageInterpreterPipeline + { + public List PreInspectCallbacks { get; set; } = new List(); + public List PostInspectCallbacks { get; set; } = new List(); + } + + public class PreInspectCallback + { + public Func Callback { get; set; } + } + + public class PostInspectCallback { public List TypeFields { get; set; } = new List(); - public List IdFields { get; set; } = new List(); - public Func, IEnumerable, IEnumerable, Type?> Callback { get; set; } + public Func, IDictionary, PostInspectResult> Callback { get; set; } + } + + public class PreInspectResult + { + public bool Matched { get; set; } + public string Identifier { get; set; } + } + + public class PostInspectResult + { + public Type? Type { get; set; } + public string Identifier { get; set; } } } diff --git a/CryptoExchange.Net/Sockets/Query.cs b/CryptoExchange.Net/Sockets/Query.cs index e947d47..5ee2909 100644 --- a/CryptoExchange.Net/Sockets/Query.cs +++ b/CryptoExchange.Net/Sockets/Query.cs @@ -1,6 +1,7 @@ using CryptoExchange.Net.Interfaces; using CryptoExchange.Net.Objects; using CryptoExchange.Net.Objects.Sockets; +using System; using System.Collections.Generic; using System.Threading.Tasks; @@ -40,6 +41,8 @@ namespace CryptoExchange.Net.Sockets /// public BasePendingRequest? PendingRequest { get; private set; } + public abstract Type ExpectedMessageType { get; } + /// /// ctor /// @@ -84,6 +87,8 @@ namespace CryptoExchange.Net.Sockets /// Response object type public abstract class Query : BaseQuery { + public override Type ExpectedMessageType => typeof(TResponse); + /// /// ctor /// diff --git a/CryptoExchange.Net/Sockets/SocketConnection.cs b/CryptoExchange.Net/Sockets/SocketConnection.cs index 0a195d7..012e18e 100644 --- a/CryptoExchange.Net/Sockets/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/SocketConnection.cs @@ -307,7 +307,7 @@ namespace CryptoExchange.Net.Sockets var timestamp = DateTime.UtcNow; TimeSpan userCodeDuration = TimeSpan.Zero; - var result = ApiClient.StreamConverter.ReadJson(stream, _pendingRequests, _subscriptions, ApiClient.ApiOptions.OutputOriginalData ?? ApiClient.ClientOptions.OutputOriginalData); + var result = ApiClient.StreamConverter.ReadJson(stream, _messageIdMap, ApiClient.ApiOptions.OutputOriginalData ?? ApiClient.ClientOptions.OutputOriginalData); if(result == null) { stream.Position = 0; diff --git a/CryptoExchange.Net/Sockets/Subscription.cs b/CryptoExchange.Net/Sockets/Subscription.cs index 8fbad5b..eaee2c1 100644 --- a/CryptoExchange.Net/Sockets/Subscription.cs +++ b/CryptoExchange.Net/Sockets/Subscription.cs @@ -59,6 +59,8 @@ namespace CryptoExchange.Net.Sockets /// public event Action? Exception; + public abstract Type ExpectedMessageType { get; } + /// /// ctor /// @@ -118,6 +120,8 @@ namespace CryptoExchange.Net.Sockets /// public abstract class Subscription : Subscription { + public override Type ExpectedMessageType => typeof(TEvent); + /// /// ctor ///