diff --git a/CryptoExchange.Net.UnitTests/TestImplementations/Sockets/TestChannelQuery.cs b/CryptoExchange.Net.UnitTests/TestImplementations/Sockets/TestChannelQuery.cs index 0af9b97..212c055 100644 --- a/CryptoExchange.Net.UnitTests/TestImplementations/Sockets/TestChannelQuery.cs +++ b/CryptoExchange.Net.UnitTests/TestImplementations/Sockets/TestChannelQuery.cs @@ -31,21 +31,19 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations.Sockets internal class TestChannelQuery : Query { - public override HashSet ListenerIdentifiers { get; set; } - public TestChannelQuery(string channel, string request, bool authenticated, int weight = 1) : base(request, authenticated, weight) { - ListenerIdentifiers = new HashSet { request + "-" + channel }; + MessageMatcher = MessageMatcher.Create(request + "-" + channel, HandleMessage); } - public override CallResult HandleMessage(SocketConnection connection, DataEvent message) + public CallResult HandleMessage(SocketConnection connection, DataEvent message) { if (!message.Data.Status.Equals("confirmed", StringComparison.OrdinalIgnoreCase)) { return new CallResult(new ServerError(message.Data.Status)); } - return base.HandleMessage(connection, message); + return message.ToCallResult(); } } } diff --git a/CryptoExchange.Net.UnitTests/TestImplementations/Sockets/TestQuery.cs b/CryptoExchange.Net.UnitTests/TestImplementations/Sockets/TestQuery.cs index 4eafbd8..f2302b6 100644 --- a/CryptoExchange.Net.UnitTests/TestImplementations/Sockets/TestQuery.cs +++ b/CryptoExchange.Net.UnitTests/TestImplementations/Sockets/TestQuery.cs @@ -9,11 +9,9 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations.Sockets { internal class TestQuery : Query { - public override HashSet ListenerIdentifiers { get; set; } - public TestQuery(string identifier, object request, bool authenticated, int weight = 1) : base(request, authenticated, weight) { - ListenerIdentifiers = new HashSet { identifier }; + MessageMatcher = MessageMatcher.Create(identifier); } } } diff --git a/CryptoExchange.Net.UnitTests/TestImplementations/Sockets/TestSubscription.cs b/CryptoExchange.Net.UnitTests/TestImplementations/Sockets/TestSubscription.cs index f8f1afd..dd6007b 100644 --- a/CryptoExchange.Net.UnitTests/TestImplementations/Sockets/TestSubscription.cs +++ b/CryptoExchange.Net.UnitTests/TestImplementations/Sockets/TestSubscription.cs @@ -15,21 +15,19 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations.Sockets { private readonly Action> _handler; - public override HashSet ListenerIdentifiers { get; set; } = new HashSet { "update-topic" }; - public TestSubscription(ILogger logger, Action> handler) : base(logger, false) { _handler = handler; + + MessageMatcher = MessageMatcher.Create("update-topic", DoHandleMessage); } - public override CallResult DoHandleMessage(SocketConnection connection, DataEvent message) + public CallResult DoHandleMessage(SocketConnection connection, DataEvent message) { - var data = (T)message.Data; - _handler.Invoke(message.As(data)); + _handler.Invoke(message); return new CallResult(null); } - public override Type GetMessageType(IMessageAccessor message) => typeof(T); public override Query GetSubQuery(SocketConnection connection) => new TestQuery("sub", new object(), false, 1); public override Query GetUnsubQuery() => new TestQuery("unsub", new object(), false, 1); } diff --git a/CryptoExchange.Net.UnitTests/TestImplementations/Sockets/TestSubscriptionWithResponseCheck.cs b/CryptoExchange.Net.UnitTests/TestImplementations/Sockets/TestSubscriptionWithResponseCheck.cs index 4a4372a..c7a975c 100644 --- a/CryptoExchange.Net.UnitTests/TestImplementations/Sockets/TestSubscriptionWithResponseCheck.cs +++ b/CryptoExchange.Net.UnitTests/TestImplementations/Sockets/TestSubscriptionWithResponseCheck.cs @@ -15,23 +15,19 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations.Sockets private readonly Action> _handler; private readonly string _channel; - public override HashSet ListenerIdentifiers { get; set; } - public TestSubscriptionWithResponseCheck(string channel, Action> handler) : base(Mock.Of(), false) { - ListenerIdentifiers = new HashSet() { channel }; + MessageMatcher = MessageMatcher.Create(channel, DoHandleMessage); _handler = handler; _channel = channel; } - public override CallResult DoHandleMessage(SocketConnection connection, DataEvent message) + public CallResult DoHandleMessage(SocketConnection connection, DataEvent message) { - var data = (T)message.Data; - _handler.Invoke(message.As(data)); + _handler.Invoke(message); return new CallResult(null); } - public override Type GetMessageType(IMessageAccessor message) => typeof(T); public override Query GetSubQuery(SocketConnection connection) => new TestChannelQuery(_channel, "subscribe", false, 1); public override Query GetUnsubQuery() => new TestChannelQuery(_channel, "unsubscribe", false, 1); } diff --git a/CryptoExchange.Net/Clients/SocketApiClient.cs b/CryptoExchange.Net/Clients/SocketApiClient.cs index 1ea2aa5..fcc0565 100644 --- a/CryptoExchange.Net/Clients/SocketApiClient.cs +++ b/CryptoExchange.Net/Clients/SocketApiClient.cs @@ -313,11 +313,10 @@ namespace CryptoExchange.Net.Clients /// Send a query on a socket connection to the BaseAddress and wait for the response /// /// Expected result type - /// The type returned to the caller /// The query /// Cancellation token /// - protected virtual Task> QueryAsync(Query query, CancellationToken ct = default) + protected virtual Task> QueryAsync(Query query, CancellationToken ct = default) { return QueryAsync(BaseAddress, query, ct); } @@ -326,12 +325,11 @@ namespace CryptoExchange.Net.Clients /// Send a query on a socket connection and wait for the response /// /// Expected result type - /// The type returned to the caller /// The url for the request /// The query /// Cancellation token /// - protected virtual async Task> QueryAsync(string url, Query query, CancellationToken ct = default) + protected virtual async Task> QueryAsync(string url, Query query, CancellationToken ct = default) { if (_disposing) return new CallResult(new InvalidOperationError("Client disposed, can't query")); @@ -816,7 +814,7 @@ namespace CryptoExchange.Net.Clients sb.AppendLine($"\t\t\tId: {subState.Id}"); sb.AppendLine($"\t\t\tConfirmed: {subState.Confirmed}"); sb.AppendLine($"\t\t\tInvocations: {subState.Invocations}"); - sb.AppendLine($"\t\t\tIdentifiers: [{string.Join(",", subState.Identifiers)}]"); + sb.AppendLine($"\t\t\tIdentifiers: [{subState.ListenMatcher.ToString()}]"); }); } }); diff --git a/CryptoExchange.Net/Interfaces/IMessageProcessor.cs b/CryptoExchange.Net/Interfaces/IMessageProcessor.cs index 749e738..749a04f 100644 --- a/CryptoExchange.Net/Interfaces/IMessageProcessor.cs +++ b/CryptoExchange.Net/Interfaces/IMessageProcessor.cs @@ -17,22 +17,13 @@ namespace CryptoExchange.Net.Interfaces /// public int Id { get; } /// - /// The identifiers for this processor + /// The matcher for this listener /// - public HashSet ListenerIdentifiers { get; } + public MessageMatcher MessageMatcher { get; } /// /// Handle a message /// - /// - /// - /// - Task Handle(SocketConnection connection, DataEvent message); - /// - /// Get the type the message should be deserialized to - /// - /// - /// - Type? GetMessageType(IMessageAccessor messageAccessor); + Task Handle(SocketConnection connection, DataEvent message, MessageHandlerLink matchedHandler); /// /// Deserialize a message into object of type /// diff --git a/CryptoExchange.Net/Logging/Extensions/SocketConnectionLoggingExtension.cs b/CryptoExchange.Net/Logging/Extensions/SocketConnectionLoggingExtension.cs index 531d180..af6a61b 100644 --- a/CryptoExchange.Net/Logging/Extensions/SocketConnectionLoggingExtension.cs +++ b/CryptoExchange.Net/Logging/Extensions/SocketConnectionLoggingExtension.cs @@ -18,7 +18,7 @@ namespace CryptoExchange.Net.Logging.Extensions private static readonly Action _failedToParse; private static readonly Action _failedToEvaluateMessage; private static readonly Action _errorProcessingMessage; - private static readonly Action _processorMatched; + private static readonly Action _processorMatched; private static readonly Action _receivedMessageNotRecognized; private static readonly Action _failedToDeserializeMessage; private static readonly Action _userMessageProcessingFailed; @@ -92,11 +92,6 @@ namespace CryptoExchange.Net.Logging.Extensions new EventId(2009, "ErrorProcessingMessage"), "[Sckt {SocketId}] error processing message"); - _processorMatched = LoggerMessage.Define( - LogLevel.Trace, - new EventId(2010, "ProcessorMatched"), - "[Sckt {SocketId}] {Count} processor(s) matched to message with listener identifier {ListenerId}"); - _receivedMessageNotRecognized = LoggerMessage.Define( LogLevel.Warning, new EventId(2011, "ReceivedMessageNotRecognized"), @@ -190,7 +185,7 @@ namespace CryptoExchange.Net.Logging.Extensions _receivedMessageNotMatchedToAnyListener = LoggerMessage.Define( LogLevel.Warning, new EventId(2029, "ReceivedMessageNotMatchedToAnyListener"), - "[Sckt {SocketId}] received message not matched to any listener. ListenId: {ListenId}, current listeners: {ListenIds}"); + "[Sckt {SocketId}] received message not matched to any listener. ListenId: {ListenId}, current listeners: [{ListenIds}]"); _failedToParse = LoggerMessage.Define( LogLevel.Warning, @@ -201,6 +196,12 @@ namespace CryptoExchange.Net.Logging.Extensions LogLevel.Trace, new EventId(2031, "SendingByteData"), "[Sckt {SocketId}] [Req {RequestId}] sending byte message of length: {Length}"); + + _processorMatched = LoggerMessage.Define( + LogLevel.Trace, + new EventId(2032, "ProcessorMatched"), + "[Sckt {SocketId}] listener '{ListenId}' matched to message with listener identifier {ListenerId}"); + } public static void ActivityPaused(this ILogger logger, int socketId, bool paused) @@ -256,9 +257,9 @@ namespace CryptoExchange.Net.Logging.Extensions { _errorProcessingMessage(logger, socketId, e); } - public static void ProcessorMatched(this ILogger logger, int socketId, int count, string listenerId) + public static void ProcessorMatched(this ILogger logger, int socketId, string listener, string listenerId) { - _processorMatched(logger, socketId, count, listenerId, null); + _processorMatched(logger, socketId, listener, listenerId, null); } public static void ReceivedMessageNotRecognized(this ILogger logger, int socketId, int id) { diff --git a/CryptoExchange.Net/Sockets/MessageMatcher.cs b/CryptoExchange.Net/Sockets/MessageMatcher.cs new file mode 100644 index 0000000..d73a4e7 --- /dev/null +++ b/CryptoExchange.Net/Sockets/MessageMatcher.cs @@ -0,0 +1,180 @@ +using CryptoExchange.Net.Interfaces; +using CryptoExchange.Net.Objects; +using CryptoExchange.Net.Objects.Sockets; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace CryptoExchange.Net.Sockets +{ + /// + /// Message link type + /// + public enum MessageLinkType + { + /// + /// Match when the listen id matches fully to the value + /// + Full, + /// + /// Match when the listen id starts with the value + /// + StartsWith + } + + /// + /// Matches a message listen id to a specific listener + /// + public class MessageMatcher + { + /// + /// Linkers in this matcher + /// + public MessageHandlerLink[] HandlerLinks { get; } + + /// + /// ctor + /// + private MessageMatcher(params MessageHandlerLink[] links) + { + HandlerLinks = links; + } + + /// + /// Create message matcher + /// + public static MessageMatcher Create(string value) + { + return new MessageMatcher(new MessageHandlerLink(MessageLinkType.Full, value, (con, msg) => CallResult.SuccessResult)); + } + + /// + /// Create message matcher + /// + public static MessageMatcher Create(string value, Func, CallResult> handler) + { + return new MessageMatcher(new MessageHandlerLink(MessageLinkType.Full, value, handler)); + } + + /// + /// Create message matcher + /// + public static MessageMatcher Create(IEnumerable values, Func, CallResult> handler) + { + return new MessageMatcher(values.Select(x => new MessageHandlerLink(MessageLinkType.Full, x, handler)).ToArray()); + } + + /// + /// Create message matcher + /// + public static MessageMatcher Create(MessageLinkType type, string value, Func, CallResult> handler) + { + return new MessageMatcher(new MessageHandlerLink(type, value, handler)); + } + + /// + /// Create message matcher + /// + public static MessageMatcher Create(params MessageHandlerLink[] linkers) + { + return new MessageMatcher(linkers); + } + + /// + /// Whether this matcher contains a specific link + /// + public bool ContainsCheck(MessageHandlerLink link) => HandlerLinks.Any(x => x.Type == link.Type && x.Value == link.Value); + + /// + /// Get any handler links matching with the listen id + /// + public List GetHandlerLinks(string listenId) => HandlerLinks.Where(x => x.Check(listenId)).ToList(); + + /// + public override string ToString() => string.Join(",", HandlerLinks.Select(x => x.ToString())); + } + + /// + /// Message handler link + /// + public abstract class MessageHandlerLink + { + /// + /// Type of check + /// + public MessageLinkType Type { get; } + /// + /// String value of the check + /// + public string Value { get; } + /// + /// Deserialization type + /// + public abstract Type GetDeserializationType(IMessageAccessor accessor); + + /// + /// ctor + /// + public MessageHandlerLink(MessageLinkType type, string value) + { + Type = type; + Value = value; + } + + /// + /// Whether this listen id matches this link + /// + public bool Check(string listenId) + { + if (Type == MessageLinkType.Full) + return Value.Equals(listenId, StringComparison.Ordinal); + + return listenId.StartsWith(Value, StringComparison.Ordinal); + } + + /// + /// Message handler + /// + public abstract CallResult Handle(SocketConnection connection, DataEvent message); + + /// + public override string ToString() => $"{Type} match for \"{Value}\""; + } + + /// + /// Message handler link + /// + public class MessageHandlerLink: MessageHandlerLink + { + private Func, CallResult> _handler; + + /// + public override Type GetDeserializationType(IMessageAccessor accessor) => typeof(TServer); + + /// + /// ctor + /// + public MessageHandlerLink(string value, Func, CallResult> handler) + : this(MessageLinkType.Full, value, handler) + { + } + + /// + /// ctor + /// + public MessageHandlerLink(MessageLinkType type, string value, Func, CallResult> handler) + : base(type, value) + { + _handler = handler; + } + + + /// + public override CallResult Handle(SocketConnection connection, DataEvent message) + { + return _handler(connection, message.As((TServer)message.Data)); + } + } +} diff --git a/CryptoExchange.Net/Sockets/Query.cs b/CryptoExchange.Net/Sockets/Query.cs index 9357a33..a79afcb 100644 --- a/CryptoExchange.Net/Sockets/Query.cs +++ b/CryptoExchange.Net/Sockets/Query.cs @@ -3,6 +3,7 @@ using CryptoExchange.Net.Objects; using CryptoExchange.Net.Objects.Sockets; using System; using System.Collections.Generic; +using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -60,9 +61,9 @@ namespace CryptoExchange.Net.Sockets public AsyncResetEvent? ContinueAwaiter { get; set; } /// - /// Strings to match this query to a received message + /// Matcher for this query /// - public abstract HashSet ListenerIdentifiers { get; set; } + public MessageMatcher MessageMatcher { get; set; } = null!; /// /// The query request object @@ -84,13 +85,6 @@ namespace CryptoExchange.Net.Sockets /// public bool ExpectsResponse { get; set; } = true; - /// - /// Get the type the message should be deserialized to - /// - /// - /// - public abstract Type? GetMessageType(IMessageAccessor message); - /// /// Wait event for response /// @@ -161,23 +155,16 @@ namespace CryptoExchange.Net.Sockets /// /// Handle a response message /// - /// - /// - /// - public abstract Task Handle(SocketConnection connection, DataEvent message); + public abstract Task Handle(SocketConnection connection, DataEvent message, MessageHandlerLink check); } /// /// Query /// - /// The type returned from the server /// The type to be returned to the caller - public abstract class Query : Query + public abstract class Query : Query { - /// - public override Type? GetMessageType(IMessageAccessor message) => typeof(TServerResponse); - /// /// The typed call result /// @@ -194,10 +181,9 @@ namespace CryptoExchange.Net.Sockets } /// - public override async Task Handle(SocketConnection connection, DataEvent message) + public override async Task Handle(SocketConnection connection, DataEvent message, MessageHandlerLink check) { - var typedMessage = message.As((TServerResponse)message.Data); - if (!ValidateMessage(typedMessage)) + if (!PreCheckMessage(message)) return CallResult.SuccessResult; CurrentResponses++; @@ -209,7 +195,7 @@ namespace CryptoExchange.Net.Sockets if (Result?.Success != false) // If an error result is already set don't override that - Result = HandleMessage(connection, typedMessage); + Result = check.Handle(connection, message); if (CurrentResponses == RequiredResponses) { @@ -226,15 +212,7 @@ namespace CryptoExchange.Net.Sockets /// /// /// - public virtual bool ValidateMessage(DataEvent message) => true; - - /// - /// Handle the query response - /// - /// - /// - /// - public abstract CallResult HandleMessage(SocketConnection connection, DataEvent message); + public virtual bool PreCheckMessage(DataEvent message) => true; /// public override void Timeout() @@ -257,29 +235,4 @@ namespace CryptoExchange.Net.Sockets _event.Set(); } } - - /// - /// Query - /// - /// Response object type - public abstract class Query : Query - { - /// - /// ctor - /// - /// - /// - /// - protected Query(object request, bool authenticated, int weight = 1) : base(request, authenticated, weight) - { - } - - /// - /// Handle the query response - /// - /// - /// - /// - public override CallResult HandleMessage(SocketConnection connection, DataEvent message) => message.ToCallResult(); - } } diff --git a/CryptoExchange.Net/Sockets/SocketConnection.cs b/CryptoExchange.Net/Sockets/SocketConnection.cs index b970edd..7abc49d 100644 --- a/CryptoExchange.Net/Sockets/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/SocketConnection.cs @@ -227,6 +227,11 @@ namespace CryptoExchange.Net.Sockets /// private readonly IWebsocket _socket; + /// + /// Cache for deserialization, only caches for a single message + /// + private readonly Dictionary _deserializationCache = new Dictionary(); + /// /// New socket connection /// @@ -444,9 +449,6 @@ namespace CryptoExchange.Net.Sockets /// /// Handle a message /// - /// - /// - /// protected virtual async Task HandleStreamMessage(WebSocketMessageType type, ReadOnlyMemory data) { var sw = Stopwatch.StartNew(); @@ -483,7 +485,7 @@ namespace CryptoExchange.Net.Sockets var listenId = ApiClient.GetListenerIdentifier(accessor); if (listenId == null) { - originalData = outputOriginalData ? accessor.GetOriginalString() : "[OutputOriginalData is false]"; + originalData ??= "[OutputOriginalData is false]"; if (!ApiClient.UnhandledMessageExpected) _logger.FailedToEvaluateMessage(SocketId, originalData); @@ -491,18 +493,78 @@ namespace CryptoExchange.Net.Sockets return; } - // 4. Get the listeners interested in this message - List processors; - lock (_listenersLock) - processors = _listeners.Where(s => s.ListenerIdentifiers.Contains(listenId)).ToList(); + bool processed = false; + var totalUserTime = 0; - if (processors.Count == 0) + List localListeners; + lock(_listenersLock) + localListeners = _listeners.ToList(); + + foreach(var processor in localListeners) + { + foreach(var listener in processor.MessageMatcher.GetHandlerLinks(listenId)) + { + processed = true; + _logger.ProcessorMatched(SocketId, listener.ToString(), listenId); + + // 4. Determine the type to deserialize to for this processor + var messageType = listener.GetDeserializationType(accessor); + if (messageType == null) + { + _logger.ReceivedMessageNotRecognized(SocketId, processor.Id); + continue; + } + + if (processor is Subscription subscriptionProcessor && !subscriptionProcessor.Confirmed) + { + // If this message is for this listener then it is automatically confirmed, even if the subscription is not (yet) confirmed + subscriptionProcessor.Confirmed = true; + // This doesn't trigger a waiting subscribe query, should probably also somehow set the wait event for that + } + + // 5. Deserialize the message + _deserializationCache.TryGetValue(messageType, out var deserialized); + + if (deserialized == null) + { + var desResult = processor.Deserialize(accessor, messageType); + if (!desResult) + { + _logger.FailedToDeserializeMessage(SocketId, desResult.Error?.ToString(), desResult.Error?.Exception); + continue; + } + + deserialized = desResult.Data; + _deserializationCache.Add(messageType, deserialized); + } + + // 6. Pass the message to the handler + try + { + var innerSw = Stopwatch.StartNew(); + await processor.Handle(this, new DataEvent(deserialized, null, null, originalData, receiveTime, null), listener).ConfigureAwait(false); + if (processor is Query query && query.RequiredResponses != 1) + _logger.LogDebug($"[Sckt {SocketId}] [Req {query.Id}] responses: {query.CurrentResponses}/{query.RequiredResponses}"); + totalUserTime += (int)innerSw.ElapsedMilliseconds; + } + catch (Exception ex) + { + _logger.UserMessageProcessingFailed(SocketId, ex.Message, ex); + if (processor is Subscription subscription) + subscription.InvokeExceptionHandler(ex); + } + + } + } + + if (!processed) { if (!ApiClient.UnhandledMessageExpected) { List listenerIds; lock (_listenersLock) - listenerIds = _listeners.SelectMany(l => l.ListenerIdentifiers).ToList(); + listenerIds = _listeners.Select(l => l.MessageMatcher.ToString()).ToList(); + _logger.ReceivedMessageNotMatchedToAnyListener(SocketId, listenId, string.Join(",", listenerIds)); UnhandledMessage?.Invoke(accessor); } @@ -510,69 +572,11 @@ namespace CryptoExchange.Net.Sockets return; } - _logger.ProcessorMatched(SocketId, processors.Count, listenId); - var totalUserTime = 0; - Dictionary? desCache = null; - if (processors.Count > 1) - { - // Only instantiate a cache if there are multiple processors - desCache = new Dictionary(); - } - - foreach (var processor in processors) - { - // 5. Determine the type to deserialize to for this processor - var messageType = processor.GetMessageType(accessor); - if (messageType == null) - { - _logger.ReceivedMessageNotRecognized(SocketId, processor.Id); - continue; - } - - if (processor is Subscription subscriptionProcessor && !subscriptionProcessor.Confirmed) - { - // If this message is for this listener then it is automatically confirmed, even if the subscription is not (yet) confirmed - subscriptionProcessor.Confirmed = true; - // This doesn't trigger a waiting subscribe query, should probably also somehow set the wait event for that - } - - // 6. Deserialize the message - object? deserialized = null; - desCache?.TryGetValue(messageType, out deserialized); - - if (deserialized == null) - { - var desResult = processor.Deserialize(accessor, messageType); - if (!desResult) - { - _logger.FailedToDeserializeMessage(SocketId, desResult.Error?.ToString(), desResult.Error?.Exception); - continue; - } - deserialized = desResult.Data; - desCache?.Add(messageType, deserialized); - } - - // 7. Hand of the message to the subscription - try - { - var innerSw = Stopwatch.StartNew(); - await processor.Handle(this, new DataEvent(deserialized, null, null, originalData, receiveTime, null)).ConfigureAwait(false); - if (processor is Query query && query.RequiredResponses != 1) - _logger.LogDebug($"[Sckt {SocketId}] [Req {query.Id}] responses: {query.CurrentResponses}/{query.RequiredResponses}"); - totalUserTime += (int)innerSw.ElapsedMilliseconds; - } - catch (Exception ex) - { - _logger.UserMessageProcessingFailed(SocketId, ex.Message, ex); - if (processor is Subscription subscription) - subscription.InvokeExceptionHandler(ex); - } - } - _logger.MessageProcessed(SocketId, sw.ElapsedMilliseconds, sw.ElapsedMilliseconds - totalUserTime); } finally { + _deserializationCache.Clear(); accessor.Clear(); } } @@ -652,7 +656,7 @@ namespace CryptoExchange.Net.Sockets bool anyDuplicateSubscription; lock (_listenersLock) - anyDuplicateSubscription = _listeners.OfType().Any(x => x != subscription && x.ListenerIdentifiers.All(l => subscription.ListenerIdentifiers.Contains(l))); + anyDuplicateSubscription = _listeners.OfType().Any(x => x != subscription && x.MessageMatcher.HandlerLinks.All(l => subscription.MessageMatcher.ContainsCheck(l))); bool shouldCloseConnection; lock (_listenersLock) @@ -768,12 +772,11 @@ namespace CryptoExchange.Net.Sockets /// Send a query request and wait for an answer /// /// Expected result type - /// The type returned to the caller /// Query to send /// Wait event for when the socket message handler can continue /// Cancellation token /// - public virtual async Task> SendAndWaitQueryAsync(Query query, AsyncResetEvent? continueEvent = null, CancellationToken ct = default) + public virtual async Task> SendAndWaitQueryAsync(Query query, AsyncResetEvent? continueEvent = null, CancellationToken ct = default) { await SendAndWaitIntAsync(query, continueEvent, ct).ConfigureAwait(false); return query.TypedResult ?? new CallResult(new ServerError("Timeout")); diff --git a/CryptoExchange.Net/Sockets/Subscription.cs b/CryptoExchange.Net/Sockets/Subscription.cs index 4e35683..081977e 100644 --- a/CryptoExchange.Net/Sockets/Subscription.cs +++ b/CryptoExchange.Net/Sockets/Subscription.cs @@ -4,6 +4,7 @@ using CryptoExchange.Net.Objects.Sockets; using Microsoft.Extensions.Logging; using System; using System.Collections.Generic; +using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -60,9 +61,9 @@ namespace CryptoExchange.Net.Sockets public bool Authenticated { get; } /// - /// Strings to match this subscription to a received message + /// Matcher for this subscription /// - public abstract HashSet ListenerIdentifiers { get; set; } + public MessageMatcher MessageMatcher { get; set; } = null!; /// /// Cancellation token registration @@ -74,13 +75,6 @@ namespace CryptoExchange.Net.Sockets /// public event Action? Exception; - /// - /// Get the deserialization type for this message - /// - /// - /// - public abstract Type? GetMessageType(IMessageAccessor message); - /// /// Subscription topic /// @@ -89,9 +83,6 @@ namespace CryptoExchange.Net.Sockets /// /// ctor /// - /// - /// - /// public Subscription(ILogger logger, bool authenticated, bool userSubscription = true) { _logger = logger; @@ -130,14 +121,11 @@ namespace CryptoExchange.Net.Sockets /// /// Handle an update message /// - /// - /// - /// - public Task Handle(SocketConnection connection, DataEvent message) + public Task Handle(SocketConnection connection, DataEvent message, MessageHandlerLink matcher) { ConnectionInvocations++; TotalInvocations++; - return Task.FromResult(DoHandleMessage(connection, message)); + return Task.FromResult(matcher.Handle(connection, message)); } /// @@ -154,14 +142,6 @@ namespace CryptoExchange.Net.Sockets /// public virtual void DoHandleReset() { } - /// - /// Handle the update message - /// - /// - /// - /// - public abstract CallResult DoHandleMessage(SocketConnection connection, DataEvent message); - /// /// Invoke the exception event /// @@ -177,12 +157,12 @@ namespace CryptoExchange.Net.Sockets /// The id of the subscription /// True when the subscription query is handled (either accepted or rejected) /// Number of times this subscription got a message - /// Identifiers the subscription is listening to + /// Matcher for this subscription public record SubscriptionState( int Id, bool Confirmed, int Invocations, - HashSet Identifiers + MessageMatcher ListenMatcher ); /// @@ -191,7 +171,7 @@ namespace CryptoExchange.Net.Sockets /// public SubscriptionState GetState() { - return new SubscriptionState(Id, Confirmed, TotalInvocations, ListenerIdentifiers); + return new SubscriptionState(Id, Confirmed, TotalInvocations, MessageMatcher); } } diff --git a/CryptoExchange.Net/Sockets/SystemSubscription.cs b/CryptoExchange.Net/Sockets/SystemSubscription.cs index 75b9fe9..1d228ba 100644 --- a/CryptoExchange.Net/Sockets/SystemSubscription.cs +++ b/CryptoExchange.Net/Sockets/SystemSubscription.cs @@ -27,32 +27,4 @@ namespace CryptoExchange.Net.Sockets /// public override Query? GetUnsubQuery() => null; } - - /// - public abstract class SystemSubscription : SystemSubscription - { - /// - public override Type GetMessageType(IMessageAccessor message) => typeof(T); - - /// - public override CallResult DoHandleMessage(SocketConnection connection, DataEvent message) - => HandleMessage(connection, message.As((T)message.Data)); - - /// - /// ctor - /// - /// - /// - protected SystemSubscription(ILogger logger, bool authenticated) : base(logger, authenticated) - { - } - - /// - /// Handle an update message - /// - /// - /// - /// - public abstract CallResult HandleMessage(SocketConnection connection, DataEvent message); - } }