1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-07-23 18:05:43 +00:00

Feature/websocket listener update (#244)

Updated websocket message to listener matching logic to be more flexible
This commit is contained in:
Jan Korf 2025-07-23 10:31:03 +02:00 committed by GitHub
parent 3d942bd503
commit 30475dae67
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 299 additions and 231 deletions

View File

@ -31,21 +31,19 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations.Sockets
internal class TestChannelQuery : Query<SubResponse>
{
public override HashSet<string> ListenerIdentifiers { get; set; }
public TestChannelQuery(string channel, string request, bool authenticated, int weight = 1) : base(request, authenticated, weight)
{
ListenerIdentifiers = new HashSet<string> { request + "-" + channel };
MessageMatcher = MessageMatcher.Create<SubResponse>(request + "-" + channel, HandleMessage);
}
public override CallResult<SubResponse> HandleMessage(SocketConnection connection, DataEvent<SubResponse> message)
public CallResult<SubResponse> HandleMessage(SocketConnection connection, DataEvent<SubResponse> message)
{
if (!message.Data.Status.Equals("confirmed", StringComparison.OrdinalIgnoreCase))
{
return new CallResult<SubResponse>(new ServerError(message.Data.Status));
}
return base.HandleMessage(connection, message);
return message.ToCallResult();
}
}
}

View File

@ -9,11 +9,9 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations.Sockets
{
internal class TestQuery : Query<object>
{
public override HashSet<string> ListenerIdentifiers { get; set; }
public TestQuery(string identifier, object request, bool authenticated, int weight = 1) : base(request, authenticated, weight)
{
ListenerIdentifiers = new HashSet<string> { identifier };
MessageMatcher = MessageMatcher.Create<object>(identifier);
}
}
}

View File

@ -15,21 +15,19 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations.Sockets
{
private readonly Action<DataEvent<T>> _handler;
public override HashSet<string> ListenerIdentifiers { get; set; } = new HashSet<string> { "update-topic" };
public TestSubscription(ILogger logger, Action<DataEvent<T>> handler) : base(logger, false)
{
_handler = handler;
MessageMatcher = MessageMatcher.Create<T>("update-topic", DoHandleMessage);
}
public override CallResult DoHandleMessage(SocketConnection connection, DataEvent<object> message)
public CallResult DoHandleMessage(SocketConnection connection, DataEvent<T> message)
{
var data = (T)message.Data;
_handler.Invoke(message.As(data));
_handler.Invoke(message);
return new CallResult(null);
}
public override Type GetMessageType(IMessageAccessor message) => typeof(T);
public override Query GetSubQuery(SocketConnection connection) => new TestQuery("sub", new object(), false, 1);
public override Query GetUnsubQuery() => new TestQuery("unsub", new object(), false, 1);
}

View File

@ -15,23 +15,19 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations.Sockets
private readonly Action<DataEvent<T>> _handler;
private readonly string _channel;
public override HashSet<string> ListenerIdentifiers { get; set; }
public TestSubscriptionWithResponseCheck(string channel, Action<DataEvent<T>> handler) : base(Mock.Of<ILogger>(), false)
{
ListenerIdentifiers = new HashSet<string>() { channel };
MessageMatcher = MessageMatcher.Create<T>(channel, DoHandleMessage);
_handler = handler;
_channel = channel;
}
public override CallResult DoHandleMessage(SocketConnection connection, DataEvent<object> message)
public CallResult DoHandleMessage(SocketConnection connection, DataEvent<T> message)
{
var data = (T)message.Data;
_handler.Invoke(message.As(data));
_handler.Invoke(message);
return new CallResult(null);
}
public override Type GetMessageType(IMessageAccessor message) => typeof(T);
public override Query GetSubQuery(SocketConnection connection) => new TestChannelQuery(_channel, "subscribe", false, 1);
public override Query GetUnsubQuery() => new TestChannelQuery(_channel, "unsubscribe", false, 1);
}

View File

@ -313,11 +313,10 @@ namespace CryptoExchange.Net.Clients
/// Send a query on a socket connection to the BaseAddress and wait for the response
/// </summary>
/// <typeparam name="THandlerResponse">Expected result type</typeparam>
/// <typeparam name="TServerResponse">The type returned to the caller</typeparam>
/// <param name="query">The query</param>
/// <param name="ct">Cancellation token</param>
/// <returns></returns>
protected virtual Task<CallResult<THandlerResponse>> QueryAsync<TServerResponse, THandlerResponse>(Query<TServerResponse, THandlerResponse> query, CancellationToken ct = default)
protected virtual Task<CallResult<THandlerResponse>> QueryAsync<THandlerResponse>(Query<THandlerResponse> query, CancellationToken ct = default)
{
return QueryAsync(BaseAddress, query, ct);
}
@ -326,12 +325,11 @@ namespace CryptoExchange.Net.Clients
/// Send a query on a socket connection and wait for the response
/// </summary>
/// <typeparam name="THandlerResponse">Expected result type</typeparam>
/// <typeparam name="TServerResponse">The type returned to the caller</typeparam>
/// <param name="url">The url for the request</param>
/// <param name="query">The query</param>
/// <param name="ct">Cancellation token</param>
/// <returns></returns>
protected virtual async Task<CallResult<THandlerResponse>> QueryAsync<TServerResponse, THandlerResponse>(string url, Query<TServerResponse, THandlerResponse> query, CancellationToken ct = default)
protected virtual async Task<CallResult<THandlerResponse>> QueryAsync<THandlerResponse>(string url, Query<THandlerResponse> query, CancellationToken ct = default)
{
if (_disposing)
return new CallResult<THandlerResponse>(new InvalidOperationError("Client disposed, can't query"));
@ -816,7 +814,7 @@ namespace CryptoExchange.Net.Clients
sb.AppendLine($"\t\t\tId: {subState.Id}");
sb.AppendLine($"\t\t\tConfirmed: {subState.Confirmed}");
sb.AppendLine($"\t\t\tInvocations: {subState.Invocations}");
sb.AppendLine($"\t\t\tIdentifiers: [{string.Join(",", subState.Identifiers)}]");
sb.AppendLine($"\t\t\tIdentifiers: [{subState.ListenMatcher.ToString()}]");
});
}
});

View File

@ -17,22 +17,13 @@ namespace CryptoExchange.Net.Interfaces
/// </summary>
public int Id { get; }
/// <summary>
/// The identifiers for this processor
/// The matcher for this listener
/// </summary>
public HashSet<string> ListenerIdentifiers { get; }
public MessageMatcher MessageMatcher { get; }
/// <summary>
/// Handle a message
/// </summary>
/// <param name="connection"></param>
/// <param name="message"></param>
/// <returns></returns>
Task<CallResult> Handle(SocketConnection connection, DataEvent<object> message);
/// <summary>
/// Get the type the message should be deserialized to
/// </summary>
/// <param name="messageAccessor"></param>
/// <returns></returns>
Type? GetMessageType(IMessageAccessor messageAccessor);
Task<CallResult> Handle(SocketConnection connection, DataEvent<object> message, MessageHandlerLink matchedHandler);
/// <summary>
/// Deserialize a message into object of type
/// </summary>

View File

@ -18,7 +18,7 @@ namespace CryptoExchange.Net.Logging.Extensions
private static readonly Action<ILogger, int, string, Exception?> _failedToParse;
private static readonly Action<ILogger, int, string, Exception?> _failedToEvaluateMessage;
private static readonly Action<ILogger, int, Exception?> _errorProcessingMessage;
private static readonly Action<ILogger, int, int, string, Exception?> _processorMatched;
private static readonly Action<ILogger, int, string, string, Exception?> _processorMatched;
private static readonly Action<ILogger, int, int, Exception?> _receivedMessageNotRecognized;
private static readonly Action<ILogger, int, string?, Exception?> _failedToDeserializeMessage;
private static readonly Action<ILogger, int, string, Exception?> _userMessageProcessingFailed;
@ -92,11 +92,6 @@ namespace CryptoExchange.Net.Logging.Extensions
new EventId(2009, "ErrorProcessingMessage"),
"[Sckt {SocketId}] error processing message");
_processorMatched = LoggerMessage.Define<int, int, string>(
LogLevel.Trace,
new EventId(2010, "ProcessorMatched"),
"[Sckt {SocketId}] {Count} processor(s) matched to message with listener identifier {ListenerId}");
_receivedMessageNotRecognized = LoggerMessage.Define<int, int>(
LogLevel.Warning,
new EventId(2011, "ReceivedMessageNotRecognized"),
@ -190,7 +185,7 @@ namespace CryptoExchange.Net.Logging.Extensions
_receivedMessageNotMatchedToAnyListener = LoggerMessage.Define<int, string, string>(
LogLevel.Warning,
new EventId(2029, "ReceivedMessageNotMatchedToAnyListener"),
"[Sckt {SocketId}] received message not matched to any listener. ListenId: {ListenId}, current listeners: {ListenIds}");
"[Sckt {SocketId}] received message not matched to any listener. ListenId: {ListenId}, current listeners: [{ListenIds}]");
_failedToParse = LoggerMessage.Define<int, string>(
LogLevel.Warning,
@ -201,6 +196,12 @@ namespace CryptoExchange.Net.Logging.Extensions
LogLevel.Trace,
new EventId(2031, "SendingByteData"),
"[Sckt {SocketId}] [Req {RequestId}] sending byte message of length: {Length}");
_processorMatched = LoggerMessage.Define<int, string, string>(
LogLevel.Trace,
new EventId(2032, "ProcessorMatched"),
"[Sckt {SocketId}] listener '{ListenId}' matched to message with listener identifier {ListenerId}");
}
public static void ActivityPaused(this ILogger logger, int socketId, bool paused)
@ -256,9 +257,9 @@ namespace CryptoExchange.Net.Logging.Extensions
{
_errorProcessingMessage(logger, socketId, e);
}
public static void ProcessorMatched(this ILogger logger, int socketId, int count, string listenerId)
public static void ProcessorMatched(this ILogger logger, int socketId, string listener, string listenerId)
{
_processorMatched(logger, socketId, count, listenerId, null);
_processorMatched(logger, socketId, listener, listenerId, null);
}
public static void ReceivedMessageNotRecognized(this ILogger logger, int socketId, int id)
{

View File

@ -0,0 +1,180 @@
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Sockets;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Sockets
{
/// <summary>
/// Message link type
/// </summary>
public enum MessageLinkType
{
/// <summary>
/// Match when the listen id matches fully to the value
/// </summary>
Full,
/// <summary>
/// Match when the listen id starts with the value
/// </summary>
StartsWith
}
/// <summary>
/// Matches a message listen id to a specific listener
/// </summary>
public class MessageMatcher
{
/// <summary>
/// Linkers in this matcher
/// </summary>
public MessageHandlerLink[] HandlerLinks { get; }
/// <summary>
/// ctor
/// </summary>
private MessageMatcher(params MessageHandlerLink[] links)
{
HandlerLinks = links;
}
/// <summary>
/// Create message matcher
/// </summary>
public static MessageMatcher Create<T>(string value)
{
return new MessageMatcher(new MessageHandlerLink<T>(MessageLinkType.Full, value, (con, msg) => CallResult.SuccessResult));
}
/// <summary>
/// Create message matcher
/// </summary>
public static MessageMatcher Create<T>(string value, Func<SocketConnection, DataEvent<T>, CallResult> handler)
{
return new MessageMatcher(new MessageHandlerLink<T>(MessageLinkType.Full, value, handler));
}
/// <summary>
/// Create message matcher
/// </summary>
public static MessageMatcher Create<T>(IEnumerable<string> values, Func<SocketConnection, DataEvent<T>, CallResult> handler)
{
return new MessageMatcher(values.Select(x => new MessageHandlerLink<T>(MessageLinkType.Full, x, handler)).ToArray());
}
/// <summary>
/// Create message matcher
/// </summary>
public static MessageMatcher Create<T>(MessageLinkType type, string value, Func<SocketConnection, DataEvent<T>, CallResult> handler)
{
return new MessageMatcher(new MessageHandlerLink<T>(type, value, handler));
}
/// <summary>
/// Create message matcher
/// </summary>
public static MessageMatcher Create(params MessageHandlerLink[] linkers)
{
return new MessageMatcher(linkers);
}
/// <summary>
/// Whether this matcher contains a specific link
/// </summary>
public bool ContainsCheck(MessageHandlerLink link) => HandlerLinks.Any(x => x.Type == link.Type && x.Value == link.Value);
/// <summary>
/// Get any handler links matching with the listen id
/// </summary>
public List<MessageHandlerLink> GetHandlerLinks(string listenId) => HandlerLinks.Where(x => x.Check(listenId)).ToList();
/// <inheritdoc />
public override string ToString() => string.Join(",", HandlerLinks.Select(x => x.ToString()));
}
/// <summary>
/// Message handler link
/// </summary>
public abstract class MessageHandlerLink
{
/// <summary>
/// Type of check
/// </summary>
public MessageLinkType Type { get; }
/// <summary>
/// String value of the check
/// </summary>
public string Value { get; }
/// <summary>
/// Deserialization type
/// </summary>
public abstract Type GetDeserializationType(IMessageAccessor accessor);
/// <summary>
/// ctor
/// </summary>
public MessageHandlerLink(MessageLinkType type, string value)
{
Type = type;
Value = value;
}
/// <summary>
/// Whether this listen id matches this link
/// </summary>
public bool Check(string listenId)
{
if (Type == MessageLinkType.Full)
return Value.Equals(listenId, StringComparison.Ordinal);
return listenId.StartsWith(Value, StringComparison.Ordinal);
}
/// <summary>
/// Message handler
/// </summary>
public abstract CallResult Handle(SocketConnection connection, DataEvent<object> message);
/// <inheritdoc />
public override string ToString() => $"{Type} match for \"{Value}\"";
}
/// <summary>
/// Message handler link
/// </summary>
public class MessageHandlerLink<TServer>: MessageHandlerLink
{
private Func<SocketConnection, DataEvent<TServer>, CallResult> _handler;
/// <inheritdoc />
public override Type GetDeserializationType(IMessageAccessor accessor) => typeof(TServer);
/// <summary>
/// ctor
/// </summary>
public MessageHandlerLink(string value, Func<SocketConnection, DataEvent<TServer>, CallResult> handler)
: this(MessageLinkType.Full, value, handler)
{
}
/// <summary>
/// ctor
/// </summary>
public MessageHandlerLink(MessageLinkType type, string value, Func<SocketConnection, DataEvent<TServer>, CallResult> handler)
: base(type, value)
{
_handler = handler;
}
/// <inheritdoc />
public override CallResult Handle(SocketConnection connection, DataEvent<object> message)
{
return _handler(connection, message.As((TServer)message.Data));
}
}
}

View File

@ -3,6 +3,7 @@ using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Sockets;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
@ -60,9 +61,9 @@ namespace CryptoExchange.Net.Sockets
public AsyncResetEvent? ContinueAwaiter { get; set; }
/// <summary>
/// Strings to match this query to a received message
/// Matcher for this query
/// </summary>
public abstract HashSet<string> ListenerIdentifiers { get; set; }
public MessageMatcher MessageMatcher { get; set; } = null!;
/// <summary>
/// The query request object
@ -84,13 +85,6 @@ namespace CryptoExchange.Net.Sockets
/// </summary>
public bool ExpectsResponse { get; set; } = true;
/// <summary>
/// Get the type the message should be deserialized to
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
public abstract Type? GetMessageType(IMessageAccessor message);
/// <summary>
/// Wait event for response
/// </summary>
@ -161,23 +155,16 @@ namespace CryptoExchange.Net.Sockets
/// <summary>
/// Handle a response message
/// </summary>
/// <param name="message"></param>
/// <param name="connection"></param>
/// <returns></returns>
public abstract Task<CallResult> Handle(SocketConnection connection, DataEvent<object> message);
public abstract Task<CallResult> Handle(SocketConnection connection, DataEvent<object> message, MessageHandlerLink check);
}
/// <summary>
/// Query
/// </summary>
/// <typeparam name="TServerResponse">The type returned from the server</typeparam>
/// <typeparam name="THandlerResponse">The type to be returned to the caller</typeparam>
public abstract class Query<TServerResponse, THandlerResponse> : Query
public abstract class Query<THandlerResponse> : Query
{
/// <inheritdoc />
public override Type? GetMessageType(IMessageAccessor message) => typeof(TServerResponse);
/// <summary>
/// The typed call result
/// </summary>
@ -194,10 +181,9 @@ namespace CryptoExchange.Net.Sockets
}
/// <inheritdoc />
public override async Task<CallResult> Handle(SocketConnection connection, DataEvent<object> message)
public override async Task<CallResult> Handle(SocketConnection connection, DataEvent<object> message, MessageHandlerLink check)
{
var typedMessage = message.As((TServerResponse)message.Data);
if (!ValidateMessage(typedMessage))
if (!PreCheckMessage(message))
return CallResult.SuccessResult;
CurrentResponses++;
@ -209,7 +195,7 @@ namespace CryptoExchange.Net.Sockets
if (Result?.Success != false)
// If an error result is already set don't override that
Result = HandleMessage(connection, typedMessage);
Result = check.Handle(connection, message);
if (CurrentResponses == RequiredResponses)
{
@ -226,15 +212,7 @@ namespace CryptoExchange.Net.Sockets
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
public virtual bool ValidateMessage(DataEvent<TServerResponse> message) => true;
/// <summary>
/// Handle the query response
/// </summary>
/// <param name="connection"></param>
/// <param name="message"></param>
/// <returns></returns>
public abstract CallResult<THandlerResponse> HandleMessage(SocketConnection connection, DataEvent<TServerResponse> message);
public virtual bool PreCheckMessage(DataEvent<object> message) => true;
/// <inheritdoc />
public override void Timeout()
@ -257,29 +235,4 @@ namespace CryptoExchange.Net.Sockets
_event.Set();
}
}
/// <summary>
/// Query
/// </summary>
/// <typeparam name="TResponse">Response object type</typeparam>
public abstract class Query<TResponse> : Query<TResponse, TResponse>
{
/// <summary>
/// ctor
/// </summary>
/// <param name="request"></param>
/// <param name="authenticated"></param>
/// <param name="weight"></param>
protected Query(object request, bool authenticated, int weight = 1) : base(request, authenticated, weight)
{
}
/// <summary>
/// Handle the query response
/// </summary>
/// <param name="connection"></param>
/// <param name="message"></param>
/// <returns></returns>
public override CallResult<TResponse> HandleMessage(SocketConnection connection, DataEvent<TResponse> message) => message.ToCallResult();
}
}

View File

@ -227,6 +227,11 @@ namespace CryptoExchange.Net.Sockets
/// </summary>
private readonly IWebsocket _socket;
/// <summary>
/// Cache for deserialization, only caches for a single message
/// </summary>
private readonly Dictionary<Type, object> _deserializationCache = new Dictionary<Type, object>();
/// <summary>
/// New socket connection
/// </summary>
@ -444,9 +449,6 @@ namespace CryptoExchange.Net.Sockets
/// <summary>
/// Handle a message
/// </summary>
/// <param name="data"></param>
/// <param name="type"></param>
/// <returns></returns>
protected virtual async Task HandleStreamMessage(WebSocketMessageType type, ReadOnlyMemory<byte> data)
{
var sw = Stopwatch.StartNew();
@ -483,7 +485,7 @@ namespace CryptoExchange.Net.Sockets
var listenId = ApiClient.GetListenerIdentifier(accessor);
if (listenId == null)
{
originalData = outputOriginalData ? accessor.GetOriginalString() : "[OutputOriginalData is false]";
originalData ??= "[OutputOriginalData is false]";
if (!ApiClient.UnhandledMessageExpected)
_logger.FailedToEvaluateMessage(SocketId, originalData);
@ -491,18 +493,78 @@ namespace CryptoExchange.Net.Sockets
return;
}
// 4. Get the listeners interested in this message
List<IMessageProcessor> processors;
lock (_listenersLock)
processors = _listeners.Where(s => s.ListenerIdentifiers.Contains(listenId)).ToList();
bool processed = false;
var totalUserTime = 0;
if (processors.Count == 0)
List<IMessageProcessor> localListeners;
lock(_listenersLock)
localListeners = _listeners.ToList();
foreach(var processor in localListeners)
{
foreach(var listener in processor.MessageMatcher.GetHandlerLinks(listenId))
{
processed = true;
_logger.ProcessorMatched(SocketId, listener.ToString(), listenId);
// 4. Determine the type to deserialize to for this processor
var messageType = listener.GetDeserializationType(accessor);
if (messageType == null)
{
_logger.ReceivedMessageNotRecognized(SocketId, processor.Id);
continue;
}
if (processor is Subscription subscriptionProcessor && !subscriptionProcessor.Confirmed)
{
// If this message is for this listener then it is automatically confirmed, even if the subscription is not (yet) confirmed
subscriptionProcessor.Confirmed = true;
// This doesn't trigger a waiting subscribe query, should probably also somehow set the wait event for that
}
// 5. Deserialize the message
_deserializationCache.TryGetValue(messageType, out var deserialized);
if (deserialized == null)
{
var desResult = processor.Deserialize(accessor, messageType);
if (!desResult)
{
_logger.FailedToDeserializeMessage(SocketId, desResult.Error?.ToString(), desResult.Error?.Exception);
continue;
}
deserialized = desResult.Data;
_deserializationCache.Add(messageType, deserialized);
}
// 6. Pass the message to the handler
try
{
var innerSw = Stopwatch.StartNew();
await processor.Handle(this, new DataEvent<object>(deserialized, null, null, originalData, receiveTime, null), listener).ConfigureAwait(false);
if (processor is Query query && query.RequiredResponses != 1)
_logger.LogDebug($"[Sckt {SocketId}] [Req {query.Id}] responses: {query.CurrentResponses}/{query.RequiredResponses}");
totalUserTime += (int)innerSw.ElapsedMilliseconds;
}
catch (Exception ex)
{
_logger.UserMessageProcessingFailed(SocketId, ex.Message, ex);
if (processor is Subscription subscription)
subscription.InvokeExceptionHandler(ex);
}
}
}
if (!processed)
{
if (!ApiClient.UnhandledMessageExpected)
{
List<string> listenerIds;
lock (_listenersLock)
listenerIds = _listeners.SelectMany(l => l.ListenerIdentifiers).ToList();
listenerIds = _listeners.Select(l => l.MessageMatcher.ToString()).ToList();
_logger.ReceivedMessageNotMatchedToAnyListener(SocketId, listenId, string.Join(",", listenerIds));
UnhandledMessage?.Invoke(accessor);
}
@ -510,69 +572,11 @@ namespace CryptoExchange.Net.Sockets
return;
}
_logger.ProcessorMatched(SocketId, processors.Count, listenId);
var totalUserTime = 0;
Dictionary<Type, object>? desCache = null;
if (processors.Count > 1)
{
// Only instantiate a cache if there are multiple processors
desCache = new Dictionary<Type, object>();
}
foreach (var processor in processors)
{
// 5. Determine the type to deserialize to for this processor
var messageType = processor.GetMessageType(accessor);
if (messageType == null)
{
_logger.ReceivedMessageNotRecognized(SocketId, processor.Id);
continue;
}
if (processor is Subscription subscriptionProcessor && !subscriptionProcessor.Confirmed)
{
// If this message is for this listener then it is automatically confirmed, even if the subscription is not (yet) confirmed
subscriptionProcessor.Confirmed = true;
// This doesn't trigger a waiting subscribe query, should probably also somehow set the wait event for that
}
// 6. Deserialize the message
object? deserialized = null;
desCache?.TryGetValue(messageType, out deserialized);
if (deserialized == null)
{
var desResult = processor.Deserialize(accessor, messageType);
if (!desResult)
{
_logger.FailedToDeserializeMessage(SocketId, desResult.Error?.ToString(), desResult.Error?.Exception);
continue;
}
deserialized = desResult.Data;
desCache?.Add(messageType, deserialized);
}
// 7. Hand of the message to the subscription
try
{
var innerSw = Stopwatch.StartNew();
await processor.Handle(this, new DataEvent<object>(deserialized, null, null, originalData, receiveTime, null)).ConfigureAwait(false);
if (processor is Query query && query.RequiredResponses != 1)
_logger.LogDebug($"[Sckt {SocketId}] [Req {query.Id}] responses: {query.CurrentResponses}/{query.RequiredResponses}");
totalUserTime += (int)innerSw.ElapsedMilliseconds;
}
catch (Exception ex)
{
_logger.UserMessageProcessingFailed(SocketId, ex.Message, ex);
if (processor is Subscription subscription)
subscription.InvokeExceptionHandler(ex);
}
}
_logger.MessageProcessed(SocketId, sw.ElapsedMilliseconds, sw.ElapsedMilliseconds - totalUserTime);
}
finally
{
_deserializationCache.Clear();
accessor.Clear();
}
}
@ -652,7 +656,7 @@ namespace CryptoExchange.Net.Sockets
bool anyDuplicateSubscription;
lock (_listenersLock)
anyDuplicateSubscription = _listeners.OfType<Subscription>().Any(x => x != subscription && x.ListenerIdentifiers.All(l => subscription.ListenerIdentifiers.Contains(l)));
anyDuplicateSubscription = _listeners.OfType<Subscription>().Any(x => x != subscription && x.MessageMatcher.HandlerLinks.All(l => subscription.MessageMatcher.ContainsCheck(l)));
bool shouldCloseConnection;
lock (_listenersLock)
@ -768,12 +772,11 @@ namespace CryptoExchange.Net.Sockets
/// Send a query request and wait for an answer
/// </summary>
/// <typeparam name="THandlerResponse">Expected result type</typeparam>
/// <typeparam name="TServerResponse">The type returned to the caller</typeparam>
/// <param name="query">Query to send</param>
/// <param name="continueEvent">Wait event for when the socket message handler can continue</param>
/// <param name="ct">Cancellation token</param>
/// <returns></returns>
public virtual async Task<CallResult<THandlerResponse>> SendAndWaitQueryAsync<TServerResponse, THandlerResponse>(Query<TServerResponse, THandlerResponse> query, AsyncResetEvent? continueEvent = null, CancellationToken ct = default)
public virtual async Task<CallResult<THandlerResponse>> SendAndWaitQueryAsync<THandlerResponse>(Query<THandlerResponse> query, AsyncResetEvent? continueEvent = null, CancellationToken ct = default)
{
await SendAndWaitIntAsync(query, continueEvent, ct).ConfigureAwait(false);
return query.TypedResult ?? new CallResult<THandlerResponse>(new ServerError("Timeout"));

View File

@ -4,6 +4,7 @@ using CryptoExchange.Net.Objects.Sockets;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
@ -60,9 +61,9 @@ namespace CryptoExchange.Net.Sockets
public bool Authenticated { get; }
/// <summary>
/// Strings to match this subscription to a received message
/// Matcher for this subscription
/// </summary>
public abstract HashSet<string> ListenerIdentifiers { get; set; }
public MessageMatcher MessageMatcher { get; set; } = null!;
/// <summary>
/// Cancellation token registration
@ -74,13 +75,6 @@ namespace CryptoExchange.Net.Sockets
/// </summary>
public event Action<Exception>? Exception;
/// <summary>
/// Get the deserialization type for this message
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
public abstract Type? GetMessageType(IMessageAccessor message);
/// <summary>
/// Subscription topic
/// </summary>
@ -89,9 +83,6 @@ namespace CryptoExchange.Net.Sockets
/// <summary>
/// ctor
/// </summary>
/// <param name="logger"></param>
/// <param name="authenticated"></param>
/// <param name="userSubscription"></param>
public Subscription(ILogger logger, bool authenticated, bool userSubscription = true)
{
_logger = logger;
@ -130,14 +121,11 @@ namespace CryptoExchange.Net.Sockets
/// <summary>
/// Handle an update message
/// </summary>
/// <param name="connection"></param>
/// <param name="message"></param>
/// <returns></returns>
public Task<CallResult> Handle(SocketConnection connection, DataEvent<object> message)
public Task<CallResult> Handle(SocketConnection connection, DataEvent<object> message, MessageHandlerLink matcher)
{
ConnectionInvocations++;
TotalInvocations++;
return Task.FromResult(DoHandleMessage(connection, message));
return Task.FromResult(matcher.Handle(connection, message));
}
/// <summary>
@ -154,14 +142,6 @@ namespace CryptoExchange.Net.Sockets
/// </summary>
public virtual void DoHandleReset() { }
/// <summary>
/// Handle the update message
/// </summary>
/// <param name="connection"></param>
/// <param name="message"></param>
/// <returns></returns>
public abstract CallResult DoHandleMessage(SocketConnection connection, DataEvent<object> message);
/// <summary>
/// Invoke the exception event
/// </summary>
@ -177,12 +157,12 @@ namespace CryptoExchange.Net.Sockets
/// <param name="Id">The id of the subscription</param>
/// <param name="Confirmed">True when the subscription query is handled (either accepted or rejected)</param>
/// <param name="Invocations">Number of times this subscription got a message</param>
/// <param name="Identifiers">Identifiers the subscription is listening to</param>
/// <param name="ListenMatcher">Matcher for this subscription</param>
public record SubscriptionState(
int Id,
bool Confirmed,
int Invocations,
HashSet<string> Identifiers
MessageMatcher ListenMatcher
);
/// <summary>
@ -191,7 +171,7 @@ namespace CryptoExchange.Net.Sockets
/// <returns></returns>
public SubscriptionState GetState()
{
return new SubscriptionState(Id, Confirmed, TotalInvocations, ListenerIdentifiers);
return new SubscriptionState(Id, Confirmed, TotalInvocations, MessageMatcher);
}
}

View File

@ -27,32 +27,4 @@ namespace CryptoExchange.Net.Sockets
/// <inheritdoc />
public override Query? GetUnsubQuery() => null;
}
/// <inheritdoc />
public abstract class SystemSubscription<T> : SystemSubscription
{
/// <inheritdoc />
public override Type GetMessageType(IMessageAccessor message) => typeof(T);
/// <inheritdoc />
public override CallResult DoHandleMessage(SocketConnection connection, DataEvent<object> message)
=> HandleMessage(connection, message.As((T)message.Data));
/// <summary>
/// ctor
/// </summary>
/// <param name="logger"></param>
/// <param name="authenticated"></param>
protected SystemSubscription(ILogger logger, bool authenticated) : base(logger, authenticated)
{
}
/// <summary>
/// Handle an update message
/// </summary>
/// <param name="connection"></param>
/// <param name="message"></param>
/// <returns></returns>
public abstract CallResult HandleMessage(SocketConnection connection, DataEvent<T> message);
}
}