1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2026-04-08 10:41:08 +00:00
This commit is contained in:
Jkorf 2026-04-07 15:34:32 +02:00
parent 792dfa2cf2
commit e4fd67517b
12 changed files with 593 additions and 444 deletions

View File

@ -2,6 +2,7 @@
using CryptoExchange.Net.Objects.Errors; using CryptoExchange.Net.Objects.Errors;
using CryptoExchange.Net.Sockets; using CryptoExchange.Net.Sockets;
using CryptoExchange.Net.Sockets.Default; using CryptoExchange.Net.Sockets.Default;
using CryptoExchange.Net.Sockets.Default.Routing;
using System; using System;
namespace CryptoExchange.Net.UnitTests.Implementations namespace CryptoExchange.Net.UnitTests.Implementations

View File

@ -2,6 +2,7 @@
using CryptoExchange.Net.Objects.Sockets; using CryptoExchange.Net.Objects.Sockets;
using CryptoExchange.Net.Sockets; using CryptoExchange.Net.Sockets;
using CryptoExchange.Net.Sockets.Default; using CryptoExchange.Net.Sockets.Default;
using CryptoExchange.Net.Sockets.Default.Routing;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;

View File

@ -171,6 +171,7 @@ namespace CryptoExchange.Net.Converters.SystemTextJson
return resultOptimistic.Value; return resultOptimistic.Value;
} }
var isNumber = reader.TokenType == JsonTokenType.Number;
var stringValue = reader.TokenType switch var stringValue = reader.TokenType switch
{ {
JsonTokenType.String => reader.GetString(), JsonTokenType.String => reader.GetString(),
@ -207,7 +208,7 @@ namespace CryptoExchange.Net.Converters.SystemTextJson
return null; return null;
} }
if (RunOptimistic) if (RunOptimistic && !isNumber)
{ {
if (!_unknownValuesWarned.Contains(stringValue)) if (!_unknownValuesWarned.Contains(stringValue))
{ {

View File

@ -0,0 +1,104 @@
using CryptoExchange.Net.Objects;
using System;
namespace CryptoExchange.Net.Sockets.Default.Routing
{
/// <summary>
/// Message route
/// </summary>
public abstract class MessageRoute
{
/// <summary>
/// Type identifier
/// </summary>
public string TypeIdentifier { get; set; }
/// <summary>
/// Optional topic filter
/// </summary>
public string? TopicFilter { get; set; }
/// <summary>
/// Whether responses to this route might be read by multiple listeners
/// </summary>
public bool MultipleReaders { get; set; } = false;
/// <summary>
/// Deserialization type
/// </summary>
public abstract Type DeserializationType { get; }
/// <summary>
/// ctor
/// </summary>
public MessageRoute(string typeIdentifier, string? topicFilter)
{
TypeIdentifier = typeIdentifier;
TopicFilter = topicFilter;
}
/// <summary>
/// Message handler
/// </summary>
public abstract CallResult? Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object data);
}
/// <summary>
/// Message route
/// </summary>
public class MessageRoute<TMessage> : MessageRoute
{
private Func<SocketConnection, DateTime, string?, TMessage, CallResult?> _handler;
/// <inheritdoc />
public override Type DeserializationType { get; } = typeof(TMessage);
/// <summary>
/// ctor
/// </summary>
internal MessageRoute(string typeIdentifier, string? topicFilter, Func<SocketConnection, DateTime, string?, TMessage, CallResult?> handler, bool multipleReaders = false)
: base(typeIdentifier, topicFilter)
{
_handler = handler;
MultipleReaders = multipleReaders;
}
/// <summary>
/// Create route without topic filter
/// </summary>
public static MessageRoute<TMessage> CreateWithoutTopicFilter(string typeIdentifier, Func<SocketConnection, DateTime, string?, TMessage, CallResult?> handler, bool multipleReaders = false)
{
return new MessageRoute<TMessage>(typeIdentifier, null, handler)
{
MultipleReaders = multipleReaders
};
}
/// <summary>
/// Create route with optional topic filter
/// </summary>
public static MessageRoute<TMessage> CreateWithOptionalTopicFilter(string typeIdentifier, string? topicFilter, Func<SocketConnection, DateTime, string?, TMessage, CallResult?> handler, bool multipleReaders = false)
{
return new MessageRoute<TMessage>(typeIdentifier, topicFilter, handler)
{
MultipleReaders = multipleReaders
};
}
/// <summary>
/// Create route with topic filter
/// </summary>
public static MessageRoute<TMessage> CreateWithTopicFilter(string typeIdentifier, string topicFilter, Func<SocketConnection, DateTime, string?, TMessage, CallResult?> handler, bool multipleReaders = false)
{
return new MessageRoute<TMessage>(typeIdentifier, topicFilter, handler)
{
MultipleReaders = multipleReaders
};
}
/// <inheritdoc />
public override CallResult? Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object data)
{
return _handler(connection, receiveTime, originalData, (TMessage)data);
}
}
}

View File

@ -0,0 +1,185 @@
using CryptoExchange.Net.Objects;
using System;
using System.Collections.Generic;
using System.Linq;
namespace CryptoExchange.Net.Sockets.Default.Routing
{
/// <summary>
/// Message router
/// </summary>
public class MessageRouter
{
private RoutingSubTable? _routingTable;
/// <summary>
/// The routes registered for this router
/// </summary>
public MessageRoute[] Routes { get; }
/// <summary>
/// ctor
/// </summary>
private MessageRouter(params MessageRoute[] routes)
{
Routes = routes;
}
/// <summary>
/// Build the route mapping
/// </summary>
public void BuildRouteMap()
{
_routingTable = new RoutingSubTable(Routes);
}
/// <summary>
/// Get routes matching the type identifier
/// </summary>
internal RoutingSubTableEntry? this[string identifier]
{
get => (_routingTable ?? throw new InvalidOperationException("Route map not initialized before use"))[identifier];
}
/// <summary>
/// Create message router without specific message handler
/// </summary>
public static MessageRouter CreateWithoutHandler<T>(string typeIdentifier, bool multipleReaders = false)
{
return new MessageRouter(new MessageRoute<T>(typeIdentifier, null, (con, receiveTime, originalData, msg) => new CallResult<T>(default, null, null), multipleReaders));
}
/// <summary>
/// Create message router without specific message handler
/// </summary>
public static MessageRouter CreateWithoutHandler<T>(string typeIdentifier, string topicFilter, bool multipleReaders = false)
{
return new MessageRouter(new MessageRoute<T>(typeIdentifier, topicFilter, (con, receiveTime, originalData, msg) => new CallResult<string>(default, null, null), multipleReaders));
}
/// <summary>
/// Create message router without topic filter
/// </summary>
public static MessageRouter CreateWithoutTopicFilter<T>(IEnumerable<string> values, Func<SocketConnection, DateTime, string?, T, CallResult?> handler, bool multipleReaders = false)
{
return new MessageRouter(values.Select(x => new MessageRoute<T>(x, null, handler, multipleReaders)).ToArray());
}
/// <summary>
/// Create message router without topic filter
/// </summary>
public static MessageRouter CreateWithoutTopicFilter<T>(string typeIdentifier, Func<SocketConnection, DateTime, string?, T, CallResult?> handler, bool multipleReaders = false)
{
return new MessageRouter(new MessageRoute<T>(typeIdentifier, null, handler, multipleReaders));
}
/// <summary>
/// Create message router with topic filter
/// </summary>
public static MessageRouter CreateWithTopicFilter<T>(string typeIdentifier, string topicFilter, Func<SocketConnection, DateTime, string?, T, CallResult?> handler, bool multipleReaders = false)
{
return new MessageRouter(new MessageRoute<T>(typeIdentifier, topicFilter, handler, multipleReaders));
}
/// <summary>
/// Create message router with topic filter
/// </summary>
public static MessageRouter CreateWithTopicFilter<T>(IEnumerable<string> typeIdentifiers, string topicFilter, Func<SocketConnection, DateTime, string?, T, CallResult?> handler, bool multipleReaders = false)
{
var routes = new List<MessageRoute>();
foreach (var type in typeIdentifiers)
routes.Add(new MessageRoute<T>(type, topicFilter, handler, multipleReaders));
return new MessageRouter(routes.ToArray());
}
/// <summary>
/// Create message router with topic filter
/// </summary>
public static MessageRouter CreateWithTopicFilters<T>(string typeIdentifier, IEnumerable<string> topicFilters, Func<SocketConnection, DateTime, string?, T, CallResult?> handler, bool multipleReaders = false)
{
var routes = new List<MessageRoute>();
foreach (var filter in topicFilters)
routes.Add(new MessageRoute<T>(typeIdentifier, filter, handler, multipleReaders));
return new MessageRouter(routes.ToArray());
}
/// <summary>
/// Create message router with topic filter
/// </summary>
public static MessageRouter CreateWithTopicFilters<T>(IEnumerable<string> typeIdentifiers, IEnumerable<string> topicFilters, Func<SocketConnection, DateTime, string?, T, CallResult?> handler, bool multipleReaders = false)
{
var routes = new List<MessageRoute>();
foreach (var type in typeIdentifiers)
{
foreach (var filter in topicFilters)
routes.Add(new MessageRoute<T>(type, filter, handler, multipleReaders));
}
return new MessageRouter(routes.ToArray());
}
/// <summary>
/// Create message router with optional topic filter
/// </summary>
public static MessageRouter CreateWithOptionalTopicFilter<T>(string typeIdentifier, string? topicFilter, Func<SocketConnection, DateTime, string?, T, CallResult?> handler, bool multipleReaders = false)
{
return new MessageRouter(new MessageRoute<T>(typeIdentifier, topicFilter, handler, multipleReaders));
}
/// <summary>
/// Create message router with optional topic filter
/// </summary>
public static MessageRouter CreateWithOptionalTopicFilters<T>(string typeIdentifier, IEnumerable<string>? topicFilters, Func<SocketConnection, DateTime, string?, T, CallResult?> handler, bool multipleReaders = false)
{
var routes = new List<MessageRoute>();
if (topicFilters?.Count() > 0)
{
foreach (var filter in topicFilters)
routes.Add(new MessageRoute<T>(typeIdentifier, filter, handler, multipleReaders));
}
else
{
routes.Add(new MessageRoute<T>(typeIdentifier, null, handler, multipleReaders));
}
return new MessageRouter(routes.ToArray());
}
/// <summary>
/// Create message router with optional topic filter
/// </summary>
public static MessageRouter CreateWithOptionalTopicFilters<T>(IEnumerable<string> typeIdentifiers, IEnumerable<string>? topicFilters, Func<SocketConnection, DateTime, string?, T, CallResult?> handler, bool multipleReaders = false)
{
var routes = new List<MessageRoute>();
foreach (var typeIdentifier in typeIdentifiers)
{
if (topicFilters?.Count() > 0)
{
foreach (var filter in topicFilters)
routes.Add(new MessageRoute<T>(typeIdentifier, filter, handler, multipleReaders));
}
else
{
routes.Add(new MessageRoute<T>(typeIdentifier, null, handler, multipleReaders));
}
}
return new MessageRouter(routes.ToArray());
}
/// <summary>
/// Create message matcher with specific routes
/// </summary>
public static MessageRouter Create(params MessageRoute[] routes)
{
return new MessageRouter(routes);
}
/// <summary>
/// Whether this matcher contains a specific link
/// </summary>
public bool ContainsCheck(MessageRoute route) => Routes.Any(x => x.TypeIdentifier == route.TypeIdentifier && x.TopicFilter == route.TopicFilter);
}
}

View File

@ -0,0 +1,142 @@
using CryptoExchange.Net.Objects;
using System;
#if NET8_0_OR_GREATER
using System.Collections.Frozen;
#endif
using System.Collections.Generic;
namespace CryptoExchange.Net.Sockets.Default.Routing
{
internal class RoutingSubTable
{
#if NET8_0_OR_GREATER
// Used for mapping a type identifier to the routes matching it
private FrozenDictionary<string, RoutingSubTableEntry> _routeMap;
#else
private Dictionary<string, RoutingSubTableEntry> _routeMap;
#endif
public RoutingSubTable(IEnumerable<MessageRoute> routes)
{
var newMap = new Dictionary<string, RoutingSubTableEntry>();
foreach (var route in routes)
{
if (!newMap.TryGetValue(route.TypeIdentifier, out var typeMap))
{
typeMap = new RoutingSubTableEntry(route.DeserializationType);
newMap.Add(route.TypeIdentifier, typeMap);
}
typeMap.AddRoute(route.TopicFilter, route);
}
foreach(var subEntry in newMap.Values)
subEntry.Build();
#if NET8_0_OR_GREATER
_routeMap = newMap.ToFrozenDictionary();
#else
_routeMap = newMap;
#endif
}
/// <summary>
/// Get routes matching the type identifier
/// </summary>
public RoutingSubTableEntry? this[string identifier]
{
get => _routeMap.TryGetValue(identifier, out var routes) ? routes : null;
}
}
internal record RoutingSubTableEntry
{
public Type DeserializationType { get; }
public bool MultipleReaders { get; private set; }
private List<MessageRoute> _routesWithoutTopicFilter;
private Dictionary<string, List<MessageRoute>> _routesWithTopicFilter;
#if NET8_0_OR_GREATER
// Used for mapping a type identifier to the routes matching it
private FrozenDictionary<string, List<MessageRoute>>? _routesWithTopicFilterFrozen;
#endif
public RoutingSubTableEntry(Type routeType)
{
_routesWithoutTopicFilter = new List<MessageRoute>();
_routesWithTopicFilter = new Dictionary<string, List<MessageRoute>>();
DeserializationType = routeType;
}
public void AddRoute(string? topicFilter, MessageRoute route)
{
if (string.IsNullOrEmpty(topicFilter))
{
_routesWithoutTopicFilter.Add(route);
}
else
{
if (!_routesWithTopicFilter.TryGetValue(topicFilter!, out var list))
{
list = new List<MessageRoute>();
_routesWithTopicFilter.Add(topicFilter!, list);
}
list.Add(route);
}
if (route.MultipleReaders)
MultipleReaders = true;
}
public void Build()
{
#if NET8_0_OR_GREATER
_routesWithTopicFilterFrozen = _routesWithTopicFilter.ToFrozenDictionary();
#endif
}
internal bool Handle(string? topicFilter, SocketConnection connection, DateTime receiveTime, string? originalData, object data, out CallResult? result)
{
result = null;
// Routes without topic filter handle both when the message topic is empty and when it is not, so we always call them
var handled = false;
foreach (var route in _routesWithoutTopicFilter)
{
var thisResult = route.Handle(connection, receiveTime, originalData, data);
if (thisResult != null)
result ??= thisResult;
handled = true;
}
// Forward to routes with matching topic filter, if any
if (topicFilter != null)
{
#if NET8_0_OR_GREATER
_routesWithTopicFilterFrozen!.TryGetValue(topicFilter, out var matchingTopicRoutes);
#else
_routesWithTopicFilter.TryGetValue(topicFilter, out var matchingTopicRoutes);
#endif
foreach (var route in matchingTopicRoutes ?? [])
{
var thisResult = route.Handle(connection, receiveTime, originalData, data);
handled = true;
if (thisResult != null)
{
result ??= thisResult;
if (!MultipleReaders)
break;
}
}
}
return handled;
}
}
}

View File

@ -0,0 +1,96 @@
using CryptoExchange.Net.Sockets.Interfaces;
using System;
using System.Collections.Generic;
using System.Text;
namespace CryptoExchange.Net.Sockets.Default.Routing
{
/// <summary>
/// Routing table
/// </summary>
public class RoutingTable
{
private Dictionary<string, RoutingTableEntry> _routeTableEntries;
/// <summary>
/// Create routing table for provided processors
/// </summary>
public RoutingTable(IEnumerable<IMessageProcessor> processors)
{
_routeTableEntries = new Dictionary<string, RoutingTableEntry>();
foreach (var entry in processors)
{
foreach (var route in entry.MessageRouter.Routes)
{
if (!_routeTableEntries.ContainsKey(route.TypeIdentifier))
_routeTableEntries.Add(route.TypeIdentifier, new RoutingTableEntry(route.DeserializationType));
_routeTableEntries[route.TypeIdentifier].Handlers.Add(entry);
}
}
}
/// <summary>
/// Get route table entry for a type identifier
/// </summary>
public RoutingTableEntry? GetRouteTableEntry(string typeIdentifier)
{
return _routeTableEntries.TryGetValue(typeIdentifier, out var entry) ? entry : null;
}
/// <inheritdoc />
public override string ToString()
{
var sb = new StringBuilder();
foreach (var entry in _routeTableEntries)
{
sb.AppendLine($"{entry.Key}, {entry.Value.DeserializationType.Name}");
foreach(var item in entry.Value.Handlers)
{
sb.AppendLine($" - Processor {item.GetType().Name}");
foreach(var route in item.MessageRouter.Routes)
{
if (route.TypeIdentifier == entry.Key)
{
if (route.TopicFilter == null)
sb.AppendLine($" - Route without topic filter");
else
sb.AppendLine($" - Route with topic filter {route.TopicFilter}");
}
}
}
}
return sb.ToString();
}
}
/// <summary>
/// Routing table entry
/// </summary>
public record RoutingTableEntry
{
/// <summary>
/// Whether the deserialization type is string
/// </summary>
public bool IsStringOutput { get; set; }
/// <summary>
/// The deserialization type
/// </summary>
public Type DeserializationType { get; set; }
/// <summary>
/// Message processors
/// </summary>
public List<IMessageProcessor> Handlers { get; set; }
/// <summary>
/// ctor
/// </summary>
public RoutingTableEntry(Type deserializationType)
{
IsStringOutput = deserializationType == typeof(string);
DeserializationType = deserializationType;
Handlers = new List<IMessageProcessor>();
}
}
}

View File

@ -5,13 +5,11 @@ using CryptoExchange.Net.Logging.Extensions;
using CryptoExchange.Net.Objects; using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Sockets; using CryptoExchange.Net.Objects.Sockets;
using CryptoExchange.Net.Sockets.Default.Interfaces; using CryptoExchange.Net.Sockets.Default.Interfaces;
using CryptoExchange.Net.Sockets.Default.Routing;
using CryptoExchange.Net.Sockets.Interfaces; using CryptoExchange.Net.Sockets.Interfaces;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using System; using System;
using System.Collections; using System.Collections;
#if NET8_0_OR_GREATER
using System.Collections.Frozen;
#endif
using System.Collections.Generic; using System.Collections.Generic;
using System.Collections.ObjectModel; using System.Collections.ObjectModel;
using System.Data; using System.Data;
@ -267,8 +265,7 @@ namespace CryptoExchange.Net.Sockets.Default
private readonly object _listenersLock = new object(); private readonly object _listenersLock = new object();
#endif #endif
private RoutingTable _routeTable = new RoutingTable([]);
private RouteTable _routeTable = new RouteTable([]);
private ReadOnlyCollection<IMessageProcessor> _listeners; private ReadOnlyCollection<IMessageProcessor> _listeners;
private readonly ILogger _logger; private readonly ILogger _logger;
@ -552,7 +549,7 @@ namespace CryptoExchange.Net.Sockets.Default
object result; object result;
try try
{ {
if (routingEntry.RouteType == typeof(string)) if (routingEntry.IsStringOutput)
{ {
#if NETSTANDARD2_0 #if NETSTANDARD2_0
result = Encoding.UTF8.GetString(data.ToArray()); result = Encoding.UTF8.GetString(data.ToArray());
@ -562,7 +559,7 @@ namespace CryptoExchange.Net.Sockets.Default
} }
else else
{ {
result = messageConverter.Deserialize(data, routingEntry.RouteType); result = messageConverter.Deserialize(data, routingEntry.DeserializationType);
} }
} }
catch(Exception ex) catch(Exception ex)
@ -1145,15 +1142,32 @@ namespace CryptoExchange.Net.Sockets.Default
}); });
} }
private void BuildRoutingTable()
{
_routeTable = new RoutingTable(_listeners);
}
private void AddMessageProcessor(IMessageProcessor processor) private void AddMessageProcessor(IMessageProcessor processor)
{ {
lock (_listenersLock) lock (_listenersLock)
{ {
var updatedList = new List<IMessageProcessor>(_listeners); var updatedList = new List<IMessageProcessor>(_listeners);
updatedList.Add(processor); updatedList.Add(processor);
_routeTable = new RouteTable(updatedList); processor.OnMessageRouterUpdated += () =>
{
BuildRoutingTable();
#if DEBUG
_logger.LogTrace("MessageRouter updated, new routing table:\r\n" + _routeTable.ToString());
#endif
};
_listeners = updatedList.AsReadOnly(); _listeners = updatedList.AsReadOnly();
if (processor.MessageRouter.Routes.Length > 0)
{
BuildRoutingTable();
#if DEBUG
_logger.LogTrace("Processor added, new routing table:\r\n" + _routeTable.ToString());
#endif
}
} }
} }
@ -1162,9 +1176,15 @@ namespace CryptoExchange.Net.Sockets.Default
lock (_listenersLock) lock (_listenersLock)
{ {
var updatedList = new List<IMessageProcessor>(_listeners); var updatedList = new List<IMessageProcessor>(_listeners);
updatedList.Remove(processor); if (!updatedList.Remove(processor))
_routeTable = new RouteTable(updatedList); return; // If nothing removed nothing has changed
processor.OnMessageRouterUpdated -= BuildRoutingTable;
_listeners = updatedList.AsReadOnly(); _listeners = updatedList.AsReadOnly();
BuildRoutingTable();
#if DEBUG
_logger.LogTrace("Processor removed, new routing table:\r\n" + _routeTable.ToString());
#endif
} }
} }
@ -1173,13 +1193,24 @@ namespace CryptoExchange.Net.Sockets.Default
lock (_listenersLock) lock (_listenersLock)
{ {
var updatedList = new List<IMessageProcessor>(_listeners); var updatedList = new List<IMessageProcessor>(_listeners);
var anyRemoved = false;
foreach (var processor in processors) foreach (var processor in processors)
updatedList.Remove(processor); {
_routeTable = new RouteTable(updatedList); processor.OnMessageRouterUpdated -= BuildRoutingTable;
if (updatedList.Remove(processor))
anyRemoved = true;
}
if (!anyRemoved)
return; // If nothing removed nothing has changed
_listeners = updatedList.AsReadOnly(); _listeners = updatedList.AsReadOnly();
BuildRoutingTable();
#if DEBUG
_logger.LogTrace("Processors removed, new routing table:\r\n" + _routeTable.ToString());
#endif
} }
} }
} }
} }

View File

@ -1,5 +1,6 @@
using CryptoExchange.Net.Interfaces; using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Objects; using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Sockets.Default.Routing;
using CryptoExchange.Net.Sockets.Interfaces; using CryptoExchange.Net.Sockets.Interfaces;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using System; using System;
@ -82,6 +83,7 @@ namespace CryptoExchange.Net.Sockets.Default
{ {
_router = value; _router = value;
_router.BuildRouteMap(); _router.BuildRouteMap();
OnMessageRouterUpdated?.Invoke();
} }
} }
@ -119,6 +121,9 @@ namespace CryptoExchange.Net.Sockets.Default
/// </summary> /// </summary>
public int IndividualSubscriptionCount { get; set; } = 1; public int IndividualSubscriptionCount { get; set; } = 1;
/// <inheritdoc />
public event Action? OnMessageRouterUpdated;
/// <summary> /// <summary>
/// ctor /// ctor
/// </summary> /// </summary>
@ -193,9 +198,7 @@ namespace CryptoExchange.Net.Sockets.Default
SubscriptionQuery.Timeout(); SubscriptionQuery.Timeout();
} }
return MessageRouter[typeIdentifier].Handle(topicFilter, connection, receiveTime, originalData, data, out _); return MessageRouter[typeIdentifier]?.Handle(topicFilter, connection, receiveTime, originalData, data, out _) ?? false;
//return routeMap.Handle(topicFilter, connection, receiveTime, originalData, data, out _);
} }
/// <summary> /// <summary>

View File

@ -1,6 +1,7 @@
using CryptoExchange.Net.Interfaces; using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Objects; using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Sockets.Default; using CryptoExchange.Net.Sockets.Default;
using CryptoExchange.Net.Sockets.Default.Routing;
using System; using System;
namespace CryptoExchange.Net.Sockets.Interfaces namespace CryptoExchange.Net.Sockets.Interfaces
@ -19,6 +20,10 @@ namespace CryptoExchange.Net.Sockets.Interfaces
/// </summary> /// </summary>
public MessageRouter MessageRouter { get; } public MessageRouter MessageRouter { get; }
/// <summary> /// <summary>
/// Event when the message router for this processor has been changed
/// </summary>
public event Action OnMessageRouterUpdated;
/// <summary>
/// Handle a message /// Handle a message
/// </summary> /// </summary>
//bool Handle(string? topicFilter, SocketConnection connection, DateTime receiveTime, string? originalData, object result, MessageRoute route); //bool Handle(string? topicFilter, SocketConnection connection, DateTime receiveTime, string? originalData, object result, MessageRoute route);

View File

@ -1,425 +0,0 @@
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Sockets.Default;
using CryptoExchange.Net.Sockets.Interfaces;
using System;
#if NET8_0_OR_GREATER
using System.Collections.Frozen;
#endif
using System.Collections.Generic;
using System.Linq;
namespace CryptoExchange.Net.Sockets
{
public class RouteTable
{
private Dictionary<string, RouteTableEntry> _routeTableEntries;
public RouteTable(IEnumerable<IMessageProcessor> processors)
{
_routeTableEntries = new Dictionary<string, RouteTableEntry>();
foreach (var entry in processors)
{
foreach(var route in entry.MessageRouter.Routes)
{
if (!_routeTableEntries.ContainsKey(route.TypeIdentifier)) {
_routeTableEntries.Add(route.TypeIdentifier, new RouteTableEntry()
{
RouteType = route.DeserializationType,
Handlers = new List<IMessageProcessor>()
});
}
_routeTableEntries[route.TypeIdentifier].Handlers.Add(entry);
}
}
}
public RouteTableEntry? GetRouteTableEntry(string typeIdentifier)
{
return _routeTableEntries.TryGetValue(typeIdentifier, out var entry) ? entry : null;
}
}
public record RouteTableEntry
{
public Type RouteType { get; set; }
public List<IMessageProcessor> Handlers { get; set; }
}
public record RouteMapEntry
{
public Type RouteType { get; }
public bool MultipleReaders { get; private set; }
private List<MessageRoute> _routesWithoutTopicFilter;
private Dictionary<string, List<MessageRoute>> _routesWithTopicFilter;
public RouteMapEntry(Type routeType)
{
_routesWithoutTopicFilter = new List<MessageRoute>();
_routesWithTopicFilter = new Dictionary<string, List<MessageRoute>>();
RouteType = routeType;
}
public void AddRoute(string? topicFilter, MessageRoute route)
{
if (string.IsNullOrEmpty(topicFilter))
{
_routesWithoutTopicFilter.Add(route);
}
else
{
if (!_routesWithTopicFilter.TryGetValue(topicFilter!, out var list))
{
list = new List<MessageRoute>();
_routesWithTopicFilter.Add(topicFilter!, list);
}
list.Add(route);
}
if (route.MultipleReaders)
MultipleReaders = true;
}
internal bool Handle(string? topicFilter, SocketConnection connection, DateTime receiveTime, string? originalData, object data, out CallResult? result)
{
result = null;
// Routes without topic filter handle both when the message topic is empty and when it is not, so we always call them
var handled = false;
foreach (var route in _routesWithoutTopicFilter)
{
var thisResult = route.Handle(connection, receiveTime, originalData, data);
if (thisResult != null)
result ??= thisResult;
handled = true;
}
// Forward to routes with matching topic filter, if any
if (topicFilter != null)
{
_routesWithTopicFilter.TryGetValue(topicFilter, out var matchingTopicRoutes);
foreach (var route in matchingTopicRoutes ?? [])
{
var thisResult = route.Handle(connection, receiveTime, originalData, data);
handled = true;
if (thisResult != null)
{
result ??= thisResult;
if (!MultipleReaders)
break;
}
}
}
return handled;
}
}
/// <summary>
/// Message router
/// </summary>
public class MessageRouter
{
/// <summary>
/// The routes registered for this router
/// </summary>
public MessageRoute[] Routes { get; }
#if NET8_0_OR_GREATER
// Used for mapping a type identifier to the routes matching it
private FrozenDictionary<string, RouteMapEntry>? _routeMap;
#else
private Dictionary<string, RouteMapEntry>? _routeMap;
#endif
/// <summary>
/// ctor
/// </summary>
private MessageRouter(params MessageRoute[] routes)
{
Routes = routes;
}
/// <summary>
/// Build the route mapping
/// </summary>
public void BuildRouteMap()
{
var newMap = new Dictionary<string, RouteMapEntry>();
foreach (var route in Routes)
{
if (!newMap.TryGetValue(route.TypeIdentifier, out var typeMap))
{
typeMap = new RouteMapEntry(route.DeserializationType);
newMap.Add(route.TypeIdentifier, typeMap);
}
typeMap.AddRoute(route.TopicFilter, route);
}
#if NET8_0_OR_GREATER
_routeMap = newMap.ToFrozenDictionary();
#else
_routeMap = newMap;
#endif
}
/// <summary>
/// Get routes matching the type identifier
/// </summary>
public RouteMapEntry? this[string identifier]
{
get => (_routeMap ?? throw new InvalidOperationException("Route map not initialized before use")).TryGetValue(identifier, out var routes) ? routes: null;
}
/// <summary>
/// Create message router without specific message handler
/// </summary>
public static MessageRouter CreateWithoutHandler<T>(string typeIdentifier, bool multipleReaders = false)
{
return new MessageRouter(new MessageRoute<T>(typeIdentifier, (string?)null, (con, receiveTime, originalData, msg) => new CallResult<T>(default, null, null), multipleReaders));
}
/// <summary>
/// Create message router without specific message handler
/// </summary>
public static MessageRouter CreateWithoutHandler<T>(string typeIdentifier, string topicFilter, bool multipleReaders = false)
{
return new MessageRouter(new MessageRoute<T>(typeIdentifier, topicFilter, (con, receiveTime, originalData, msg) => new CallResult<string>(default, null, null), multipleReaders));
}
/// <summary>
/// Create message router without topic filter
/// </summary>
public static MessageRouter CreateWithoutTopicFilter<T>(IEnumerable<string> values, Func<SocketConnection, DateTime, string?, T, CallResult?> handler, bool multipleReaders = false)
{
return new MessageRouter(values.Select(x => new MessageRoute<T>(x, null, handler, multipleReaders)).ToArray());
}
/// <summary>
/// Create message router without topic filter
/// </summary>
public static MessageRouter CreateWithoutTopicFilter<T>(string typeIdentifier, Func<SocketConnection, DateTime, string?, T, CallResult?> handler, bool multipleReaders = false)
{
return new MessageRouter(new MessageRoute<T>(typeIdentifier, null, handler, multipleReaders));
}
/// <summary>
/// Create message router with topic filter
/// </summary>
public static MessageRouter CreateWithTopicFilter<T>(string typeIdentifier, string topicFilter, Func<SocketConnection, DateTime, string?, T, CallResult?> handler, bool multipleReaders = false)
{
return new MessageRouter(new MessageRoute<T>(typeIdentifier, topicFilter, handler, multipleReaders));
}
/// <summary>
/// Create message router with topic filter
/// </summary>
public static MessageRouter CreateWithTopicFilter<T>(IEnumerable<string> typeIdentifiers, string topicFilter, Func<SocketConnection, DateTime, string?, T, CallResult?> handler, bool multipleReaders = false)
{
var routes = new List<MessageRoute>();
foreach (var type in typeIdentifiers)
routes.Add(new MessageRoute<T>(type, topicFilter, handler, multipleReaders));
return new MessageRouter(routes.ToArray());
}
/// <summary>
/// Create message router with topic filter
/// </summary>
public static MessageRouter CreateWithTopicFilters<T>(string typeIdentifier, IEnumerable<string> topicFilters, Func<SocketConnection, DateTime, string?, T, CallResult?> handler, bool multipleReaders = false)
{
var routes = new List<MessageRoute>();
foreach (var filter in topicFilters)
routes.Add(new MessageRoute<T>(typeIdentifier, filter, handler, multipleReaders));
return new MessageRouter(routes.ToArray());
}
/// <summary>
/// Create message router with topic filter
/// </summary>
public static MessageRouter CreateWithTopicFilters<T>(IEnumerable<string> typeIdentifiers, IEnumerable<string> topicFilters, Func<SocketConnection, DateTime, string?, T, CallResult?> handler, bool multipleReaders = false)
{
var routes = new List<MessageRoute>();
foreach (var type in typeIdentifiers)
{
foreach (var filter in topicFilters)
routes.Add(new MessageRoute<T>(type, filter, handler, multipleReaders));
}
return new MessageRouter(routes.ToArray());
}
/// <summary>
/// Create message router with optional topic filter
/// </summary>
public static MessageRouter CreateWithOptionalTopicFilter<T>(string typeIdentifier, string? topicFilter, Func<SocketConnection, DateTime, string?, T, CallResult?> handler, bool multipleReaders = false)
{
return new MessageRouter(new MessageRoute<T>(typeIdentifier, topicFilter, handler, multipleReaders));
}
/// <summary>
/// Create message router with optional topic filter
/// </summary>
public static MessageRouter CreateWithOptionalTopicFilters<T>(string typeIdentifier, IEnumerable<string>? topicFilters, Func<SocketConnection, DateTime, string?, T, CallResult?> handler, bool multipleReaders = false)
{
var routes = new List<MessageRoute>();
if (topicFilters?.Count() > 0)
{
foreach (var filter in topicFilters)
routes.Add(new MessageRoute<T>(typeIdentifier, filter, handler, multipleReaders));
}
else
{
routes.Add(new MessageRoute<T>(typeIdentifier, null, handler, multipleReaders));
}
return new MessageRouter(routes.ToArray());
}
/// <summary>
/// Create message router with optional topic filter
/// </summary>
public static MessageRouter CreateWithOptionalTopicFilters<T>(IEnumerable<string> typeIdentifiers, IEnumerable<string>? topicFilters, Func<SocketConnection, DateTime, string?, T, CallResult?> handler, bool multipleReaders = false)
{
var routes = new List<MessageRoute>();
foreach (var typeIdentifier in typeIdentifiers)
{
if (topicFilters?.Count() > 0)
{
foreach (var filter in topicFilters)
routes.Add(new MessageRoute<T>(typeIdentifier, filter, handler, multipleReaders));
}
else
{
routes.Add(new MessageRoute<T>(typeIdentifier, null, handler, multipleReaders));
}
}
return new MessageRouter(routes.ToArray());
}
/// <summary>
/// Create message matcher with specific routes
/// </summary>
public static MessageRouter Create(params MessageRoute[] routes)
{
return new MessageRouter(routes);
}
/// <summary>
/// Whether this matcher contains a specific link
/// </summary>
public bool ContainsCheck(MessageRoute route) => Routes.Any(x => x.TypeIdentifier == route.TypeIdentifier && x.TopicFilter == route.TopicFilter);
}
/// <summary>
/// Message route
/// </summary>
public abstract class MessageRoute
{
/// <summary>
/// Type identifier
/// </summary>
public string TypeIdentifier { get; set; }
/// <summary>
/// Optional topic filter
/// </summary>
public string? TopicFilter { get; set; }
/// <summary>
/// Whether responses to this route might be read by multiple listeners
/// </summary>
public bool MultipleReaders { get; set; } = false;
/// <summary>
/// Deserialization type
/// </summary>
public abstract Type DeserializationType { get; }
/// <summary>
/// ctor
/// </summary>
public MessageRoute(string typeIdentifier, string? topicFilter)
{
TypeIdentifier = typeIdentifier;
TopicFilter = topicFilter;
}
/// <summary>
/// Message handler
/// </summary>
public abstract CallResult? Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object data);
}
/// <summary>
/// Message route
/// </summary>
public class MessageRoute<TMessage> : MessageRoute
{
private Func<SocketConnection, DateTime, string?, TMessage, CallResult?> _handler;
/// <inheritdoc />
public override Type DeserializationType { get; } = typeof(TMessage);
/// <summary>
/// ctor
/// </summary>
internal MessageRoute(string typeIdentifier, string? topicFilter, Func<SocketConnection, DateTime, string?, TMessage, CallResult?> handler, bool multipleReaders = false)
: base(typeIdentifier, topicFilter)
{
_handler = handler;
MultipleReaders = multipleReaders;
}
/// <summary>
/// Create route without topic filter
/// </summary>
public static MessageRoute<TMessage> CreateWithoutTopicFilter(string typeIdentifier, Func<SocketConnection, DateTime, string?, TMessage, CallResult?> handler, bool multipleReaders = false)
{
return new MessageRoute<TMessage>(typeIdentifier, null, handler)
{
MultipleReaders = multipleReaders
};
}
/// <summary>
/// Create route with optional topic filter
/// </summary>
public static MessageRoute<TMessage> CreateWithOptionalTopicFilter(string typeIdentifier, string? topicFilter, Func<SocketConnection, DateTime, string?, TMessage, CallResult?> handler, bool multipleReaders = false)
{
return new MessageRoute<TMessage>(typeIdentifier, topicFilter, handler)
{
MultipleReaders = multipleReaders
};
}
/// <summary>
/// Create route with topic filter
/// </summary>
public static MessageRoute<TMessage> CreateWithTopicFilter(string typeIdentifier, string topicFilter, Func<SocketConnection, DateTime, string?, TMessage, CallResult?> handler, bool multipleReaders = false)
{
return new MessageRoute<TMessage>(typeIdentifier, topicFilter, handler)
{
MultipleReaders = multipleReaders
};
}
/// <inheritdoc />
public override CallResult? Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object data)
{
return _handler(connection, receiveTime, originalData, (TMessage)data);
}
}
}

View File

@ -1,6 +1,7 @@
using CryptoExchange.Net.Interfaces; using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Objects; using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Sockets.Default; using CryptoExchange.Net.Sockets.Default;
using CryptoExchange.Net.Sockets.Default.Routing;
using CryptoExchange.Net.Sockets.Interfaces; using CryptoExchange.Net.Sockets.Interfaces;
using System; using System;
using System.Threading; using System.Threading;
@ -70,6 +71,7 @@ namespace CryptoExchange.Net.Sockets
{ {
_router = value; _router = value;
_router.BuildRouteMap(); _router.BuildRouteMap();
OnMessageRouterUpdated?.Invoke();
} }
} }
@ -108,6 +110,9 @@ namespace CryptoExchange.Net.Sockets
/// </summary> /// </summary>
public Action? OnComplete { get; set; } public Action? OnComplete { get; set; }
/// <inheritdoc />
public event Action? OnMessageRouterUpdated;
/// <summary> /// <summary>
/// ctor /// ctor
/// </summary> /// </summary>
@ -203,7 +208,7 @@ namespace CryptoExchange.Net.Sockets
if (Result?.Success != false) if (Result?.Success != false)
{ {
// If an error result is already set don't override that // If an error result is already set don't override that
MessageRouter[typeIdentifier].Handle(topicFilter, connection, receiveTime, originalData, message, out var result); MessageRouter[typeIdentifier]!.Handle(topicFilter, connection, receiveTime, originalData, message, out var result);
Result = result; Result = result;
if (Result == null) if (Result == null)
// Null from Handle means it wasn't actually for this query // Null from Handle means it wasn't actually for this query