From f66730ca8cadf2b98e9de3da9929d34647edafab Mon Sep 17 00:00:00 2001 From: Jkorf Date: Wed, 8 Apr 2026 09:33:45 +0200 Subject: [PATCH] wip --- .../Sockets/Default/Routing/MessageRoute.cs | 10 + .../Sockets/Default/Routing/MessageRouter.cs | 21 +- .../Default/Routing/ProcessorRouter.cs | 103 ++++++++ .../Sockets/Default/Routing/QueryRouter.cs | 95 +++++++ .../Default/Routing/RoutingSubTable.cs | 240 +++++++++--------- .../Default/Routing/SubscriptionRouter.cs | 76 ++++++ .../Sockets/Default/Subscription.cs | 4 +- CryptoExchange.Net/Sockets/Query.cs | 4 +- 8 files changed, 423 insertions(+), 130 deletions(-) create mode 100644 CryptoExchange.Net/Sockets/Default/Routing/ProcessorRouter.cs create mode 100644 CryptoExchange.Net/Sockets/Default/Routing/QueryRouter.cs create mode 100644 CryptoExchange.Net/Sockets/Default/Routing/SubscriptionRouter.cs diff --git a/CryptoExchange.Net/Sockets/Default/Routing/MessageRoute.cs b/CryptoExchange.Net/Sockets/Default/Routing/MessageRoute.cs index 9ed234e..5f99aea 100644 --- a/CryptoExchange.Net/Sockets/Default/Routing/MessageRoute.cs +++ b/CryptoExchange.Net/Sockets/Default/Routing/MessageRoute.cs @@ -101,4 +101,14 @@ namespace CryptoExchange.Net.Sockets.Default.Routing return _handler(connection, receiveTime, originalData, (TMessage)data); } } + + //public class SubscriptionRoute : MessageRoute + //{ + + //} + + //public class QuertyRoute : MessageRoute + //{ + + //} } diff --git a/CryptoExchange.Net/Sockets/Default/Routing/MessageRouter.cs b/CryptoExchange.Net/Sockets/Default/Routing/MessageRouter.cs index 6b2542c..cb81b7b 100644 --- a/CryptoExchange.Net/Sockets/Default/Routing/MessageRouter.cs +++ b/CryptoExchange.Net/Sockets/Default/Routing/MessageRouter.cs @@ -10,7 +10,7 @@ namespace CryptoExchange.Net.Sockets.Default.Routing /// public class MessageRouter { - private RoutingSubTable? _routingTable; + private ProcessorRouter? _routingTable; /// /// The routes registered for this router @@ -28,17 +28,26 @@ namespace CryptoExchange.Net.Sockets.Default.Routing /// /// Build the route mapping /// - public void BuildRouteMap() + public void BuildQueryRouteMap() { - _routingTable = new RoutingSubTable(Routes); + _routingTable = new QueryRouter(Routes); } /// - /// Get routes matching the type identifier + /// Build the route mapping /// - internal RoutingSubTableEntry? this[string identifier] + public void BuildSubscriptionRouteMap() { - get => (_routingTable ?? throw new InvalidOperationException("Route map not initialized before use"))[identifier]; + _routingTable = new SubscriptionRouter(Routes); + } + + public bool Handle(string typeIdentifier, string? topicFilter, SocketConnection connection, DateTime receiveTime, string? originalData, object data, out CallResult? result) + { + var routeCollection = _routingTable.GetRoutes(typeIdentifier); + if (routeCollection == null) + throw new InvalidOperationException($"No routes for {typeIdentifier} message type"); + + return routeCollection.Handle(topicFilter, connection, receiveTime, originalData, data, out result); } /// diff --git a/CryptoExchange.Net/Sockets/Default/Routing/ProcessorRouter.cs b/CryptoExchange.Net/Sockets/Default/Routing/ProcessorRouter.cs new file mode 100644 index 0000000..f388a6b --- /dev/null +++ b/CryptoExchange.Net/Sockets/Default/Routing/ProcessorRouter.cs @@ -0,0 +1,103 @@ +using CryptoExchange.Net.Objects; +using System; +using System.Collections.Generic; +using System.Text; +#if NET8_0_OR_GREATER +using System.Collections.Frozen; +#endif + +namespace CryptoExchange.Net.Sockets.Default.Routing +{ + internal abstract class ProcessorRouter + { + public abstract RouteCollection? GetRoutes(string identifier); + } + + internal abstract class ProcessorRouter : ProcessorRouter + where T : RouteCollection + { +#if NET8_0_OR_GREATER + private FrozenDictionary _routeMap; +#else + private Dictionary _routeMap; +#endif + + public ProcessorRouter(IEnumerable routes) + { + var map = BuildFromRoutes(routes); +#if NET8_0_OR_GREATER + _routeMap = map.ToFrozenDictionary(); +#else + _routeMap = map; +#endif + } + + public abstract Dictionary BuildFromRoutes(IEnumerable routes); + + public override RouteCollection? GetRoutes(string identifier) => this[identifier]; + + /// + /// Get routes matching the type identifier + /// + public T? this[string identifier] + { + get => _routeMap.TryGetValue(identifier, out var routes) ? routes : null; + } + } + + internal abstract class RouteCollection + { + protected List _routesWithoutTopicFilter; + protected Dictionary> _routesWithTopicFilter; +#if NET8_0_OR_GREATER + protected FrozenDictionary>? _routesWithTopicFilterFrozen; +#endif + + public Type DeserializationType { get; } + + public RouteCollection(Type routeType) + { + _routesWithoutTopicFilter = new List(); + _routesWithTopicFilter = new Dictionary>(); + + DeserializationType = routeType; + } + + public virtual void AddRoute(string? topicFilter, MessageRoute route) + { + if (string.IsNullOrEmpty(topicFilter)) + { + _routesWithoutTopicFilter.Add(route); + } + else + { + if (!_routesWithTopicFilter.TryGetValue(topicFilter!, out var list)) + { + list = new List(); + _routesWithTopicFilter.Add(topicFilter!, list); + } + + list.Add(route); + } + } + + public void Build() + { +#if NET8_0_OR_GREATER + _routesWithTopicFilterFrozen = _routesWithTopicFilter.ToFrozenDictionary(); +#endif + } + + protected List? GetRoutesWithMatchingTopicFilter(string topicFilter) + { +#if NET8_0_OR_GREATER + _routesWithTopicFilterFrozen!.TryGetValue(topicFilter, out var matchingTopicRoutes); +#else + _routesWithTopicFilter.TryGetValue(topicFilter, out var matchingTopicRoutes); +#endif + return matchingTopicRoutes; + } + + public abstract bool Handle(string? topicFilter, SocketConnection connection, DateTime receiveTime, string? originalData, object data, out CallResult? result); + } +} diff --git a/CryptoExchange.Net/Sockets/Default/Routing/QueryRouter.cs b/CryptoExchange.Net/Sockets/Default/Routing/QueryRouter.cs new file mode 100644 index 0000000..6d7c778 --- /dev/null +++ b/CryptoExchange.Net/Sockets/Default/Routing/QueryRouter.cs @@ -0,0 +1,95 @@ +using CryptoExchange.Net.Objects; +using System; +using System.Collections.Generic; +using System.Text; +#if NET8_0_OR_GREATER +using System.Collections.Frozen; +#endif + +namespace CryptoExchange.Net.Sockets.Default.Routing +{ + internal class QueryRouter : ProcessorRouter + { + public QueryRouter(IEnumerable routes) : base(routes) + { + } + + public override Dictionary BuildFromRoutes(IEnumerable routes) + { + var newMap = new Dictionary(); + foreach (var route in routes) + { + if (!newMap.TryGetValue(route.TypeIdentifier, out var typeMap)) + { + typeMap = new QueryRouteCollection(route.DeserializationType); + newMap.Add(route.TypeIdentifier, typeMap); + } + + typeMap.AddRoute(route.TopicFilter, route); + } + + foreach (var subEntry in newMap.Values) + subEntry.Build(); + + return newMap; + } + } + + internal class QueryRouteCollection : RouteCollection + { + public bool MultipleReaders { get; private set; } + + public QueryRouteCollection(Type routeType) : base(routeType) + { + } + + public override void AddRoute(string? topicFilter, MessageRoute route) + { + base.AddRoute(topicFilter, route); + + if (route.MultipleReaders) + MultipleReaders = true; + } + + public override bool Handle(string? topicFilter, SocketConnection connection, DateTime receiveTime, string? originalData, object data, out CallResult? result) + { + result = null; + + // Routes without topic filter handle both when the message topic is empty and when it is not, so we always call them + var handled = false; + foreach (var route in _routesWithoutTopicFilter) + { + var thisResult = route.Handle(connection, receiveTime, originalData, data); + if (thisResult != null) + result ??= thisResult; + + handled = true; + } + + // Forward to routes with matching topic filter, if any + if (topicFilter != null) + { +#if NET8_0_OR_GREATER + _routesWithTopicFilterFrozen!.TryGetValue(topicFilter, out var matchingTopicRoutes); +#else + _routesWithTopicFilter.TryGetValue(topicFilter, out var matchingTopicRoutes); +#endif + foreach (var route in matchingTopicRoutes ?? []) + { + var thisResult = route.Handle(connection, receiveTime, originalData, data); + handled = true; + + if (thisResult != null) + { + result ??= thisResult; + + if (!MultipleReaders) + break; + } + } + } + + return handled; + } + } +} \ No newline at end of file diff --git a/CryptoExchange.Net/Sockets/Default/Routing/RoutingSubTable.cs b/CryptoExchange.Net/Sockets/Default/Routing/RoutingSubTable.cs index 5a83104..8df8810 100644 --- a/CryptoExchange.Net/Sockets/Default/Routing/RoutingSubTable.cs +++ b/CryptoExchange.Net/Sockets/Default/Routing/RoutingSubTable.cs @@ -1,142 +1,142 @@ -using CryptoExchange.Net.Objects; -using System; -#if NET8_0_OR_GREATER -using System.Collections.Frozen; -#endif -using System.Collections.Generic; +//using CryptoExchange.Net.Objects; +//using System; +//#if NET8_0_OR_GREATER +//using System.Collections.Frozen; +//#endif +//using System.Collections.Generic; -namespace CryptoExchange.Net.Sockets.Default.Routing -{ - internal class RoutingSubTable - { -#if NET8_0_OR_GREATER - // Used for mapping a type identifier to the routes matching it - private FrozenDictionary _routeMap; -#else - private Dictionary _routeMap; -#endif +//namespace CryptoExchange.Net.Sockets.Default.Routing +//{ +// internal class RoutingSubTable +// { +//#if NET8_0_OR_GREATER +// // Used for mapping a type identifier to the routes matching it +// private FrozenDictionary _routeMap; +//#else +// private Dictionary _routeMap; +//#endif - public RoutingSubTable(IEnumerable routes) - { - var newMap = new Dictionary(); - foreach (var route in routes) - { - if (!newMap.TryGetValue(route.TypeIdentifier, out var typeMap)) - { - typeMap = new RoutingSubTableEntry(route.DeserializationType); - newMap.Add(route.TypeIdentifier, typeMap); - } +// public RoutingSubTable(IEnumerable routes) +// { +// var newMap = new Dictionary(); +// foreach (var route in routes) +// { +// if (!newMap.TryGetValue(route.TypeIdentifier, out var typeMap)) +// { +// typeMap = new RoutingSubTableEntry(route.DeserializationType); +// newMap.Add(route.TypeIdentifier, typeMap); +// } - typeMap.AddRoute(route.TopicFilter, route); - } +// typeMap.AddRoute(route.TopicFilter, route); +// } - foreach(var subEntry in newMap.Values) - subEntry.Build(); +// foreach(var subEntry in newMap.Values) +// subEntry.Build(); -#if NET8_0_OR_GREATER - _routeMap = newMap.ToFrozenDictionary(); -#else - _routeMap = newMap; -#endif - } +//#if NET8_0_OR_GREATER +// _routeMap = newMap.ToFrozenDictionary(); +//#else +// _routeMap = newMap; +//#endif +// } - /// - /// Get routes matching the type identifier - /// - public RoutingSubTableEntry? this[string identifier] - { - get => _routeMap.TryGetValue(identifier, out var routes) ? routes : null; - } - } +// /// +// /// Get routes matching the type identifier +// /// +// public RoutingSubTableEntry? this[string identifier] +// { +// get => _routeMap.TryGetValue(identifier, out var routes) ? routes : null; +// } +// } - internal record RoutingSubTableEntry - { - public Type DeserializationType { get; } - public bool MultipleReaders { get; private set; } +// internal record RoutingSubTableEntry +// { +// public Type DeserializationType { get; } +// public bool MultipleReaders { get; private set; } - private List _routesWithoutTopicFilter; - private Dictionary> _routesWithTopicFilter; -#if NET8_0_OR_GREATER - private FrozenDictionary>? _routesWithTopicFilterFrozen; -#endif +// private List _routesWithoutTopicFilter; +// private Dictionary> _routesWithTopicFilter; +//#if NET8_0_OR_GREATER +// private FrozenDictionary>? _routesWithTopicFilterFrozen; +//#endif - public RoutingSubTableEntry(Type routeType) - { - _routesWithoutTopicFilter = new List(); - _routesWithTopicFilter = new Dictionary>(); +// public RoutingSubTableEntry(Type routeType) +// { +// _routesWithoutTopicFilter = new List(); +// _routesWithTopicFilter = new Dictionary>(); - DeserializationType = routeType; - } +// DeserializationType = routeType; +// } - public void AddRoute(string? topicFilter, MessageRoute route) - { - if (string.IsNullOrEmpty(topicFilter)) - { - _routesWithoutTopicFilter.Add(route); - } - else - { - if (!_routesWithTopicFilter.TryGetValue(topicFilter!, out var list)) - { - list = new List(); - _routesWithTopicFilter.Add(topicFilter!, list); - } +// public void AddRoute(string? topicFilter, MessageRoute route) +// { +// if (string.IsNullOrEmpty(topicFilter)) +// { +// _routesWithoutTopicFilter.Add(route); +// } +// else +// { +// if (!_routesWithTopicFilter.TryGetValue(topicFilter!, out var list)) +// { +// list = new List(); +// _routesWithTopicFilter.Add(topicFilter!, list); +// } - list.Add(route); - } +// list.Add(route); +// } - if (route.MultipleReaders) - MultipleReaders = true; - } +// if (route.MultipleReaders) +// MultipleReaders = true; +// } - public void Build() - { -#if NET8_0_OR_GREATER - _routesWithTopicFilterFrozen = _routesWithTopicFilter.ToFrozenDictionary(); -#endif - } +// public void Build() +// { +//#if NET8_0_OR_GREATER +// _routesWithTopicFilterFrozen = _routesWithTopicFilter.ToFrozenDictionary(); +//#endif +// } - internal bool Handle(string? topicFilter, SocketConnection connection, DateTime receiveTime, string? originalData, object data, out CallResult? result) - { - result = null; +// internal bool Handle(string? topicFilter, SocketConnection connection, DateTime receiveTime, string? originalData, object data, out CallResult? result) +// { +// result = null; - // Routes without topic filter handle both when the message topic is empty and when it is not, so we always call them - var handled = false; - foreach (var route in _routesWithoutTopicFilter) - { - var thisResult = route.Handle(connection, receiveTime, originalData, data); - if (thisResult != null) - result ??= thisResult; +// // Routes without topic filter handle both when the message topic is empty and when it is not, so we always call them +// var handled = false; +// foreach (var route in _routesWithoutTopicFilter) +// { +// var thisResult = route.Handle(connection, receiveTime, originalData, data); +// if (thisResult != null) +// result ??= thisResult; - handled = true; - } +// handled = true; +// } - // Forward to routes with matching topic filter, if any - if (topicFilter != null) - { -#if NET8_0_OR_GREATER - _routesWithTopicFilterFrozen!.TryGetValue(topicFilter, out var matchingTopicRoutes); -#else - _routesWithTopicFilter.TryGetValue(topicFilter, out var matchingTopicRoutes); -#endif - foreach (var route in matchingTopicRoutes ?? []) - { - var thisResult = route.Handle(connection, receiveTime, originalData, data); - handled = true; +// // Forward to routes with matching topic filter, if any +// if (topicFilter != null) +// { +//#if NET8_0_OR_GREATER +// _routesWithTopicFilterFrozen!.TryGetValue(topicFilter, out var matchingTopicRoutes); +//#else +// _routesWithTopicFilter.TryGetValue(topicFilter, out var matchingTopicRoutes); +//#endif +// foreach (var route in matchingTopicRoutes ?? []) +// { +// var thisResult = route.Handle(connection, receiveTime, originalData, data); +// handled = true; - if (thisResult != null) - { - result ??= thisResult; +// if (thisResult != null) +// { +// result ??= thisResult; -#warning MultipleReaders is only for queries, subscriptions should always have multiple readers = true. Maybe create different RoutingSubTable implementations for Queries and Subscriptions? - if (!MultipleReaders) - break; - } +//#warning MultipleReaders is only for queries, subscriptions should always have multiple readers = true. Maybe create different RoutingSubTable implementations for Queries and Subscriptions? +// if (!MultipleReaders) +// break; +// } - } - } +// } +// } - return handled; - } - } -} +// return handled; +// } +// } +//} diff --git a/CryptoExchange.Net/Sockets/Default/Routing/SubscriptionRouter.cs b/CryptoExchange.Net/Sockets/Default/Routing/SubscriptionRouter.cs new file mode 100644 index 0000000..ff67f6d --- /dev/null +++ b/CryptoExchange.Net/Sockets/Default/Routing/SubscriptionRouter.cs @@ -0,0 +1,76 @@ +using CryptoExchange.Net.Objects; +using System; +using System.Collections.Generic; +using System.Text; +#if NET8_0_OR_GREATER +using System.Collections.Frozen; +#endif + +namespace CryptoExchange.Net.Sockets.Default.Routing +{ + internal class SubscriptionRouter : ProcessorRouter + { + public SubscriptionRouter(IEnumerable routes) : base(routes) + { + } + + public override Dictionary BuildFromRoutes(IEnumerable routes) + { + var newMap = new Dictionary(); + foreach (var route in routes) + { + if (!newMap.TryGetValue(route.TypeIdentifier, out var typeMap)) + { + typeMap = new SubscriptionRouteCollection(route.DeserializationType); + newMap.Add(route.TypeIdentifier, typeMap); + } + + typeMap.AddRoute(route.TopicFilter, route); + } + + foreach (var subEntry in newMap.Values) + subEntry.Build(); + + return newMap; + } + } + + internal class SubscriptionRouteCollection : RouteCollection + { + public SubscriptionRouteCollection(Type routeType) : base(routeType) + { + } + + public override bool Handle(string? topicFilter, SocketConnection connection, DateTime receiveTime, string? originalData, object data, out CallResult? result) + { + result = null; + + // Routes without topic filter handle both when the message topic is empty and when it is not, so we always call them + var handled = false; + foreach (var route in _routesWithoutTopicFilter) + { + var thisResult = route.Handle(connection, receiveTime, originalData, data); + if (thisResult != null) + result ??= thisResult; + + handled = true; + } + + // Forward to routes with matching topic filter, if any + if (topicFilter != null) + { + var matchingTopicRoutes = GetRoutesWithMatchingTopicFilter(topicFilter); + foreach (var route in matchingTopicRoutes ?? []) + { + var thisResult = route.Handle(connection, receiveTime, originalData, data); + handled = true; + + if (thisResult != null) + result ??= thisResult; + } + } + + return handled; + } + } +} \ No newline at end of file diff --git a/CryptoExchange.Net/Sockets/Default/Subscription.cs b/CryptoExchange.Net/Sockets/Default/Subscription.cs index 05053f7..1b3a305 100644 --- a/CryptoExchange.Net/Sockets/Default/Subscription.cs +++ b/CryptoExchange.Net/Sockets/Default/Subscription.cs @@ -82,7 +82,7 @@ namespace CryptoExchange.Net.Sockets.Default set { _router = value; - _router.BuildRouteMap(); + _router.BuildSubscriptionRouteMap(); OnMessageRouterUpdated?.Invoke(); } } @@ -198,7 +198,7 @@ namespace CryptoExchange.Net.Sockets.Default SubscriptionQuery.Timeout(); } - return MessageRouter[typeIdentifier]?.Handle(topicFilter, connection, receiveTime, originalData, data, out _) ?? false; + return MessageRouter.Handle(typeIdentifier, topicFilter, connection, receiveTime, originalData, data, out _); } /// diff --git a/CryptoExchange.Net/Sockets/Query.cs b/CryptoExchange.Net/Sockets/Query.cs index 3d7154b..8ceae2a 100644 --- a/CryptoExchange.Net/Sockets/Query.cs +++ b/CryptoExchange.Net/Sockets/Query.cs @@ -70,7 +70,7 @@ namespace CryptoExchange.Net.Sockets set { _router = value; - _router.BuildRouteMap(); + _router.BuildQueryRouteMap(); OnMessageRouterUpdated?.Invoke(); } } @@ -211,7 +211,7 @@ namespace CryptoExchange.Net.Sockets if (Result?.Success != false) { // If an error result is already set don't override that - MessageRouter[typeIdentifier]!.Handle(topicFilter, connection, receiveTime, originalData, message, out var result); + MessageRouter.Handle(typeIdentifier, topicFilter, connection, receiveTime, originalData, message, out var result); Result = result; if (Result == null) // Null from Handle means it wasn't actually for this query