diff --git a/CryptoExchange.Net.UnitTests/Implementations/TestQuery.cs b/CryptoExchange.Net.UnitTests/Implementations/TestQuery.cs
index 4d762dd..05dd240 100644
--- a/CryptoExchange.Net.UnitTests/Implementations/TestQuery.cs
+++ b/CryptoExchange.Net.UnitTests/Implementations/TestQuery.cs
@@ -2,6 +2,7 @@
using CryptoExchange.Net.Objects.Errors;
using CryptoExchange.Net.Sockets;
using CryptoExchange.Net.Sockets.Default;
+using CryptoExchange.Net.Sockets.Default.Routing;
using System;
namespace CryptoExchange.Net.UnitTests.Implementations
diff --git a/CryptoExchange.Net.UnitTests/Implementations/TestSubscription.cs b/CryptoExchange.Net.UnitTests/Implementations/TestSubscription.cs
index 6aa3b4b..1ff2a4d 100644
--- a/CryptoExchange.Net.UnitTests/Implementations/TestSubscription.cs
+++ b/CryptoExchange.Net.UnitTests/Implementations/TestSubscription.cs
@@ -2,6 +2,7 @@
using CryptoExchange.Net.Objects.Sockets;
using CryptoExchange.Net.Sockets;
using CryptoExchange.Net.Sockets.Default;
+using CryptoExchange.Net.Sockets.Default.Routing;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
diff --git a/CryptoExchange.Net/Converters/SystemTextJson/EnumConverter.cs b/CryptoExchange.Net/Converters/SystemTextJson/EnumConverter.cs
index 4eafc67..4d598a4 100644
--- a/CryptoExchange.Net/Converters/SystemTextJson/EnumConverter.cs
+++ b/CryptoExchange.Net/Converters/SystemTextJson/EnumConverter.cs
@@ -171,6 +171,7 @@ namespace CryptoExchange.Net.Converters.SystemTextJson
return resultOptimistic.Value;
}
+ var isNumber = reader.TokenType == JsonTokenType.Number;
var stringValue = reader.TokenType switch
{
JsonTokenType.String => reader.GetString(),
@@ -207,7 +208,7 @@ namespace CryptoExchange.Net.Converters.SystemTextJson
return null;
}
- if (RunOptimistic)
+ if (RunOptimistic && !isNumber)
{
if (!_unknownValuesWarned.Contains(stringValue))
{
diff --git a/CryptoExchange.Net/Sockets/Default/Routing/MessageRoute.cs b/CryptoExchange.Net/Sockets/Default/Routing/MessageRoute.cs
new file mode 100644
index 0000000..9ed234e
--- /dev/null
+++ b/CryptoExchange.Net/Sockets/Default/Routing/MessageRoute.cs
@@ -0,0 +1,104 @@
+using CryptoExchange.Net.Objects;
+using System;
+
+namespace CryptoExchange.Net.Sockets.Default.Routing
+{
+ ///
+ /// Message route
+ ///
+ public abstract class MessageRoute
+ {
+ ///
+ /// Type identifier
+ ///
+ public string TypeIdentifier { get; set; }
+ ///
+ /// Optional topic filter
+ ///
+ public string? TopicFilter { get; set; }
+
+ ///
+ /// Whether responses to this route might be read by multiple listeners
+ ///
+ public bool MultipleReaders { get; set; } = false;
+
+ ///
+ /// Deserialization type
+ ///
+ public abstract Type DeserializationType { get; }
+
+ ///
+ /// ctor
+ ///
+ public MessageRoute(string typeIdentifier, string? topicFilter)
+ {
+ TypeIdentifier = typeIdentifier;
+ TopicFilter = topicFilter;
+ }
+
+ ///
+ /// Message handler
+ ///
+ public abstract CallResult? Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object data);
+ }
+
+ ///
+ /// Message route
+ ///
+ public class MessageRoute : MessageRoute
+ {
+ private Func _handler;
+
+ ///
+ public override Type DeserializationType { get; } = typeof(TMessage);
+
+ ///
+ /// ctor
+ ///
+ internal MessageRoute(string typeIdentifier, string? topicFilter, Func handler, bool multipleReaders = false)
+ : base(typeIdentifier, topicFilter)
+ {
+ _handler = handler;
+ MultipleReaders = multipleReaders;
+ }
+
+ ///
+ /// Create route without topic filter
+ ///
+ public static MessageRoute CreateWithoutTopicFilter(string typeIdentifier, Func handler, bool multipleReaders = false)
+ {
+ return new MessageRoute(typeIdentifier, null, handler)
+ {
+ MultipleReaders = multipleReaders
+ };
+ }
+
+ ///
+ /// Create route with optional topic filter
+ ///
+ public static MessageRoute CreateWithOptionalTopicFilter(string typeIdentifier, string? topicFilter, Func handler, bool multipleReaders = false)
+ {
+ return new MessageRoute(typeIdentifier, topicFilter, handler)
+ {
+ MultipleReaders = multipleReaders
+ };
+ }
+
+ ///
+ /// Create route with topic filter
+ ///
+ public static MessageRoute CreateWithTopicFilter(string typeIdentifier, string topicFilter, Func handler, bool multipleReaders = false)
+ {
+ return new MessageRoute(typeIdentifier, topicFilter, handler)
+ {
+ MultipleReaders = multipleReaders
+ };
+ }
+
+ ///
+ public override CallResult? Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object data)
+ {
+ return _handler(connection, receiveTime, originalData, (TMessage)data);
+ }
+ }
+}
diff --git a/CryptoExchange.Net/Sockets/Default/Routing/MessageRouter.cs b/CryptoExchange.Net/Sockets/Default/Routing/MessageRouter.cs
new file mode 100644
index 0000000..6b2542c
--- /dev/null
+++ b/CryptoExchange.Net/Sockets/Default/Routing/MessageRouter.cs
@@ -0,0 +1,185 @@
+using CryptoExchange.Net.Objects;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+
+namespace CryptoExchange.Net.Sockets.Default.Routing
+{
+ ///
+ /// Message router
+ ///
+ public class MessageRouter
+ {
+ private RoutingSubTable? _routingTable;
+
+ ///
+ /// The routes registered for this router
+ ///
+ public MessageRoute[] Routes { get; }
+
+ ///
+ /// ctor
+ ///
+ private MessageRouter(params MessageRoute[] routes)
+ {
+ Routes = routes;
+ }
+
+ ///
+ /// Build the route mapping
+ ///
+ public void BuildRouteMap()
+ {
+ _routingTable = new RoutingSubTable(Routes);
+ }
+
+ ///
+ /// Get routes matching the type identifier
+ ///
+ internal RoutingSubTableEntry? this[string identifier]
+ {
+ get => (_routingTable ?? throw new InvalidOperationException("Route map not initialized before use"))[identifier];
+ }
+
+ ///
+ /// Create message router without specific message handler
+ ///
+ public static MessageRouter CreateWithoutHandler(string typeIdentifier, bool multipleReaders = false)
+ {
+ return new MessageRouter(new MessageRoute(typeIdentifier, null, (con, receiveTime, originalData, msg) => new CallResult(default, null, null), multipleReaders));
+ }
+
+ ///
+ /// Create message router without specific message handler
+ ///
+ public static MessageRouter CreateWithoutHandler(string typeIdentifier, string topicFilter, bool multipleReaders = false)
+ {
+ return new MessageRouter(new MessageRoute(typeIdentifier, topicFilter, (con, receiveTime, originalData, msg) => new CallResult(default, null, null), multipleReaders));
+ }
+
+ ///
+ /// Create message router without topic filter
+ ///
+ public static MessageRouter CreateWithoutTopicFilter(IEnumerable values, Func handler, bool multipleReaders = false)
+ {
+ return new MessageRouter(values.Select(x => new MessageRoute(x, null, handler, multipleReaders)).ToArray());
+ }
+
+ ///
+ /// Create message router without topic filter
+ ///
+ public static MessageRouter CreateWithoutTopicFilter(string typeIdentifier, Func handler, bool multipleReaders = false)
+ {
+ return new MessageRouter(new MessageRoute(typeIdentifier, null, handler, multipleReaders));
+ }
+
+ ///
+ /// Create message router with topic filter
+ ///
+ public static MessageRouter CreateWithTopicFilter(string typeIdentifier, string topicFilter, Func handler, bool multipleReaders = false)
+ {
+ return new MessageRouter(new MessageRoute(typeIdentifier, topicFilter, handler, multipleReaders));
+ }
+
+ ///
+ /// Create message router with topic filter
+ ///
+ public static MessageRouter CreateWithTopicFilter(IEnumerable typeIdentifiers, string topicFilter, Func handler, bool multipleReaders = false)
+ {
+ var routes = new List();
+ foreach (var type in typeIdentifiers)
+ routes.Add(new MessageRoute(type, topicFilter, handler, multipleReaders));
+
+ return new MessageRouter(routes.ToArray());
+ }
+
+ ///
+ /// Create message router with topic filter
+ ///
+ public static MessageRouter CreateWithTopicFilters(string typeIdentifier, IEnumerable topicFilters, Func handler, bool multipleReaders = false)
+ {
+ var routes = new List();
+ foreach (var filter in topicFilters)
+ routes.Add(new MessageRoute(typeIdentifier, filter, handler, multipleReaders));
+
+ return new MessageRouter(routes.ToArray());
+ }
+
+ ///
+ /// Create message router with topic filter
+ ///
+ public static MessageRouter CreateWithTopicFilters(IEnumerable typeIdentifiers, IEnumerable topicFilters, Func handler, bool multipleReaders = false)
+ {
+ var routes = new List();
+ foreach (var type in typeIdentifiers)
+ {
+ foreach (var filter in topicFilters)
+ routes.Add(new MessageRoute(type, filter, handler, multipleReaders));
+ }
+
+ return new MessageRouter(routes.ToArray());
+ }
+
+ ///
+ /// Create message router with optional topic filter
+ ///
+ public static MessageRouter CreateWithOptionalTopicFilter(string typeIdentifier, string? topicFilter, Func handler, bool multipleReaders = false)
+ {
+ return new MessageRouter(new MessageRoute(typeIdentifier, topicFilter, handler, multipleReaders));
+ }
+
+ ///
+ /// Create message router with optional topic filter
+ ///
+ public static MessageRouter CreateWithOptionalTopicFilters(string typeIdentifier, IEnumerable? topicFilters, Func handler, bool multipleReaders = false)
+ {
+ var routes = new List();
+ if (topicFilters?.Count() > 0)
+ {
+ foreach (var filter in topicFilters)
+ routes.Add(new MessageRoute(typeIdentifier, filter, handler, multipleReaders));
+ }
+ else
+ {
+ routes.Add(new MessageRoute(typeIdentifier, null, handler, multipleReaders));
+ }
+
+ return new MessageRouter(routes.ToArray());
+ }
+
+ ///
+ /// Create message router with optional topic filter
+ ///
+ public static MessageRouter CreateWithOptionalTopicFilters(IEnumerable typeIdentifiers, IEnumerable? topicFilters, Func handler, bool multipleReaders = false)
+ {
+ var routes = new List();
+ foreach (var typeIdentifier in typeIdentifiers)
+ {
+ if (topicFilters?.Count() > 0)
+ {
+ foreach (var filter in topicFilters)
+ routes.Add(new MessageRoute(typeIdentifier, filter, handler, multipleReaders));
+ }
+ else
+ {
+ routes.Add(new MessageRoute(typeIdentifier, null, handler, multipleReaders));
+ }
+ }
+
+ return new MessageRouter(routes.ToArray());
+ }
+
+ ///
+ /// Create message matcher with specific routes
+ ///
+ public static MessageRouter Create(params MessageRoute[] routes)
+ {
+ return new MessageRouter(routes);
+ }
+
+ ///
+ /// Whether this matcher contains a specific link
+ ///
+ public bool ContainsCheck(MessageRoute route) => Routes.Any(x => x.TypeIdentifier == route.TypeIdentifier && x.TopicFilter == route.TopicFilter);
+ }
+}
diff --git a/CryptoExchange.Net/Sockets/Default/Routing/RoutingSubTable.cs b/CryptoExchange.Net/Sockets/Default/Routing/RoutingSubTable.cs
new file mode 100644
index 0000000..caf5b3f
--- /dev/null
+++ b/CryptoExchange.Net/Sockets/Default/Routing/RoutingSubTable.cs
@@ -0,0 +1,142 @@
+using CryptoExchange.Net.Objects;
+using System;
+#if NET8_0_OR_GREATER
+using System.Collections.Frozen;
+#endif
+using System.Collections.Generic;
+
+namespace CryptoExchange.Net.Sockets.Default.Routing
+{
+ internal class RoutingSubTable
+ {
+#if NET8_0_OR_GREATER
+ // Used for mapping a type identifier to the routes matching it
+ private FrozenDictionary _routeMap;
+#else
+ private Dictionary _routeMap;
+#endif
+
+ public RoutingSubTable(IEnumerable routes)
+ {
+ var newMap = new Dictionary();
+ foreach (var route in routes)
+ {
+ if (!newMap.TryGetValue(route.TypeIdentifier, out var typeMap))
+ {
+ typeMap = new RoutingSubTableEntry(route.DeserializationType);
+ newMap.Add(route.TypeIdentifier, typeMap);
+ }
+
+ typeMap.AddRoute(route.TopicFilter, route);
+ }
+
+ foreach(var subEntry in newMap.Values)
+ subEntry.Build();
+
+#if NET8_0_OR_GREATER
+ _routeMap = newMap.ToFrozenDictionary();
+#else
+ _routeMap = newMap;
+#endif
+ }
+
+ ///
+ /// Get routes matching the type identifier
+ ///
+ public RoutingSubTableEntry? this[string identifier]
+ {
+ get => _routeMap.TryGetValue(identifier, out var routes) ? routes : null;
+ }
+ }
+
+ internal record RoutingSubTableEntry
+ {
+ public Type DeserializationType { get; }
+ public bool MultipleReaders { get; private set; }
+
+ private List _routesWithoutTopicFilter;
+ private Dictionary> _routesWithTopicFilter;
+#if NET8_0_OR_GREATER
+ // Used for mapping a type identifier to the routes matching it
+ private FrozenDictionary>? _routesWithTopicFilterFrozen;
+#endif
+
+ public RoutingSubTableEntry(Type routeType)
+ {
+ _routesWithoutTopicFilter = new List();
+ _routesWithTopicFilter = new Dictionary>();
+
+ DeserializationType = routeType;
+ }
+
+ public void AddRoute(string? topicFilter, MessageRoute route)
+ {
+ if (string.IsNullOrEmpty(topicFilter))
+ {
+ _routesWithoutTopicFilter.Add(route);
+ }
+ else
+ {
+ if (!_routesWithTopicFilter.TryGetValue(topicFilter!, out var list))
+ {
+ list = new List();
+ _routesWithTopicFilter.Add(topicFilter!, list);
+ }
+
+ list.Add(route);
+ }
+
+ if (route.MultipleReaders)
+ MultipleReaders = true;
+ }
+
+ public void Build()
+ {
+#if NET8_0_OR_GREATER
+ _routesWithTopicFilterFrozen = _routesWithTopicFilter.ToFrozenDictionary();
+#endif
+ }
+
+ internal bool Handle(string? topicFilter, SocketConnection connection, DateTime receiveTime, string? originalData, object data, out CallResult? result)
+ {
+ result = null;
+
+ // Routes without topic filter handle both when the message topic is empty and when it is not, so we always call them
+ var handled = false;
+ foreach (var route in _routesWithoutTopicFilter)
+ {
+ var thisResult = route.Handle(connection, receiveTime, originalData, data);
+ if (thisResult != null)
+ result ??= thisResult;
+
+ handled = true;
+ }
+
+ // Forward to routes with matching topic filter, if any
+ if (topicFilter != null)
+ {
+#if NET8_0_OR_GREATER
+ _routesWithTopicFilterFrozen!.TryGetValue(topicFilter, out var matchingTopicRoutes);
+#else
+ _routesWithTopicFilter.TryGetValue(topicFilter, out var matchingTopicRoutes);
+#endif
+ foreach (var route in matchingTopicRoutes ?? [])
+ {
+ var thisResult = route.Handle(connection, receiveTime, originalData, data);
+ handled = true;
+
+ if (thisResult != null)
+ {
+ result ??= thisResult;
+
+ if (!MultipleReaders)
+ break;
+ }
+
+ }
+ }
+
+ return handled;
+ }
+ }
+}
diff --git a/CryptoExchange.Net/Sockets/Default/Routing/RoutingTable.cs b/CryptoExchange.Net/Sockets/Default/Routing/RoutingTable.cs
new file mode 100644
index 0000000..7ef699c
--- /dev/null
+++ b/CryptoExchange.Net/Sockets/Default/Routing/RoutingTable.cs
@@ -0,0 +1,96 @@
+using CryptoExchange.Net.Sockets.Interfaces;
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace CryptoExchange.Net.Sockets.Default.Routing
+{
+ ///
+ /// Routing table
+ ///
+ public class RoutingTable
+ {
+ private Dictionary _routeTableEntries;
+
+ ///
+ /// Create routing table for provided processors
+ ///
+ public RoutingTable(IEnumerable processors)
+ {
+ _routeTableEntries = new Dictionary();
+ foreach (var entry in processors)
+ {
+ foreach (var route in entry.MessageRouter.Routes)
+ {
+ if (!_routeTableEntries.ContainsKey(route.TypeIdentifier))
+ _routeTableEntries.Add(route.TypeIdentifier, new RoutingTableEntry(route.DeserializationType));
+
+ _routeTableEntries[route.TypeIdentifier].Handlers.Add(entry);
+ }
+ }
+ }
+
+ ///
+ /// Get route table entry for a type identifier
+ ///
+ public RoutingTableEntry? GetRouteTableEntry(string typeIdentifier)
+ {
+ return _routeTableEntries.TryGetValue(typeIdentifier, out var entry) ? entry : null;
+ }
+
+ ///
+ public override string ToString()
+ {
+ var sb = new StringBuilder();
+ foreach (var entry in _routeTableEntries)
+ {
+ sb.AppendLine($"{entry.Key}, {entry.Value.DeserializationType.Name}");
+ foreach(var item in entry.Value.Handlers)
+ {
+ sb.AppendLine($" - Processor {item.GetType().Name}");
+ foreach(var route in item.MessageRouter.Routes)
+ {
+ if (route.TypeIdentifier == entry.Key)
+ {
+ if (route.TopicFilter == null)
+ sb.AppendLine($" - Route without topic filter");
+ else
+ sb.AppendLine($" - Route with topic filter {route.TopicFilter}");
+ }
+ }
+ }
+ }
+
+ return sb.ToString();
+ }
+ }
+
+ ///
+ /// Routing table entry
+ ///
+ public record RoutingTableEntry
+ {
+ ///
+ /// Whether the deserialization type is string
+ ///
+ public bool IsStringOutput { get; set; }
+ ///
+ /// The deserialization type
+ ///
+ public Type DeserializationType { get; set; }
+ ///
+ /// Message processors
+ ///
+ public List Handlers { get; set; }
+
+ ///
+ /// ctor
+ ///
+ public RoutingTableEntry(Type deserializationType)
+ {
+ IsStringOutput = deserializationType == typeof(string);
+ DeserializationType = deserializationType;
+ Handlers = new List();
+ }
+ }
+}
diff --git a/CryptoExchange.Net/Sockets/Default/SocketConnection.cs b/CryptoExchange.Net/Sockets/Default/SocketConnection.cs
index b1fa7db..d69f66b 100644
--- a/CryptoExchange.Net/Sockets/Default/SocketConnection.cs
+++ b/CryptoExchange.Net/Sockets/Default/SocketConnection.cs
@@ -5,13 +5,11 @@ using CryptoExchange.Net.Logging.Extensions;
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Sockets;
using CryptoExchange.Net.Sockets.Default.Interfaces;
+using CryptoExchange.Net.Sockets.Default.Routing;
using CryptoExchange.Net.Sockets.Interfaces;
using Microsoft.Extensions.Logging;
using System;
using System.Collections;
-#if NET8_0_OR_GREATER
-using System.Collections.Frozen;
-#endif
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Data;
@@ -267,8 +265,7 @@ namespace CryptoExchange.Net.Sockets.Default
private readonly object _listenersLock = new object();
#endif
-
- private RouteTable _routeTable = new RouteTable([]);
+ private RoutingTable _routeTable = new RoutingTable([]);
private ReadOnlyCollection _listeners;
private readonly ILogger _logger;
@@ -552,7 +549,7 @@ namespace CryptoExchange.Net.Sockets.Default
object result;
try
{
- if (routingEntry.RouteType == typeof(string))
+ if (routingEntry.IsStringOutput)
{
#if NETSTANDARD2_0
result = Encoding.UTF8.GetString(data.ToArray());
@@ -562,7 +559,7 @@ namespace CryptoExchange.Net.Sockets.Default
}
else
{
- result = messageConverter.Deserialize(data, routingEntry.RouteType);
+ result = messageConverter.Deserialize(data, routingEntry.DeserializationType);
}
}
catch(Exception ex)
@@ -1145,15 +1142,32 @@ namespace CryptoExchange.Net.Sockets.Default
});
}
+ private void BuildRoutingTable()
+ {
+ _routeTable = new RoutingTable(_listeners);
+ }
+
private void AddMessageProcessor(IMessageProcessor processor)
{
lock (_listenersLock)
{
var updatedList = new List(_listeners);
updatedList.Add(processor);
- _routeTable = new RouteTable(updatedList);
+ processor.OnMessageRouterUpdated += () =>
+ {
+ BuildRoutingTable();
+#if DEBUG
+ _logger.LogTrace("MessageRouter updated, new routing table:\r\n" + _routeTable.ToString());
+#endif
+ };
_listeners = updatedList.AsReadOnly();
-
+ if (processor.MessageRouter.Routes.Length > 0)
+ {
+ BuildRoutingTable();
+#if DEBUG
+ _logger.LogTrace("Processor added, new routing table:\r\n" + _routeTable.ToString());
+#endif
+ }
}
}
@@ -1162,9 +1176,15 @@ namespace CryptoExchange.Net.Sockets.Default
lock (_listenersLock)
{
var updatedList = new List(_listeners);
- updatedList.Remove(processor);
- _routeTable = new RouteTable(updatedList);
+ if (!updatedList.Remove(processor))
+ return; // If nothing removed nothing has changed
+
+ processor.OnMessageRouterUpdated -= BuildRoutingTable;
_listeners = updatedList.AsReadOnly();
+ BuildRoutingTable();
+#if DEBUG
+ _logger.LogTrace("Processor removed, new routing table:\r\n" + _routeTable.ToString());
+#endif
}
}
@@ -1173,13 +1193,24 @@ namespace CryptoExchange.Net.Sockets.Default
lock (_listenersLock)
{
var updatedList = new List(_listeners);
+ var anyRemoved = false;
foreach (var processor in processors)
- updatedList.Remove(processor);
- _routeTable = new RouteTable(updatedList);
+ {
+ processor.OnMessageRouterUpdated -= BuildRoutingTable;
+ if (updatedList.Remove(processor))
+ anyRemoved = true;
+ }
+
+ if (!anyRemoved)
+ return; // If nothing removed nothing has changed
+
_listeners = updatedList.AsReadOnly();
+ BuildRoutingTable();
+#if DEBUG
+ _logger.LogTrace("Processors removed, new routing table:\r\n" + _routeTable.ToString());
+#endif
}
}
-
}
}
diff --git a/CryptoExchange.Net/Sockets/Default/Subscription.cs b/CryptoExchange.Net/Sockets/Default/Subscription.cs
index 04dde72..05053f7 100644
--- a/CryptoExchange.Net/Sockets/Default/Subscription.cs
+++ b/CryptoExchange.Net/Sockets/Default/Subscription.cs
@@ -1,5 +1,6 @@
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Objects;
+using CryptoExchange.Net.Sockets.Default.Routing;
using CryptoExchange.Net.Sockets.Interfaces;
using Microsoft.Extensions.Logging;
using System;
@@ -82,6 +83,7 @@ namespace CryptoExchange.Net.Sockets.Default
{
_router = value;
_router.BuildRouteMap();
+ OnMessageRouterUpdated?.Invoke();
}
}
@@ -119,6 +121,9 @@ namespace CryptoExchange.Net.Sockets.Default
///
public int IndividualSubscriptionCount { get; set; } = 1;
+ ///
+ public event Action? OnMessageRouterUpdated;
+
///
/// ctor
///
@@ -193,9 +198,7 @@ namespace CryptoExchange.Net.Sockets.Default
SubscriptionQuery.Timeout();
}
- return MessageRouter[typeIdentifier].Handle(topicFilter, connection, receiveTime, originalData, data, out _);
-
- //return routeMap.Handle(topicFilter, connection, receiveTime, originalData, data, out _);
+ return MessageRouter[typeIdentifier]?.Handle(topicFilter, connection, receiveTime, originalData, data, out _) ?? false;
}
///
diff --git a/CryptoExchange.Net/Sockets/Interfaces/IMessageProcessor.cs b/CryptoExchange.Net/Sockets/Interfaces/IMessageProcessor.cs
index 2b59419..ab807ca 100644
--- a/CryptoExchange.Net/Sockets/Interfaces/IMessageProcessor.cs
+++ b/CryptoExchange.Net/Sockets/Interfaces/IMessageProcessor.cs
@@ -1,6 +1,7 @@
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Sockets.Default;
+using CryptoExchange.Net.Sockets.Default.Routing;
using System;
namespace CryptoExchange.Net.Sockets.Interfaces
@@ -19,6 +20,10 @@ namespace CryptoExchange.Net.Sockets.Interfaces
///
public MessageRouter MessageRouter { get; }
///
+ /// Event when the message router for this processor has been changed
+ ///
+ public event Action OnMessageRouterUpdated;
+ ///
/// Handle a message
///
//bool Handle(string? topicFilter, SocketConnection connection, DateTime receiveTime, string? originalData, object result, MessageRoute route);
diff --git a/CryptoExchange.Net/Sockets/MessageRouter.cs b/CryptoExchange.Net/Sockets/MessageRouter.cs
deleted file mode 100644
index 23253e6..0000000
--- a/CryptoExchange.Net/Sockets/MessageRouter.cs
+++ /dev/null
@@ -1,425 +0,0 @@
-using CryptoExchange.Net.Objects;
-using CryptoExchange.Net.Sockets.Default;
-using CryptoExchange.Net.Sockets.Interfaces;
-using System;
-#if NET8_0_OR_GREATER
-using System.Collections.Frozen;
-#endif
-using System.Collections.Generic;
-using System.Linq;
-
-namespace CryptoExchange.Net.Sockets
-{
- public class RouteTable
- {
- private Dictionary _routeTableEntries;
-
- public RouteTable(IEnumerable processors)
- {
- _routeTableEntries = new Dictionary();
- foreach (var entry in processors)
- {
- foreach(var route in entry.MessageRouter.Routes)
- {
- if (!_routeTableEntries.ContainsKey(route.TypeIdentifier)) {
- _routeTableEntries.Add(route.TypeIdentifier, new RouteTableEntry()
- {
- RouteType = route.DeserializationType,
- Handlers = new List()
- });
- }
-
- _routeTableEntries[route.TypeIdentifier].Handlers.Add(entry);
- }
-
- }
- }
-
- public RouteTableEntry? GetRouteTableEntry(string typeIdentifier)
- {
- return _routeTableEntries.TryGetValue(typeIdentifier, out var entry) ? entry : null;
- }
- }
-
- public record RouteTableEntry
- {
- public Type RouteType { get; set; }
- public List Handlers { get; set; }
-
- }
-
- public record RouteMapEntry
- {
- public Type RouteType { get; }
- public bool MultipleReaders { get; private set; }
-
- private List _routesWithoutTopicFilter;
- private Dictionary> _routesWithTopicFilter;
-
- public RouteMapEntry(Type routeType)
- {
- _routesWithoutTopicFilter = new List();
- _routesWithTopicFilter = new Dictionary>();
-
- RouteType = routeType;
- }
-
- public void AddRoute(string? topicFilter, MessageRoute route)
- {
- if (string.IsNullOrEmpty(topicFilter))
- {
- _routesWithoutTopicFilter.Add(route);
- }
- else
- {
- if (!_routesWithTopicFilter.TryGetValue(topicFilter!, out var list))
- {
- list = new List();
- _routesWithTopicFilter.Add(topicFilter!, list);
- }
-
- list.Add(route);
- }
-
- if (route.MultipleReaders)
- MultipleReaders = true;
- }
-
- internal bool Handle(string? topicFilter, SocketConnection connection, DateTime receiveTime, string? originalData, object data, out CallResult? result)
- {
- result = null;
-
- // Routes without topic filter handle both when the message topic is empty and when it is not, so we always call them
- var handled = false;
- foreach (var route in _routesWithoutTopicFilter)
- {
- var thisResult = route.Handle(connection, receiveTime, originalData, data);
- if (thisResult != null)
- result ??= thisResult;
-
- handled = true;
- }
-
- // Forward to routes with matching topic filter, if any
- if (topicFilter != null)
- {
- _routesWithTopicFilter.TryGetValue(topicFilter, out var matchingTopicRoutes);
- foreach (var route in matchingTopicRoutes ?? [])
- {
- var thisResult = route.Handle(connection, receiveTime, originalData, data);
- handled = true;
-
- if (thisResult != null)
- {
- result ??= thisResult;
-
- if (!MultipleReaders)
- break;
- }
-
- }
- }
-
- return handled;
- }
- }
-
- ///
- /// Message router
- ///
- public class MessageRouter
- {
- ///
- /// The routes registered for this router
- ///
- public MessageRoute[] Routes { get; }
-
-#if NET8_0_OR_GREATER
- // Used for mapping a type identifier to the routes matching it
- private FrozenDictionary? _routeMap;
-#else
- private Dictionary? _routeMap;
-#endif
-
- ///
- /// ctor
- ///
- private MessageRouter(params MessageRoute[] routes)
- {
- Routes = routes;
- }
-
- ///
- /// Build the route mapping
- ///
- public void BuildRouteMap()
- {
- var newMap = new Dictionary();
- foreach (var route in Routes)
- {
- if (!newMap.TryGetValue(route.TypeIdentifier, out var typeMap))
- {
- typeMap = new RouteMapEntry(route.DeserializationType);
- newMap.Add(route.TypeIdentifier, typeMap);
- }
-
- typeMap.AddRoute(route.TopicFilter, route);
- }
-
-#if NET8_0_OR_GREATER
- _routeMap = newMap.ToFrozenDictionary();
-#else
- _routeMap = newMap;
-#endif
- }
-
- ///
- /// Get routes matching the type identifier
- ///
- public RouteMapEntry? this[string identifier]
- {
- get => (_routeMap ?? throw new InvalidOperationException("Route map not initialized before use")).TryGetValue(identifier, out var routes) ? routes: null;
- }
-
- ///
- /// Create message router without specific message handler
- ///
- public static MessageRouter CreateWithoutHandler(string typeIdentifier, bool multipleReaders = false)
- {
- return new MessageRouter(new MessageRoute(typeIdentifier, (string?)null, (con, receiveTime, originalData, msg) => new CallResult(default, null, null), multipleReaders));
- }
-
- ///
- /// Create message router without specific message handler
- ///
- public static MessageRouter CreateWithoutHandler(string typeIdentifier, string topicFilter, bool multipleReaders = false)
- {
- return new MessageRouter(new MessageRoute(typeIdentifier, topicFilter, (con, receiveTime, originalData, msg) => new CallResult(default, null, null), multipleReaders));
- }
-
- ///
- /// Create message router without topic filter
- ///
- public static MessageRouter CreateWithoutTopicFilter(IEnumerable values, Func handler, bool multipleReaders = false)
- {
- return new MessageRouter(values.Select(x => new MessageRoute(x, null, handler, multipleReaders)).ToArray());
- }
-
- ///
- /// Create message router without topic filter
- ///
- public static MessageRouter CreateWithoutTopicFilter(string typeIdentifier, Func handler, bool multipleReaders = false)
- {
- return new MessageRouter(new MessageRoute(typeIdentifier, null, handler, multipleReaders));
- }
-
- ///
- /// Create message router with topic filter
- ///
- public static MessageRouter CreateWithTopicFilter(string typeIdentifier, string topicFilter, Func handler, bool multipleReaders = false)
- {
- return new MessageRouter(new MessageRoute(typeIdentifier, topicFilter, handler, multipleReaders));
- }
-
- ///
- /// Create message router with topic filter
- ///
- public static MessageRouter CreateWithTopicFilter(IEnumerable typeIdentifiers, string topicFilter, Func handler, bool multipleReaders = false)
- {
- var routes = new List();
- foreach (var type in typeIdentifiers)
- routes.Add(new MessageRoute(type, topicFilter, handler, multipleReaders));
-
- return new MessageRouter(routes.ToArray());
- }
-
- ///
- /// Create message router with topic filter
- ///
- public static MessageRouter CreateWithTopicFilters(string typeIdentifier, IEnumerable topicFilters, Func handler, bool multipleReaders = false)
- {
- var routes = new List();
- foreach (var filter in topicFilters)
- routes.Add(new MessageRoute(typeIdentifier, filter, handler, multipleReaders));
-
- return new MessageRouter(routes.ToArray());
- }
-
- ///
- /// Create message router with topic filter
- ///
- public static MessageRouter CreateWithTopicFilters(IEnumerable typeIdentifiers, IEnumerable topicFilters, Func handler, bool multipleReaders = false)
- {
- var routes = new List();
- foreach (var type in typeIdentifiers)
- {
- foreach (var filter in topicFilters)
- routes.Add(new MessageRoute(type, filter, handler, multipleReaders));
- }
-
- return new MessageRouter(routes.ToArray());
- }
-
- ///
- /// Create message router with optional topic filter
- ///
- public static MessageRouter CreateWithOptionalTopicFilter(string typeIdentifier, string? topicFilter, Func handler, bool multipleReaders = false)
- {
- return new MessageRouter(new MessageRoute(typeIdentifier, topicFilter, handler, multipleReaders));
- }
-
- ///
- /// Create message router with optional topic filter
- ///
- public static MessageRouter CreateWithOptionalTopicFilters(string typeIdentifier, IEnumerable? topicFilters, Func handler, bool multipleReaders = false)
- {
- var routes = new List();
- if (topicFilters?.Count() > 0)
- {
- foreach (var filter in topicFilters)
- routes.Add(new MessageRoute(typeIdentifier, filter, handler, multipleReaders));
- }
- else
- {
- routes.Add(new MessageRoute(typeIdentifier, null, handler, multipleReaders));
- }
-
- return new MessageRouter(routes.ToArray());
- }
-
- ///
- /// Create message router with optional topic filter
- ///
- public static MessageRouter CreateWithOptionalTopicFilters(IEnumerable typeIdentifiers, IEnumerable? topicFilters, Func handler, bool multipleReaders = false)
- {
- var routes = new List();
- foreach (var typeIdentifier in typeIdentifiers)
- {
- if (topicFilters?.Count() > 0)
- {
- foreach (var filter in topicFilters)
- routes.Add(new MessageRoute(typeIdentifier, filter, handler, multipleReaders));
- }
- else
- {
- routes.Add(new MessageRoute(typeIdentifier, null, handler, multipleReaders));
- }
- }
-
- return new MessageRouter(routes.ToArray());
- }
-
- ///
- /// Create message matcher with specific routes
- ///
- public static MessageRouter Create(params MessageRoute[] routes)
- {
- return new MessageRouter(routes);
- }
-
- ///
- /// Whether this matcher contains a specific link
- ///
- public bool ContainsCheck(MessageRoute route) => Routes.Any(x => x.TypeIdentifier == route.TypeIdentifier && x.TopicFilter == route.TopicFilter);
- }
-
- ///
- /// Message route
- ///
- public abstract class MessageRoute
- {
- ///
- /// Type identifier
- ///
- public string TypeIdentifier { get; set; }
- ///
- /// Optional topic filter
- ///
- public string? TopicFilter { get; set; }
-
- ///
- /// Whether responses to this route might be read by multiple listeners
- ///
- public bool MultipleReaders { get; set; } = false;
-
- ///
- /// Deserialization type
- ///
- public abstract Type DeserializationType { get; }
-
- ///
- /// ctor
- ///
- public MessageRoute(string typeIdentifier, string? topicFilter)
- {
- TypeIdentifier = typeIdentifier;
- TopicFilter = topicFilter;
- }
-
- ///
- /// Message handler
- ///
- public abstract CallResult? Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object data);
- }
-
- ///
- /// Message route
- ///
- public class MessageRoute : MessageRoute
- {
- private Func _handler;
-
- ///
- public override Type DeserializationType { get; } = typeof(TMessage);
-
- ///
- /// ctor
- ///
- internal MessageRoute(string typeIdentifier, string? topicFilter, Func handler, bool multipleReaders = false)
- : base(typeIdentifier, topicFilter)
- {
- _handler = handler;
- MultipleReaders = multipleReaders;
- }
-
- ///
- /// Create route without topic filter
- ///
- public static MessageRoute CreateWithoutTopicFilter(string typeIdentifier, Func handler, bool multipleReaders = false)
- {
- return new MessageRoute(typeIdentifier, null, handler)
- {
- MultipleReaders = multipleReaders
- };
- }
-
- ///
- /// Create route with optional topic filter
- ///
- public static MessageRoute CreateWithOptionalTopicFilter(string typeIdentifier, string? topicFilter, Func handler, bool multipleReaders = false)
- {
- return new MessageRoute(typeIdentifier, topicFilter, handler)
- {
- MultipleReaders = multipleReaders
- };
- }
-
- ///
- /// Create route with topic filter
- ///
- public static MessageRoute CreateWithTopicFilter(string typeIdentifier, string topicFilter, Func handler, bool multipleReaders = false)
- {
- return new MessageRoute(typeIdentifier, topicFilter, handler)
- {
- MultipleReaders = multipleReaders
- };
- }
-
- ///
- public override CallResult? Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object data)
- {
- return _handler(connection, receiveTime, originalData, (TMessage)data);
- }
- }
-
-}
diff --git a/CryptoExchange.Net/Sockets/Query.cs b/CryptoExchange.Net/Sockets/Query.cs
index ec283cb..e3865bd 100644
--- a/CryptoExchange.Net/Sockets/Query.cs
+++ b/CryptoExchange.Net/Sockets/Query.cs
@@ -1,6 +1,7 @@
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Sockets.Default;
+using CryptoExchange.Net.Sockets.Default.Routing;
using CryptoExchange.Net.Sockets.Interfaces;
using System;
using System.Threading;
@@ -70,6 +71,7 @@ namespace CryptoExchange.Net.Sockets
{
_router = value;
_router.BuildRouteMap();
+ OnMessageRouterUpdated?.Invoke();
}
}
@@ -108,6 +110,9 @@ namespace CryptoExchange.Net.Sockets
///
public Action? OnComplete { get; set; }
+ ///
+ public event Action? OnMessageRouterUpdated;
+
///
/// ctor
///
@@ -203,7 +208,7 @@ namespace CryptoExchange.Net.Sockets
if (Result?.Success != false)
{
// If an error result is already set don't override that
- MessageRouter[typeIdentifier].Handle(topicFilter, connection, receiveTime, originalData, message, out var result);
+ MessageRouter[typeIdentifier]!.Handle(topicFilter, connection, receiveTime, originalData, message, out var result);
Result = result;
if (Result == null)
// Null from Handle means it wasn't actually for this query