1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-12-14 18:00:26 +00:00
This commit is contained in:
Jkorf 2025-12-12 16:00:38 +01:00
parent f49e48b18d
commit 8521e556a3
7 changed files with 96 additions and 37 deletions

View File

@ -135,7 +135,10 @@ namespace CryptoExchange.Net.Clients
/// <inheritdoc /> /// <inheritdoc />
public new SocketApiOptions ApiOptions => (SocketApiOptions)base.ApiOptions; public new SocketApiOptions ApiOptions => (SocketApiOptions)base.ApiOptions;
public int? MaxSubscriptionsPerConnection { get; set; } /// <summary>
/// The max number of individual subscriptions on a single connection
/// </summary>
public int? MaxIndividualSubscriptionsPerConnection { get; set; }
#endregion #endregion
@ -225,6 +228,9 @@ namespace CryptoExchange.Net.Clients
return new CallResult<UpdateSubscription>(new NoApiCredentialsError()); return new CallResult<UpdateSubscription>(new NoApiCredentialsError());
} }
if (subscription.IndividualSubscriptionCount > MaxIndividualSubscriptionsPerConnection)
return new CallResult<UpdateSubscription>(ArgumentError.Invalid("subscriptions", $"Max number of subscriptions in a single call is {MaxIndividualSubscriptionsPerConnection}"));
SocketConnection socketConnection; SocketConnection socketConnection;
var released = false; var released = false;
// Wait for a semaphore here, so we only connect 1 socket at a time. // Wait for a semaphore here, so we only connect 1 socket at a time.
@ -680,22 +686,29 @@ namespace CryptoExchange.Net.Clients
connection.DedicatedRequestConnection.Authenticated = authenticated; connection.DedicatedRequestConnection.Authenticated = authenticated;
} }
bool maxConnectionsReached = _socketConnections.Count >= (ApiOptions.MaxSocketConnections ?? ClientOptions.MaxSocketConnections);
if (connection != null) if (connection != null)
{ {
if (connection.UserSubscriptionCount < ClientOptions.SocketSubscriptionsCombineTarget bool lessThanBatchSubCombineTarget = connection.UserSubscriptionCount < ClientOptions.SocketSubscriptionsCombineTarget;
|| (_socketConnections.Count >= (ApiOptions.MaxSocketConnections ?? ClientOptions.MaxSocketConnections) && _socketConnections.All(s => s.Value.UserSubscriptionCount >= ClientOptions.SocketSubscriptionsCombineTarget))) bool lessThanIndividualSubCombineTarget = connection.Subscriptions.Sum(x => x.IndividualSubscriptionCount) < ClientOptions.SocketIndividualSubscriptionCombineTarget;
if ((lessThanBatchSubCombineTarget && lessThanIndividualSubCombineTarget)
|| maxConnectionsReached)
{ {
// Use existing socket if it has less than target connections OR it has the least connections and we can't make new // Use existing socket if it has less than target connections OR it has the least connections and we can't make new
// If there is a max subscriptions per connection limit also only use existing if the new subscription doesn't go over the limit // If there is a max subscriptions per connection limit also only use existing if the new subscription doesn't go over the limit
if (MaxSubscriptionsPerConnection == null) if (MaxIndividualSubscriptionsPerConnection == null)
return new CallResult<SocketConnection>(connection); return new CallResult<SocketConnection>(connection);
var currentCount = connection.Subscriptions.Sum(x => x.IndividualSubscriptionCount); var currentCount = connection.Subscriptions.Sum(x => x.IndividualSubscriptionCount);
if (currentCount + individualSubscriptionCount <= MaxSubscriptionsPerConnection) if (currentCount + individualSubscriptionCount <= MaxIndividualSubscriptionsPerConnection)
return new CallResult<SocketConnection>(connection); return new CallResult<SocketConnection>(connection);
} }
} }
if (maxConnectionsReached)
return new CallResult<SocketConnection>(new InvalidOperationError("Max amount of socket connections reached"));
var connectionAddress = await GetConnectionUrlAsync(address, authenticated).ConfigureAwait(false); var connectionAddress = await GetConnectionUrlAsync(address, authenticated).ConfigureAwait(false);
if (!connectionAddress) if (!connectionAddress)
{ {

View File

@ -80,9 +80,11 @@ namespace CryptoExchange.Net.Converters.SystemTextJson
} }
#if NET8_0_OR_GREATER #if NET8_0_OR_GREATER
private static FrozenSet<EnumMapping>? _mapping = null; private static FrozenSet<EnumMapping>? _mappingToEnum = null;
private static FrozenDictionary<T, string>? _mappingToString = null;
#else #else
private static List<EnumMapping>? _mapping = null; private static List<EnumMapping>? _mappingToEnum = null;
private static Dictionary<T, string>? _mappingToString = null;
#endif #endif
private NullableEnumConverter? _nullableEnumConverter = null; private NullableEnumConverter? _nullableEnumConverter = null;
@ -138,8 +140,8 @@ namespace CryptoExchange.Net.Converters.SystemTextJson
{ {
isEmptyString = false; isEmptyString = false;
var enumType = typeof(T); var enumType = typeof(T);
if (_mapping == null) if (_mappingToEnum == null)
_mapping = AddMapping(); CreateMapping();
var stringValue = reader.TokenType switch var stringValue = reader.TokenType switch
{ {
@ -166,7 +168,7 @@ namespace CryptoExchange.Net.Converters.SystemTextJson
if (!_unknownValuesWarned.Contains(stringValue)) if (!_unknownValuesWarned.Contains(stringValue))
{ {
_unknownValuesWarned.Add(stringValue!); _unknownValuesWarned.Add(stringValue!);
LibraryHelpers.StaticLogger?.LogWarning($"Cannot map enum value. EnumType: {enumType.FullName}, Value: {stringValue}, Known values: {string.Join(", ", _mapping.Select(m => m.Value))}. If you think {stringValue} should added please open an issue on the Github repo"); LibraryHelpers.StaticLogger?.LogWarning($"Cannot map enum value. EnumType: {enumType.FullName}, Value: {stringValue}, Known values: {string.Join(", ", _mappingToEnum!.Select(m => m.Value))}. If you think {stringValue} should added please open an issue on the Github repo");
} }
} }
@ -185,11 +187,11 @@ namespace CryptoExchange.Net.Converters.SystemTextJson
private static bool GetValue(Type objectType, string value, out T? result) private static bool GetValue(Type objectType, string value, out T? result)
{ {
if (_mapping != null) if (_mappingToEnum != null)
{ {
EnumMapping? mapping = null; EnumMapping? mapping = null;
// Try match on full equals // Try match on full equals
foreach (var item in _mapping) foreach (var item in _mappingToEnum)
{ {
if (item.StringValue.Equals(value, StringComparison.Ordinal)) if (item.StringValue.Equals(value, StringComparison.Ordinal))
mapping = item; mapping = item;
@ -198,7 +200,7 @@ namespace CryptoExchange.Net.Converters.SystemTextJson
// If not found, try matching ignoring case // If not found, try matching ignoring case
if (mapping == null) if (mapping == null)
{ {
foreach (var item in _mapping) foreach (var item in _mappingToEnum)
{ {
if (item.StringValue.Equals(value, StringComparison.OrdinalIgnoreCase)) if (item.StringValue.Equals(value, StringComparison.OrdinalIgnoreCase))
mapping = item; mapping = item;
@ -247,13 +249,11 @@ namespace CryptoExchange.Net.Converters.SystemTextJson
} }
} }
#if NET8_0_OR_GREATER private static void CreateMapping()
private static FrozenSet<EnumMapping> AddMapping()
#else
private static List<EnumMapping> AddMapping()
#endif
{ {
var mapping = new List<EnumMapping>(); var mappingToEnum = new List<EnumMapping>();
var mappingToString = new Dictionary<T, string>();
var enumType = Nullable.GetUnderlyingType(typeof(T)) ?? typeof(T); var enumType = Nullable.GetUnderlyingType(typeof(T)) ?? typeof(T);
var enumMembers = enumType.GetFields(); var enumMembers = enumType.GetFields();
foreach (var member in enumMembers) foreach (var member in enumMembers)
@ -262,14 +262,21 @@ namespace CryptoExchange.Net.Converters.SystemTextJson
foreach (MapAttribute attribute in maps) foreach (MapAttribute attribute in maps)
{ {
foreach (var value in attribute.Values) foreach (var value in attribute.Values)
mapping.Add(new EnumMapping((T)Enum.Parse(enumType, member.Name), value)); {
var enumVal = (T)Enum.Parse(enumType, member.Name);
mappingToEnum.Add(new EnumMapping(enumVal, value));
if (!mappingToString.ContainsKey(enumVal))
mappingToString.Add(enumVal, value);
}
} }
} }
#if NET8_0_OR_GREATER #if NET8_0_OR_GREATER
return mapping.ToFrozenSet(); _mappingToEnum = mappingToEnum.ToFrozenSet();
_mappingToString = mappingToString.ToFrozenDictionary();
#else #else
return mapping; _mappingToEnum = mappingToEnum;
_mappingToString = mappingToString;
#endif #endif
} }
@ -281,10 +288,10 @@ namespace CryptoExchange.Net.Converters.SystemTextJson
[return: NotNullIfNotNull("enumValue")] [return: NotNullIfNotNull("enumValue")]
public static string? GetString(T? enumValue) public static string? GetString(T? enumValue)
{ {
if (_mapping == null) if (_mappingToString == null)
_mapping = AddMapping(); CreateMapping();
return enumValue == null ? null : (_mapping.FirstOrDefault(v => v.Value.Equals(enumValue))?.StringValue ?? enumValue.ToString()); return enumValue == null ? null : (_mappingToString!.TryGetValue(enumValue.Value, out var str) ? str : enumValue.ToString());
} }
/// <summary> /// <summary>
@ -295,12 +302,12 @@ namespace CryptoExchange.Net.Converters.SystemTextJson
public static T? ParseString(string value) public static T? ParseString(string value)
{ {
var type = Nullable.GetUnderlyingType(typeof(T)) ?? typeof(T); var type = Nullable.GetUnderlyingType(typeof(T)) ?? typeof(T);
if (_mapping == null) if (_mappingToEnum == null)
_mapping = AddMapping(); CreateMapping();
EnumMapping? mapping = null; EnumMapping? mapping = null;
// Try match on full equals // Try match on full equals
foreach(var item in _mapping) foreach(var item in _mappingToEnum!)
{ {
if (item.StringValue.Equals(value, StringComparison.Ordinal)) if (item.StringValue.Equals(value, StringComparison.Ordinal))
mapping = item; mapping = item;
@ -309,7 +316,7 @@ namespace CryptoExchange.Net.Converters.SystemTextJson
// If not found, try matching ignoring case // If not found, try matching ignoring case
if (mapping == null) if (mapping == null)
{ {
foreach (var item in _mapping) foreach (var item in _mappingToEnum)
{ {
if (item.StringValue.Equals(value, StringComparison.OrdinalIgnoreCase)) if (item.StringValue.Equals(value, StringComparison.OrdinalIgnoreCase))
mapping = item; mapping = item;

View File

@ -32,10 +32,24 @@ namespace CryptoExchange.Net.Objects.Options
/// <summary> /// <summary>
/// The amount of subscriptions that should be made on a single socket connection. Not all API's support multiple subscriptions on a single socket. /// The amount of subscriptions that should be made on a single socket connection. Not all API's support multiple subscriptions on a single socket.
/// Setting this to a higher number increases subscription speed because not every subscription needs to connect to the server, but having more subscriptions on a /// Setting this to a higher number increases subscription speed because not every subscription needs to connect to the server, but having more subscriptions on a
/// single connection will also increase the amount of traffic on that single connection, potentially leading to issues. /// single connection will also increase the amount of traffic on that single connection, potentially leading to issues or delays.
/// <para>
/// This setting counts each Subscribe request as one instead of counting the individual subscriptions as <see cref="SocketIndividualSubscriptionCombineTarget"/> does
/// </para>
/// </summary> /// </summary>
public int? SocketSubscriptionsCombineTarget { get; set; } public int? SocketSubscriptionsCombineTarget { get; set; }
/// <summary>
/// The amount of subscriptions that should be made on a single socket connection. Not all API's support multiple subscriptions on a single socket.
/// Setting this to a higher number increases subscription speed because not every subscription needs to connect to the server, but having more subscriptions on a
/// single connection will also increase the amount of traffic on that single connection, potentially leading to issues or delays.
/// <para>
/// This setting counts the individual subscriptions in a request instead of counting subscriptions in batched request as one as <see cref="SocketSubscriptionsCombineTarget"/> does.
/// </para>
/// <para>Defaults to 20</para>
/// </summary>
public int SocketIndividualSubscriptionCombineTarget { get; set; } = 20;
/// <summary> /// <summary>
/// The max amount of connections to make to the server. Can be used for API's which only allow a certain number of connections. Changing this to a high value might cause issues. /// The max amount of connections to make to the server. Can be used for API's which only allow a certain number of connections. Changing this to a high value might cause issues.
/// </summary> /// </summary>

View File

@ -616,19 +616,34 @@ namespace CryptoExchange.Net.Sockets.Default
throw new Exception("Listeners list adjusted, can't continue processing"); throw new Exception("Listeners list adjusted, can't continue processing");
} }
var subscription = _listeners[i]; var processor = _listeners[i];
foreach (var route in subscription.MessageRouter.Routes) bool isQuery = false;
Query? query = null;
if (processor is Query cquery)
{
isQuery = true;
query = cquery;
}
foreach (var route in processor.MessageRouter.Routes)
{ {
if (route.TypeIdentifier != typeIdentifier) if (route.TypeIdentifier != typeIdentifier)
continue; continue;
if (topicFilter == null || route.TopicFilter == null || route.TopicFilter.Equals(topicFilter, StringComparison.Ordinal)) if (topicFilter == null
|| route.TopicFilter == null
|| route.TopicFilter.Equals(topicFilter, StringComparison.Ordinal))
{ {
processed = true; if (isQuery && query!.Completed)
subscription.Handle(this, receiveTime, originalData, result, route); continue;
}
processed = true;
processor.Handle(this, receiveTime, originalData, result, route);
}
} }
if (processed && isQuery && !query!.MultipleReaders)
break;
} }
} }

View File

@ -109,7 +109,10 @@ namespace CryptoExchange.Net.Sockets.Default
/// </summary> /// </summary>
public Query? UnsubscriptionQuery { get; private set; } public Query? UnsubscriptionQuery { get; private set; }
public int IndividualSubscriptionCount { get; set; } /// <summary>
/// The number of individual streams in this subscription
/// </summary>
public int IndividualSubscriptionCount { get; set; } = 1;
/// <summary> /// <summary>
/// ctor /// ctor

View File

@ -16,6 +16,8 @@ namespace CryptoExchange.Net.Sockets.Default
public SystemSubscription(ILogger logger, bool authenticated = false) : base(logger, authenticated, false) public SystemSubscription(ILogger logger, bool authenticated = false) : base(logger, authenticated, false)
{ {
Status = SubscriptionStatus.Subscribed; Status = SubscriptionStatus.Subscribed;
IndividualSubscriptionCount = 0;
} }
/// <inheritdoc /> /// <inheritdoc />

View File

@ -89,6 +89,11 @@ namespace CryptoExchange.Net.Sockets
/// </summary> /// </summary>
public bool ExpectsResponse { get; set; } = true; public bool ExpectsResponse { get; set; } = true;
/// <summary>
/// Whether responses to this query might be read by multiple listeners
/// </summary>
public bool MultipleReaders { get; set; } = false;
/// <summary> /// <summary>
/// Wait event for response /// Wait event for response
/// </summary> /// </summary>