1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2026-04-12 16:13:12 +00:00
This commit is contained in:
Jkorf 2026-04-08 09:33:45 +02:00
parent 61c66300af
commit f66730ca8c
8 changed files with 423 additions and 130 deletions

View File

@ -101,4 +101,14 @@ namespace CryptoExchange.Net.Sockets.Default.Routing
return _handler(connection, receiveTime, originalData, (TMessage)data); return _handler(connection, receiveTime, originalData, (TMessage)data);
} }
} }
//public class SubscriptionRoute<TMessage> : MessageRoute<TMessage>
//{
//}
//public class QuertyRoute<TMessage> : MessageRoute<TMessage>
//{
//}
} }

View File

@ -10,7 +10,7 @@ namespace CryptoExchange.Net.Sockets.Default.Routing
/// </summary> /// </summary>
public class MessageRouter public class MessageRouter
{ {
private RoutingSubTable? _routingTable; private ProcessorRouter? _routingTable;
/// <summary> /// <summary>
/// The routes registered for this router /// The routes registered for this router
@ -28,17 +28,26 @@ namespace CryptoExchange.Net.Sockets.Default.Routing
/// <summary> /// <summary>
/// Build the route mapping /// Build the route mapping
/// </summary> /// </summary>
public void BuildRouteMap() public void BuildQueryRouteMap()
{ {
_routingTable = new RoutingSubTable(Routes); _routingTable = new QueryRouter(Routes);
} }
/// <summary> /// <summary>
/// Get routes matching the type identifier /// Build the route mapping
/// </summary> /// </summary>
internal RoutingSubTableEntry? this[string identifier] public void BuildSubscriptionRouteMap()
{ {
get => (_routingTable ?? throw new InvalidOperationException("Route map not initialized before use"))[identifier]; _routingTable = new SubscriptionRouter(Routes);
}
public bool Handle(string typeIdentifier, string? topicFilter, SocketConnection connection, DateTime receiveTime, string? originalData, object data, out CallResult? result)
{
var routeCollection = _routingTable.GetRoutes(typeIdentifier);
if (routeCollection == null)
throw new InvalidOperationException($"No routes for {typeIdentifier} message type");
return routeCollection.Handle(topicFilter, connection, receiveTime, originalData, data, out result);
} }
/// <summary> /// <summary>

View File

@ -0,0 +1,103 @@
using CryptoExchange.Net.Objects;
using System;
using System.Collections.Generic;
using System.Text;
#if NET8_0_OR_GREATER
using System.Collections.Frozen;
#endif
namespace CryptoExchange.Net.Sockets.Default.Routing
{
internal abstract class ProcessorRouter
{
public abstract RouteCollection? GetRoutes(string identifier);
}
internal abstract class ProcessorRouter<T> : ProcessorRouter
where T : RouteCollection
{
#if NET8_0_OR_GREATER
private FrozenDictionary<string, T> _routeMap;
#else
private Dictionary<string, T> _routeMap;
#endif
public ProcessorRouter(IEnumerable<MessageRoute> routes)
{
var map = BuildFromRoutes(routes);
#if NET8_0_OR_GREATER
_routeMap = map.ToFrozenDictionary();
#else
_routeMap = map;
#endif
}
public abstract Dictionary<string, T> BuildFromRoutes(IEnumerable<MessageRoute> routes);
public override RouteCollection? GetRoutes(string identifier) => this[identifier];
/// <summary>
/// Get routes matching the type identifier
/// </summary>
public T? this[string identifier]
{
get => _routeMap.TryGetValue(identifier, out var routes) ? routes : null;
}
}
internal abstract class RouteCollection
{
protected List<MessageRoute> _routesWithoutTopicFilter;
protected Dictionary<string, List<MessageRoute>> _routesWithTopicFilter;
#if NET8_0_OR_GREATER
protected FrozenDictionary<string, List<MessageRoute>>? _routesWithTopicFilterFrozen;
#endif
public Type DeserializationType { get; }
public RouteCollection(Type routeType)
{
_routesWithoutTopicFilter = new List<MessageRoute>();
_routesWithTopicFilter = new Dictionary<string, List<MessageRoute>>();
DeserializationType = routeType;
}
public virtual void AddRoute(string? topicFilter, MessageRoute route)
{
if (string.IsNullOrEmpty(topicFilter))
{
_routesWithoutTopicFilter.Add(route);
}
else
{
if (!_routesWithTopicFilter.TryGetValue(topicFilter!, out var list))
{
list = new List<MessageRoute>();
_routesWithTopicFilter.Add(topicFilter!, list);
}
list.Add(route);
}
}
public void Build()
{
#if NET8_0_OR_GREATER
_routesWithTopicFilterFrozen = _routesWithTopicFilter.ToFrozenDictionary();
#endif
}
protected List<MessageRoute>? GetRoutesWithMatchingTopicFilter(string topicFilter)
{
#if NET8_0_OR_GREATER
_routesWithTopicFilterFrozen!.TryGetValue(topicFilter, out var matchingTopicRoutes);
#else
_routesWithTopicFilter.TryGetValue(topicFilter, out var matchingTopicRoutes);
#endif
return matchingTopicRoutes;
}
public abstract bool Handle(string? topicFilter, SocketConnection connection, DateTime receiveTime, string? originalData, object data, out CallResult? result);
}
}

View File

@ -0,0 +1,95 @@
using CryptoExchange.Net.Objects;
using System;
using System.Collections.Generic;
using System.Text;
#if NET8_0_OR_GREATER
using System.Collections.Frozen;
#endif
namespace CryptoExchange.Net.Sockets.Default.Routing
{
internal class QueryRouter : ProcessorRouter<QueryRouteCollection>
{
public QueryRouter(IEnumerable<MessageRoute> routes) : base(routes)
{
}
public override Dictionary<string, QueryRouteCollection> BuildFromRoutes(IEnumerable<MessageRoute> routes)
{
var newMap = new Dictionary<string, QueryRouteCollection>();
foreach (var route in routes)
{
if (!newMap.TryGetValue(route.TypeIdentifier, out var typeMap))
{
typeMap = new QueryRouteCollection(route.DeserializationType);
newMap.Add(route.TypeIdentifier, typeMap);
}
typeMap.AddRoute(route.TopicFilter, route);
}
foreach (var subEntry in newMap.Values)
subEntry.Build();
return newMap;
}
}
internal class QueryRouteCollection : RouteCollection
{
public bool MultipleReaders { get; private set; }
public QueryRouteCollection(Type routeType) : base(routeType)
{
}
public override void AddRoute(string? topicFilter, MessageRoute route)
{
base.AddRoute(topicFilter, route);
if (route.MultipleReaders)
MultipleReaders = true;
}
public override bool Handle(string? topicFilter, SocketConnection connection, DateTime receiveTime, string? originalData, object data, out CallResult? result)
{
result = null;
// Routes without topic filter handle both when the message topic is empty and when it is not, so we always call them
var handled = false;
foreach (var route in _routesWithoutTopicFilter)
{
var thisResult = route.Handle(connection, receiveTime, originalData, data);
if (thisResult != null)
result ??= thisResult;
handled = true;
}
// Forward to routes with matching topic filter, if any
if (topicFilter != null)
{
#if NET8_0_OR_GREATER
_routesWithTopicFilterFrozen!.TryGetValue(topicFilter, out var matchingTopicRoutes);
#else
_routesWithTopicFilter.TryGetValue(topicFilter, out var matchingTopicRoutes);
#endif
foreach (var route in matchingTopicRoutes ?? [])
{
var thisResult = route.Handle(connection, receiveTime, originalData, data);
handled = true;
if (thisResult != null)
{
result ??= thisResult;
if (!MultipleReaders)
break;
}
}
}
return handled;
}
}
}

View File

@ -1,142 +1,142 @@
using CryptoExchange.Net.Objects; //using CryptoExchange.Net.Objects;
using System; //using System;
#if NET8_0_OR_GREATER //#if NET8_0_OR_GREATER
using System.Collections.Frozen; //using System.Collections.Frozen;
#endif //#endif
using System.Collections.Generic; //using System.Collections.Generic;
namespace CryptoExchange.Net.Sockets.Default.Routing //namespace CryptoExchange.Net.Sockets.Default.Routing
{ //{
internal class RoutingSubTable // internal class RoutingSubTable
{ // {
#if NET8_0_OR_GREATER //#if NET8_0_OR_GREATER
// Used for mapping a type identifier to the routes matching it // // Used for mapping a type identifier to the routes matching it
private FrozenDictionary<string, RoutingSubTableEntry> _routeMap; // private FrozenDictionary<string, RoutingSubTableEntry> _routeMap;
#else //#else
private Dictionary<string, RoutingSubTableEntry> _routeMap; // private Dictionary<string, RoutingSubTableEntry> _routeMap;
#endif //#endif
public RoutingSubTable(IEnumerable<MessageRoute> routes) // public RoutingSubTable(IEnumerable<MessageRoute> routes)
{ // {
var newMap = new Dictionary<string, RoutingSubTableEntry>(); // var newMap = new Dictionary<string, RoutingSubTableEntry>();
foreach (var route in routes) // foreach (var route in routes)
{ // {
if (!newMap.TryGetValue(route.TypeIdentifier, out var typeMap)) // if (!newMap.TryGetValue(route.TypeIdentifier, out var typeMap))
{ // {
typeMap = new RoutingSubTableEntry(route.DeserializationType); // typeMap = new RoutingSubTableEntry(route.DeserializationType);
newMap.Add(route.TypeIdentifier, typeMap); // newMap.Add(route.TypeIdentifier, typeMap);
} // }
typeMap.AddRoute(route.TopicFilter, route); // typeMap.AddRoute(route.TopicFilter, route);
} // }
foreach(var subEntry in newMap.Values) // foreach(var subEntry in newMap.Values)
subEntry.Build(); // subEntry.Build();
#if NET8_0_OR_GREATER //#if NET8_0_OR_GREATER
_routeMap = newMap.ToFrozenDictionary(); // _routeMap = newMap.ToFrozenDictionary();
#else //#else
_routeMap = newMap; // _routeMap = newMap;
#endif //#endif
} // }
/// <summary> // /// <summary>
/// Get routes matching the type identifier // /// Get routes matching the type identifier
/// </summary> // /// </summary>
public RoutingSubTableEntry? this[string identifier] // public RoutingSubTableEntry? this[string identifier]
{ // {
get => _routeMap.TryGetValue(identifier, out var routes) ? routes : null; // get => _routeMap.TryGetValue(identifier, out var routes) ? routes : null;
} // }
} // }
internal record RoutingSubTableEntry // internal record RoutingSubTableEntry
{ // {
public Type DeserializationType { get; } // public Type DeserializationType { get; }
public bool MultipleReaders { get; private set; } // public bool MultipleReaders { get; private set; }
private List<MessageRoute> _routesWithoutTopicFilter; // private List<MessageRoute> _routesWithoutTopicFilter;
private Dictionary<string, List<MessageRoute>> _routesWithTopicFilter; // private Dictionary<string, List<MessageRoute>> _routesWithTopicFilter;
#if NET8_0_OR_GREATER //#if NET8_0_OR_GREATER
private FrozenDictionary<string, List<MessageRoute>>? _routesWithTopicFilterFrozen; // private FrozenDictionary<string, List<MessageRoute>>? _routesWithTopicFilterFrozen;
#endif //#endif
public RoutingSubTableEntry(Type routeType) // public RoutingSubTableEntry(Type routeType)
{ // {
_routesWithoutTopicFilter = new List<MessageRoute>(); // _routesWithoutTopicFilter = new List<MessageRoute>();
_routesWithTopicFilter = new Dictionary<string, List<MessageRoute>>(); // _routesWithTopicFilter = new Dictionary<string, List<MessageRoute>>();
DeserializationType = routeType; // DeserializationType = routeType;
} // }
public void AddRoute(string? topicFilter, MessageRoute route) // public void AddRoute(string? topicFilter, MessageRoute route)
{ // {
if (string.IsNullOrEmpty(topicFilter)) // if (string.IsNullOrEmpty(topicFilter))
{ // {
_routesWithoutTopicFilter.Add(route); // _routesWithoutTopicFilter.Add(route);
} // }
else // else
{ // {
if (!_routesWithTopicFilter.TryGetValue(topicFilter!, out var list)) // if (!_routesWithTopicFilter.TryGetValue(topicFilter!, out var list))
{ // {
list = new List<MessageRoute>(); // list = new List<MessageRoute>();
_routesWithTopicFilter.Add(topicFilter!, list); // _routesWithTopicFilter.Add(topicFilter!, list);
} // }
list.Add(route); // list.Add(route);
} // }
if (route.MultipleReaders) // if (route.MultipleReaders)
MultipleReaders = true; // MultipleReaders = true;
} // }
public void Build() // public void Build()
{ // {
#if NET8_0_OR_GREATER //#if NET8_0_OR_GREATER
_routesWithTopicFilterFrozen = _routesWithTopicFilter.ToFrozenDictionary(); // _routesWithTopicFilterFrozen = _routesWithTopicFilter.ToFrozenDictionary();
#endif //#endif
} // }
internal bool Handle(string? topicFilter, SocketConnection connection, DateTime receiveTime, string? originalData, object data, out CallResult? result) // internal bool Handle(string? topicFilter, SocketConnection connection, DateTime receiveTime, string? originalData, object data, out CallResult? result)
{ // {
result = null; // result = null;
// Routes without topic filter handle both when the message topic is empty and when it is not, so we always call them // // Routes without topic filter handle both when the message topic is empty and when it is not, so we always call them
var handled = false; // var handled = false;
foreach (var route in _routesWithoutTopicFilter) // foreach (var route in _routesWithoutTopicFilter)
{ // {
var thisResult = route.Handle(connection, receiveTime, originalData, data); // var thisResult = route.Handle(connection, receiveTime, originalData, data);
if (thisResult != null) // if (thisResult != null)
result ??= thisResult; // result ??= thisResult;
handled = true; // handled = true;
} // }
// Forward to routes with matching topic filter, if any // // Forward to routes with matching topic filter, if any
if (topicFilter != null) // if (topicFilter != null)
{ // {
#if NET8_0_OR_GREATER //#if NET8_0_OR_GREATER
_routesWithTopicFilterFrozen!.TryGetValue(topicFilter, out var matchingTopicRoutes); // _routesWithTopicFilterFrozen!.TryGetValue(topicFilter, out var matchingTopicRoutes);
#else //#else
_routesWithTopicFilter.TryGetValue(topicFilter, out var matchingTopicRoutes); // _routesWithTopicFilter.TryGetValue(topicFilter, out var matchingTopicRoutes);
#endif //#endif
foreach (var route in matchingTopicRoutes ?? []) // foreach (var route in matchingTopicRoutes ?? [])
{ // {
var thisResult = route.Handle(connection, receiveTime, originalData, data); // var thisResult = route.Handle(connection, receiveTime, originalData, data);
handled = true; // handled = true;
if (thisResult != null) // if (thisResult != null)
{ // {
result ??= thisResult; // result ??= thisResult;
#warning MultipleReaders is only for queries, subscriptions should always have multiple readers = true. Maybe create different RoutingSubTable implementations for Queries and Subscriptions? //#warning MultipleReaders is only for queries, subscriptions should always have multiple readers = true. Maybe create different RoutingSubTable implementations for Queries and Subscriptions?
if (!MultipleReaders) // if (!MultipleReaders)
break; // break;
} // }
} // }
} // }
return handled; // return handled;
} // }
} // }
} //}

View File

@ -0,0 +1,76 @@
using CryptoExchange.Net.Objects;
using System;
using System.Collections.Generic;
using System.Text;
#if NET8_0_OR_GREATER
using System.Collections.Frozen;
#endif
namespace CryptoExchange.Net.Sockets.Default.Routing
{
internal class SubscriptionRouter : ProcessorRouter<SubscriptionRouteCollection>
{
public SubscriptionRouter(IEnumerable<MessageRoute> routes) : base(routes)
{
}
public override Dictionary<string, SubscriptionRouteCollection> BuildFromRoutes(IEnumerable<MessageRoute> routes)
{
var newMap = new Dictionary<string, SubscriptionRouteCollection>();
foreach (var route in routes)
{
if (!newMap.TryGetValue(route.TypeIdentifier, out var typeMap))
{
typeMap = new SubscriptionRouteCollection(route.DeserializationType);
newMap.Add(route.TypeIdentifier, typeMap);
}
typeMap.AddRoute(route.TopicFilter, route);
}
foreach (var subEntry in newMap.Values)
subEntry.Build();
return newMap;
}
}
internal class SubscriptionRouteCollection : RouteCollection
{
public SubscriptionRouteCollection(Type routeType) : base(routeType)
{
}
public override bool Handle(string? topicFilter, SocketConnection connection, DateTime receiveTime, string? originalData, object data, out CallResult? result)
{
result = null;
// Routes without topic filter handle both when the message topic is empty and when it is not, so we always call them
var handled = false;
foreach (var route in _routesWithoutTopicFilter)
{
var thisResult = route.Handle(connection, receiveTime, originalData, data);
if (thisResult != null)
result ??= thisResult;
handled = true;
}
// Forward to routes with matching topic filter, if any
if (topicFilter != null)
{
var matchingTopicRoutes = GetRoutesWithMatchingTopicFilter(topicFilter);
foreach (var route in matchingTopicRoutes ?? [])
{
var thisResult = route.Handle(connection, receiveTime, originalData, data);
handled = true;
if (thisResult != null)
result ??= thisResult;
}
}
return handled;
}
}
}

View File

@ -82,7 +82,7 @@ namespace CryptoExchange.Net.Sockets.Default
set set
{ {
_router = value; _router = value;
_router.BuildRouteMap(); _router.BuildSubscriptionRouteMap();
OnMessageRouterUpdated?.Invoke(); OnMessageRouterUpdated?.Invoke();
} }
} }
@ -198,7 +198,7 @@ namespace CryptoExchange.Net.Sockets.Default
SubscriptionQuery.Timeout(); SubscriptionQuery.Timeout();
} }
return MessageRouter[typeIdentifier]?.Handle(topicFilter, connection, receiveTime, originalData, data, out _) ?? false; return MessageRouter.Handle(typeIdentifier, topicFilter, connection, receiveTime, originalData, data, out _);
} }
/// <summary> /// <summary>

View File

@ -70,7 +70,7 @@ namespace CryptoExchange.Net.Sockets
set set
{ {
_router = value; _router = value;
_router.BuildRouteMap(); _router.BuildQueryRouteMap();
OnMessageRouterUpdated?.Invoke(); OnMessageRouterUpdated?.Invoke();
} }
} }
@ -211,7 +211,7 @@ namespace CryptoExchange.Net.Sockets
if (Result?.Success != false) if (Result?.Success != false)
{ {
// If an error result is already set don't override that // If an error result is already set don't override that
MessageRouter[typeIdentifier]!.Handle(topicFilter, connection, receiveTime, originalData, message, out var result); MessageRouter.Handle(typeIdentifier, topicFilter, connection, receiveTime, originalData, message, out var result);
Result = result; Result = result;
if (Result == null) if (Result == null)
// Null from Handle means it wasn't actually for this query // Null from Handle means it wasn't actually for this query