diff --git a/CryptoExchange.Net.UnitTests/TestImplementations/Sockets/TestSubscription.cs b/CryptoExchange.Net.UnitTests/TestImplementations/Sockets/TestSubscription.cs index 330d997..d5829a6 100644 --- a/CryptoExchange.Net.UnitTests/TestImplementations/Sockets/TestSubscription.cs +++ b/CryptoExchange.Net.UnitTests/TestImplementations/Sockets/TestSubscription.cs @@ -20,7 +20,7 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations.Sockets public CallResult DoHandleMessage(SocketConnection connection, DateTime receiveTime, string? originalData, T message) { - _handler.Invoke(new DataEvent(message, receiveTime, originalData)); + _handler.Invoke(new DataEvent("Test", message, receiveTime, originalData)); return new CallResult(null); } diff --git a/CryptoExchange.Net.UnitTests/TestImplementations/Sockets/TestSubscriptionWithResponseCheck.cs b/CryptoExchange.Net.UnitTests/TestImplementations/Sockets/TestSubscriptionWithResponseCheck.cs index 4778728..d1c6ddb 100644 --- a/CryptoExchange.Net.UnitTests/TestImplementations/Sockets/TestSubscriptionWithResponseCheck.cs +++ b/CryptoExchange.Net.UnitTests/TestImplementations/Sockets/TestSubscriptionWithResponseCheck.cs @@ -22,7 +22,7 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations.Sockets public CallResult DoHandleMessage(SocketConnection connection, DateTime receiveTime, string? originalData, T message) { - _handler.Invoke(new DataEvent(message, receiveTime, originalData)); + _handler.Invoke(new DataEvent("Test", message, receiveTime, originalData)); return new CallResult(null); } diff --git a/CryptoExchange.Net/Clients/SocketApiClient.cs b/CryptoExchange.Net/Clients/SocketApiClient.cs index 19f6b09..31ee886 100644 --- a/CryptoExchange.Net/Clients/SocketApiClient.cs +++ b/CryptoExchange.Net/Clients/SocketApiClient.cs @@ -627,6 +627,7 @@ namespace CryptoExchange.Net.Clients /// Whether a dedicated request connection should be returned /// Cancellation token /// The subscription topic, can be provided when multiple of the same topics are not allowed on a connection + /// The number of individual subscriptions in this subscribe request /// protected virtual async Task> GetSocketConnection( string address, diff --git a/CryptoExchange.Net/Converters/SystemTextJson/ArrayConverter.cs b/CryptoExchange.Net/Converters/SystemTextJson/ArrayConverter.cs index d3d611a..51793cb 100644 --- a/CryptoExchange.Net/Converters/SystemTextJson/ArrayConverter.cs +++ b/CryptoExchange.Net/Converters/SystemTextJson/ArrayConverter.cs @@ -21,8 +21,9 @@ namespace CryptoExchange.Net.Converters.SystemTextJson public class ArrayConverter : JsonConverter where T : new() #endif { - private static readonly Lazy> _typePropertyInfo = new Lazy>(CacheTypeAttributes, LazyThreadSafetyMode.PublicationOnly); - + //private static readonly Lazy> _typePropertyInfo = new Lazy>(CacheTypeAttributes, LazyThreadSafetyMode.PublicationOnly); + private static SortedDictionary>? _typePropertyInfo; + /// #if NET5_0_OR_GREATER [UnconditionalSuppressMessage("AssemblyLoadTrimming", "IL3050:RequiresUnreferencedCode", Justification = "JsonSerializerOptions provided here has TypeInfoResolver set")] @@ -36,54 +37,59 @@ namespace CryptoExchange.Net.Converters.SystemTextJson return; } + if (_typePropertyInfo == null) + _typePropertyInfo = CacheTypeAttributes(); + writer.WriteStartArray(); - - var ordered = _typePropertyInfo.Value.Where(x => x.ArrayProperty != null).OrderBy(p => p.ArrayProperty.Index); var last = -1; - foreach (var prop in ordered) + foreach (var indexProps in _typePropertyInfo) { - if (prop.ArrayProperty.Index == last) - continue; - - while (prop.ArrayProperty.Index != last + 1) + foreach (var prop in indexProps.Value) { - writer.WriteNullValue(); - last += 1; - } + if (prop.ArrayProperty.Index == last) + // Don't write the same index twice + continue; - last = prop.ArrayProperty.Index; - - var objValue = prop.PropertyInfo.GetValue(value); - if (objValue == null) - { - writer.WriteNullValue(); - continue; - } - - JsonSerializerOptions? typeOptions = null; - if (prop.JsonConverter != null) - { - typeOptions = new JsonSerializerOptions + while (prop.ArrayProperty.Index != last + 1) { - NumberHandling = JsonNumberHandling.AllowReadingFromString | JsonNumberHandling.AllowNamedFloatingPointLiterals, - PropertyNameCaseInsensitive = false, - TypeInfoResolver = options.TypeInfoResolver, - }; - typeOptions.Converters.Add(prop.JsonConverter); - } + writer.WriteNullValue(); + last += 1; + } - if (prop.JsonConverter == null && IsSimple(prop.PropertyInfo.PropertyType)) - { - if (prop.TargetType == typeof(string)) - writer.WriteStringValue(Convert.ToString(objValue, CultureInfo.InvariantCulture)); - else if (prop.TargetType == typeof(bool)) - writer.WriteBooleanValue((bool)objValue); + last = prop.ArrayProperty.Index; + + var objValue = prop.PropertyInfo.GetValue(value); + if (objValue == null) + { + writer.WriteNullValue(); + continue; + } + + JsonSerializerOptions? typeOptions = null; + if (prop.JsonConverter != null) + { + typeOptions = new JsonSerializerOptions + { + NumberHandling = JsonNumberHandling.AllowReadingFromString | JsonNumberHandling.AllowNamedFloatingPointLiterals, + PropertyNameCaseInsensitive = false, + TypeInfoResolver = options.TypeInfoResolver, + }; + typeOptions.Converters.Add(prop.JsonConverter); + } + + if (prop.JsonConverter == null && IsSimple(prop.PropertyInfo.PropertyType)) + { + if (prop.TargetType == typeof(string)) + writer.WriteStringValue(Convert.ToString(objValue, CultureInfo.InvariantCulture)); + else if (prop.TargetType == typeof(bool)) + writer.WriteBooleanValue((bool)objValue); + else + writer.WriteRawValue(Convert.ToString(objValue, CultureInfo.InvariantCulture)!); + } else - writer.WriteRawValue(Convert.ToString(objValue, CultureInfo.InvariantCulture)!); - } - else - { - JsonSerializer.Serialize(writer, objValue, typeOptions ?? options); + { + JsonSerializer.Serialize(writer, objValue, typeOptions ?? options); + } } } @@ -112,14 +118,17 @@ namespace CryptoExchange.Net.Converters.SystemTextJson if (reader.TokenType != JsonTokenType.StartArray) throw new CeDeserializationException("Not an array"); + + if (_typePropertyInfo == null) + _typePropertyInfo = CacheTypeAttributes(); + int index = 0; while (reader.Read()) { if (reader.TokenType == JsonTokenType.EndArray) break; - var indexAttributes = _typePropertyInfo.Value.Where(a => a.ArrayProperty.Index == index); - if (!indexAttributes.Any()) + if(!_typePropertyInfo.TryGetValue(index, out var indexAttributes)) { index++; continue; @@ -191,12 +200,12 @@ namespace CryptoExchange.Net.Converters.SystemTextJson #if NET5_0_OR_GREATER [UnconditionalSuppressMessage("AssemblyLoadTrimming", "IL3050:RequiresUnreferencedCode", Justification = "JsonSerializerOptions provided here has TypeInfoResolver set")] [UnconditionalSuppressMessage("AssemblyLoadTrimming", "IL2026:RequiresUnreferencedCode", Justification = "JsonSerializerOptions provided here has TypeInfoResolver set")] - private static List CacheTypeAttributes() + private static SortedDictionary> CacheTypeAttributes() #else - private static List CacheTypeAttributes() + private static SortedDictionary> CacheTypeAttributes() #endif { - var attributes = new List(); + var result = new SortedDictionary>(); var properties = typeof(T).GetProperties(); foreach (var property in properties) { @@ -206,7 +215,13 @@ namespace CryptoExchange.Net.Converters.SystemTextJson var targetType = Nullable.GetUnderlyingType(property.PropertyType) ?? property.PropertyType; var converterType = property.GetCustomAttribute()?.ConverterType ?? targetType.GetCustomAttribute()?.ConverterType; - attributes.Add(new ArrayPropertyInfo + if (!result.TryGetValue(att.Index, out var indexList)) + { + indexList = new List(); + result[att.Index] = indexList; + } + + indexList.Add(new ArrayPropertyInfo { ArrayProperty = att, PropertyInfo = property, @@ -216,7 +231,7 @@ namespace CryptoExchange.Net.Converters.SystemTextJson }); } - return attributes; + return result; } private class ArrayPropertyInfo diff --git a/CryptoExchange.Net/Objects/Sockets/DataEvent.cs b/CryptoExchange.Net/Objects/Sockets/DataEvent.cs index fd16678..8780b99 100644 --- a/CryptoExchange.Net/Objects/Sockets/DataEvent.cs +++ b/CryptoExchange.Net/Objects/Sockets/DataEvent.cs @@ -28,6 +28,11 @@ namespace CryptoExchange.Net.Objects.Sockets /// public string? Symbol { get; set; } + /// + /// The exchange name + /// + public string Exchange { get; set; } + /// /// The original data that was received, only available when OutputOriginalData is set to true in the client options /// @@ -42,9 +47,11 @@ namespace CryptoExchange.Net.Objects.Sockets /// ctor /// public DataEvent( + string exchange, DateTime receiveTimestamp, string? originalData) { + Exchange = exchange; OriginalData = originalData; ReceiveTime = receiveTimestamp; } @@ -68,9 +75,10 @@ namespace CryptoExchange.Net.Objects.Sockets /// ctor /// public DataEvent( + string exchange, T data, DateTime receiveTimestamp, - string? originalData): base(receiveTimestamp, originalData) + string? originalData): base(exchange, receiveTimestamp, originalData) { Data = data; } @@ -118,13 +126,15 @@ namespace CryptoExchange.Net.Objects.Sockets } /// - /// Copy the DataEvent to a new data type + /// Create a new DataEvent of the new type /// - public ExchangeEvent AsExchangeEvent(string exchange, K data) + public DataEvent ToType(TNew data) { - return new ExchangeEvent(exchange, this, data) + return new DataEvent(Exchange, data, ReceiveTime, OriginalData) { - DataTime = DataTime + StreamId = StreamId, + UpdateType = UpdateType, + Symbol = Symbol }; } diff --git a/CryptoExchange.Net/SharedApis/Interfaces/Socket/Futures/IFuturesOrderSocketClient.cs b/CryptoExchange.Net/SharedApis/Interfaces/Socket/Futures/IFuturesOrderSocketClient.cs index dfd4270..26d1529 100644 --- a/CryptoExchange.Net/SharedApis/Interfaces/Socket/Futures/IFuturesOrderSocketClient.cs +++ b/CryptoExchange.Net/SharedApis/Interfaces/Socket/Futures/IFuturesOrderSocketClient.cs @@ -22,6 +22,6 @@ namespace CryptoExchange.Net.SharedApis /// Update handler /// Cancellation token, can be used to stop the updates /// - Task> SubscribeToFuturesOrderUpdatesAsync(SubscribeFuturesOrderRequest request, Action> handler, CancellationToken ct = default); + Task> SubscribeToFuturesOrderUpdatesAsync(SubscribeFuturesOrderRequest request, Action> handler, CancellationToken ct = default); } } diff --git a/CryptoExchange.Net/SharedApis/Interfaces/Socket/Futures/IPositionSocketClient.cs b/CryptoExchange.Net/SharedApis/Interfaces/Socket/Futures/IPositionSocketClient.cs index ce99896..05697e3 100644 --- a/CryptoExchange.Net/SharedApis/Interfaces/Socket/Futures/IPositionSocketClient.cs +++ b/CryptoExchange.Net/SharedApis/Interfaces/Socket/Futures/IPositionSocketClient.cs @@ -22,6 +22,6 @@ namespace CryptoExchange.Net.SharedApis /// Update handler /// Cancellation token, can be used to stop the updates /// - Task> SubscribeToPositionUpdatesAsync(SubscribePositionRequest request, Action> handler, CancellationToken ct = default); + Task> SubscribeToPositionUpdatesAsync(SubscribePositionRequest request, Action> handler, CancellationToken ct = default); } } diff --git a/CryptoExchange.Net/SharedApis/Interfaces/Socket/IBalanceSocketClient.cs b/CryptoExchange.Net/SharedApis/Interfaces/Socket/IBalanceSocketClient.cs index 9f2611d..5591079 100644 --- a/CryptoExchange.Net/SharedApis/Interfaces/Socket/IBalanceSocketClient.cs +++ b/CryptoExchange.Net/SharedApis/Interfaces/Socket/IBalanceSocketClient.cs @@ -22,6 +22,6 @@ namespace CryptoExchange.Net.SharedApis /// Update handler /// Cancellation token, can be used to stop the updates /// - Task> SubscribeToBalanceUpdatesAsync(SubscribeBalancesRequest request, Action> handler, CancellationToken ct = default); + Task> SubscribeToBalanceUpdatesAsync(SubscribeBalancesRequest request, Action> handler, CancellationToken ct = default); } } diff --git a/CryptoExchange.Net/SharedApis/Interfaces/Socket/IBookTickerSocketClient.cs b/CryptoExchange.Net/SharedApis/Interfaces/Socket/IBookTickerSocketClient.cs index ad2c575..46fcf29 100644 --- a/CryptoExchange.Net/SharedApis/Interfaces/Socket/IBookTickerSocketClient.cs +++ b/CryptoExchange.Net/SharedApis/Interfaces/Socket/IBookTickerSocketClient.cs @@ -22,6 +22,6 @@ namespace CryptoExchange.Net.SharedApis /// Update handler /// Cancellation token, can be used to stop the updates /// - Task> SubscribeToBookTickerUpdatesAsync(SubscribeBookTickerRequest request, Action> handler, CancellationToken ct = default); + Task> SubscribeToBookTickerUpdatesAsync(SubscribeBookTickerRequest request, Action> handler, CancellationToken ct = default); } } diff --git a/CryptoExchange.Net/SharedApis/Interfaces/Socket/IKlineSocketClient.cs b/CryptoExchange.Net/SharedApis/Interfaces/Socket/IKlineSocketClient.cs index 9a8c520..1133940 100644 --- a/CryptoExchange.Net/SharedApis/Interfaces/Socket/IKlineSocketClient.cs +++ b/CryptoExchange.Net/SharedApis/Interfaces/Socket/IKlineSocketClient.cs @@ -22,6 +22,6 @@ namespace CryptoExchange.Net.SharedApis /// Update handler /// Cancellation token, can be used to stop the updates /// - Task> SubscribeToKlineUpdatesAsync(SubscribeKlineRequest request, Action> handler, CancellationToken ct = default); + Task> SubscribeToKlineUpdatesAsync(SubscribeKlineRequest request, Action> handler, CancellationToken ct = default); } } diff --git a/CryptoExchange.Net/SharedApis/Interfaces/Socket/IOrderBookSocketClient.cs b/CryptoExchange.Net/SharedApis/Interfaces/Socket/IOrderBookSocketClient.cs index 4cab64f..9e039f2 100644 --- a/CryptoExchange.Net/SharedApis/Interfaces/Socket/IOrderBookSocketClient.cs +++ b/CryptoExchange.Net/SharedApis/Interfaces/Socket/IOrderBookSocketClient.cs @@ -22,6 +22,6 @@ namespace CryptoExchange.Net.SharedApis /// Update handler /// Cancellation token, can be used to stop the updates /// - Task> SubscribeToOrderBookUpdatesAsync(SubscribeOrderBookRequest request, Action> handler, CancellationToken ct = default); + Task> SubscribeToOrderBookUpdatesAsync(SubscribeOrderBookRequest request, Action> handler, CancellationToken ct = default); } } diff --git a/CryptoExchange.Net/SharedApis/Interfaces/Socket/ITickerSocketClient.cs b/CryptoExchange.Net/SharedApis/Interfaces/Socket/ITickerSocketClient.cs index 89ce996..c441972 100644 --- a/CryptoExchange.Net/SharedApis/Interfaces/Socket/ITickerSocketClient.cs +++ b/CryptoExchange.Net/SharedApis/Interfaces/Socket/ITickerSocketClient.cs @@ -22,6 +22,6 @@ namespace CryptoExchange.Net.SharedApis /// Update handler /// Cancellation token, can be used to stop the updates /// - Task> SubscribeToTickerUpdatesAsync(SubscribeTickerRequest request, Action> handler, CancellationToken ct = default); + Task> SubscribeToTickerUpdatesAsync(SubscribeTickerRequest request, Action> handler, CancellationToken ct = default); } } diff --git a/CryptoExchange.Net/SharedApis/Interfaces/Socket/ITickersSocketClient.cs b/CryptoExchange.Net/SharedApis/Interfaces/Socket/ITickersSocketClient.cs index d642c1d..9221b09 100644 --- a/CryptoExchange.Net/SharedApis/Interfaces/Socket/ITickersSocketClient.cs +++ b/CryptoExchange.Net/SharedApis/Interfaces/Socket/ITickersSocketClient.cs @@ -22,6 +22,6 @@ namespace CryptoExchange.Net.SharedApis /// Update handler /// Cancellation token, can be used to stop the updates /// - Task> SubscribeToAllTickersUpdatesAsync(SubscribeAllTickersRequest request, Action> handler, CancellationToken ct = default); + Task> SubscribeToAllTickersUpdatesAsync(SubscribeAllTickersRequest request, Action> handler, CancellationToken ct = default); } } diff --git a/CryptoExchange.Net/SharedApis/Interfaces/Socket/ITradeSocketClient.cs b/CryptoExchange.Net/SharedApis/Interfaces/Socket/ITradeSocketClient.cs index 78721ef..2572fc6 100644 --- a/CryptoExchange.Net/SharedApis/Interfaces/Socket/ITradeSocketClient.cs +++ b/CryptoExchange.Net/SharedApis/Interfaces/Socket/ITradeSocketClient.cs @@ -22,6 +22,6 @@ namespace CryptoExchange.Net.SharedApis /// Update handler /// Cancellation token, can be used to stop the updates /// - Task> SubscribeToTradeUpdatesAsync(SubscribeTradeRequest request, Action> handler, CancellationToken ct = default); + Task> SubscribeToTradeUpdatesAsync(SubscribeTradeRequest request, Action> handler, CancellationToken ct = default); } } diff --git a/CryptoExchange.Net/SharedApis/Interfaces/Socket/IUserTradeSocketClient.cs b/CryptoExchange.Net/SharedApis/Interfaces/Socket/IUserTradeSocketClient.cs index b6d2b9d..50ccefa 100644 --- a/CryptoExchange.Net/SharedApis/Interfaces/Socket/IUserTradeSocketClient.cs +++ b/CryptoExchange.Net/SharedApis/Interfaces/Socket/IUserTradeSocketClient.cs @@ -22,6 +22,6 @@ namespace CryptoExchange.Net.SharedApis /// Update handler /// Cancellation token, can be used to stop the updates /// - Task> SubscribeToUserTradeUpdatesAsync(SubscribeUserTradeRequest request, Action> handler, CancellationToken ct = default); + Task> SubscribeToUserTradeUpdatesAsync(SubscribeUserTradeRequest request, Action> handler, CancellationToken ct = default); } } diff --git a/CryptoExchange.Net/SharedApis/Interfaces/Socket/Spot/ISpotOrderSocketClient.cs b/CryptoExchange.Net/SharedApis/Interfaces/Socket/Spot/ISpotOrderSocketClient.cs index 20a59d6..b07f784 100644 --- a/CryptoExchange.Net/SharedApis/Interfaces/Socket/Spot/ISpotOrderSocketClient.cs +++ b/CryptoExchange.Net/SharedApis/Interfaces/Socket/Spot/ISpotOrderSocketClient.cs @@ -22,6 +22,6 @@ namespace CryptoExchange.Net.SharedApis /// Update handler /// Cancellation token, can be used to stop the updates /// - Task> SubscribeToSpotOrderUpdatesAsync(SubscribeSpotOrderRequest request, Action> handler, CancellationToken ct = default); + Task> SubscribeToSpotOrderUpdatesAsync(SubscribeSpotOrderRequest request, Action> handler, CancellationToken ct = default); } } diff --git a/CryptoExchange.Net/SharedApis/Models/ExchangeEvent.cs b/CryptoExchange.Net/SharedApis/Models/ExchangeEvent.cs deleted file mode 100644 index 7a13ba3..0000000 --- a/CryptoExchange.Net/SharedApis/Models/ExchangeEvent.cs +++ /dev/null @@ -1,34 +0,0 @@ -using CryptoExchange.Net.Objects.Sockets; - -namespace CryptoExchange.Net.SharedApis -{ - /// - /// An update event for a specific exchange - /// - /// Type of the data - public class ExchangeEvent : DataEvent - { - /// - /// The exchange - /// - public string Exchange { get; } - - /// - /// ctor - /// - public ExchangeEvent(string exchange, DataEvent baseEvent, T data) : - base(data, - baseEvent.ReceiveTime, - baseEvent.OriginalData) - { - StreamId = baseEvent.StreamId; - Symbol = baseEvent.Symbol; - UpdateType = baseEvent.UpdateType; - DataTime = baseEvent.DataTime; - Exchange = exchange; - } - - /// - public override string ToString() => $"{Exchange} - " + base.ToString(); - } -} diff --git a/CryptoExchange.Net/Sockets/Default/SocketConnection.cs b/CryptoExchange.Net/Sockets/Default/SocketConnection.cs index 9ddbc34..86808fb 100644 --- a/CryptoExchange.Net/Sockets/Default/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/Default/SocketConnection.cs @@ -625,6 +625,8 @@ namespace CryptoExchange.Net.Sockets.Default query = cquery; } + var complete = false; + foreach (var route in processor.MessageRouter.Routes) { if (route.TypeIdentifier != typeIdentifier) @@ -639,10 +641,16 @@ namespace CryptoExchange.Net.Sockets.Default processed = true; processor.Handle(this, receiveTime, originalData, result, route); + + if (isQuery && !route.MultipleReaders) + { + complete = true; + break; + } } } - if (processed && isQuery && !query!.MultipleReaders) + if (complete) break; } } @@ -1054,7 +1062,9 @@ namespace CryptoExchange.Net.Sockets.Default public virtual ValueTask SendAsync(int requestId, T obj, int weight) { if (_serializer is IByteMessageSerializer byteSerializer) + { return SendBytesAsync(requestId, byteSerializer.Serialize(obj), weight); + } else if (_serializer is IStringMessageSerializer stringSerializer) { if (obj is string str) diff --git a/CryptoExchange.Net/Sockets/Default/Subscription.cs b/CryptoExchange.Net/Sockets/Default/Subscription.cs index 9305755..c41b084 100644 --- a/CryptoExchange.Net/Sockets/Default/Subscription.cs +++ b/CryptoExchange.Net/Sockets/Default/Subscription.cs @@ -190,7 +190,7 @@ namespace CryptoExchange.Net.Sockets.Default /// /// Handle an update message /// - public CallResult Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object data, MessageRoute route) + public CallResult? Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object data, MessageRoute route) { ConnectionInvocations++; TotalInvocations++; diff --git a/CryptoExchange.Net/Sockets/Interfaces/IMessageProcessor.cs b/CryptoExchange.Net/Sockets/Interfaces/IMessageProcessor.cs index 79dea44..c98f846 100644 --- a/CryptoExchange.Net/Sockets/Interfaces/IMessageProcessor.cs +++ b/CryptoExchange.Net/Sockets/Interfaces/IMessageProcessor.cs @@ -29,7 +29,7 @@ namespace CryptoExchange.Net.Sockets.Interfaces /// /// Handle a message /// - CallResult Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object result, MessageRoute route); + CallResult? Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object result, MessageRoute route); /// /// Deserialize a message into object of type /// diff --git a/CryptoExchange.Net/Sockets/MessageRouter.cs b/CryptoExchange.Net/Sockets/MessageRouter.cs index 0e44f76..4c29763 100644 --- a/CryptoExchange.Net/Sockets/MessageRouter.cs +++ b/CryptoExchange.Net/Sockets/MessageRouter.cs @@ -27,51 +27,51 @@ namespace CryptoExchange.Net.Sockets /// /// Create message router without specific message handler /// - public static MessageRouter CreateWithoutHandler(string typeIdentifier) + public static MessageRouter CreateWithoutHandler(string typeIdentifier, bool multipleReaders = false) { - return new MessageRouter(new MessageRoute(typeIdentifier, (string?)null, (con, receiveTime, originalData, msg) => new CallResult(default, null, null))); + return new MessageRouter(new MessageRoute(typeIdentifier, (string?)null, (con, receiveTime, originalData, msg) => new CallResult(default, null, null), multipleReaders)); } /// /// Create message router without specific message handler /// - public static MessageRouter CreateWithoutHandler(string typeIdentifier, string topicFilter) + public static MessageRouter CreateWithoutHandler(string typeIdentifier, string topicFilter, bool multipleReaders = false) { - return new MessageRouter(new MessageRoute(typeIdentifier, topicFilter, (con, receiveTime, originalData, msg) => new CallResult(default, null, null))); + return new MessageRouter(new MessageRoute(typeIdentifier, topicFilter, (con, receiveTime, originalData, msg) => new CallResult(default, null, null), multipleReaders)); } /// /// Create message router without topic filter /// - public static MessageRouter CreateWithoutTopicFilter(IEnumerable values, Func handler) + public static MessageRouter CreateWithoutTopicFilter(IEnumerable values, Func handler, bool multipleReaders = false) { - return new MessageRouter(values.Select(x => new MessageRoute(x, (string?)null, handler)).ToArray()); + return new MessageRouter(values.Select(x => new MessageRoute(x, null, handler, multipleReaders)).ToArray()); } /// /// Create message router without topic filter /// - public static MessageRouter CreateWithoutTopicFilter(string typeIdentifier, Func handler) + public static MessageRouter CreateWithoutTopicFilter(string typeIdentifier, Func handler, bool multipleReaders = false) { - return new MessageRouter(new MessageRoute(typeIdentifier, (string?)null, handler)); + return new MessageRouter(new MessageRoute(typeIdentifier, null, handler, multipleReaders)); } /// /// Create message router with topic filter /// - public static MessageRouter CreateWithTopicFilter(string typeIdentifier, string topicFilter, Func handler) + public static MessageRouter CreateWithTopicFilter(string typeIdentifier, string topicFilter, Func handler, bool multipleReaders = false) { - return new MessageRouter(new MessageRoute(typeIdentifier, topicFilter, handler)); + return new MessageRouter(new MessageRoute(typeIdentifier, topicFilter, handler, multipleReaders)); } /// /// Create message router with topic filter /// - public static MessageRouter CreateWithTopicFilter(IEnumerable typeIdentifiers, string topicFilter, Func handler) + public static MessageRouter CreateWithTopicFilter(IEnumerable typeIdentifiers, string topicFilter, Func handler, bool multipleReaders = false) { var routes = new List(); foreach (var type in typeIdentifiers) - routes.Add(new MessageRoute(type, topicFilter, handler)); + routes.Add(new MessageRoute(type, topicFilter, handler, multipleReaders)); return new MessageRouter(routes.ToArray()); } @@ -79,11 +79,11 @@ namespace CryptoExchange.Net.Sockets /// /// Create message router with topic filter /// - public static MessageRouter CreateWithTopicFilters(string typeIdentifier, IEnumerable topicFilters, Func handler) + public static MessageRouter CreateWithTopicFilters(string typeIdentifier, IEnumerable topicFilters, Func handler, bool multipleReaders = false) { var routes = new List(); foreach (var filter in topicFilters) - routes.Add(new MessageRoute(typeIdentifier, filter, handler)); + routes.Add(new MessageRoute(typeIdentifier, filter, handler, multipleReaders)); return new MessageRouter(routes.ToArray()); } @@ -91,13 +91,13 @@ namespace CryptoExchange.Net.Sockets /// /// Create message router with topic filter /// - public static MessageRouter CreateWithTopicFilters(IEnumerable typeIdentifiers, IEnumerable topicFilters, Func handler) + public static MessageRouter CreateWithTopicFilters(IEnumerable typeIdentifiers, IEnumerable topicFilters, Func handler, bool multipleReaders = false) { var routes = new List(); foreach (var type in typeIdentifiers) { foreach (var filter in topicFilters) - routes.Add(new MessageRoute(type, filter, handler)); + routes.Add(new MessageRoute(type, filter, handler, multipleReaders)); } return new MessageRouter(routes.ToArray()); @@ -106,25 +106,25 @@ namespace CryptoExchange.Net.Sockets /// /// Create message router with optional topic filter /// - public static MessageRouter CreateWithOptionalTopicFilter(string typeIdentifier, string? topicFilter, Func handler) + public static MessageRouter CreateWithOptionalTopicFilter(string typeIdentifier, string? topicFilter, Func handler, bool multipleReaders = false) { - return new MessageRouter(new MessageRoute(typeIdentifier, topicFilter, handler)); + return new MessageRouter(new MessageRoute(typeIdentifier, topicFilter, handler, multipleReaders)); } /// /// Create message router with optional topic filter /// - public static MessageRouter CreateWithOptionalTopicFilters(string typeIdentifier, IEnumerable? topicFilters, Func handler) + public static MessageRouter CreateWithOptionalTopicFilters(string typeIdentifier, IEnumerable? topicFilters, Func handler, bool multipleReaders = false) { var routes = new List(); if (topicFilters?.Count() > 0) { foreach (var filter in topicFilters) - routes.Add(new MessageRoute(typeIdentifier, filter, handler)); + routes.Add(new MessageRoute(typeIdentifier, filter, handler, multipleReaders)); } else { - routes.Add(new MessageRoute(typeIdentifier, null, handler)); + routes.Add(new MessageRoute(typeIdentifier, null, handler, multipleReaders)); } return new MessageRouter(routes.ToArray()); @@ -133,7 +133,7 @@ namespace CryptoExchange.Net.Sockets /// /// Create message router with optional topic filter /// - public static MessageRouter CreateWithOptionalTopicFilters(IEnumerable typeIdentifiers, IEnumerable? topicFilters, Func handler) + public static MessageRouter CreateWithOptionalTopicFilters(IEnumerable typeIdentifiers, IEnumerable? topicFilters, Func handler, bool multipleReaders = false) { var routes = new List(); foreach (var typeIdentifier in typeIdentifiers) @@ -141,11 +141,11 @@ namespace CryptoExchange.Net.Sockets if (topicFilters?.Count() > 0) { foreach (var filter in topicFilters) - routes.Add(new MessageRoute(typeIdentifier, filter, handler)); + routes.Add(new MessageRoute(typeIdentifier, filter, handler, multipleReaders)); } else { - routes.Add(new MessageRoute(typeIdentifier, null, handler)); + routes.Add(new MessageRoute(typeIdentifier, null, handler, multipleReaders)); } } @@ -179,6 +179,12 @@ namespace CryptoExchange.Net.Sockets /// Optional topic filter /// public string? TopicFilter { get; set; } + + /// + /// Whether responses to this route might be read by multiple listeners + /// + public bool MultipleReaders { get; set; } = false; + /// /// Deserialization type /// @@ -196,7 +202,7 @@ namespace CryptoExchange.Net.Sockets /// /// Message handler /// - public abstract CallResult Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object data); + public abstract CallResult? Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object data); } /// @@ -204,46 +210,56 @@ namespace CryptoExchange.Net.Sockets /// public class MessageRoute : MessageRoute { - private Func _handler; + private Func _handler; /// - public override Type DeserializationType => typeof(TMessage); + public override Type DeserializationType { get; } = typeof(TMessage); /// /// ctor /// - internal MessageRoute(string typeIdentifier, string? topicFilter, Func handler) + internal MessageRoute(string typeIdentifier, string? topicFilter, Func handler, bool multipleReaders = false) : base(typeIdentifier, topicFilter) { _handler = handler; + MultipleReaders = multipleReaders; } /// /// Create route without topic filter /// - public static MessageRoute CreateWithoutTopicFilter(string typeIdentifier, Func handler) + public static MessageRoute CreateWithoutTopicFilter(string typeIdentifier, Func handler, bool multipleReaders = false) { - return new MessageRoute(typeIdentifier, null, handler); + return new MessageRoute(typeIdentifier, null, handler) + { + MultipleReaders = multipleReaders + }; } /// /// Create route with optional topic filter /// - public static MessageRoute CreateWithOptionalTopicFilter(string typeIdentifier, string? topicFilter, Func handler) + public static MessageRoute CreateWithOptionalTopicFilter(string typeIdentifier, string? topicFilter, Func handler, bool multipleReaders = false) { - return new MessageRoute(typeIdentifier, topicFilter, handler); + return new MessageRoute(typeIdentifier, topicFilter, handler) + { + MultipleReaders = multipleReaders + }; } /// /// Create route with topic filter /// - public static MessageRoute CreateWithTopicFilter(string typeIdentifier, string topicFilter, Func handler) + public static MessageRoute CreateWithTopicFilter(string typeIdentifier, string topicFilter, Func handler, bool multipleReaders = false) { - return new MessageRoute(typeIdentifier, topicFilter, handler); + return new MessageRoute(typeIdentifier, topicFilter, handler) + { + MultipleReaders = multipleReaders + }; } /// - public override CallResult Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object data) + public override CallResult? Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object data) { return _handler(connection, receiveTime, originalData, (TMessage)data); } diff --git a/CryptoExchange.Net/Sockets/Query.cs b/CryptoExchange.Net/Sockets/Query.cs index e569646..3796768 100644 --- a/CryptoExchange.Net/Sockets/Query.cs +++ b/CryptoExchange.Net/Sockets/Query.cs @@ -89,11 +89,6 @@ namespace CryptoExchange.Net.Sockets /// public bool ExpectsResponse { get; set; } = true; - /// - /// Whether responses to this query might be read by multiple listeners - /// - public bool MultipleReaders { get; set; } = false; - /// /// Wait event for response /// @@ -205,16 +200,18 @@ namespace CryptoExchange.Net.Sockets /// public override CallResult Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object message, MessageRoute route) { - if (!PreCheckMessage(connection, message)) - return CallResult.SuccessResult; - CurrentResponses++; if (CurrentResponses == RequiredResponses) Response = message; if (Result?.Success != false) + { // If an error result is already set don't override that Result = route.Handle(connection, receiveTime, originalData, message); + if (Result == null) + // Null from Handle means it wasn't actually for this query + CurrentResponses -= 1; + } if (CurrentResponses == RequiredResponses) { @@ -223,7 +220,7 @@ namespace CryptoExchange.Net.Sockets OnComplete?.Invoke(); } - return Result; + return Result ?? CallResult.SuccessResult; } ///