1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2026-04-07 02:01:12 +00:00
This commit is contained in:
JKorf 2026-04-06 22:01:15 +02:00
parent aed98140aa
commit 8a5ffb1c62
5 changed files with 69 additions and 35 deletions

View File

@ -9,6 +9,9 @@ using CryptoExchange.Net.Sockets.Interfaces;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using System; using System;
using System.Collections; using System.Collections;
#if NET8_0_OR_GREATER
using System.Collections.Frozen;
#endif
using System.Collections.Generic; using System.Collections.Generic;
using System.Collections.ObjectModel; using System.Collections.ObjectModel;
using System.Data; using System.Data;
@ -263,6 +266,10 @@ namespace CryptoExchange.Net.Sockets.Default
#else #else
private readonly object _listenersLock = new object(); private readonly object _listenersLock = new object();
#endif #endif
private RouteTable _routeTable = new RouteTable([]);
private ReadOnlyCollection<IMessageProcessor> _listeners; private ReadOnlyCollection<IMessageProcessor> _listeners;
private readonly ILogger _logger; private readonly ILogger _logger;
private SocketStatus _status; private SocketStatus _status;
@ -530,31 +537,22 @@ namespace CryptoExchange.Net.Sockets.Default
return; return;
} }
Type? deserializationType = null; var routingEntry = _routeTable.GetRouteTableEntry(typeIdentifier);
foreach (var listener in _listeners) if (routingEntry == null)
{
var routes = listener.MessageRouter[typeIdentifier];
deserializationType = routes?.RouteType;
if (deserializationType != null)
break;
}
if (deserializationType == null)
{ {
if (!ApiClient.HandleUnhandledMessage(this, typeIdentifier, data)) if (!ApiClient.HandleUnhandledMessage(this, typeIdentifier, data))
{ {
// No handler found for identifier either, can't process // No handler found for identifier either, can't process
_logger.LogWarning("Failed to determine message type for identifier {Identifier}. Data: {Message}", typeIdentifier, Encoding.UTF8.GetString(data.ToArray())); _logger.LogWarning("Failed to determine message type for identifier {Identifier}. Data: {Message}", typeIdentifier, Encoding.UTF8.GetString(data.ToArray()));
} }
return; return;
} }
object result; object result;
try try
{ {
if (deserializationType == typeof(string)) if (routingEntry.RouteType == typeof(string))
{ {
#if NETSTANDARD2_0 #if NETSTANDARD2_0
result = Encoding.UTF8.GetString(data.ToArray()); result = Encoding.UTF8.GetString(data.ToArray());
@ -564,7 +562,7 @@ namespace CryptoExchange.Net.Sockets.Default
} }
else else
{ {
result = messageConverter.Deserialize(data, deserializationType); result = messageConverter.Deserialize(data, routingEntry.RouteType);
} }
} }
catch(Exception ex) catch(Exception ex)
@ -581,22 +579,12 @@ namespace CryptoExchange.Net.Sockets.Default
} }
var topicFilter = messageConverter.GetTopicFilter(result); var topicFilter = messageConverter.GetTopicFilter(result);
var processed = false;
bool processed = false; foreach (var handler in routingEntry.Handlers)
foreach (var listener in _listeners)
{ {
var routeMap = listener.MessageRouter[typeIdentifier]; var thisHandled = handler.Handle(typeIdentifier, topicFilter, this, receiveTime, originalData, result);
if (routeMap != null) if (thisHandled)
{ processed = true;
// Could be null if listeners was updated while handling message
var handled = listener.Handle(topicFilter, this, receiveTime, originalData, result, routeMap);
if (!processed)
processed = handled;
if (!routeMap.MultipleReaders)
// If this was a response to a query and there are no other routes expecting this message we can break here
break;
}
} }
if (!processed) if (!processed)
@ -1163,7 +1151,9 @@ namespace CryptoExchange.Net.Sockets.Default
{ {
var updatedList = new List<IMessageProcessor>(_listeners); var updatedList = new List<IMessageProcessor>(_listeners);
updatedList.Add(processor); updatedList.Add(processor);
_routeTable = new RouteTable(updatedList);
_listeners = updatedList.AsReadOnly(); _listeners = updatedList.AsReadOnly();
} }
} }
@ -1173,6 +1163,7 @@ namespace CryptoExchange.Net.Sockets.Default
{ {
var updatedList = new List<IMessageProcessor>(_listeners); var updatedList = new List<IMessageProcessor>(_listeners);
updatedList.Remove(processor); updatedList.Remove(processor);
_routeTable = new RouteTable(updatedList);
_listeners = updatedList.AsReadOnly(); _listeners = updatedList.AsReadOnly();
} }
} }
@ -1184,6 +1175,7 @@ namespace CryptoExchange.Net.Sockets.Default
var updatedList = new List<IMessageProcessor>(_listeners); var updatedList = new List<IMessageProcessor>(_listeners);
foreach (var processor in processors) foreach (var processor in processors)
updatedList.Remove(processor); updatedList.Remove(processor);
_routeTable = new RouteTable(updatedList);
_listeners = updatedList.AsReadOnly(); _listeners = updatedList.AsReadOnly();
} }
} }

View File

@ -180,7 +180,7 @@ namespace CryptoExchange.Net.Sockets.Default
/// <summary> /// <summary>
/// Handle an update message /// Handle an update message
/// </summary> /// </summary>
public bool Handle(string? topicFilter, SocketConnection connection, DateTime receiveTime, string? originalData, object data, RouteMapEntry routeMap) public bool Handle(string typeIdentifier, string? topicFilter, SocketConnection connection, DateTime receiveTime, string? originalData, object data)
{ {
ConnectionInvocations++; ConnectionInvocations++;
TotalInvocations++; TotalInvocations++;
@ -193,7 +193,9 @@ namespace CryptoExchange.Net.Sockets.Default
SubscriptionQuery.Timeout(); SubscriptionQuery.Timeout();
} }
return routeMap.Handle(topicFilter, connection, receiveTime, originalData, data, out _); return MessageRouter[typeIdentifier].Handle(topicFilter, connection, receiveTime, originalData, data, out _);
//return routeMap.Handle(topicFilter, connection, receiveTime, originalData, data, out _);
} }
/// <summary> /// <summary>

View File

@ -21,6 +21,7 @@ namespace CryptoExchange.Net.Sockets.Interfaces
/// <summary> /// <summary>
/// Handle a message /// Handle a message
/// </summary> /// </summary>
bool Handle(string? topicFilter, SocketConnection connection, DateTime receiveTime, string? originalData, object result, RouteMapEntry route); //bool Handle(string? topicFilter, SocketConnection connection, DateTime receiveTime, string? originalData, object result, MessageRoute route);
bool Handle(string typeIdentifier, string? topicFilter, SocketConnection socketConnection, DateTime receiveTime, string? originalData, object result);
} }
} }

View File

@ -1,5 +1,6 @@
using CryptoExchange.Net.Objects; using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Sockets.Default; using CryptoExchange.Net.Sockets.Default;
using CryptoExchange.Net.Sockets.Interfaces;
using System; using System;
#if NET8_0_OR_GREATER #if NET8_0_OR_GREATER
using System.Collections.Frozen; using System.Collections.Frozen;
@ -9,6 +10,44 @@ using System.Linq;
namespace CryptoExchange.Net.Sockets namespace CryptoExchange.Net.Sockets
{ {
public class RouteTable
{
private Dictionary<string, RouteTableEntry> _routeTableEntries;
public RouteTable(IEnumerable<IMessageProcessor> processors)
{
_routeTableEntries = new Dictionary<string, RouteTableEntry>();
foreach (var entry in processors)
{
foreach(var route in entry.MessageRouter.Routes)
{
if (!_routeTableEntries.ContainsKey(route.TypeIdentifier)) {
_routeTableEntries.Add(route.TypeIdentifier, new RouteTableEntry()
{
RouteType = route.DeserializationType,
Handlers = new List<IMessageProcessor>()
});
}
_routeTableEntries[route.TypeIdentifier].Handlers.Add(entry);
}
}
}
public RouteTableEntry? GetRouteTableEntry(string typeIdentifier)
{
return _routeTableEntries.TryGetValue(typeIdentifier, out var entry) ? entry : null;
}
}
public record RouteTableEntry
{
public Type RouteType { get; set; }
public List<IMessageProcessor> Handlers { get; set; }
}
public record RouteMapEntry public record RouteMapEntry
{ {
public Type RouteType { get; } public Type RouteType { get; }

View File

@ -164,7 +164,7 @@ namespace CryptoExchange.Net.Sockets
/// <summary> /// <summary>
/// Handle a response message /// Handle a response message
/// </summary> /// </summary>
public abstract bool Handle(string? topicFilter, SocketConnection connection, DateTime receiveTime, string? originalData, object message, RouteMapEntry routeMap); public abstract bool Handle(string typeIdentifier, string? topicFilter, SocketConnection connection, DateTime receiveTime, string? originalData, object message);
} }
@ -194,7 +194,7 @@ namespace CryptoExchange.Net.Sockets
} }
/// <inheritdoc /> /// <inheritdoc />
public override bool Handle(string? topicFilter, SocketConnection connection, DateTime receiveTime, string? originalData, object message, RouteMapEntry routeMap) public override bool Handle(string typeIdentifier, string? topicFilter, SocketConnection connection, DateTime receiveTime, string? originalData, object message)
{ {
CurrentResponses++; CurrentResponses++;
if (CurrentResponses == RequiredResponses) if (CurrentResponses == RequiredResponses)
@ -203,7 +203,7 @@ namespace CryptoExchange.Net.Sockets
if (Result?.Success != false) if (Result?.Success != false)
{ {
// If an error result is already set don't override that // If an error result is already set don't override that
routeMap.Handle(topicFilter, connection, receiveTime, originalData, message, out var result); MessageRouter[typeIdentifier].Handle(topicFilter, connection, receiveTime, originalData, message, out var result);
Result = result; Result = result;
if (Result == null) if (Result == null)
// Null from Handle means it wasn't actually for this query // Null from Handle means it wasn't actually for this query