diff --git a/CryptoExchange.Net/CryptoExchange.Net.csproj b/CryptoExchange.Net/CryptoExchange.Net.csproj index a685e1f..bb052c2 100644 --- a/CryptoExchange.Net/CryptoExchange.Net.csproj +++ b/CryptoExchange.Net/CryptoExchange.Net.csproj @@ -6,9 +6,9 @@ CryptoExchange.Net JKorf CryptoExchange.Net is a base library which is used to implement different cryptocurrency (exchange) API's. It provides a standardized way of implementing different API's, which results in a very similar experience for users of the API implementations. - 11.0.3 - 11.0.3 - 11.0.3 + 11.0.4 + 11.0.4 + 11.0.4 false OKX;OKX.Net;Mexc;Mexc.Net;Kucoin;Kucoin.Net;Kraken;Kraken.Net;Huobi;Huobi.Net;CoinEx;CoinEx.Net;Bybit;Bybit.Net;Bitget;Bitget.Net;Bitfinex;Bitfinex.Net;Binance;Binance.Net;CryptoCurrency;CryptoCurrency Exchange;CryptoExchange.Net git diff --git a/CryptoExchange.Net/Sockets/Default/SocketConnection.cs b/CryptoExchange.Net/Sockets/Default/SocketConnection.cs index faa769c..a82dc44 100644 --- a/CryptoExchange.Net/Sockets/Default/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/Default/SocketConnection.cs @@ -11,6 +11,7 @@ using System; using System.Collections; using System.Collections.Generic; using System.Collections.ObjectModel; +using System.Data; using System.Diagnostics; using System.Linq; using System.Net.WebSockets; @@ -489,6 +490,14 @@ namespace CryptoExchange.Net.Sockets.Default /// protected internal virtual void HandleStreamMessage2(WebSocketMessageType type, ReadOnlySpan data) { + // Forward message rules: + // | Message Topic | Route Topic Filter | Topics Match | Forward | Description + // | N | N | - | Y | No topic filter applied + // | N | Y | - | N | Route only listens to specific topic + // | Y | N | - | Y | Route listens to all message regardless of topic + // | Y | Y | Y | Y | Route listens to specific message topic + // | Y | Y | N | N | Route listens to different topic + var receiveTime = DateTime.UtcNow; // 1. Decrypt/Preprocess if necessary @@ -525,10 +534,9 @@ namespace CryptoExchange.Net.Sockets.Default foreach (var listener in _listeners) { var routes = listener.MessageRouter[typeIdentifier]; - if (routes.Count > 0) { - deserializationType = routes[0].DeserializationType; - break; - } + deserializationType = routes?.RouteType; + if (deserializationType != null) + break; } if (deserializationType == null) @@ -577,55 +585,18 @@ namespace CryptoExchange.Net.Sockets.Default bool processed = false; foreach (var listener in _listeners) { - var routes = listener.MessageRouter[typeIdentifier]; - - bool isQuery = false; - Query? query = null; - if (listener is Query cquery) + var routeMap = listener.MessageRouter[typeIdentifier]; + if (routeMap != null) { - isQuery = true; - query = cquery; - } + // Could be null if listeners was updated while handling message + var handled = listener.Handle(topicFilter, this, receiveTime, originalData, result, routeMap); + if (!processed) + processed = handled; - var complete = false; - - foreach (var route in routes) - { - // Forward message rules: - // | Message Topic | Route Topic Filter | Topics Match | Forward | Description - // | N | N | - | Y | No topic filter applied - // | N | Y | - | N | Route only listens to specific topic - // | Y | N | - | Y | Route listens to all message regardless of topic - // | Y | Y | Y | Y | Route listens to specific message topic - // | Y | Y | N | N | Route listens to different topic - if (topicFilter == null) - { - if (route.TopicFilter != null) - // No topic on message, but route is filtering on topic - continue; - } - else - { - if (route.TopicFilter != null && !route.TopicFilter.Equals(topicFilter, StringComparison.Ordinal)) - // Message has a topic, and the route has a filter for another topic - continue; - } - - processed = true; - - if (isQuery && query!.Completed) - continue; - - listener.Handle(this, receiveTime, originalData, result, route); - if (isQuery && !route.MultipleReaders) - { - complete = true; + if (!routeMap.MultipleReaders) + // If this was a response to a query and there are no other routes expecting this message we can break here break; - } } - - if (complete) - break; } if (!processed) diff --git a/CryptoExchange.Net/Sockets/Default/Subscription.cs b/CryptoExchange.Net/Sockets/Default/Subscription.cs index d2585ac..d3ad6e2 100644 --- a/CryptoExchange.Net/Sockets/Default/Subscription.cs +++ b/CryptoExchange.Net/Sockets/Default/Subscription.cs @@ -180,10 +180,11 @@ namespace CryptoExchange.Net.Sockets.Default /// /// Handle an update message /// - public CallResult? Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object data, MessageRoute route) + public bool Handle(string? topicFilter, SocketConnection connection, DateTime receiveTime, string? originalData, object data, RouteMapEntry routeMap) { ConnectionInvocations++; TotalInvocations++; + if (SubscriptionQuery != null && !SubscriptionQuery.Completed && SubscriptionQuery.TimeoutBehavior == TimeoutBehavior.Succeed) { // The subscription query is one where it is successful if there is no error returned @@ -192,7 +193,7 @@ namespace CryptoExchange.Net.Sockets.Default SubscriptionQuery.Timeout(); } - return route.Handle(connection, receiveTime, originalData, data); + return routeMap.Handle(topicFilter, connection, receiveTime, originalData, data, out _); } /// diff --git a/CryptoExchange.Net/Sockets/Interfaces/IMessageProcessor.cs b/CryptoExchange.Net/Sockets/Interfaces/IMessageProcessor.cs index 24e5d5b..aa3ef27 100644 --- a/CryptoExchange.Net/Sockets/Interfaces/IMessageProcessor.cs +++ b/CryptoExchange.Net/Sockets/Interfaces/IMessageProcessor.cs @@ -21,6 +21,6 @@ namespace CryptoExchange.Net.Sockets.Interfaces /// /// Handle a message /// - CallResult? Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object result, MessageRoute route); + bool Handle(string? topicFilter, SocketConnection connection, DateTime receiveTime, string? originalData, object result, RouteMapEntry route); } } diff --git a/CryptoExchange.Net/Sockets/MessageRouter.cs b/CryptoExchange.Net/Sockets/MessageRouter.cs index 95f44a4..54add04 100644 --- a/CryptoExchange.Net/Sockets/MessageRouter.cs +++ b/CryptoExchange.Net/Sockets/MessageRouter.cs @@ -9,6 +9,82 @@ using System.Linq; namespace CryptoExchange.Net.Sockets { + public record RouteMapEntry + { + public Type RouteType { get; } + public bool MultipleReaders { get; private set; } + + private List _routesWithoutTopicFilter; + private Dictionary> _routesWithTopicFilter; + + public RouteMapEntry(Type routeType) + { + _routesWithoutTopicFilter = new List(); + _routesWithTopicFilter = new Dictionary>(); + + RouteType = routeType; + } + + public void AddRoute(string? topicFilter, MessageRoute route) + { + if (string.IsNullOrEmpty(topicFilter)) + { + _routesWithoutTopicFilter.Add(route); + } + else + { + if (!_routesWithTopicFilter.TryGetValue(topicFilter!, out var list)) + { + list = new List(); + _routesWithTopicFilter.Add(topicFilter!, list); + } + + list.Add(route); + } + + if (route.MultipleReaders) + MultipleReaders = true; + } + + internal bool Handle(string? topicFilter, SocketConnection connection, DateTime receiveTime, string? originalData, object data, out CallResult? result) + { + result = null; + + // Routes without topic filter handle both when the message topic is empty and when it is not, so we always call them + var handled = false; + foreach (var route in _routesWithoutTopicFilter) + { + var thisResult = route.Handle(connection, receiveTime, originalData, data); + if (thisResult != null) + result ??= thisResult; + + handled = true; + } + + // Forward to routes with matching topic filter, if any + if (topicFilter != null) + { + _routesWithTopicFilter.TryGetValue(topicFilter, out var matchingTopicRoutes); + foreach (var route in matchingTopicRoutes ?? []) + { + var thisResult = route.Handle(connection, receiveTime, originalData, data); + handled = true; + + if (thisResult != null) + { + result ??= thisResult; + + if (!MultipleReaders) + break; + } + + } + } + + return handled; + } + } + /// /// Message router /// @@ -20,9 +96,10 @@ namespace CryptoExchange.Net.Sockets public MessageRoute[] Routes { get; } #if NET8_0_OR_GREATER - private FrozenDictionary>? _routeMap; + // Used for mapping a type identifier to the routes matching it + private FrozenDictionary? _routeMap; #else - private Dictionary>? _routeMap; + private Dictionary? _routeMap; #endif /// @@ -38,12 +115,16 @@ namespace CryptoExchange.Net.Sockets /// public void BuildRouteMap() { - var newMap = new Dictionary>(); + var newMap = new Dictionary(); foreach (var route in Routes) { - if (!newMap.ContainsKey(route.TypeIdentifier)) - newMap.Add(route.TypeIdentifier, new List()); - newMap[route.TypeIdentifier].Add(route); + if (!newMap.TryGetValue(route.TypeIdentifier, out var typeMap)) + { + typeMap = new RouteMapEntry(route.DeserializationType); + newMap.Add(route.TypeIdentifier, typeMap); + } + + typeMap.AddRoute(route.TopicFilter, route); } #if NET8_0_OR_GREATER @@ -56,9 +137,9 @@ namespace CryptoExchange.Net.Sockets /// /// Get routes matching the type identifier /// - public List this[string identifier] + public RouteMapEntry? this[string identifier] { - get => (_routeMap ?? throw new InvalidOperationException("Route map not initialized before use")).TryGetValue(identifier, out var routes) ? routes: []; + get => (_routeMap ?? throw new InvalidOperationException("Route map not initialized before use")).TryGetValue(identifier, out var routes) ? routes: null; } /// diff --git a/CryptoExchange.Net/Sockets/Query.cs b/CryptoExchange.Net/Sockets/Query.cs index 49e3f3c..d67e497 100644 --- a/CryptoExchange.Net/Sockets/Query.cs +++ b/CryptoExchange.Net/Sockets/Query.cs @@ -164,7 +164,7 @@ namespace CryptoExchange.Net.Sockets /// /// Handle a response message /// - public abstract CallResult Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object message, MessageRoute route); + public abstract bool Handle(string? topicFilter, SocketConnection connection, DateTime receiveTime, string? originalData, object message, RouteMapEntry routeMap); } @@ -194,7 +194,7 @@ namespace CryptoExchange.Net.Sockets } /// - public override CallResult Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object message, MessageRoute route) + public override bool Handle(string? topicFilter, SocketConnection connection, DateTime receiveTime, string? originalData, object message, RouteMapEntry routeMap) { CurrentResponses++; if (CurrentResponses == RequiredResponses) @@ -203,7 +203,8 @@ namespace CryptoExchange.Net.Sockets if (Result?.Success != false) { // If an error result is already set don't override that - Result = route.Handle(connection, receiveTime, originalData, message); + routeMap.Handle(topicFilter, connection, receiveTime, originalData, message, out var result); + Result = result; if (Result == null) // Null from Handle means it wasn't actually for this query CurrentResponses -= 1; @@ -216,7 +217,7 @@ namespace CryptoExchange.Net.Sockets OnComplete?.Invoke(); } - return Result ?? CallResult.SuccessResult; + return true; } ///