From 8521e556a3f66443b09a4e7f1cc227a545b7a6f8 Mon Sep 17 00:00:00 2001 From: Jkorf Date: Fri, 12 Dec 2025 16:00:38 +0100 Subject: [PATCH] wip --- CryptoExchange.Net/Clients/SocketApiClient.cs | 23 ++++++-- .../SystemTextJson/EnumConverter.cs | 55 +++++++++++-------- .../Objects/Options/SocketExchangeOptions.cs | 16 +++++- .../Sockets/Default/SocketConnection.cs | 27 +++++++-- .../Sockets/Default/Subscription.cs | 5 +- .../Sockets/Default/SystemSubscription.cs | 2 + CryptoExchange.Net/Sockets/Query.cs | 5 ++ 7 files changed, 96 insertions(+), 37 deletions(-) diff --git a/CryptoExchange.Net/Clients/SocketApiClient.cs b/CryptoExchange.Net/Clients/SocketApiClient.cs index c4a0caf..19f6b09 100644 --- a/CryptoExchange.Net/Clients/SocketApiClient.cs +++ b/CryptoExchange.Net/Clients/SocketApiClient.cs @@ -135,7 +135,10 @@ namespace CryptoExchange.Net.Clients /// public new SocketApiOptions ApiOptions => (SocketApiOptions)base.ApiOptions; - public int? MaxSubscriptionsPerConnection { get; set; } + /// + /// The max number of individual subscriptions on a single connection + /// + public int? MaxIndividualSubscriptionsPerConnection { get; set; } #endregion @@ -225,6 +228,9 @@ namespace CryptoExchange.Net.Clients return new CallResult(new NoApiCredentialsError()); } + if (subscription.IndividualSubscriptionCount > MaxIndividualSubscriptionsPerConnection) + return new CallResult(ArgumentError.Invalid("subscriptions", $"Max number of subscriptions in a single call is {MaxIndividualSubscriptionsPerConnection}")); + SocketConnection socketConnection; var released = false; // Wait for a semaphore here, so we only connect 1 socket at a time. @@ -680,22 +686,29 @@ namespace CryptoExchange.Net.Clients connection.DedicatedRequestConnection.Authenticated = authenticated; } + bool maxConnectionsReached = _socketConnections.Count >= (ApiOptions.MaxSocketConnections ?? ClientOptions.MaxSocketConnections); if (connection != null) { - if (connection.UserSubscriptionCount < ClientOptions.SocketSubscriptionsCombineTarget - || (_socketConnections.Count >= (ApiOptions.MaxSocketConnections ?? ClientOptions.MaxSocketConnections) && _socketConnections.All(s => s.Value.UserSubscriptionCount >= ClientOptions.SocketSubscriptionsCombineTarget))) + bool lessThanBatchSubCombineTarget = connection.UserSubscriptionCount < ClientOptions.SocketSubscriptionsCombineTarget; + bool lessThanIndividualSubCombineTarget = connection.Subscriptions.Sum(x => x.IndividualSubscriptionCount) < ClientOptions.SocketIndividualSubscriptionCombineTarget; + + if ((lessThanBatchSubCombineTarget && lessThanIndividualSubCombineTarget) + || maxConnectionsReached) { // Use existing socket if it has less than target connections OR it has the least connections and we can't make new // If there is a max subscriptions per connection limit also only use existing if the new subscription doesn't go over the limit - if (MaxSubscriptionsPerConnection == null) + if (MaxIndividualSubscriptionsPerConnection == null) return new CallResult(connection); var currentCount = connection.Subscriptions.Sum(x => x.IndividualSubscriptionCount); - if (currentCount + individualSubscriptionCount <= MaxSubscriptionsPerConnection) + if (currentCount + individualSubscriptionCount <= MaxIndividualSubscriptionsPerConnection) return new CallResult(connection); } } + if (maxConnectionsReached) + return new CallResult(new InvalidOperationError("Max amount of socket connections reached")); + var connectionAddress = await GetConnectionUrlAsync(address, authenticated).ConfigureAwait(false); if (!connectionAddress) { diff --git a/CryptoExchange.Net/Converters/SystemTextJson/EnumConverter.cs b/CryptoExchange.Net/Converters/SystemTextJson/EnumConverter.cs index d2f78e6..944138b 100644 --- a/CryptoExchange.Net/Converters/SystemTextJson/EnumConverter.cs +++ b/CryptoExchange.Net/Converters/SystemTextJson/EnumConverter.cs @@ -80,9 +80,11 @@ namespace CryptoExchange.Net.Converters.SystemTextJson } #if NET8_0_OR_GREATER - private static FrozenSet? _mapping = null; + private static FrozenSet? _mappingToEnum = null; + private static FrozenDictionary? _mappingToString = null; #else - private static List? _mapping = null; + private static List? _mappingToEnum = null; + private static Dictionary? _mappingToString = null; #endif private NullableEnumConverter? _nullableEnumConverter = null; @@ -138,8 +140,8 @@ namespace CryptoExchange.Net.Converters.SystemTextJson { isEmptyString = false; var enumType = typeof(T); - if (_mapping == null) - _mapping = AddMapping(); + if (_mappingToEnum == null) + CreateMapping(); var stringValue = reader.TokenType switch { @@ -166,7 +168,7 @@ namespace CryptoExchange.Net.Converters.SystemTextJson if (!_unknownValuesWarned.Contains(stringValue)) { _unknownValuesWarned.Add(stringValue!); - LibraryHelpers.StaticLogger?.LogWarning($"Cannot map enum value. EnumType: {enumType.FullName}, Value: {stringValue}, Known values: {string.Join(", ", _mapping.Select(m => m.Value))}. If you think {stringValue} should added please open an issue on the Github repo"); + LibraryHelpers.StaticLogger?.LogWarning($"Cannot map enum value. EnumType: {enumType.FullName}, Value: {stringValue}, Known values: {string.Join(", ", _mappingToEnum!.Select(m => m.Value))}. If you think {stringValue} should added please open an issue on the Github repo"); } } @@ -185,11 +187,11 @@ namespace CryptoExchange.Net.Converters.SystemTextJson private static bool GetValue(Type objectType, string value, out T? result) { - if (_mapping != null) + if (_mappingToEnum != null) { EnumMapping? mapping = null; // Try match on full equals - foreach (var item in _mapping) + foreach (var item in _mappingToEnum) { if (item.StringValue.Equals(value, StringComparison.Ordinal)) mapping = item; @@ -198,7 +200,7 @@ namespace CryptoExchange.Net.Converters.SystemTextJson // If not found, try matching ignoring case if (mapping == null) { - foreach (var item in _mapping) + foreach (var item in _mappingToEnum) { if (item.StringValue.Equals(value, StringComparison.OrdinalIgnoreCase)) mapping = item; @@ -247,13 +249,11 @@ namespace CryptoExchange.Net.Converters.SystemTextJson } } -#if NET8_0_OR_GREATER - private static FrozenSet AddMapping() -#else - private static List AddMapping() -#endif + private static void CreateMapping() { - var mapping = new List(); + var mappingToEnum = new List(); + var mappingToString = new Dictionary(); + var enumType = Nullable.GetUnderlyingType(typeof(T)) ?? typeof(T); var enumMembers = enumType.GetFields(); foreach (var member in enumMembers) @@ -262,14 +262,21 @@ namespace CryptoExchange.Net.Converters.SystemTextJson foreach (MapAttribute attribute in maps) { foreach (var value in attribute.Values) - mapping.Add(new EnumMapping((T)Enum.Parse(enumType, member.Name), value)); + { + var enumVal = (T)Enum.Parse(enumType, member.Name); + mappingToEnum.Add(new EnumMapping(enumVal, value)); + if (!mappingToString.ContainsKey(enumVal)) + mappingToString.Add(enumVal, value); + } } } #if NET8_0_OR_GREATER - return mapping.ToFrozenSet(); + _mappingToEnum = mappingToEnum.ToFrozenSet(); + _mappingToString = mappingToString.ToFrozenDictionary(); #else - return mapping; + _mappingToEnum = mappingToEnum; + _mappingToString = mappingToString; #endif } @@ -281,10 +288,10 @@ namespace CryptoExchange.Net.Converters.SystemTextJson [return: NotNullIfNotNull("enumValue")] public static string? GetString(T? enumValue) { - if (_mapping == null) - _mapping = AddMapping(); + if (_mappingToString == null) + CreateMapping(); - return enumValue == null ? null : (_mapping.FirstOrDefault(v => v.Value.Equals(enumValue))?.StringValue ?? enumValue.ToString()); + return enumValue == null ? null : (_mappingToString!.TryGetValue(enumValue.Value, out var str) ? str : enumValue.ToString()); } /// @@ -295,12 +302,12 @@ namespace CryptoExchange.Net.Converters.SystemTextJson public static T? ParseString(string value) { var type = Nullable.GetUnderlyingType(typeof(T)) ?? typeof(T); - if (_mapping == null) - _mapping = AddMapping(); + if (_mappingToEnum == null) + CreateMapping(); EnumMapping? mapping = null; // Try match on full equals - foreach(var item in _mapping) + foreach(var item in _mappingToEnum!) { if (item.StringValue.Equals(value, StringComparison.Ordinal)) mapping = item; @@ -309,7 +316,7 @@ namespace CryptoExchange.Net.Converters.SystemTextJson // If not found, try matching ignoring case if (mapping == null) { - foreach (var item in _mapping) + foreach (var item in _mappingToEnum) { if (item.StringValue.Equals(value, StringComparison.OrdinalIgnoreCase)) mapping = item; diff --git a/CryptoExchange.Net/Objects/Options/SocketExchangeOptions.cs b/CryptoExchange.Net/Objects/Options/SocketExchangeOptions.cs index 6ad56ad..8abe09d 100644 --- a/CryptoExchange.Net/Objects/Options/SocketExchangeOptions.cs +++ b/CryptoExchange.Net/Objects/Options/SocketExchangeOptions.cs @@ -32,10 +32,24 @@ namespace CryptoExchange.Net.Objects.Options /// /// The amount of subscriptions that should be made on a single socket connection. Not all API's support multiple subscriptions on a single socket. /// Setting this to a higher number increases subscription speed because not every subscription needs to connect to the server, but having more subscriptions on a - /// single connection will also increase the amount of traffic on that single connection, potentially leading to issues. + /// single connection will also increase the amount of traffic on that single connection, potentially leading to issues or delays. + /// + /// This setting counts each Subscribe request as one instead of counting the individual subscriptions as does + /// /// public int? SocketSubscriptionsCombineTarget { get; set; } + /// + /// The amount of subscriptions that should be made on a single socket connection. Not all API's support multiple subscriptions on a single socket. + /// Setting this to a higher number increases subscription speed because not every subscription needs to connect to the server, but having more subscriptions on a + /// single connection will also increase the amount of traffic on that single connection, potentially leading to issues or delays. + /// + /// This setting counts the individual subscriptions in a request instead of counting subscriptions in batched request as one as does. + /// + /// Defaults to 20 + /// + public int SocketIndividualSubscriptionCombineTarget { get; set; } = 20; + /// /// The max amount of connections to make to the server. Can be used for API's which only allow a certain number of connections. Changing this to a high value might cause issues. /// diff --git a/CryptoExchange.Net/Sockets/Default/SocketConnection.cs b/CryptoExchange.Net/Sockets/Default/SocketConnection.cs index f4903df..9ddbc34 100644 --- a/CryptoExchange.Net/Sockets/Default/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/Default/SocketConnection.cs @@ -616,19 +616,34 @@ namespace CryptoExchange.Net.Sockets.Default throw new Exception("Listeners list adjusted, can't continue processing"); } - var subscription = _listeners[i]; - foreach (var route in subscription.MessageRouter.Routes) + var processor = _listeners[i]; + bool isQuery = false; + Query? query = null; + if (processor is Query cquery) + { + isQuery = true; + query = cquery; + } + + foreach (var route in processor.MessageRouter.Routes) { if (route.TypeIdentifier != typeIdentifier) continue; - if (topicFilter == null || route.TopicFilter == null || route.TopicFilter.Equals(topicFilter, StringComparison.Ordinal)) + if (topicFilter == null + || route.TopicFilter == null + || route.TopicFilter.Equals(topicFilter, StringComparison.Ordinal)) { - processed = true; - subscription.Handle(this, receiveTime, originalData, result, route); - } + if (isQuery && query!.Completed) + continue; + processed = true; + processor.Handle(this, receiveTime, originalData, result, route); + } } + + if (processed && isQuery && !query!.MultipleReaders) + break; } } diff --git a/CryptoExchange.Net/Sockets/Default/Subscription.cs b/CryptoExchange.Net/Sockets/Default/Subscription.cs index 700d30f..9305755 100644 --- a/CryptoExchange.Net/Sockets/Default/Subscription.cs +++ b/CryptoExchange.Net/Sockets/Default/Subscription.cs @@ -109,7 +109,10 @@ namespace CryptoExchange.Net.Sockets.Default /// public Query? UnsubscriptionQuery { get; private set; } - public int IndividualSubscriptionCount { get; set; } + /// + /// The number of individual streams in this subscription + /// + public int IndividualSubscriptionCount { get; set; } = 1; /// /// ctor diff --git a/CryptoExchange.Net/Sockets/Default/SystemSubscription.cs b/CryptoExchange.Net/Sockets/Default/SystemSubscription.cs index 0d610aa..ea1379a 100644 --- a/CryptoExchange.Net/Sockets/Default/SystemSubscription.cs +++ b/CryptoExchange.Net/Sockets/Default/SystemSubscription.cs @@ -16,6 +16,8 @@ namespace CryptoExchange.Net.Sockets.Default public SystemSubscription(ILogger logger, bool authenticated = false) : base(logger, authenticated, false) { Status = SubscriptionStatus.Subscribed; + + IndividualSubscriptionCount = 0; } /// diff --git a/CryptoExchange.Net/Sockets/Query.cs b/CryptoExchange.Net/Sockets/Query.cs index f734f08..e569646 100644 --- a/CryptoExchange.Net/Sockets/Query.cs +++ b/CryptoExchange.Net/Sockets/Query.cs @@ -89,6 +89,11 @@ namespace CryptoExchange.Net.Sockets /// public bool ExpectsResponse { get; set; } = true; + /// + /// Whether responses to this query might be read by multiple listeners + /// + public bool MultipleReaders { get; set; } = false; + /// /// Wait event for response ///