From e4fd67517bad7a19219e272e7a0f1ef35c4d5a56 Mon Sep 17 00:00:00 2001 From: Jkorf Date: Tue, 7 Apr 2026 15:34:32 +0200 Subject: [PATCH] wip --- .../Implementations/TestQuery.cs | 1 + .../Implementations/TestSubscription.cs | 1 + .../SystemTextJson/EnumConverter.cs | 3 +- .../Sockets/Default/Routing/MessageRoute.cs | 104 +++++ .../Sockets/Default/Routing/MessageRouter.cs | 185 ++++++++ .../Default/Routing/RoutingSubTable.cs | 142 ++++++ .../Sockets/Default/Routing/RoutingTable.cs | 96 ++++ .../Sockets/Default/SocketConnection.cs | 59 ++- .../Sockets/Default/Subscription.cs | 9 +- .../Sockets/Interfaces/IMessageProcessor.cs | 5 + CryptoExchange.Net/Sockets/MessageRouter.cs | 425 ------------------ CryptoExchange.Net/Sockets/Query.cs | 7 +- 12 files changed, 593 insertions(+), 444 deletions(-) create mode 100644 CryptoExchange.Net/Sockets/Default/Routing/MessageRoute.cs create mode 100644 CryptoExchange.Net/Sockets/Default/Routing/MessageRouter.cs create mode 100644 CryptoExchange.Net/Sockets/Default/Routing/RoutingSubTable.cs create mode 100644 CryptoExchange.Net/Sockets/Default/Routing/RoutingTable.cs delete mode 100644 CryptoExchange.Net/Sockets/MessageRouter.cs diff --git a/CryptoExchange.Net.UnitTests/Implementations/TestQuery.cs b/CryptoExchange.Net.UnitTests/Implementations/TestQuery.cs index 4d762dd..05dd240 100644 --- a/CryptoExchange.Net.UnitTests/Implementations/TestQuery.cs +++ b/CryptoExchange.Net.UnitTests/Implementations/TestQuery.cs @@ -2,6 +2,7 @@ using CryptoExchange.Net.Objects.Errors; using CryptoExchange.Net.Sockets; using CryptoExchange.Net.Sockets.Default; +using CryptoExchange.Net.Sockets.Default.Routing; using System; namespace CryptoExchange.Net.UnitTests.Implementations diff --git a/CryptoExchange.Net.UnitTests/Implementations/TestSubscription.cs b/CryptoExchange.Net.UnitTests/Implementations/TestSubscription.cs index 6aa3b4b..1ff2a4d 100644 --- a/CryptoExchange.Net.UnitTests/Implementations/TestSubscription.cs +++ b/CryptoExchange.Net.UnitTests/Implementations/TestSubscription.cs @@ -2,6 +2,7 @@ using CryptoExchange.Net.Objects.Sockets; using CryptoExchange.Net.Sockets; using CryptoExchange.Net.Sockets.Default; +using CryptoExchange.Net.Sockets.Default.Routing; using Microsoft.Extensions.Logging; using System; using System.Collections.Generic; diff --git a/CryptoExchange.Net/Converters/SystemTextJson/EnumConverter.cs b/CryptoExchange.Net/Converters/SystemTextJson/EnumConverter.cs index 4eafc67..4d598a4 100644 --- a/CryptoExchange.Net/Converters/SystemTextJson/EnumConverter.cs +++ b/CryptoExchange.Net/Converters/SystemTextJson/EnumConverter.cs @@ -171,6 +171,7 @@ namespace CryptoExchange.Net.Converters.SystemTextJson return resultOptimistic.Value; } + var isNumber = reader.TokenType == JsonTokenType.Number; var stringValue = reader.TokenType switch { JsonTokenType.String => reader.GetString(), @@ -207,7 +208,7 @@ namespace CryptoExchange.Net.Converters.SystemTextJson return null; } - if (RunOptimistic) + if (RunOptimistic && !isNumber) { if (!_unknownValuesWarned.Contains(stringValue)) { diff --git a/CryptoExchange.Net/Sockets/Default/Routing/MessageRoute.cs b/CryptoExchange.Net/Sockets/Default/Routing/MessageRoute.cs new file mode 100644 index 0000000..9ed234e --- /dev/null +++ b/CryptoExchange.Net/Sockets/Default/Routing/MessageRoute.cs @@ -0,0 +1,104 @@ +using CryptoExchange.Net.Objects; +using System; + +namespace CryptoExchange.Net.Sockets.Default.Routing +{ + /// + /// Message route + /// + public abstract class MessageRoute + { + /// + /// Type identifier + /// + public string TypeIdentifier { get; set; } + /// + /// Optional topic filter + /// + public string? TopicFilter { get; set; } + + /// + /// Whether responses to this route might be read by multiple listeners + /// + public bool MultipleReaders { get; set; } = false; + + /// + /// Deserialization type + /// + public abstract Type DeserializationType { get; } + + /// + /// ctor + /// + public MessageRoute(string typeIdentifier, string? topicFilter) + { + TypeIdentifier = typeIdentifier; + TopicFilter = topicFilter; + } + + /// + /// Message handler + /// + public abstract CallResult? Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object data); + } + + /// + /// Message route + /// + public class MessageRoute : MessageRoute + { + private Func _handler; + + /// + public override Type DeserializationType { get; } = typeof(TMessage); + + /// + /// ctor + /// + internal MessageRoute(string typeIdentifier, string? topicFilter, Func handler, bool multipleReaders = false) + : base(typeIdentifier, topicFilter) + { + _handler = handler; + MultipleReaders = multipleReaders; + } + + /// + /// Create route without topic filter + /// + public static MessageRoute CreateWithoutTopicFilter(string typeIdentifier, Func handler, bool multipleReaders = false) + { + return new MessageRoute(typeIdentifier, null, handler) + { + MultipleReaders = multipleReaders + }; + } + + /// + /// Create route with optional topic filter + /// + public static MessageRoute CreateWithOptionalTopicFilter(string typeIdentifier, string? topicFilter, Func handler, bool multipleReaders = false) + { + return new MessageRoute(typeIdentifier, topicFilter, handler) + { + MultipleReaders = multipleReaders + }; + } + + /// + /// Create route with topic filter + /// + public static MessageRoute CreateWithTopicFilter(string typeIdentifier, string topicFilter, Func handler, bool multipleReaders = false) + { + return new MessageRoute(typeIdentifier, topicFilter, handler) + { + MultipleReaders = multipleReaders + }; + } + + /// + public override CallResult? Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object data) + { + return _handler(connection, receiveTime, originalData, (TMessage)data); + } + } +} diff --git a/CryptoExchange.Net/Sockets/Default/Routing/MessageRouter.cs b/CryptoExchange.Net/Sockets/Default/Routing/MessageRouter.cs new file mode 100644 index 0000000..6b2542c --- /dev/null +++ b/CryptoExchange.Net/Sockets/Default/Routing/MessageRouter.cs @@ -0,0 +1,185 @@ +using CryptoExchange.Net.Objects; +using System; +using System.Collections.Generic; +using System.Linq; + +namespace CryptoExchange.Net.Sockets.Default.Routing +{ + /// + /// Message router + /// + public class MessageRouter + { + private RoutingSubTable? _routingTable; + + /// + /// The routes registered for this router + /// + public MessageRoute[] Routes { get; } + + /// + /// ctor + /// + private MessageRouter(params MessageRoute[] routes) + { + Routes = routes; + } + + /// + /// Build the route mapping + /// + public void BuildRouteMap() + { + _routingTable = new RoutingSubTable(Routes); + } + + /// + /// Get routes matching the type identifier + /// + internal RoutingSubTableEntry? this[string identifier] + { + get => (_routingTable ?? throw new InvalidOperationException("Route map not initialized before use"))[identifier]; + } + + /// + /// Create message router without specific message handler + /// + public static MessageRouter CreateWithoutHandler(string typeIdentifier, bool multipleReaders = false) + { + return new MessageRouter(new MessageRoute(typeIdentifier, null, (con, receiveTime, originalData, msg) => new CallResult(default, null, null), multipleReaders)); + } + + /// + /// Create message router without specific message handler + /// + public static MessageRouter CreateWithoutHandler(string typeIdentifier, string topicFilter, bool multipleReaders = false) + { + return new MessageRouter(new MessageRoute(typeIdentifier, topicFilter, (con, receiveTime, originalData, msg) => new CallResult(default, null, null), multipleReaders)); + } + + /// + /// Create message router without topic filter + /// + public static MessageRouter CreateWithoutTopicFilter(IEnumerable values, Func handler, bool multipleReaders = false) + { + return new MessageRouter(values.Select(x => new MessageRoute(x, null, handler, multipleReaders)).ToArray()); + } + + /// + /// Create message router without topic filter + /// + public static MessageRouter CreateWithoutTopicFilter(string typeIdentifier, Func handler, bool multipleReaders = false) + { + return new MessageRouter(new MessageRoute(typeIdentifier, null, handler, multipleReaders)); + } + + /// + /// Create message router with topic filter + /// + public static MessageRouter CreateWithTopicFilter(string typeIdentifier, string topicFilter, Func handler, bool multipleReaders = false) + { + return new MessageRouter(new MessageRoute(typeIdentifier, topicFilter, handler, multipleReaders)); + } + + /// + /// Create message router with topic filter + /// + public static MessageRouter CreateWithTopicFilter(IEnumerable typeIdentifiers, string topicFilter, Func handler, bool multipleReaders = false) + { + var routes = new List(); + foreach (var type in typeIdentifiers) + routes.Add(new MessageRoute(type, topicFilter, handler, multipleReaders)); + + return new MessageRouter(routes.ToArray()); + } + + /// + /// Create message router with topic filter + /// + public static MessageRouter CreateWithTopicFilters(string typeIdentifier, IEnumerable topicFilters, Func handler, bool multipleReaders = false) + { + var routes = new List(); + foreach (var filter in topicFilters) + routes.Add(new MessageRoute(typeIdentifier, filter, handler, multipleReaders)); + + return new MessageRouter(routes.ToArray()); + } + + /// + /// Create message router with topic filter + /// + public static MessageRouter CreateWithTopicFilters(IEnumerable typeIdentifiers, IEnumerable topicFilters, Func handler, bool multipleReaders = false) + { + var routes = new List(); + foreach (var type in typeIdentifiers) + { + foreach (var filter in topicFilters) + routes.Add(new MessageRoute(type, filter, handler, multipleReaders)); + } + + return new MessageRouter(routes.ToArray()); + } + + /// + /// Create message router with optional topic filter + /// + public static MessageRouter CreateWithOptionalTopicFilter(string typeIdentifier, string? topicFilter, Func handler, bool multipleReaders = false) + { + return new MessageRouter(new MessageRoute(typeIdentifier, topicFilter, handler, multipleReaders)); + } + + /// + /// Create message router with optional topic filter + /// + public static MessageRouter CreateWithOptionalTopicFilters(string typeIdentifier, IEnumerable? topicFilters, Func handler, bool multipleReaders = false) + { + var routes = new List(); + if (topicFilters?.Count() > 0) + { + foreach (var filter in topicFilters) + routes.Add(new MessageRoute(typeIdentifier, filter, handler, multipleReaders)); + } + else + { + routes.Add(new MessageRoute(typeIdentifier, null, handler, multipleReaders)); + } + + return new MessageRouter(routes.ToArray()); + } + + /// + /// Create message router with optional topic filter + /// + public static MessageRouter CreateWithOptionalTopicFilters(IEnumerable typeIdentifiers, IEnumerable? topicFilters, Func handler, bool multipleReaders = false) + { + var routes = new List(); + foreach (var typeIdentifier in typeIdentifiers) + { + if (topicFilters?.Count() > 0) + { + foreach (var filter in topicFilters) + routes.Add(new MessageRoute(typeIdentifier, filter, handler, multipleReaders)); + } + else + { + routes.Add(new MessageRoute(typeIdentifier, null, handler, multipleReaders)); + } + } + + return new MessageRouter(routes.ToArray()); + } + + /// + /// Create message matcher with specific routes + /// + public static MessageRouter Create(params MessageRoute[] routes) + { + return new MessageRouter(routes); + } + + /// + /// Whether this matcher contains a specific link + /// + public bool ContainsCheck(MessageRoute route) => Routes.Any(x => x.TypeIdentifier == route.TypeIdentifier && x.TopicFilter == route.TopicFilter); + } +} diff --git a/CryptoExchange.Net/Sockets/Default/Routing/RoutingSubTable.cs b/CryptoExchange.Net/Sockets/Default/Routing/RoutingSubTable.cs new file mode 100644 index 0000000..caf5b3f --- /dev/null +++ b/CryptoExchange.Net/Sockets/Default/Routing/RoutingSubTable.cs @@ -0,0 +1,142 @@ +using CryptoExchange.Net.Objects; +using System; +#if NET8_0_OR_GREATER +using System.Collections.Frozen; +#endif +using System.Collections.Generic; + +namespace CryptoExchange.Net.Sockets.Default.Routing +{ + internal class RoutingSubTable + { +#if NET8_0_OR_GREATER + // Used for mapping a type identifier to the routes matching it + private FrozenDictionary _routeMap; +#else + private Dictionary _routeMap; +#endif + + public RoutingSubTable(IEnumerable routes) + { + var newMap = new Dictionary(); + foreach (var route in routes) + { + if (!newMap.TryGetValue(route.TypeIdentifier, out var typeMap)) + { + typeMap = new RoutingSubTableEntry(route.DeserializationType); + newMap.Add(route.TypeIdentifier, typeMap); + } + + typeMap.AddRoute(route.TopicFilter, route); + } + + foreach(var subEntry in newMap.Values) + subEntry.Build(); + +#if NET8_0_OR_GREATER + _routeMap = newMap.ToFrozenDictionary(); +#else + _routeMap = newMap; +#endif + } + + /// + /// Get routes matching the type identifier + /// + public RoutingSubTableEntry? this[string identifier] + { + get => _routeMap.TryGetValue(identifier, out var routes) ? routes : null; + } + } + + internal record RoutingSubTableEntry + { + public Type DeserializationType { get; } + public bool MultipleReaders { get; private set; } + + private List _routesWithoutTopicFilter; + private Dictionary> _routesWithTopicFilter; +#if NET8_0_OR_GREATER + // Used for mapping a type identifier to the routes matching it + private FrozenDictionary>? _routesWithTopicFilterFrozen; +#endif + + public RoutingSubTableEntry(Type routeType) + { + _routesWithoutTopicFilter = new List(); + _routesWithTopicFilter = new Dictionary>(); + + DeserializationType = routeType; + } + + public void AddRoute(string? topicFilter, MessageRoute route) + { + if (string.IsNullOrEmpty(topicFilter)) + { + _routesWithoutTopicFilter.Add(route); + } + else + { + if (!_routesWithTopicFilter.TryGetValue(topicFilter!, out var list)) + { + list = new List(); + _routesWithTopicFilter.Add(topicFilter!, list); + } + + list.Add(route); + } + + if (route.MultipleReaders) + MultipleReaders = true; + } + + public void Build() + { +#if NET8_0_OR_GREATER + _routesWithTopicFilterFrozen = _routesWithTopicFilter.ToFrozenDictionary(); +#endif + } + + internal bool Handle(string? topicFilter, SocketConnection connection, DateTime receiveTime, string? originalData, object data, out CallResult? result) + { + result = null; + + // Routes without topic filter handle both when the message topic is empty and when it is not, so we always call them + var handled = false; + foreach (var route in _routesWithoutTopicFilter) + { + var thisResult = route.Handle(connection, receiveTime, originalData, data); + if (thisResult != null) + result ??= thisResult; + + handled = true; + } + + // Forward to routes with matching topic filter, if any + if (topicFilter != null) + { +#if NET8_0_OR_GREATER + _routesWithTopicFilterFrozen!.TryGetValue(topicFilter, out var matchingTopicRoutes); +#else + _routesWithTopicFilter.TryGetValue(topicFilter, out var matchingTopicRoutes); +#endif + foreach (var route in matchingTopicRoutes ?? []) + { + var thisResult = route.Handle(connection, receiveTime, originalData, data); + handled = true; + + if (thisResult != null) + { + result ??= thisResult; + + if (!MultipleReaders) + break; + } + + } + } + + return handled; + } + } +} diff --git a/CryptoExchange.Net/Sockets/Default/Routing/RoutingTable.cs b/CryptoExchange.Net/Sockets/Default/Routing/RoutingTable.cs new file mode 100644 index 0000000..7ef699c --- /dev/null +++ b/CryptoExchange.Net/Sockets/Default/Routing/RoutingTable.cs @@ -0,0 +1,96 @@ +using CryptoExchange.Net.Sockets.Interfaces; +using System; +using System.Collections.Generic; +using System.Text; + +namespace CryptoExchange.Net.Sockets.Default.Routing +{ + /// + /// Routing table + /// + public class RoutingTable + { + private Dictionary _routeTableEntries; + + /// + /// Create routing table for provided processors + /// + public RoutingTable(IEnumerable processors) + { + _routeTableEntries = new Dictionary(); + foreach (var entry in processors) + { + foreach (var route in entry.MessageRouter.Routes) + { + if (!_routeTableEntries.ContainsKey(route.TypeIdentifier)) + _routeTableEntries.Add(route.TypeIdentifier, new RoutingTableEntry(route.DeserializationType)); + + _routeTableEntries[route.TypeIdentifier].Handlers.Add(entry); + } + } + } + + /// + /// Get route table entry for a type identifier + /// + public RoutingTableEntry? GetRouteTableEntry(string typeIdentifier) + { + return _routeTableEntries.TryGetValue(typeIdentifier, out var entry) ? entry : null; + } + + /// + public override string ToString() + { + var sb = new StringBuilder(); + foreach (var entry in _routeTableEntries) + { + sb.AppendLine($"{entry.Key}, {entry.Value.DeserializationType.Name}"); + foreach(var item in entry.Value.Handlers) + { + sb.AppendLine($" - Processor {item.GetType().Name}"); + foreach(var route in item.MessageRouter.Routes) + { + if (route.TypeIdentifier == entry.Key) + { + if (route.TopicFilter == null) + sb.AppendLine($" - Route without topic filter"); + else + sb.AppendLine($" - Route with topic filter {route.TopicFilter}"); + } + } + } + } + + return sb.ToString(); + } + } + + /// + /// Routing table entry + /// + public record RoutingTableEntry + { + /// + /// Whether the deserialization type is string + /// + public bool IsStringOutput { get; set; } + /// + /// The deserialization type + /// + public Type DeserializationType { get; set; } + /// + /// Message processors + /// + public List Handlers { get; set; } + + /// + /// ctor + /// + public RoutingTableEntry(Type deserializationType) + { + IsStringOutput = deserializationType == typeof(string); + DeserializationType = deserializationType; + Handlers = new List(); + } + } +} diff --git a/CryptoExchange.Net/Sockets/Default/SocketConnection.cs b/CryptoExchange.Net/Sockets/Default/SocketConnection.cs index b1fa7db..d69f66b 100644 --- a/CryptoExchange.Net/Sockets/Default/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/Default/SocketConnection.cs @@ -5,13 +5,11 @@ using CryptoExchange.Net.Logging.Extensions; using CryptoExchange.Net.Objects; using CryptoExchange.Net.Objects.Sockets; using CryptoExchange.Net.Sockets.Default.Interfaces; +using CryptoExchange.Net.Sockets.Default.Routing; using CryptoExchange.Net.Sockets.Interfaces; using Microsoft.Extensions.Logging; using System; using System.Collections; -#if NET8_0_OR_GREATER -using System.Collections.Frozen; -#endif using System.Collections.Generic; using System.Collections.ObjectModel; using System.Data; @@ -267,8 +265,7 @@ namespace CryptoExchange.Net.Sockets.Default private readonly object _listenersLock = new object(); #endif - - private RouteTable _routeTable = new RouteTable([]); + private RoutingTable _routeTable = new RoutingTable([]); private ReadOnlyCollection _listeners; private readonly ILogger _logger; @@ -552,7 +549,7 @@ namespace CryptoExchange.Net.Sockets.Default object result; try { - if (routingEntry.RouteType == typeof(string)) + if (routingEntry.IsStringOutput) { #if NETSTANDARD2_0 result = Encoding.UTF8.GetString(data.ToArray()); @@ -562,7 +559,7 @@ namespace CryptoExchange.Net.Sockets.Default } else { - result = messageConverter.Deserialize(data, routingEntry.RouteType); + result = messageConverter.Deserialize(data, routingEntry.DeserializationType); } } catch(Exception ex) @@ -1145,15 +1142,32 @@ namespace CryptoExchange.Net.Sockets.Default }); } + private void BuildRoutingTable() + { + _routeTable = new RoutingTable(_listeners); + } + private void AddMessageProcessor(IMessageProcessor processor) { lock (_listenersLock) { var updatedList = new List(_listeners); updatedList.Add(processor); - _routeTable = new RouteTable(updatedList); + processor.OnMessageRouterUpdated += () => + { + BuildRoutingTable(); +#if DEBUG + _logger.LogTrace("MessageRouter updated, new routing table:\r\n" + _routeTable.ToString()); +#endif + }; _listeners = updatedList.AsReadOnly(); - + if (processor.MessageRouter.Routes.Length > 0) + { + BuildRoutingTable(); +#if DEBUG + _logger.LogTrace("Processor added, new routing table:\r\n" + _routeTable.ToString()); +#endif + } } } @@ -1162,9 +1176,15 @@ namespace CryptoExchange.Net.Sockets.Default lock (_listenersLock) { var updatedList = new List(_listeners); - updatedList.Remove(processor); - _routeTable = new RouteTable(updatedList); + if (!updatedList.Remove(processor)) + return; // If nothing removed nothing has changed + + processor.OnMessageRouterUpdated -= BuildRoutingTable; _listeners = updatedList.AsReadOnly(); + BuildRoutingTable(); +#if DEBUG + _logger.LogTrace("Processor removed, new routing table:\r\n" + _routeTable.ToString()); +#endif } } @@ -1173,13 +1193,24 @@ namespace CryptoExchange.Net.Sockets.Default lock (_listenersLock) { var updatedList = new List(_listeners); + var anyRemoved = false; foreach (var processor in processors) - updatedList.Remove(processor); - _routeTable = new RouteTable(updatedList); + { + processor.OnMessageRouterUpdated -= BuildRoutingTable; + if (updatedList.Remove(processor)) + anyRemoved = true; + } + + if (!anyRemoved) + return; // If nothing removed nothing has changed + _listeners = updatedList.AsReadOnly(); + BuildRoutingTable(); +#if DEBUG + _logger.LogTrace("Processors removed, new routing table:\r\n" + _routeTable.ToString()); +#endif } } - } } diff --git a/CryptoExchange.Net/Sockets/Default/Subscription.cs b/CryptoExchange.Net/Sockets/Default/Subscription.cs index 04dde72..05053f7 100644 --- a/CryptoExchange.Net/Sockets/Default/Subscription.cs +++ b/CryptoExchange.Net/Sockets/Default/Subscription.cs @@ -1,5 +1,6 @@ using CryptoExchange.Net.Interfaces; using CryptoExchange.Net.Objects; +using CryptoExchange.Net.Sockets.Default.Routing; using CryptoExchange.Net.Sockets.Interfaces; using Microsoft.Extensions.Logging; using System; @@ -82,6 +83,7 @@ namespace CryptoExchange.Net.Sockets.Default { _router = value; _router.BuildRouteMap(); + OnMessageRouterUpdated?.Invoke(); } } @@ -119,6 +121,9 @@ namespace CryptoExchange.Net.Sockets.Default /// public int IndividualSubscriptionCount { get; set; } = 1; + /// + public event Action? OnMessageRouterUpdated; + /// /// ctor /// @@ -193,9 +198,7 @@ namespace CryptoExchange.Net.Sockets.Default SubscriptionQuery.Timeout(); } - return MessageRouter[typeIdentifier].Handle(topicFilter, connection, receiveTime, originalData, data, out _); - - //return routeMap.Handle(topicFilter, connection, receiveTime, originalData, data, out _); + return MessageRouter[typeIdentifier]?.Handle(topicFilter, connection, receiveTime, originalData, data, out _) ?? false; } /// diff --git a/CryptoExchange.Net/Sockets/Interfaces/IMessageProcessor.cs b/CryptoExchange.Net/Sockets/Interfaces/IMessageProcessor.cs index 2b59419..ab807ca 100644 --- a/CryptoExchange.Net/Sockets/Interfaces/IMessageProcessor.cs +++ b/CryptoExchange.Net/Sockets/Interfaces/IMessageProcessor.cs @@ -1,6 +1,7 @@ using CryptoExchange.Net.Interfaces; using CryptoExchange.Net.Objects; using CryptoExchange.Net.Sockets.Default; +using CryptoExchange.Net.Sockets.Default.Routing; using System; namespace CryptoExchange.Net.Sockets.Interfaces @@ -19,6 +20,10 @@ namespace CryptoExchange.Net.Sockets.Interfaces /// public MessageRouter MessageRouter { get; } /// + /// Event when the message router for this processor has been changed + /// + public event Action OnMessageRouterUpdated; + /// /// Handle a message /// //bool Handle(string? topicFilter, SocketConnection connection, DateTime receiveTime, string? originalData, object result, MessageRoute route); diff --git a/CryptoExchange.Net/Sockets/MessageRouter.cs b/CryptoExchange.Net/Sockets/MessageRouter.cs deleted file mode 100644 index 23253e6..0000000 --- a/CryptoExchange.Net/Sockets/MessageRouter.cs +++ /dev/null @@ -1,425 +0,0 @@ -using CryptoExchange.Net.Objects; -using CryptoExchange.Net.Sockets.Default; -using CryptoExchange.Net.Sockets.Interfaces; -using System; -#if NET8_0_OR_GREATER -using System.Collections.Frozen; -#endif -using System.Collections.Generic; -using System.Linq; - -namespace CryptoExchange.Net.Sockets -{ - public class RouteTable - { - private Dictionary _routeTableEntries; - - public RouteTable(IEnumerable processors) - { - _routeTableEntries = new Dictionary(); - foreach (var entry in processors) - { - foreach(var route in entry.MessageRouter.Routes) - { - if (!_routeTableEntries.ContainsKey(route.TypeIdentifier)) { - _routeTableEntries.Add(route.TypeIdentifier, new RouteTableEntry() - { - RouteType = route.DeserializationType, - Handlers = new List() - }); - } - - _routeTableEntries[route.TypeIdentifier].Handlers.Add(entry); - } - - } - } - - public RouteTableEntry? GetRouteTableEntry(string typeIdentifier) - { - return _routeTableEntries.TryGetValue(typeIdentifier, out var entry) ? entry : null; - } - } - - public record RouteTableEntry - { - public Type RouteType { get; set; } - public List Handlers { get; set; } - - } - - public record RouteMapEntry - { - public Type RouteType { get; } - public bool MultipleReaders { get; private set; } - - private List _routesWithoutTopicFilter; - private Dictionary> _routesWithTopicFilter; - - public RouteMapEntry(Type routeType) - { - _routesWithoutTopicFilter = new List(); - _routesWithTopicFilter = new Dictionary>(); - - RouteType = routeType; - } - - public void AddRoute(string? topicFilter, MessageRoute route) - { - if (string.IsNullOrEmpty(topicFilter)) - { - _routesWithoutTopicFilter.Add(route); - } - else - { - if (!_routesWithTopicFilter.TryGetValue(topicFilter!, out var list)) - { - list = new List(); - _routesWithTopicFilter.Add(topicFilter!, list); - } - - list.Add(route); - } - - if (route.MultipleReaders) - MultipleReaders = true; - } - - internal bool Handle(string? topicFilter, SocketConnection connection, DateTime receiveTime, string? originalData, object data, out CallResult? result) - { - result = null; - - // Routes without topic filter handle both when the message topic is empty and when it is not, so we always call them - var handled = false; - foreach (var route in _routesWithoutTopicFilter) - { - var thisResult = route.Handle(connection, receiveTime, originalData, data); - if (thisResult != null) - result ??= thisResult; - - handled = true; - } - - // Forward to routes with matching topic filter, if any - if (topicFilter != null) - { - _routesWithTopicFilter.TryGetValue(topicFilter, out var matchingTopicRoutes); - foreach (var route in matchingTopicRoutes ?? []) - { - var thisResult = route.Handle(connection, receiveTime, originalData, data); - handled = true; - - if (thisResult != null) - { - result ??= thisResult; - - if (!MultipleReaders) - break; - } - - } - } - - return handled; - } - } - - /// - /// Message router - /// - public class MessageRouter - { - /// - /// The routes registered for this router - /// - public MessageRoute[] Routes { get; } - -#if NET8_0_OR_GREATER - // Used for mapping a type identifier to the routes matching it - private FrozenDictionary? _routeMap; -#else - private Dictionary? _routeMap; -#endif - - /// - /// ctor - /// - private MessageRouter(params MessageRoute[] routes) - { - Routes = routes; - } - - /// - /// Build the route mapping - /// - public void BuildRouteMap() - { - var newMap = new Dictionary(); - foreach (var route in Routes) - { - if (!newMap.TryGetValue(route.TypeIdentifier, out var typeMap)) - { - typeMap = new RouteMapEntry(route.DeserializationType); - newMap.Add(route.TypeIdentifier, typeMap); - } - - typeMap.AddRoute(route.TopicFilter, route); - } - -#if NET8_0_OR_GREATER - _routeMap = newMap.ToFrozenDictionary(); -#else - _routeMap = newMap; -#endif - } - - /// - /// Get routes matching the type identifier - /// - public RouteMapEntry? this[string identifier] - { - get => (_routeMap ?? throw new InvalidOperationException("Route map not initialized before use")).TryGetValue(identifier, out var routes) ? routes: null; - } - - /// - /// Create message router without specific message handler - /// - public static MessageRouter CreateWithoutHandler(string typeIdentifier, bool multipleReaders = false) - { - return new MessageRouter(new MessageRoute(typeIdentifier, (string?)null, (con, receiveTime, originalData, msg) => new CallResult(default, null, null), multipleReaders)); - } - - /// - /// Create message router without specific message handler - /// - public static MessageRouter CreateWithoutHandler(string typeIdentifier, string topicFilter, bool multipleReaders = false) - { - return new MessageRouter(new MessageRoute(typeIdentifier, topicFilter, (con, receiveTime, originalData, msg) => new CallResult(default, null, null), multipleReaders)); - } - - /// - /// Create message router without topic filter - /// - public static MessageRouter CreateWithoutTopicFilter(IEnumerable values, Func handler, bool multipleReaders = false) - { - return new MessageRouter(values.Select(x => new MessageRoute(x, null, handler, multipleReaders)).ToArray()); - } - - /// - /// Create message router without topic filter - /// - public static MessageRouter CreateWithoutTopicFilter(string typeIdentifier, Func handler, bool multipleReaders = false) - { - return new MessageRouter(new MessageRoute(typeIdentifier, null, handler, multipleReaders)); - } - - /// - /// Create message router with topic filter - /// - public static MessageRouter CreateWithTopicFilter(string typeIdentifier, string topicFilter, Func handler, bool multipleReaders = false) - { - return new MessageRouter(new MessageRoute(typeIdentifier, topicFilter, handler, multipleReaders)); - } - - /// - /// Create message router with topic filter - /// - public static MessageRouter CreateWithTopicFilter(IEnumerable typeIdentifiers, string topicFilter, Func handler, bool multipleReaders = false) - { - var routes = new List(); - foreach (var type in typeIdentifiers) - routes.Add(new MessageRoute(type, topicFilter, handler, multipleReaders)); - - return new MessageRouter(routes.ToArray()); - } - - /// - /// Create message router with topic filter - /// - public static MessageRouter CreateWithTopicFilters(string typeIdentifier, IEnumerable topicFilters, Func handler, bool multipleReaders = false) - { - var routes = new List(); - foreach (var filter in topicFilters) - routes.Add(new MessageRoute(typeIdentifier, filter, handler, multipleReaders)); - - return new MessageRouter(routes.ToArray()); - } - - /// - /// Create message router with topic filter - /// - public static MessageRouter CreateWithTopicFilters(IEnumerable typeIdentifiers, IEnumerable topicFilters, Func handler, bool multipleReaders = false) - { - var routes = new List(); - foreach (var type in typeIdentifiers) - { - foreach (var filter in topicFilters) - routes.Add(new MessageRoute(type, filter, handler, multipleReaders)); - } - - return new MessageRouter(routes.ToArray()); - } - - /// - /// Create message router with optional topic filter - /// - public static MessageRouter CreateWithOptionalTopicFilter(string typeIdentifier, string? topicFilter, Func handler, bool multipleReaders = false) - { - return new MessageRouter(new MessageRoute(typeIdentifier, topicFilter, handler, multipleReaders)); - } - - /// - /// Create message router with optional topic filter - /// - public static MessageRouter CreateWithOptionalTopicFilters(string typeIdentifier, IEnumerable? topicFilters, Func handler, bool multipleReaders = false) - { - var routes = new List(); - if (topicFilters?.Count() > 0) - { - foreach (var filter in topicFilters) - routes.Add(new MessageRoute(typeIdentifier, filter, handler, multipleReaders)); - } - else - { - routes.Add(new MessageRoute(typeIdentifier, null, handler, multipleReaders)); - } - - return new MessageRouter(routes.ToArray()); - } - - /// - /// Create message router with optional topic filter - /// - public static MessageRouter CreateWithOptionalTopicFilters(IEnumerable typeIdentifiers, IEnumerable? topicFilters, Func handler, bool multipleReaders = false) - { - var routes = new List(); - foreach (var typeIdentifier in typeIdentifiers) - { - if (topicFilters?.Count() > 0) - { - foreach (var filter in topicFilters) - routes.Add(new MessageRoute(typeIdentifier, filter, handler, multipleReaders)); - } - else - { - routes.Add(new MessageRoute(typeIdentifier, null, handler, multipleReaders)); - } - } - - return new MessageRouter(routes.ToArray()); - } - - /// - /// Create message matcher with specific routes - /// - public static MessageRouter Create(params MessageRoute[] routes) - { - return new MessageRouter(routes); - } - - /// - /// Whether this matcher contains a specific link - /// - public bool ContainsCheck(MessageRoute route) => Routes.Any(x => x.TypeIdentifier == route.TypeIdentifier && x.TopicFilter == route.TopicFilter); - } - - /// - /// Message route - /// - public abstract class MessageRoute - { - /// - /// Type identifier - /// - public string TypeIdentifier { get; set; } - /// - /// Optional topic filter - /// - public string? TopicFilter { get; set; } - - /// - /// Whether responses to this route might be read by multiple listeners - /// - public bool MultipleReaders { get; set; } = false; - - /// - /// Deserialization type - /// - public abstract Type DeserializationType { get; } - - /// - /// ctor - /// - public MessageRoute(string typeIdentifier, string? topicFilter) - { - TypeIdentifier = typeIdentifier; - TopicFilter = topicFilter; - } - - /// - /// Message handler - /// - public abstract CallResult? Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object data); - } - - /// - /// Message route - /// - public class MessageRoute : MessageRoute - { - private Func _handler; - - /// - public override Type DeserializationType { get; } = typeof(TMessage); - - /// - /// ctor - /// - internal MessageRoute(string typeIdentifier, string? topicFilter, Func handler, bool multipleReaders = false) - : base(typeIdentifier, topicFilter) - { - _handler = handler; - MultipleReaders = multipleReaders; - } - - /// - /// Create route without topic filter - /// - public static MessageRoute CreateWithoutTopicFilter(string typeIdentifier, Func handler, bool multipleReaders = false) - { - return new MessageRoute(typeIdentifier, null, handler) - { - MultipleReaders = multipleReaders - }; - } - - /// - /// Create route with optional topic filter - /// - public static MessageRoute CreateWithOptionalTopicFilter(string typeIdentifier, string? topicFilter, Func handler, bool multipleReaders = false) - { - return new MessageRoute(typeIdentifier, topicFilter, handler) - { - MultipleReaders = multipleReaders - }; - } - - /// - /// Create route with topic filter - /// - public static MessageRoute CreateWithTopicFilter(string typeIdentifier, string topicFilter, Func handler, bool multipleReaders = false) - { - return new MessageRoute(typeIdentifier, topicFilter, handler) - { - MultipleReaders = multipleReaders - }; - } - - /// - public override CallResult? Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object data) - { - return _handler(connection, receiveTime, originalData, (TMessage)data); - } - } - -} diff --git a/CryptoExchange.Net/Sockets/Query.cs b/CryptoExchange.Net/Sockets/Query.cs index ec283cb..e3865bd 100644 --- a/CryptoExchange.Net/Sockets/Query.cs +++ b/CryptoExchange.Net/Sockets/Query.cs @@ -1,6 +1,7 @@ using CryptoExchange.Net.Interfaces; using CryptoExchange.Net.Objects; using CryptoExchange.Net.Sockets.Default; +using CryptoExchange.Net.Sockets.Default.Routing; using CryptoExchange.Net.Sockets.Interfaces; using System; using System.Threading; @@ -70,6 +71,7 @@ namespace CryptoExchange.Net.Sockets { _router = value; _router.BuildRouteMap(); + OnMessageRouterUpdated?.Invoke(); } } @@ -108,6 +110,9 @@ namespace CryptoExchange.Net.Sockets /// public Action? OnComplete { get; set; } + /// + public event Action? OnMessageRouterUpdated; + /// /// ctor /// @@ -203,7 +208,7 @@ namespace CryptoExchange.Net.Sockets if (Result?.Success != false) { // If an error result is already set don't override that - MessageRouter[typeIdentifier].Handle(topicFilter, connection, receiveTime, originalData, message, out var result); + MessageRouter[typeIdentifier]!.Handle(topicFilter, connection, receiveTime, originalData, message, out var result); Result = result; if (Result == null) // Null from Handle means it wasn't actually for this query