1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2026-02-16 14:13:46 +00:00
This commit is contained in:
JKorf 2025-12-14 16:34:25 +01:00
parent 8521e556a3
commit b49300ec1d
22 changed files with 163 additions and 148 deletions

View File

@ -20,7 +20,7 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations.Sockets
public CallResult DoHandleMessage(SocketConnection connection, DateTime receiveTime, string? originalData, T message)
{
_handler.Invoke(new DataEvent<T>(message, receiveTime, originalData));
_handler.Invoke(new DataEvent<T>("Test", message, receiveTime, originalData));
return new CallResult(null);
}

View File

@ -22,7 +22,7 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations.Sockets
public CallResult DoHandleMessage(SocketConnection connection, DateTime receiveTime, string? originalData, T message)
{
_handler.Invoke(new DataEvent<T>(message, receiveTime, originalData));
_handler.Invoke(new DataEvent<T>("Test", message, receiveTime, originalData));
return new CallResult(null);
}

View File

@ -627,6 +627,7 @@ namespace CryptoExchange.Net.Clients
/// <param name="dedicatedRequestConnection">Whether a dedicated request connection should be returned</param>
/// <param name="ct">Cancellation token</param>
/// <param name="topic">The subscription topic, can be provided when multiple of the same topics are not allowed on a connection</param>
/// <param name="individualSubscriptionCount">The number of individual subscriptions in this subscribe request</param>
/// <returns></returns>
protected virtual async Task<CallResult<SocketConnection>> GetSocketConnection(
string address,

View File

@ -21,8 +21,9 @@ namespace CryptoExchange.Net.Converters.SystemTextJson
public class ArrayConverter<T> : JsonConverter<T> where T : new()
#endif
{
private static readonly Lazy<List<ArrayPropertyInfo>> _typePropertyInfo = new Lazy<List<ArrayPropertyInfo>>(CacheTypeAttributes, LazyThreadSafetyMode.PublicationOnly);
//private static readonly Lazy<List<ArrayPropertyInfo>> _typePropertyInfo = new Lazy<List<ArrayPropertyInfo>>(CacheTypeAttributes, LazyThreadSafetyMode.PublicationOnly);
private static SortedDictionary<int, List<ArrayPropertyInfo>>? _typePropertyInfo;
/// <inheritdoc />
#if NET5_0_OR_GREATER
[UnconditionalSuppressMessage("AssemblyLoadTrimming", "IL3050:RequiresUnreferencedCode", Justification = "JsonSerializerOptions provided here has TypeInfoResolver set")]
@ -36,54 +37,59 @@ namespace CryptoExchange.Net.Converters.SystemTextJson
return;
}
if (_typePropertyInfo == null)
_typePropertyInfo = CacheTypeAttributes();
writer.WriteStartArray();
var ordered = _typePropertyInfo.Value.Where(x => x.ArrayProperty != null).OrderBy(p => p.ArrayProperty.Index);
var last = -1;
foreach (var prop in ordered)
foreach (var indexProps in _typePropertyInfo)
{
if (prop.ArrayProperty.Index == last)
continue;
while (prop.ArrayProperty.Index != last + 1)
foreach (var prop in indexProps.Value)
{
writer.WriteNullValue();
last += 1;
}
if (prop.ArrayProperty.Index == last)
// Don't write the same index twice
continue;
last = prop.ArrayProperty.Index;
var objValue = prop.PropertyInfo.GetValue(value);
if (objValue == null)
{
writer.WriteNullValue();
continue;
}
JsonSerializerOptions? typeOptions = null;
if (prop.JsonConverter != null)
{
typeOptions = new JsonSerializerOptions
while (prop.ArrayProperty.Index != last + 1)
{
NumberHandling = JsonNumberHandling.AllowReadingFromString | JsonNumberHandling.AllowNamedFloatingPointLiterals,
PropertyNameCaseInsensitive = false,
TypeInfoResolver = options.TypeInfoResolver,
};
typeOptions.Converters.Add(prop.JsonConverter);
}
writer.WriteNullValue();
last += 1;
}
if (prop.JsonConverter == null && IsSimple(prop.PropertyInfo.PropertyType))
{
if (prop.TargetType == typeof(string))
writer.WriteStringValue(Convert.ToString(objValue, CultureInfo.InvariantCulture));
else if (prop.TargetType == typeof(bool))
writer.WriteBooleanValue((bool)objValue);
last = prop.ArrayProperty.Index;
var objValue = prop.PropertyInfo.GetValue(value);
if (objValue == null)
{
writer.WriteNullValue();
continue;
}
JsonSerializerOptions? typeOptions = null;
if (prop.JsonConverter != null)
{
typeOptions = new JsonSerializerOptions
{
NumberHandling = JsonNumberHandling.AllowReadingFromString | JsonNumberHandling.AllowNamedFloatingPointLiterals,
PropertyNameCaseInsensitive = false,
TypeInfoResolver = options.TypeInfoResolver,
};
typeOptions.Converters.Add(prop.JsonConverter);
}
if (prop.JsonConverter == null && IsSimple(prop.PropertyInfo.PropertyType))
{
if (prop.TargetType == typeof(string))
writer.WriteStringValue(Convert.ToString(objValue, CultureInfo.InvariantCulture));
else if (prop.TargetType == typeof(bool))
writer.WriteBooleanValue((bool)objValue);
else
writer.WriteRawValue(Convert.ToString(objValue, CultureInfo.InvariantCulture)!);
}
else
writer.WriteRawValue(Convert.ToString(objValue, CultureInfo.InvariantCulture)!);
}
else
{
JsonSerializer.Serialize(writer, objValue, typeOptions ?? options);
{
JsonSerializer.Serialize(writer, objValue, typeOptions ?? options);
}
}
}
@ -112,14 +118,17 @@ namespace CryptoExchange.Net.Converters.SystemTextJson
if (reader.TokenType != JsonTokenType.StartArray)
throw new CeDeserializationException("Not an array");
if (_typePropertyInfo == null)
_typePropertyInfo = CacheTypeAttributes();
int index = 0;
while (reader.Read())
{
if (reader.TokenType == JsonTokenType.EndArray)
break;
var indexAttributes = _typePropertyInfo.Value.Where(a => a.ArrayProperty.Index == index);
if (!indexAttributes.Any())
if(!_typePropertyInfo.TryGetValue(index, out var indexAttributes))
{
index++;
continue;
@ -191,12 +200,12 @@ namespace CryptoExchange.Net.Converters.SystemTextJson
#if NET5_0_OR_GREATER
[UnconditionalSuppressMessage("AssemblyLoadTrimming", "IL3050:RequiresUnreferencedCode", Justification = "JsonSerializerOptions provided here has TypeInfoResolver set")]
[UnconditionalSuppressMessage("AssemblyLoadTrimming", "IL2026:RequiresUnreferencedCode", Justification = "JsonSerializerOptions provided here has TypeInfoResolver set")]
private static List<ArrayPropertyInfo> CacheTypeAttributes()
private static SortedDictionary<int, List<ArrayPropertyInfo>> CacheTypeAttributes()
#else
private static List<ArrayPropertyInfo> CacheTypeAttributes()
private static SortedDictionary<int, List<ArrayPropertyInfo>> CacheTypeAttributes()
#endif
{
var attributes = new List<ArrayPropertyInfo>();
var result = new SortedDictionary<int, List<ArrayPropertyInfo>>();
var properties = typeof(T).GetProperties();
foreach (var property in properties)
{
@ -206,7 +215,13 @@ namespace CryptoExchange.Net.Converters.SystemTextJson
var targetType = Nullable.GetUnderlyingType(property.PropertyType) ?? property.PropertyType;
var converterType = property.GetCustomAttribute<JsonConverterAttribute>()?.ConverterType ?? targetType.GetCustomAttribute<JsonConverterAttribute>()?.ConverterType;
attributes.Add(new ArrayPropertyInfo
if (!result.TryGetValue(att.Index, out var indexList))
{
indexList = new List<ArrayPropertyInfo>();
result[att.Index] = indexList;
}
indexList.Add(new ArrayPropertyInfo
{
ArrayProperty = att,
PropertyInfo = property,
@ -216,7 +231,7 @@ namespace CryptoExchange.Net.Converters.SystemTextJson
});
}
return attributes;
return result;
}
private class ArrayPropertyInfo

View File

@ -28,6 +28,11 @@ namespace CryptoExchange.Net.Objects.Sockets
/// </summary>
public string? Symbol { get; set; }
/// <summary>
/// The exchange name
/// </summary>
public string Exchange { get; set; }
/// <summary>
/// The original data that was received, only available when OutputOriginalData is set to true in the client options
/// </summary>
@ -42,9 +47,11 @@ namespace CryptoExchange.Net.Objects.Sockets
/// ctor
/// </summary>
public DataEvent(
string exchange,
DateTime receiveTimestamp,
string? originalData)
{
Exchange = exchange;
OriginalData = originalData;
ReceiveTime = receiveTimestamp;
}
@ -68,9 +75,10 @@ namespace CryptoExchange.Net.Objects.Sockets
/// ctor
/// </summary>
public DataEvent(
string exchange,
T data,
DateTime receiveTimestamp,
string? originalData): base(receiveTimestamp, originalData)
string? originalData): base(exchange, receiveTimestamp, originalData)
{
Data = data;
}
@ -118,13 +126,15 @@ namespace CryptoExchange.Net.Objects.Sockets
}
/// <summary>
/// Copy the DataEvent to a new data type
/// Create a new DataEvent of the new type
/// </summary>
public ExchangeEvent<K> AsExchangeEvent<K>(string exchange, K data)
public DataEvent<TNew> ToType<TNew>(TNew data)
{
return new ExchangeEvent<K>(exchange, this, data)
return new DataEvent<TNew>(Exchange, data, ReceiveTime, OriginalData)
{
DataTime = DataTime
StreamId = StreamId,
UpdateType = UpdateType,
Symbol = Symbol
};
}

View File

@ -22,6 +22,6 @@ namespace CryptoExchange.Net.SharedApis
/// <param name="handler">Update handler</param>
/// <param name="ct">Cancellation token, can be used to stop the updates</param>
/// <returns></returns>
Task<ExchangeResult<UpdateSubscription>> SubscribeToFuturesOrderUpdatesAsync(SubscribeFuturesOrderRequest request, Action<ExchangeEvent<SharedFuturesOrder[]>> handler, CancellationToken ct = default);
Task<ExchangeResult<UpdateSubscription>> SubscribeToFuturesOrderUpdatesAsync(SubscribeFuturesOrderRequest request, Action<DataEvent<SharedFuturesOrder[]>> handler, CancellationToken ct = default);
}
}

View File

@ -22,6 +22,6 @@ namespace CryptoExchange.Net.SharedApis
/// <param name="handler">Update handler</param>
/// <param name="ct">Cancellation token, can be used to stop the updates</param>
/// <returns></returns>
Task<ExchangeResult<UpdateSubscription>> SubscribeToPositionUpdatesAsync(SubscribePositionRequest request, Action<ExchangeEvent<SharedPosition[]>> handler, CancellationToken ct = default);
Task<ExchangeResult<UpdateSubscription>> SubscribeToPositionUpdatesAsync(SubscribePositionRequest request, Action<DataEvent<SharedPosition[]>> handler, CancellationToken ct = default);
}
}

View File

@ -22,6 +22,6 @@ namespace CryptoExchange.Net.SharedApis
/// <param name="handler">Update handler</param>
/// <param name="ct">Cancellation token, can be used to stop the updates</param>
/// <returns></returns>
Task<ExchangeResult<UpdateSubscription>> SubscribeToBalanceUpdatesAsync(SubscribeBalancesRequest request, Action<ExchangeEvent<SharedBalance[]>> handler, CancellationToken ct = default);
Task<ExchangeResult<UpdateSubscription>> SubscribeToBalanceUpdatesAsync(SubscribeBalancesRequest request, Action<DataEvent<SharedBalance[]>> handler, CancellationToken ct = default);
}
}

View File

@ -22,6 +22,6 @@ namespace CryptoExchange.Net.SharedApis
/// <param name="handler">Update handler</param>
/// <param name="ct">Cancellation token, can be used to stop the updates</param>
/// <returns></returns>
Task<ExchangeResult<UpdateSubscription>> SubscribeToBookTickerUpdatesAsync(SubscribeBookTickerRequest request, Action<ExchangeEvent<SharedBookTicker>> handler, CancellationToken ct = default);
Task<ExchangeResult<UpdateSubscription>> SubscribeToBookTickerUpdatesAsync(SubscribeBookTickerRequest request, Action<DataEvent<SharedBookTicker>> handler, CancellationToken ct = default);
}
}

View File

@ -22,6 +22,6 @@ namespace CryptoExchange.Net.SharedApis
/// <param name="handler">Update handler</param>
/// <param name="ct">Cancellation token, can be used to stop the updates</param>
/// <returns></returns>
Task<ExchangeResult<UpdateSubscription>> SubscribeToKlineUpdatesAsync(SubscribeKlineRequest request, Action<ExchangeEvent<SharedKline>> handler, CancellationToken ct = default);
Task<ExchangeResult<UpdateSubscription>> SubscribeToKlineUpdatesAsync(SubscribeKlineRequest request, Action<DataEvent<SharedKline>> handler, CancellationToken ct = default);
}
}

View File

@ -22,6 +22,6 @@ namespace CryptoExchange.Net.SharedApis
/// <param name="handler">Update handler</param>
/// <param name="ct">Cancellation token, can be used to stop the updates</param>
/// <returns></returns>
Task<ExchangeResult<UpdateSubscription>> SubscribeToOrderBookUpdatesAsync(SubscribeOrderBookRequest request, Action<ExchangeEvent<SharedOrderBook>> handler, CancellationToken ct = default);
Task<ExchangeResult<UpdateSubscription>> SubscribeToOrderBookUpdatesAsync(SubscribeOrderBookRequest request, Action<DataEvent<SharedOrderBook>> handler, CancellationToken ct = default);
}
}

View File

@ -22,6 +22,6 @@ namespace CryptoExchange.Net.SharedApis
/// <param name="handler">Update handler</param>
/// <param name="ct">Cancellation token, can be used to stop the updates</param>
/// <returns></returns>
Task<ExchangeResult<UpdateSubscription>> SubscribeToTickerUpdatesAsync(SubscribeTickerRequest request, Action<ExchangeEvent<SharedSpotTicker>> handler, CancellationToken ct = default);
Task<ExchangeResult<UpdateSubscription>> SubscribeToTickerUpdatesAsync(SubscribeTickerRequest request, Action<DataEvent<SharedSpotTicker>> handler, CancellationToken ct = default);
}
}

View File

@ -22,6 +22,6 @@ namespace CryptoExchange.Net.SharedApis
/// <param name="handler">Update handler</param>
/// <param name="ct">Cancellation token, can be used to stop the updates</param>
/// <returns></returns>
Task<ExchangeResult<UpdateSubscription>> SubscribeToAllTickersUpdatesAsync(SubscribeAllTickersRequest request, Action<ExchangeEvent<SharedSpotTicker[]>> handler, CancellationToken ct = default);
Task<ExchangeResult<UpdateSubscription>> SubscribeToAllTickersUpdatesAsync(SubscribeAllTickersRequest request, Action<DataEvent<SharedSpotTicker[]>> handler, CancellationToken ct = default);
}
}

View File

@ -22,6 +22,6 @@ namespace CryptoExchange.Net.SharedApis
/// <param name="handler">Update handler</param>
/// <param name="ct">Cancellation token, can be used to stop the updates</param>
/// <returns></returns>
Task<ExchangeResult<UpdateSubscription>> SubscribeToTradeUpdatesAsync(SubscribeTradeRequest request, Action<ExchangeEvent<SharedTrade[]>> handler, CancellationToken ct = default);
Task<ExchangeResult<UpdateSubscription>> SubscribeToTradeUpdatesAsync(SubscribeTradeRequest request, Action<DataEvent<SharedTrade[]>> handler, CancellationToken ct = default);
}
}

View File

@ -22,6 +22,6 @@ namespace CryptoExchange.Net.SharedApis
/// <param name="handler">Update handler</param>
/// <param name="ct">Cancellation token, can be used to stop the updates</param>
/// <returns></returns>
Task<ExchangeResult<UpdateSubscription>> SubscribeToUserTradeUpdatesAsync(SubscribeUserTradeRequest request, Action<ExchangeEvent<SharedUserTrade[]>> handler, CancellationToken ct = default);
Task<ExchangeResult<UpdateSubscription>> SubscribeToUserTradeUpdatesAsync(SubscribeUserTradeRequest request, Action<DataEvent<SharedUserTrade[]>> handler, CancellationToken ct = default);
}
}

View File

@ -22,6 +22,6 @@ namespace CryptoExchange.Net.SharedApis
/// <param name="handler">Update handler</param>
/// <param name="ct">Cancellation token, can be used to stop the updates</param>
/// <returns></returns>
Task<ExchangeResult<UpdateSubscription>> SubscribeToSpotOrderUpdatesAsync(SubscribeSpotOrderRequest request, Action<ExchangeEvent<SharedSpotOrder[]>> handler, CancellationToken ct = default);
Task<ExchangeResult<UpdateSubscription>> SubscribeToSpotOrderUpdatesAsync(SubscribeSpotOrderRequest request, Action<DataEvent<SharedSpotOrder[]>> handler, CancellationToken ct = default);
}
}

View File

@ -1,34 +0,0 @@
using CryptoExchange.Net.Objects.Sockets;
namespace CryptoExchange.Net.SharedApis
{
/// <summary>
/// An update event for a specific exchange
/// </summary>
/// <typeparam name="T">Type of the data</typeparam>
public class ExchangeEvent<T> : DataEvent<T>
{
/// <summary>
/// The exchange
/// </summary>
public string Exchange { get; }
/// <summary>
/// ctor
/// </summary>
public ExchangeEvent(string exchange, DataEvent baseEvent, T data) :
base(data,
baseEvent.ReceiveTime,
baseEvent.OriginalData)
{
StreamId = baseEvent.StreamId;
Symbol = baseEvent.Symbol;
UpdateType = baseEvent.UpdateType;
DataTime = baseEvent.DataTime;
Exchange = exchange;
}
/// <inheritdoc />
public override string ToString() => $"{Exchange} - " + base.ToString();
}
}

View File

@ -625,6 +625,8 @@ namespace CryptoExchange.Net.Sockets.Default
query = cquery;
}
var complete = false;
foreach (var route in processor.MessageRouter.Routes)
{
if (route.TypeIdentifier != typeIdentifier)
@ -639,10 +641,16 @@ namespace CryptoExchange.Net.Sockets.Default
processed = true;
processor.Handle(this, receiveTime, originalData, result, route);
if (isQuery && !route.MultipleReaders)
{
complete = true;
break;
}
}
}
if (processed && isQuery && !query!.MultipleReaders)
if (complete)
break;
}
}
@ -1054,7 +1062,9 @@ namespace CryptoExchange.Net.Sockets.Default
public virtual ValueTask<CallResult> SendAsync<T>(int requestId, T obj, int weight)
{
if (_serializer is IByteMessageSerializer byteSerializer)
{
return SendBytesAsync(requestId, byteSerializer.Serialize(obj), weight);
}
else if (_serializer is IStringMessageSerializer stringSerializer)
{
if (obj is string str)

View File

@ -190,7 +190,7 @@ namespace CryptoExchange.Net.Sockets.Default
/// <summary>
/// Handle an update message
/// </summary>
public CallResult Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object data, MessageRoute route)
public CallResult? Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object data, MessageRoute route)
{
ConnectionInvocations++;
TotalInvocations++;

View File

@ -29,7 +29,7 @@ namespace CryptoExchange.Net.Sockets.Interfaces
/// <summary>
/// Handle a message
/// </summary>
CallResult Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object result, MessageRoute route);
CallResult? Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object result, MessageRoute route);
/// <summary>
/// Deserialize a message into object of type
/// </summary>

View File

@ -27,51 +27,51 @@ namespace CryptoExchange.Net.Sockets
/// <summary>
/// Create message router without specific message handler
/// </summary>
public static MessageRouter CreateWithoutHandler<T>(string typeIdentifier)
public static MessageRouter CreateWithoutHandler<T>(string typeIdentifier, bool multipleReaders = false)
{
return new MessageRouter(new MessageRoute<T>(typeIdentifier, (string?)null, (con, receiveTime, originalData, msg) => new CallResult<T>(default, null, null)));
return new MessageRouter(new MessageRoute<T>(typeIdentifier, (string?)null, (con, receiveTime, originalData, msg) => new CallResult<T>(default, null, null), multipleReaders));
}
/// <summary>
/// Create message router without specific message handler
/// </summary>
public static MessageRouter CreateWithoutHandler<T>(string typeIdentifier, string topicFilter)
public static MessageRouter CreateWithoutHandler<T>(string typeIdentifier, string topicFilter, bool multipleReaders = false)
{
return new MessageRouter(new MessageRoute<T>(typeIdentifier, topicFilter, (con, receiveTime, originalData, msg) => new CallResult<string>(default, null, null)));
return new MessageRouter(new MessageRoute<T>(typeIdentifier, topicFilter, (con, receiveTime, originalData, msg) => new CallResult<string>(default, null, null), multipleReaders));
}
/// <summary>
/// Create message router without topic filter
/// </summary>
public static MessageRouter CreateWithoutTopicFilter<T>(IEnumerable<string> values, Func<SocketConnection, DateTime, string?, T, CallResult> handler)
public static MessageRouter CreateWithoutTopicFilter<T>(IEnumerable<string> values, Func<SocketConnection, DateTime, string?, T, CallResult?> handler, bool multipleReaders = false)
{
return new MessageRouter(values.Select(x => new MessageRoute<T>(x, (string?)null, handler)).ToArray());
return new MessageRouter(values.Select(x => new MessageRoute<T>(x, null, handler, multipleReaders)).ToArray());
}
/// <summary>
/// Create message router without topic filter
/// </summary>
public static MessageRouter CreateWithoutTopicFilter<T>(string typeIdentifier, Func<SocketConnection, DateTime, string?, T, CallResult> handler)
public static MessageRouter CreateWithoutTopicFilter<T>(string typeIdentifier, Func<SocketConnection, DateTime, string?, T, CallResult?> handler, bool multipleReaders = false)
{
return new MessageRouter(new MessageRoute<T>(typeIdentifier, (string?)null, handler));
return new MessageRouter(new MessageRoute<T>(typeIdentifier, null, handler, multipleReaders));
}
/// <summary>
/// Create message router with topic filter
/// </summary>
public static MessageRouter CreateWithTopicFilter<T>(string typeIdentifier, string topicFilter, Func<SocketConnection, DateTime, string?, T, CallResult> handler)
public static MessageRouter CreateWithTopicFilter<T>(string typeIdentifier, string topicFilter, Func<SocketConnection, DateTime, string?, T, CallResult?> handler, bool multipleReaders = false)
{
return new MessageRouter(new MessageRoute<T>(typeIdentifier, topicFilter, handler));
return new MessageRouter(new MessageRoute<T>(typeIdentifier, topicFilter, handler, multipleReaders));
}
/// <summary>
/// Create message router with topic filter
/// </summary>
public static MessageRouter CreateWithTopicFilter<T>(IEnumerable<string> typeIdentifiers, string topicFilter, Func<SocketConnection, DateTime, string?, T, CallResult> handler)
public static MessageRouter CreateWithTopicFilter<T>(IEnumerable<string> typeIdentifiers, string topicFilter, Func<SocketConnection, DateTime, string?, T, CallResult?> handler, bool multipleReaders = false)
{
var routes = new List<MessageRoute>();
foreach (var type in typeIdentifiers)
routes.Add(new MessageRoute<T>(type, topicFilter, handler));
routes.Add(new MessageRoute<T>(type, topicFilter, handler, multipleReaders));
return new MessageRouter(routes.ToArray());
}
@ -79,11 +79,11 @@ namespace CryptoExchange.Net.Sockets
/// <summary>
/// Create message router with topic filter
/// </summary>
public static MessageRouter CreateWithTopicFilters<T>(string typeIdentifier, IEnumerable<string> topicFilters, Func<SocketConnection, DateTime, string?, T, CallResult> handler)
public static MessageRouter CreateWithTopicFilters<T>(string typeIdentifier, IEnumerable<string> topicFilters, Func<SocketConnection, DateTime, string?, T, CallResult?> handler, bool multipleReaders = false)
{
var routes = new List<MessageRoute>();
foreach (var filter in topicFilters)
routes.Add(new MessageRoute<T>(typeIdentifier, filter, handler));
routes.Add(new MessageRoute<T>(typeIdentifier, filter, handler, multipleReaders));
return new MessageRouter(routes.ToArray());
}
@ -91,13 +91,13 @@ namespace CryptoExchange.Net.Sockets
/// <summary>
/// Create message router with topic filter
/// </summary>
public static MessageRouter CreateWithTopicFilters<T>(IEnumerable<string> typeIdentifiers, IEnumerable<string> topicFilters, Func<SocketConnection, DateTime, string?, T, CallResult> handler)
public static MessageRouter CreateWithTopicFilters<T>(IEnumerable<string> typeIdentifiers, IEnumerable<string> topicFilters, Func<SocketConnection, DateTime, string?, T, CallResult?> handler, bool multipleReaders = false)
{
var routes = new List<MessageRoute>();
foreach (var type in typeIdentifiers)
{
foreach (var filter in topicFilters)
routes.Add(new MessageRoute<T>(type, filter, handler));
routes.Add(new MessageRoute<T>(type, filter, handler, multipleReaders));
}
return new MessageRouter(routes.ToArray());
@ -106,25 +106,25 @@ namespace CryptoExchange.Net.Sockets
/// <summary>
/// Create message router with optional topic filter
/// </summary>
public static MessageRouter CreateWithOptionalTopicFilter<T>(string typeIdentifier, string? topicFilter, Func<SocketConnection, DateTime, string?, T, CallResult> handler)
public static MessageRouter CreateWithOptionalTopicFilter<T>(string typeIdentifier, string? topicFilter, Func<SocketConnection, DateTime, string?, T, CallResult?> handler, bool multipleReaders = false)
{
return new MessageRouter(new MessageRoute<T>(typeIdentifier, topicFilter, handler));
return new MessageRouter(new MessageRoute<T>(typeIdentifier, topicFilter, handler, multipleReaders));
}
/// <summary>
/// Create message router with optional topic filter
/// </summary>
public static MessageRouter CreateWithOptionalTopicFilters<T>(string typeIdentifier, IEnumerable<string>? topicFilters, Func<SocketConnection, DateTime, string?, T, CallResult> handler)
public static MessageRouter CreateWithOptionalTopicFilters<T>(string typeIdentifier, IEnumerable<string>? topicFilters, Func<SocketConnection, DateTime, string?, T, CallResult?> handler, bool multipleReaders = false)
{
var routes = new List<MessageRoute>();
if (topicFilters?.Count() > 0)
{
foreach (var filter in topicFilters)
routes.Add(new MessageRoute<T>(typeIdentifier, filter, handler));
routes.Add(new MessageRoute<T>(typeIdentifier, filter, handler, multipleReaders));
}
else
{
routes.Add(new MessageRoute<T>(typeIdentifier, null, handler));
routes.Add(new MessageRoute<T>(typeIdentifier, null, handler, multipleReaders));
}
return new MessageRouter(routes.ToArray());
@ -133,7 +133,7 @@ namespace CryptoExchange.Net.Sockets
/// <summary>
/// Create message router with optional topic filter
/// </summary>
public static MessageRouter CreateWithOptionalTopicFilters<T>(IEnumerable<string> typeIdentifiers, IEnumerable<string>? topicFilters, Func<SocketConnection, DateTime, string?, T, CallResult> handler)
public static MessageRouter CreateWithOptionalTopicFilters<T>(IEnumerable<string> typeIdentifiers, IEnumerable<string>? topicFilters, Func<SocketConnection, DateTime, string?, T, CallResult?> handler, bool multipleReaders = false)
{
var routes = new List<MessageRoute>();
foreach (var typeIdentifier in typeIdentifiers)
@ -141,11 +141,11 @@ namespace CryptoExchange.Net.Sockets
if (topicFilters?.Count() > 0)
{
foreach (var filter in topicFilters)
routes.Add(new MessageRoute<T>(typeIdentifier, filter, handler));
routes.Add(new MessageRoute<T>(typeIdentifier, filter, handler, multipleReaders));
}
else
{
routes.Add(new MessageRoute<T>(typeIdentifier, null, handler));
routes.Add(new MessageRoute<T>(typeIdentifier, null, handler, multipleReaders));
}
}
@ -179,6 +179,12 @@ namespace CryptoExchange.Net.Sockets
/// Optional topic filter
/// </summary>
public string? TopicFilter { get; set; }
/// <summary>
/// Whether responses to this route might be read by multiple listeners
/// </summary>
public bool MultipleReaders { get; set; } = false;
/// <summary>
/// Deserialization type
/// </summary>
@ -196,7 +202,7 @@ namespace CryptoExchange.Net.Sockets
/// <summary>
/// Message handler
/// </summary>
public abstract CallResult Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object data);
public abstract CallResult? Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object data);
}
/// <summary>
@ -204,46 +210,56 @@ namespace CryptoExchange.Net.Sockets
/// </summary>
public class MessageRoute<TMessage> : MessageRoute
{
private Func<SocketConnection, DateTime, string?, TMessage, CallResult> _handler;
private Func<SocketConnection, DateTime, string?, TMessage, CallResult?> _handler;
/// <inheritdoc />
public override Type DeserializationType => typeof(TMessage);
public override Type DeserializationType { get; } = typeof(TMessage);
/// <summary>
/// ctor
/// </summary>
internal MessageRoute(string typeIdentifier, string? topicFilter, Func<SocketConnection, DateTime, string?, TMessage, CallResult> handler)
internal MessageRoute(string typeIdentifier, string? topicFilter, Func<SocketConnection, DateTime, string?, TMessage, CallResult?> handler, bool multipleReaders = false)
: base(typeIdentifier, topicFilter)
{
_handler = handler;
MultipleReaders = multipleReaders;
}
/// <summary>
/// Create route without topic filter
/// </summary>
public static MessageRoute<TMessage> CreateWithoutTopicFilter(string typeIdentifier, Func<SocketConnection, DateTime, string?, TMessage, CallResult> handler)
public static MessageRoute<TMessage> CreateWithoutTopicFilter(string typeIdentifier, Func<SocketConnection, DateTime, string?, TMessage, CallResult?> handler, bool multipleReaders = false)
{
return new MessageRoute<TMessage>(typeIdentifier, null, handler);
return new MessageRoute<TMessage>(typeIdentifier, null, handler)
{
MultipleReaders = multipleReaders
};
}
/// <summary>
/// Create route with optional topic filter
/// </summary>
public static MessageRoute<TMessage> CreateWithOptionalTopicFilter(string typeIdentifier, string? topicFilter, Func<SocketConnection, DateTime, string?, TMessage, CallResult> handler)
public static MessageRoute<TMessage> CreateWithOptionalTopicFilter(string typeIdentifier, string? topicFilter, Func<SocketConnection, DateTime, string?, TMessage, CallResult?> handler, bool multipleReaders = false)
{
return new MessageRoute<TMessage>(typeIdentifier, topicFilter, handler);
return new MessageRoute<TMessage>(typeIdentifier, topicFilter, handler)
{
MultipleReaders = multipleReaders
};
}
/// <summary>
/// Create route with topic filter
/// </summary>
public static MessageRoute<TMessage> CreateWithTopicFilter(string typeIdentifier, string topicFilter, Func<SocketConnection, DateTime, string?, TMessage, CallResult> handler)
public static MessageRoute<TMessage> CreateWithTopicFilter(string typeIdentifier, string topicFilter, Func<SocketConnection, DateTime, string?, TMessage, CallResult?> handler, bool multipleReaders = false)
{
return new MessageRoute<TMessage>(typeIdentifier, topicFilter, handler);
return new MessageRoute<TMessage>(typeIdentifier, topicFilter, handler)
{
MultipleReaders = multipleReaders
};
}
/// <inheritdoc />
public override CallResult Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object data)
public override CallResult? Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object data)
{
return _handler(connection, receiveTime, originalData, (TMessage)data);
}

View File

@ -89,11 +89,6 @@ namespace CryptoExchange.Net.Sockets
/// </summary>
public bool ExpectsResponse { get; set; } = true;
/// <summary>
/// Whether responses to this query might be read by multiple listeners
/// </summary>
public bool MultipleReaders { get; set; } = false;
/// <summary>
/// Wait event for response
/// </summary>
@ -205,16 +200,18 @@ namespace CryptoExchange.Net.Sockets
/// <inheritdoc />
public override CallResult Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object message, MessageRoute route)
{
if (!PreCheckMessage(connection, message))
return CallResult.SuccessResult;
CurrentResponses++;
if (CurrentResponses == RequiredResponses)
Response = message;
if (Result?.Success != false)
{
// If an error result is already set don't override that
Result = route.Handle(connection, receiveTime, originalData, message);
if (Result == null)
// Null from Handle means it wasn't actually for this query
CurrentResponses -= 1;
}
if (CurrentResponses == RequiredResponses)
{
@ -223,7 +220,7 @@ namespace CryptoExchange.Net.Sockets
OnComplete?.Invoke();
}
return Result;
return Result ?? CallResult.SuccessResult;
}
/// <inheritdoc />