1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2026-02-16 22:23:54 +00:00
This commit is contained in:
Jkorf 2025-11-27 16:23:59 +01:00
parent dee3730f72
commit 0f64aa5cb6
8 changed files with 182 additions and 25 deletions

View File

@ -11,10 +11,10 @@ namespace CryptoExchange.Net.Converters.MessageParsing.DynamicConverters
/// <summary>
/// Get an identifier for the message which can be used to link it to a listener
/// </summary>
string? GetMessageIdentifier(ReadOnlySpan<byte> data, WebSocketMessageType? webSocketMessageType);
//string? GetMessageIdentifier(ReadOnlySpan<byte> data, WebSocketMessageType? webSocketMessageType);
// string? GetTypeIdentifier(ReadOnlySpan<byte> data, WebSocketMessageType? webSocketMessageType);
// string? GetTopicFilter(object deserializedObject);
string? GetTypeIdentifier(ReadOnlySpan<byte> data, WebSocketMessageType? webSocketMessageType);
string? GetTopicFilter(object deserializedObject);
/// <summary>
/// Deserialize to the provided type

View File

@ -18,10 +18,11 @@ namespace CryptoExchange.Net.Converters.SystemTextJson
/// </summary>
public abstract JsonSerializerOptions Options { get; }
/// <summary>
/// Message evaluators
/// </summary>
protected abstract MessageEvaluator[] MessageEvaluators { get; }
protected abstract MessageEvaluator[] TypeEvaluators { get; }
private readonly SearchResult _searchResult = new();
@ -31,6 +32,15 @@ namespace CryptoExchange.Net.Converters.SystemTextJson
private MessageEvaluator? _topEvaluator;
private List<MessageEvalutorFieldReference>? _searchFields;
private Dictionary<Type, Func<object, string?>> _mapping;
private Dictionary<Type, Func<object, string?>>? _baseTypeMapping;
protected void AddTopicMapping<T>(Func<T, string?> mapping)
{
_mapping ??= new Dictionary<Type, Func<object, string?>>();
_mapping.Add(typeof(T), x => mapping((T)x));
}
private void InitializeConverter()
{
if (_initialized)
@ -38,7 +48,7 @@ namespace CryptoExchange.Net.Converters.SystemTextJson
_maxSearchDepth = int.MinValue;
_searchFields = new List<MessageEvalutorFieldReference>();
foreach (var evaluator in MessageEvaluators.OrderBy(x => x.Priority))
foreach (var evaluator in TypeEvaluators.OrderBy(x => x.Priority))
{
_topEvaluator ??= evaluator;
foreach (var field in evaluator.Fields)
@ -116,8 +126,42 @@ namespace CryptoExchange.Net.Converters.SystemTextJson
_initialized = true;
}
public virtual string? GetTopicFilter(object deserializedObject)
{
if (_mapping == null)
return null;
// Cache the found type for future
var currentType = deserializedObject.GetType();
if (_baseTypeMapping != null)
{
if (_baseTypeMapping.TryGetValue(currentType, out var typeMapping))
return typeMapping(deserializedObject);
}
var mappedBase = false;
while (currentType != null)
{
if (_mapping.TryGetValue(currentType, out var mapping))
{
if (mappedBase)
{
_baseTypeMapping ??= new Dictionary<Type, Func<object, string>>();
_baseTypeMapping.Add(deserializedObject.GetType(), mapping);
}
return mapping(deserializedObject);
}
mappedBase = true;
currentType = currentType.BaseType;
}
return null;
}
/// <inheritdoc />
public virtual string? GetMessageIdentifier(ReadOnlySpan<byte> data, WebSocketMessageType? webSocketMessageType)
public virtual string? GetTypeIdentifier(ReadOnlySpan<byte> data, WebSocketMessageType? webSocketMessageType)
{
InitializeConverter();
@ -225,6 +269,9 @@ namespace CryptoExchange.Net.Converters.SystemTextJson
value = reader.GetBoolean().ToString()!;
else if (reader.TokenType == JsonTokenType.Null)
value = null;
else if (reader.TokenType == JsonTokenType.StartObject
|| reader.TokenType == JsonTokenType.StartArray)
value = null;
else
continue;
}
@ -264,7 +311,7 @@ namespace CryptoExchange.Net.Converters.SystemTextJson
}
}
foreach (var evaluator in MessageEvaluators)
foreach (var evaluator in TypeEvaluators)
{
if (evaluator.Statisfied(_searchResult))
return evaluator.IdentifyMessage(_searchResult);

View File

@ -16,18 +16,20 @@ namespace CryptoExchange.Net.Converters.SystemTextJson
public abstract JsonSerializerOptions Options { get; }
/// <inheritdoc />
public virtual string? GetMessageIdentifier(ReadOnlySpan<byte> data, WebSocketMessageType? webSocketMessageType)
public virtual string? GetTypeIdentifier(ReadOnlySpan<byte> data, WebSocketMessageType? webSocketMessageType)
{
var reader = new Utf8JsonReader(data);
var jsonDocument = JsonDocument.ParseValue(ref reader);
return GetMessageIdentifier(jsonDocument);
return GetTypeIdentifier(jsonDocument);
}
/// <summary>
/// Get the message identifier for this document
/// </summary>
protected abstract string? GetMessageIdentifier(JsonDocument docuement);
protected abstract string? GetTypeIdentifier(JsonDocument document);
public virtual string? GetTopicFilter(object deserializedObject) => null;
/// <inheritdoc />
public virtual object Deserialize(ReadOnlySpan<byte> data, Type type)

View File

@ -28,6 +28,7 @@ namespace CryptoExchange.Net.Interfaces
/// Handle a message
/// </summary>
CallResult Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object result, MessageHandlerLink matchedHandler);
CallResult Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object result, MessageRoute route);
/// <summary>
/// Deserialize a message into object of type
/// </summary>

View File

@ -35,11 +35,35 @@ namespace CryptoExchange.Net.Sockets
/// <summary>
/// Create message matcher
/// </summary>
public static MessageRouter Create<T>(string typeIdentifier, string? topicFilter = null)
public static MessageRouter Create<T>(string typeIdentifier)
{
return new MessageRouter(new MessageRoute<T>(typeIdentifier, (string?)null, (con, receiveTime, originalData, msg) => new CallResult<string>(default, null, null)));
}
/// <summary>
/// Create message matcher
/// </summary>
public static MessageRouter Create<T>(IEnumerable<string> values, Func<SocketConnection, DateTime, string?, T, CallResult> handler)
{
return new MessageRouter(values.Select(x => new MessageRoute<T>(x, (string?)null, handler)).ToArray());
}
/// <summary>
/// Create message matcher
/// </summary>
public static MessageRouter Create<T>(string typeIdentifier, string topicFilter)
{
return new MessageRouter(new MessageRoute<T>(typeIdentifier, topicFilter, (con, receiveTime, originalData, msg) => new CallResult<string>(default, null, null)));
}
/// <summary>
/// Create message matcher
/// </summary>
public static MessageRouter Create<T>(string typeIdentifier, Func<SocketConnection, DateTime, string?, T, CallResult> handler)
{
return new MessageRouter(new MessageRoute<T>(typeIdentifier, (string?)null, handler));
}
/// <summary>
/// Create message matcher
/// </summary>
@ -48,6 +72,36 @@ namespace CryptoExchange.Net.Sockets
return new MessageRouter(new MessageRoute<T>(typeIdentifier, topicFilter, handler));
}
/// <summary>
/// Create message matcher
/// </summary>
public static MessageRouter Create<T>(string typeIdentifier, IEnumerable<string>? topicFilters, Func<SocketConnection, DateTime, string?, T, CallResult> handler)
{
return new MessageRouter(new MessageRoute<T>(typeIdentifier, topicFilters, handler));
}
/// <summary>
/// Create message matcher
/// </summary>
public static MessageRouter Create<T>(IEnumerable<string> typeIdentifiers, IEnumerable<string>? topicFilters, Func<SocketConnection, DateTime, string?, T, CallResult> handler)
{
var routes = new List<MessageRoute>();
foreach(var type in typeIdentifiers)
{
if (topicFilters?.Count() > 0)
{
foreach (var filter in topicFilters ?? [])
routes.Add(new MessageRoute<T>(type, filter, handler));
}
else
{
routes.Add(new MessageRoute<T>(type, (string?)null, handler));
}
}
return new MessageRouter(routes.ToArray());
}
/// <summary>
/// Create message matcher
/// </summary>
@ -65,7 +119,7 @@ namespace CryptoExchange.Net.Sockets
public abstract class MessageRoute
{
public string TypeIdentifier { get; set; }
public string? TopicFilter { get; set; }
public HashSet<string>? TopicFilter { get; set; }
/// <summary>
/// Deserialization type
/// </summary>
@ -74,7 +128,13 @@ namespace CryptoExchange.Net.Sockets
public MessageRoute(string typeIdentifier, string? topicFilter)
{
TypeIdentifier = typeIdentifier;
TopicFilter = topicFilter;
TopicFilter = topicFilter == null ? null : new HashSet<string>() { topicFilter };
}
public MessageRoute(string typeIdentifier, IEnumerable<string>? topicFilters)
{
TypeIdentifier = typeIdentifier;
TopicFilter = topicFilters == null ? null : new HashSet<string>(topicFilters);
}
/// <summary>
@ -99,6 +159,15 @@ namespace CryptoExchange.Net.Sockets
_handler = handler;
}
/// <summary>
/// ctor
/// </summary>
public MessageRoute(string typeIdentifier, IEnumerable<string>? topicFilter, Func<SocketConnection, DateTime, string?, TMessage, CallResult> handler)
: base(typeIdentifier, topicFilter)
{
_handler = handler;
}
/// <inheritdoc />
public override CallResult Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object data)
{

View File

@ -163,6 +163,7 @@ namespace CryptoExchange.Net.Sockets
/// Handle a response message
/// </summary>
public abstract CallResult Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object message, MessageHandlerLink check);
public abstract CallResult Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object message, MessageRoute route);
}
@ -191,6 +192,29 @@ namespace CryptoExchange.Net.Sockets
{
}
public override CallResult Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object message, MessageRoute route)
{
if (!PreCheckMessage(connection, message))
return CallResult.SuccessResult;
CurrentResponses++;
if (CurrentResponses == RequiredResponses)
Response = message;
if (Result?.Success != false)
// If an error result is already set don't override that
Result = route.Handle(connection, receiveTime, originalData, message);
if (CurrentResponses == RequiredResponses)
{
Completed = true;
_event.Set();
OnComplete?.Invoke();
}
return Result;
}
/// <inheritdoc />
public override CallResult Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object message, MessageHandlerLink check)
{

View File

@ -552,9 +552,9 @@ namespace CryptoExchange.Net.Sockets
// Type id: kline
// Topic filter: ethusdt-1m
var messageIdentifier = messageConverter.GetMessageIdentifier(data, type);
if (messageIdentifier == null)
var typeIdentifier = messageConverter.GetTypeIdentifier(data, type);
//var messageIdentifier = messageConverter.GetMessageIdentifier(data, type);
if (typeIdentifier == null)
{
// Both deserialization type and identifier null, can't process
_logger.LogWarning("Failed to evaluate message. Data: {Message}", Encoding.UTF8.GetString(data.ToArray()));
@ -566,12 +566,12 @@ namespace CryptoExchange.Net.Sockets
{
foreach (var subscription in _listeners)
{
foreach (var link in subscription.MessageMatcher.HandlerLinks)
foreach (var route in subscription.MessageRouter.Routes)
{
if (!link.Check(messageIdentifier!))
if (!route.TypeIdentifier.Equals(typeIdentifier, StringComparison.Ordinal))
continue;
deserializationType = link.DeserializationType;
deserializationType = route.DeserializationType;
break;
}
@ -583,7 +583,7 @@ namespace CryptoExchange.Net.Sockets
if (deserializationType == null)
{
// No handler found for identifier either, can't process
_logger.LogWarning("Failed to determine message type for identifier {Identifier}. Data: {Message}", messageIdentifier, Encoding.UTF8.GetString(data.ToArray()));
_logger.LogWarning("Failed to determine message type for identifier {Identifier}. Data: {Message}", typeIdentifier, Encoding.UTF8.GetString(data.ToArray()));
return;
}
@ -617,6 +617,8 @@ namespace CryptoExchange.Net.Sockets
return;
}
var topicFilter = messageConverter.GetTopicFilter(result);
bool processed = false;
lock (_listenersLock)
{
@ -631,20 +633,25 @@ namespace CryptoExchange.Net.Sockets
}
var subscription = _listeners[i];
foreach (var link in subscription.MessageMatcher.HandlerLinks)
foreach (var route in subscription.MessageRouter.Routes)
{
if (!link.Check(messageIdentifier!))
if (route.DeserializationType != deserializationType)
continue;
processed = true;
subscription.Handle(this, receiveTime, originalData, result, link);
if (topicFilter == null || route.TopicFilter == null || route.TopicFilter.Contains(topicFilter))
{
processed = true;
subscription.Handle(this, receiveTime, originalData, result, route);
}
}
}
}
if (!processed)
{
_logger.ReceivedMessageNotMatchedToAnyListener(SocketId, messageIdentifier!, string.Join(",", _listeners.Select(x => x.MessageMatcher.HandlerLinks.Select(x => x.ToString()))));
_logger.ReceivedMessageNotMatchedToAnyListener(SocketId, topicFilter!,
string.Join(",", _listeners.Select(x => string.Join(",", x.MessageRouter.Routes.Select(x => x.TopicFilter != null ? string.Join(",", x.TopicFilter) : "[null]")))));
}
}

View File

@ -182,6 +182,13 @@ namespace CryptoExchange.Net.Sockets
return matcher.Handle(connection, receiveTime, originalData, data);
}
public CallResult Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object data, MessageRoute route)
{
ConnectionInvocations++;
TotalInvocations++;
return route.Handle(connection, receiveTime, originalData, data);
}
/// <summary>
/// Reset the subscription
/// </summary>