1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-06-08 16:36:15 +00:00
This commit is contained in:
JKorf 2023-11-07 21:43:05 +01:00
parent ac434fa2c6
commit b59fe9e3ef
7 changed files with 77 additions and 25 deletions

View File

@ -233,7 +233,7 @@ namespace CryptoExchange.Net
if (subQuery != null)
{
// Send the request and wait for answer
var subResult = await socketConnection.SendAndWaitQueryAsync(subQuery).ConfigureAwait(false);
var subResult = await socketConnection.SendAndWaitQueryAsync(subQuery).ConfigureAwait(false); // TODO return null on timeout
if (!subResult)
{
_logger.Log(LogLevel.Warning, $"Socket {socketConnection.SocketId} failed to subscribe: {subResult.Error}");
@ -532,7 +532,8 @@ namespace CryptoExchange.Net
/// <param name="identifier">Identifier for the periodic send</param>
/// <param name="interval">How often</param>
/// <param name="queryDelegate">Method returning the query to send</param>
protected virtual void QueryPeriodic(string identifier, TimeSpan interval, Func<SocketConnection, BaseQuery> queryDelegate)
/// <param name="callback">The callback for processing the response</param>
protected virtual void QueryPeriodic(string identifier, TimeSpan interval, Func<SocketConnection, BaseQuery> queryDelegate, Action<CallResult>? callback)
{
if (queryDelegate == null)
throw new ArgumentNullException(nameof(queryDelegate));
@ -565,7 +566,8 @@ namespace CryptoExchange.Net
try
{
await socketConnection.SendAndWaitQueryAsync(query).ConfigureAwait(false);
var result = await socketConnection.SendAndWaitQueryAsync(query).ConfigureAwait(false);
callback?.Invoke(result);
}
catch (Exception ex)
{

View File

@ -1,4 +1,5 @@
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Sockets;
using CryptoExchange.Net.Sockets;
using Newtonsoft.Json;
@ -18,12 +19,10 @@ namespace CryptoExchange.Net.Converters
{
private static JsonSerializer _serializer = JsonSerializer.Create(SerializerOptions.WithConverters);
public abstract List<StreamMessageParseCallback> InterpreterPipeline { get; }
public virtual string CreateIdentifierString(Dictionary<string, string?> idValues) => string.Join("-", idValues.Values.Where(v => v != null).Select(v => v!.ToLower()));
public abstract MessageInterpreterPipeline InterpreterPipeline { get; }
/// <inheritdoc />
public BaseParsedMessage? ReadJson(Stream stream, ConcurrentList<BasePendingRequest> pendingRequests, ConcurrentList<Subscription> listeners, bool outputOriginalData)
public BaseParsedMessage? ReadJson(Stream stream, IDictionary<string, IMessageProcessor> processors, bool outputOriginalData)
{
// Start reading the data
// Once we reach the properties that identify the message we save those in a dict
@ -31,6 +30,26 @@ namespace CryptoExchange.Net.Converters
// Deserialize to the correct type
using var sr = new StreamReader(stream, Encoding.UTF8, false, (int)stream.Length, true);
foreach (var callback in InterpreterPipeline.PreInspectCallbacks)
{
var result = callback.Callback(stream);
if (result.Matched)
{
var data = sr.ReadToEnd();
var messageType = typeof(ParsedMessage<>).MakeGenericType(typeof(string));
var preInstance = (BaseParsedMessage)Activator.CreateInstance(messageType, data);
if (outputOriginalData)
{
stream.Position = 0;
preInstance.OriginalData = data;
}
preInstance.Identifier = result.Identifier;
preInstance.Parsed = true;
return preInstance;
}
}
using var jsonTextReader = new JsonTextReader(sr);
JToken token;
try
@ -49,10 +68,10 @@ namespace CryptoExchange.Net.Converters
token = token.First!;
}
Type? resultType = null;
PostInspectResult? inspectResult = null;
Dictionary<string, string> typeIdDict = new Dictionary<string, string>();
StreamMessageParseCallback? usedParser = null;
foreach (var callback in InterpreterPipeline)
PostInspectCallback? usedParser = null;
foreach (var callback in InterpreterPipeline.PostInspectCallbacks)
{
bool allFieldsPresent = true;
foreach(var field in callback.TypeFields)
@ -69,7 +88,7 @@ namespace CryptoExchange.Net.Converters
if (allFieldsPresent)
{
resultType = callback.Callback(typeIdDict, pendingRequests, listeners);
inspectResult = callback.Callback(typeIdDict, processors);
usedParser = callback;
break;
}
@ -78,20 +97,16 @@ namespace CryptoExchange.Net.Converters
if (usedParser == null)
throw new Exception("No parser found for message");
var subIdDict = new Dictionary<string, string?>();
foreach (var field in usedParser.IdFields)
subIdDict[field] = typeIdDict.TryGetValue(field, out var cachedValue) ? cachedValue : GetValueForKey(token, field);
var resultMessageType = typeof(ParsedMessage<>).MakeGenericType(resultType);
var instance = (BaseParsedMessage)Activator.CreateInstance(resultMessageType, resultType == null ? null : token.ToObject(resultType, _serializer));
var resultMessageType = typeof(ParsedMessage<>).MakeGenericType(inspectResult.Type);
var instance = (BaseParsedMessage)Activator.CreateInstance(resultMessageType, inspectResult.Type == null ? null : token.ToObject(inspectResult.Type, _serializer));
if (outputOriginalData)
{
stream.Position = 0;
instance.OriginalData = sr.ReadToEnd();
}
instance.Identifier = CreateIdentifierString(subIdDict);
instance.Parsed = resultType != null;
instance.Identifier = inspectResult.Identifier;
instance.Parsed = inspectResult.Type != null;
return instance;
}

View File

@ -11,5 +11,6 @@ namespace CryptoExchange.Net.Interfaces
{
public int Id { get; }
Task<CallResult> HandleMessageAsync(DataEvent<BaseParsedMessage> message);
public Type ExpectedMessageType { get; }
}
}

View File

@ -1,14 +1,39 @@
using CryptoExchange.Net.Sockets;
using CryptoExchange.Net.Converters;
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Sockets;
using System;
using System.Collections.Generic;
using System.IO;
using System.Text;
namespace CryptoExchange.Net.Objects.Sockets
{
public class StreamMessageParseCallback
public class MessageInterpreterPipeline
{
public List<PreInspectCallback> PreInspectCallbacks { get; set; } = new List<PreInspectCallback>();
public List<PostInspectCallback> PostInspectCallbacks { get; set; } = new List<PostInspectCallback>();
}
public class PreInspectCallback
{
public Func<Stream, PreInspectResult> Callback { get; set; }
}
public class PostInspectCallback
{
public List<string> TypeFields { get; set; } = new List<string>();
public List<string> IdFields { get; set; } = new List<string>();
public Func<Dictionary<string, string>, IEnumerable<BasePendingRequest>, IEnumerable<Subscription>, Type?> Callback { get; set; }
public Func<Dictionary<string, string>, IDictionary<string, IMessageProcessor>, PostInspectResult> Callback { get; set; }
}
public class PreInspectResult
{
public bool Matched { get; set; }
public string Identifier { get; set; }
}
public class PostInspectResult
{
public Type? Type { get; set; }
public string Identifier { get; set; }
}
}

View File

@ -1,6 +1,7 @@
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Sockets;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
@ -40,6 +41,8 @@ namespace CryptoExchange.Net.Sockets
/// </summary>
public BasePendingRequest? PendingRequest { get; private set; }
public abstract Type ExpectedMessageType { get; }
/// <summary>
/// ctor
/// </summary>
@ -84,6 +87,8 @@ namespace CryptoExchange.Net.Sockets
/// <typeparam name="TResponse">Response object type</typeparam>
public abstract class Query<TResponse> : BaseQuery
{
public override Type ExpectedMessageType => typeof(TResponse);
/// <summary>
/// ctor
/// </summary>

View File

@ -307,7 +307,7 @@ namespace CryptoExchange.Net.Sockets
var timestamp = DateTime.UtcNow;
TimeSpan userCodeDuration = TimeSpan.Zero;
var result = ApiClient.StreamConverter.ReadJson(stream, _pendingRequests, _subscriptions, ApiClient.ApiOptions.OutputOriginalData ?? ApiClient.ClientOptions.OutputOriginalData);
var result = ApiClient.StreamConverter.ReadJson(stream, _messageIdMap, ApiClient.ApiOptions.OutputOriginalData ?? ApiClient.ClientOptions.OutputOriginalData);
if(result == null)
{
stream.Position = 0;

View File

@ -59,6 +59,8 @@ namespace CryptoExchange.Net.Sockets
/// </summary>
public event Action<Exception>? Exception;
public abstract Type ExpectedMessageType { get; }
/// <summary>
/// ctor
/// </summary>
@ -118,6 +120,8 @@ namespace CryptoExchange.Net.Sockets
/// <inheritdoc />
public abstract class Subscription<TSubResponse, TEvent, TUnsubResponse> : Subscription
{
public override Type ExpectedMessageType => typeof(TEvent);
/// <summary>
/// ctor
/// </summary>