diff --git a/CryptoExchange.Net/Clients/SocketApiClient.cs b/CryptoExchange.Net/Clients/SocketApiClient.cs index 2a04a05..fbac81e 100644 --- a/CryptoExchange.Net/Clients/SocketApiClient.cs +++ b/CryptoExchange.Net/Clients/SocketApiClient.cs @@ -109,6 +109,9 @@ namespace CryptoExchange.Net /// public new SocketApiOptions ApiOptions => (SocketApiOptions)base.ApiOptions; + + /// + public abstract SocketConverter StreamConverter { get; } #endregion /// @@ -457,8 +460,6 @@ namespace CryptoExchange.Net return messageListener; } - protected internal abstract SocketConverter GetConverter(); - /// /// Adds a system subscription. Used for example to reply to ping requests /// @@ -552,7 +553,7 @@ namespace CryptoExchange.Net /// Process an unhandled message /// /// The message that wasn't processed - protected virtual void HandleUnhandledMessage(StreamMessage message) + protected virtual void HandleUnhandledMessage(ParsedMessage message) { } diff --git a/CryptoExchange.Net/Converters/SocketConverter.cs b/CryptoExchange.Net/Converters/SocketConverter.cs index f84eda2..c6ae41f 100644 --- a/CryptoExchange.Net/Converters/SocketConverter.cs +++ b/CryptoExchange.Net/Converters/SocketConverter.cs @@ -3,35 +3,31 @@ using Newtonsoft.Json; using Newtonsoft.Json.Linq; using System; using System.Collections.Generic; +using System.IO; using System.Text; namespace CryptoExchange.Net.Converters { - public class SocketConverter : JsonConverter + public abstract class SocketConverter { - private readonly List _idFields; - private readonly Func, Type> _typeIdentifier; + public abstract string[] IdFields { get; } - public SocketConverter(List idFields, Func, Type> typeIdentifier) - { - _idFields = idFields; - _typeIdentifier = typeIdentifier; - } + public abstract Type? GetDeserializationType(Dictionary idValues, List listeners); + public abstract List MatchToListener(ParsedMessage message, List listeners); /// - public override bool CanConvert(Type objectType) => true; - - /// - public override object? ReadJson(JsonReader reader, Type objectType, object? existingValue, JsonSerializer serializer) + public object? ReadJson(Stream stream, List listeners) { // Start reading the data - // Once we reach the property that identifies the message we save those in a string array + // Once we reach the properties that identify the message we save those in a dict // Once all id properties have been read callback to see what the deserialization type should be // Deserialize to the correct type + using var sr = new StreamReader(stream, Encoding.UTF8, false, (int)stream.Length, true); + using var jsonTextReader = new JsonTextReader(sr); - var token = JToken.Load(reader); + var token = JToken.Load(jsonTextReader); var dict = new Dictionary(); - foreach(var idField in _idFields) + foreach(var idField in IdFields) { var splitTokens = idField.Split(new char[] { ':' }); var accessToken = token; @@ -42,7 +38,7 @@ namespace CryptoExchange.Net.Converters dict[idField] = accessToken?.ToString(); } - var resultType = _typeIdentifier(dict); + var resultType = GetDeserializationType(dict, listeners); string idString = ""; foreach(var item in dict) idString += item.Value; @@ -53,20 +49,12 @@ namespace CryptoExchange.Net.Converters Data = resultType == null ? null : token.ToObject(resultType) }; } - - /// - public override bool CanWrite { get { return false; } } - - /// - public override void WriteJson(JsonWriter writer, object? value, JsonSerializer serializer) - { - } } public class ParsedMessage { - public string Identifier { get; set; } + public string Identifier { get; set; } = null!; - public object Data { get; set; } + public object? Data { get; set; } } } diff --git a/CryptoExchange.Net/Interfaces/IStreamMessageListener.cs b/CryptoExchange.Net/Interfaces/IStreamMessageListener.cs index ad77bd7..fc3eb29 100644 --- a/CryptoExchange.Net/Interfaces/IStreamMessageListener.cs +++ b/CryptoExchange.Net/Interfaces/IStreamMessageListener.cs @@ -1,4 +1,5 @@ -using CryptoExchange.Net.Objects.Sockets; +using CryptoExchange.Net.Converters; +using CryptoExchange.Net.Objects.Sockets; using System.Threading.Tasks; namespace CryptoExchange.Net.Interfaces @@ -6,7 +7,7 @@ namespace CryptoExchange.Net.Interfaces internal interface IStreamMessageListener { int Priority { get; } - bool MessageMatches(StreamMessage message); - Task ProcessAsync(StreamMessage message); + bool MessageMatches(ParsedMessage message); + Task ProcessAsync(ParsedMessage message); } } diff --git a/CryptoExchange.Net/Objects/Sockets/PendingRequest.cs b/CryptoExchange.Net/Objects/Sockets/PendingRequest.cs index 5ef161c..94e8d4b 100644 --- a/CryptoExchange.Net/Objects/Sockets/PendingRequest.cs +++ b/CryptoExchange.Net/Objects/Sockets/PendingRequest.cs @@ -1,4 +1,5 @@ -using CryptoExchange.Net.Interfaces; +using CryptoExchange.Net.Converters; +using CryptoExchange.Net.Interfaces; using System; using System.Threading; using System.Threading.Tasks; @@ -8,7 +9,7 @@ namespace CryptoExchange.Net.Objects.Sockets internal class PendingRequest : IStreamMessageListener { public int Id { get; set; } - public Func MessageMatchesHandler { get; } + public Func MessageMatchesHandler { get; } public bool Completed { get; private set; } public AsyncResetEvent Event { get; } public DateTime RequestTimestamp { get; set; } @@ -19,7 +20,7 @@ namespace CryptoExchange.Net.Objects.Sockets public int Priority => 100; - public PendingRequest(int id, Func messageMatchesHandler, TimeSpan timeout, MessageListener? subscription) + public PendingRequest(int id, Func messageMatchesHandler, TimeSpan timeout, MessageListener? subscription) { Id = id; MessageMatchesHandler = messageMatchesHandler; @@ -42,12 +43,12 @@ namespace CryptoExchange.Net.Objects.Sockets Event.Set(); } - public bool MessageMatches(StreamMessage message) + public bool MessageMatches(ParsedMessage message) { return MessageMatchesHandler(message); } - public Task ProcessAsync(StreamMessage message) + public Task ProcessAsync(ParsedMessage message) { Completed = true; Event.Set(); diff --git a/CryptoExchange.Net/Objects/Sockets/SocketSubscription.cs b/CryptoExchange.Net/Objects/Sockets/SocketSubscription.cs index fb3bd62..b72a837 100644 --- a/CryptoExchange.Net/Objects/Sockets/SocketSubscription.cs +++ b/CryptoExchange.Net/Objects/Sockets/SocketSubscription.cs @@ -1,4 +1,5 @@ -using CryptoExchange.Net.Interfaces; +using CryptoExchange.Net.Converters; +using CryptoExchange.Net.Interfaces; using CryptoExchange.Net.Sockets; using System; using System.Threading; @@ -84,13 +85,18 @@ namespace CryptoExchange.Net.Objects.Sockets /// /// /// - public bool MessageMatches(StreamMessage message) => Subscription.MessageMatchesEvent(message); + public bool MessageMatches(ParsedMessage message) => Subscription.MessageMatchesEvent(message); /// /// Process the message /// /// /// - public Task ProcessAsync(StreamMessage message) => Subscription.HandleEventAsync(message); + public Task ProcessAsync(ParsedMessage message) + { + // TODO + var dataEvent = new DataEvent(message, null, null, DateTime.UtcNow, null); + return Subscription.HandleEventAsync(dataEvent); + } } } diff --git a/CryptoExchange.Net/Objects/Sockets/StreamMessage.cs b/CryptoExchange.Net/Objects/Sockets/StreamMessage.cs index b4d94e4..9f4826b 100644 --- a/CryptoExchange.Net/Objects/Sockets/StreamMessage.cs +++ b/CryptoExchange.Net/Objects/Sockets/StreamMessage.cs @@ -1,70 +1,70 @@ -using System; -using System.Collections.Generic; -using System.Data.Common; -using System.IO; -using System.Text; -using System.Threading.Tasks; -using CryptoExchange.Net.Sockets; +//using System; +//using System.Collections.Generic; +//using System.Data.Common; +//using System.IO; +//using System.Text; +//using System.Threading.Tasks; +//using CryptoExchange.Net.Sockets; -namespace CryptoExchange.Net.Objects.Sockets -{ - /// - /// A message received from a stream - /// - public class StreamMessage : IDisposable - { - /// - /// The connection it was received on - /// - public SocketConnection Connection { get; } - /// - /// The data stream - /// - public Stream Stream { get; } - /// - /// Receive timestamp - /// - public DateTime Timestamp { get; set; } +//namespace CryptoExchange.Net.Objects.Sockets +//{ +// /// +// /// A message received from a stream +// /// +// public class StreamMessage : IDisposable +// { +// /// +// /// The connection it was received on +// /// +// public SocketConnection Connection { get; } +// /// +// /// The data stream +// /// +// public Stream Stream { get; } +// /// +// /// Receive timestamp +// /// +// public DateTime Timestamp { get; set; } - private Dictionary _casted; +// private Dictionary _casted; - /// - /// Get the data from the memory in specified type using the converter. If this type has been resolved before it will use that instead - /// - /// - /// - /// - public T Get(Func converter) - { - if (_casted.TryGetValue(typeof(T), out var casted)) - return (T)casted; +// /// +// /// Get the data from the memory in specified type using the converter. If this type has been resolved before it will use that instead +// /// +// /// +// /// +// /// +// public T Get(Func converter) +// { +// if (_casted.TryGetValue(typeof(T), out var casted)) +// return (T)casted; - var result = converter(Stream); - _casted.Add(typeof(T), result!); - Stream.Position = 0; - return result; - } +// var result = converter(Stream); +// _casted.Add(typeof(T), result!); +// Stream.Position = 0; +// return result; +// } - /// - /// Dispose - /// - public void Dispose() - { - Stream.Dispose(); - } +// /// +// /// Dispose +// /// +// public void Dispose() +// { +// Stream.Dispose(); +// } - /// - /// ctor - /// - /// - /// - /// - public StreamMessage(SocketConnection connection, Stream stream, DateTime timestamp) - { - Connection = connection; - Stream = stream; - Timestamp = timestamp; - _casted = new Dictionary(); - } - } -} +// /// +// /// ctor +// /// +// /// +// /// +// /// +// public StreamMessage(SocketConnection connection, Stream stream, DateTime timestamp) +// { +// Connection = connection; +// Stream = stream; +// Timestamp = timestamp; +// _casted = new Dictionary(); +// } +// } +//} diff --git a/CryptoExchange.Net/Sockets/Query.cs b/CryptoExchange.Net/Sockets/Query.cs index f08edf2..c65f9e6 100644 --- a/CryptoExchange.Net/Sockets/Query.cs +++ b/CryptoExchange.Net/Sockets/Query.cs @@ -1,4 +1,5 @@ -using CryptoExchange.Net.Objects; +using CryptoExchange.Net.Converters; +using CryptoExchange.Net.Objects; using CryptoExchange.Net.Objects.Sockets; namespace CryptoExchange.Net.Sockets @@ -28,13 +29,13 @@ namespace CryptoExchange.Net.Sockets /// /// /// - public abstract bool MessageMatchesQuery(StreamMessage message); + public abstract bool MessageMatchesQuery(ParsedMessage message); /// /// Handle the query response /// /// /// - public abstract CallResult HandleResponse(StreamMessage message); + public abstract CallResult HandleResponse(ParsedMessage message); /// /// ctor diff --git a/CryptoExchange.Net/Sockets/SocketConnection.cs b/CryptoExchange.Net/Sockets/SocketConnection.cs index 556ae72..feacdc3 100644 --- a/CryptoExchange.Net/Sockets/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/SocketConnection.cs @@ -49,7 +49,7 @@ namespace CryptoExchange.Net.Sockets /// /// Unhandled message event /// - public event Action? UnhandledMessage; + public event Action? UnhandledMessage; /// /// The amount of listeners on this connection @@ -57,7 +57,7 @@ namespace CryptoExchange.Net.Sockets public int UserListenerCount { get { lock (_listenerLock) - return _listeners.Count(h => h.UserListener); } + return _messageIdentifierListeners.Count(h => h.UserListener); } } /// @@ -153,10 +153,11 @@ namespace CryptoExchange.Net.Sockets } private bool _pausedActivity; - private readonly List _listeners; - private readonly List _messageListeners; // ? + //private readonly List _listeners; + //private readonly List _messageListeners; // ? - private readonly Dictionary _messageIdentifierListeners; + private readonly List _pendingRequests; + private readonly Dictionary _messageIdentifierListeners; private readonly object _listenerLock = new(); private readonly ILogger _logger; @@ -167,8 +168,6 @@ namespace CryptoExchange.Net.Sockets /// The underlying websocket /// private readonly IWebsocket _socket; - private readonly JsonSerializerSettings _options; - private readonly JsonSerializer _serializer; /// /// New socket connection @@ -184,9 +183,8 @@ namespace CryptoExchange.Net.Sockets Tag = tag; Properties = new Dictionary(); - _messageListeners = new List(); + _pendingRequests = new List(); _messageIdentifierListeners = new Dictionary(); - _listeners = new List(); _socket = socket; _socket.OnStreamMessage += HandleStreamMessage; @@ -197,11 +195,6 @@ namespace CryptoExchange.Net.Sockets _socket.OnReconnected += HandleReconnected; _socket.OnError += HandleError; _socket.GetReconnectionUrl = GetReconnectionUrlAsync; - - var converter = ApiClient.GetConverter(); - _options = SerializerOptions.Default; - _options.Converters.Add(converter); - _serializer = JsonSerializer.Create(_options); } /// @@ -222,7 +215,7 @@ namespace CryptoExchange.Net.Sockets Authenticated = false; lock(_listenerLock) { - foreach (var listener in _listeners) + foreach (var listener in _messageIdentifierListeners.Values) listener.Confirmed = false; } Task.Run(() => ConnectionClosed?.Invoke()); @@ -325,81 +318,101 @@ namespace CryptoExchange.Net.Sockets protected virtual async Task HandleStreamMessage(Stream stream) { var timestamp = DateTime.UtcNow; - var streamMessage = new StreamMessage(this, stream, timestamp); - var handledResponse = false; - MessageListener? currentSubscription = null; + //var streamMessage = new StreamMessage(this, stream, timestamp); TimeSpan userCodeDuration = TimeSpan.Zero; List listeners; lock (_listenerLock) listeners = _messageListeners.OrderByDescending(x => x.Priority).ToList(); - var converter = ApiClient.GetConverter(); - using var sr = new StreamReader(stream, Encoding.UTF8, false, (int)stream.Length, true); - using var jsonTextReader = new JsonTextReader(sr); - var result = (ParsedMessage)converter.ReadJson(jsonTextReader, typeof(object), null, _serializer); + var result = (ParsedMessage)ApiClient.StreamConverter.ReadJson(stream, listeners.OfType().ToList()); // TODO stream.Position = 0; + if (result == null) + { + _logger.LogWarning("Message not matched to type"); + return; + } + if (_messageIdentifierListeners.TryGetValue(result.Identifier.ToLowerInvariant(), out var idListener)) { - var userSw = Stopwatch.StartNew(); - await idListener.ProcessAsync(streamMessage).ConfigureAwait(false); - userSw.Stop(); - userCodeDuration = userSw.Elapsed; - handledResponse = true; + // Matched based on identifier + await idListener.ProcessAsync(result).ConfigureAwait(false); + return; } - else + + foreach (var pendingRequest in _messageListeners.OfType()) { - foreach (var listener in listeners) + if (pendingRequest.MessageMatchesHandler(result)) { - if (listener.MessageMatches(streamMessage)) - { - if (listener is PendingRequest pendingRequest) - { - lock (_messageListeners) - _messageListeners.Remove(pendingRequest); - - if (pendingRequest.Completed) - { - // Answer to a timed out request, unsub if it is a subscription request - if (pendingRequest.MessageListener != null) - { - _logger.Log(LogLevel.Warning, $"Socket {SocketId} Received subscription info after request timed out; unsubscribing. Consider increasing the RequestTimeout"); - _ = UnsubscribeAsync(pendingRequest.MessageListener).ConfigureAwait(false); - } - } - else - { - _logger.Log(LogLevel.Trace, $"Socket {SocketId} - msg {pendingRequest.Id} - received data matched to pending request"); - await pendingRequest.ProcessAsync(streamMessage).ConfigureAwait(false); - } - - if (!ApiClient.ContinueOnQueryResponse) - return; - - handledResponse = true; - break; - } - else if (listener is MessageListener subscription) - { - currentSubscription = subscription; - handledResponse = true; - var userSw = Stopwatch.StartNew(); - await subscription.ProcessAsync(streamMessage).ConfigureAwait(false); - userSw.Stop(); - userCodeDuration = userSw.Elapsed; - break; - } - } + await pendingRequest.ProcessAsync(result).ConfigureAwait(false); + break; } } - if (!handledResponse) - { - if (!ApiClient.UnhandledMessageExpected) - _logger.Log(LogLevel.Warning, $"Socket {SocketId} Message not handled: " + streamMessage.Get(ParsingUtils.GetString)); - UnhandledMessage?.Invoke(streamMessage); - } + _logger.LogWarning("Message not matched"); // TODO + return; + + //if (_messageIdentifierListeners.TryGetValue(result.Identifier.ToLowerInvariant(), out var idListener)) + //{ + // var userSw = Stopwatch.StartNew(); + // await idListener.ProcessAsync(streamMessage).ConfigureAwait(false); + // userSw.Stop(); + // userCodeDuration = userSw.Elapsed; + // handledResponse = true; + //} + //else + //{ + // foreach (var listener in listeners) + // { + // if (listener.MessageMatches(streamMessage)) + // { + // if (listener is PendingRequest pendingRequest) + // { + // lock (_messageListeners) + // _messageListeners.Remove(pendingRequest); + + // if (pendingRequest.Completed) + // { + // // Answer to a timed out request, unsub if it is a subscription request + // if (pendingRequest.MessageListener != null) + // { + // _logger.Log(LogLevel.Warning, $"Socket {SocketId} Received subscription info after request timed out; unsubscribing. Consider increasing the RequestTimeout"); + // _ = UnsubscribeAsync(pendingRequest.MessageListener).ConfigureAwait(false); + // } + // } + // else + // { + // _logger.Log(LogLevel.Trace, $"Socket {SocketId} - msg {pendingRequest.Id} - received data matched to pending request"); + // await pendingRequest.ProcessAsync(streamMessage).ConfigureAwait(false); + // } + + // if (!ApiClient.ContinueOnQueryResponse) + // return; + + // handledResponse = true; + // break; + // } + // else if (listener is MessageListener subscription) + // { + // currentSubscription = subscription; + // handledResponse = true; + // var userSw = Stopwatch.StartNew(); + // await subscription.ProcessAsync(streamMessage).ConfigureAwait(false); + // userSw.Stop(); + // userCodeDuration = userSw.Elapsed; + // break; + // } + // } + // } + //} + + //if (!handledResponse) + //{ + // if (!ApiClient.UnhandledMessageExpected) + // _logger.Log(LogLevel.Warning, $"Socket {SocketId} Message not handled: " + streamMessage.Get(ParsingUtils.GetString)); + // UnhandledMessage?.Invoke(streamMessage); + //} } /// @@ -434,7 +447,7 @@ namespace CryptoExchange.Net.Sockets lock (_listenerLock) { - foreach (var listener in _listeners) + foreach (var listener in _messageIdentifierListeners.Values) { if (listener.CancellationTokenRegistration.HasValue) listener.CancellationTokenRegistration.Value.Dispose(); @@ -562,7 +575,7 @@ namespace CryptoExchange.Net.Sockets /// The response handler /// The weight of the message /// - public virtual async Task SendAndWaitAsync(T obj, TimeSpan timeout, MessageListener? listener, int weight, Func handler) + public virtual async Task SendAndWaitAsync(T obj, TimeSpan timeout, MessageListener? listener, int weight, Func handler) { var pending = new PendingRequest(ExchangeHelpers.NextId(), handler, timeout, listener); lock (_messageListeners) diff --git a/CryptoExchange.Net/Sockets/Subscription.cs b/CryptoExchange.Net/Sockets/Subscription.cs index 6510c72..bcac606 100644 --- a/CryptoExchange.Net/Sockets/Subscription.cs +++ b/CryptoExchange.Net/Sockets/Subscription.cs @@ -1,4 +1,5 @@ -using CryptoExchange.Net.Interfaces; +using CryptoExchange.Net.Converters; +using CryptoExchange.Net.Interfaces; using CryptoExchange.Net.Objects; using CryptoExchange.Net.Objects.Sockets; using Microsoft.Extensions.Logging; @@ -52,7 +53,7 @@ namespace CryptoExchange.Net.Sockets /// /// /// - public abstract (bool, CallResult?) MessageMatchesSubRequest(StreamMessage message); + public abstract (bool, CallResult?) MessageMatchesSubRequest(ParsedMessage message); /// /// Get the unsubscribe object to send when unsubscribing @@ -64,54 +65,54 @@ namespace CryptoExchange.Net.Sockets /// /// /// - public abstract (bool, CallResult?) MessageMatchesUnsubRequest(StreamMessage message); + public abstract (bool, CallResult?) MessageMatchesUnsubRequest(ParsedMessage message); /// /// Check if the message is an update for this subscription /// /// /// - public abstract bool MessageMatchesEvent(StreamMessage message); + public abstract bool MessageMatchesEvent(ParsedMessage message); /// /// Handle the update message /// /// /// - public abstract Task HandleEventAsync(StreamMessage message); + public abstract Task HandleEventAsync(DataEvent message); - /// - /// Create a data event - /// - /// - /// - /// - /// - /// - /// - protected virtual DataEvent CreateDataEvent(T obj, StreamMessage message, string? topic = null, SocketUpdateType? type = null) - { - string? originalData = null; - if (_outputOriginalData) - originalData = message.Get(ParsingUtils.GetString); + ///// + ///// Create a data event + ///// + ///// + ///// + ///// + ///// + ///// + ///// + //protected virtual DataEvent CreateDataEvent(T obj, ParsedMessage message, string? topic = null, SocketUpdateType? type = null) + //{ + // string? originalData = null; + // if (_outputOriginalData) + // originalData = message.Get(ParsingUtils.GetString); - return new DataEvent(obj, topic, originalData, message.Timestamp, type); - } + // return new DataEvent(obj, topic, originalData, message.Timestamp, type); + //} - /// - /// Deserialize the message to an object using Json.Net - /// - /// - /// - /// - /// - protected virtual Task> DeserializeAsync(StreamMessage message, JsonSerializerSettings settings) - { - var serializer = JsonSerializer.Create(settings); - using var sr = new StreamReader(message.Stream, Encoding.UTF8, false, (int)message.Stream.Length, true); - using var jsonTextReader = new JsonTextReader(sr); - var result = serializer.Deserialize(jsonTextReader); - message.Stream.Position = 0; - return Task.FromResult(new CallResult(result!)); - } + ///// + ///// Deserialize the message to an object using Json.Net + ///// + ///// + ///// + ///// + ///// + //protected virtual Task> DeserializeAsync(StreamMessage message, JsonSerializerSettings settings) + //{ + // var serializer = JsonSerializer.Create(settings); + // using var sr = new StreamReader(message.Stream, Encoding.UTF8, false, (int)message.Stream.Length, true); + // using var jsonTextReader = new JsonTextReader(sr); + // var result = serializer.Deserialize(jsonTextReader); + // message.Stream.Position = 0; + // return Task.FromResult(new CallResult(result!)); + //} } } diff --git a/CryptoExchange.Net/Sockets/SystemSubscription.cs b/CryptoExchange.Net/Sockets/SystemSubscription.cs index 9473e29..b3af09e 100644 --- a/CryptoExchange.Net/Sockets/SystemSubscription.cs +++ b/CryptoExchange.Net/Sockets/SystemSubscription.cs @@ -1,4 +1,5 @@ -using CryptoExchange.Net.Interfaces; +using CryptoExchange.Net.Converters; +using CryptoExchange.Net.Interfaces; using CryptoExchange.Net.Objects; using CryptoExchange.Net.Objects.Sockets; using Microsoft.Extensions.Logging; @@ -24,11 +25,11 @@ namespace CryptoExchange.Net.Sockets /// public override object? GetSubRequest() => null; /// - public override (bool, CallResult?) MessageMatchesSubRequest(StreamMessage message) => throw new NotImplementedException(); + public override (bool, CallResult?) MessageMatchesSubRequest(ParsedMessage message) => throw new NotImplementedException(); /// public override object? GetUnsubRequest() => null; /// - public override (bool, CallResult?) MessageMatchesUnsubRequest(StreamMessage message) => throw new NotImplementedException(); + public override (bool, CallResult?) MessageMatchesUnsubRequest(ParsedMessage message) => throw new NotImplementedException(); } }