diff --git a/CryptoExchange.Net/Clients/SocketApiClient.cs b/CryptoExchange.Net/Clients/SocketApiClient.cs index 74a8a58..2578291 100644 --- a/CryptoExchange.Net/Clients/SocketApiClient.cs +++ b/CryptoExchange.Net/Clients/SocketApiClient.cs @@ -284,7 +284,7 @@ namespace CryptoExchange.Net /// protected virtual Task> QueryAsync(Query query) { - return QueryAsync(BaseAddress, query); + return QueryAsync(BaseAddress, query); } /// @@ -403,7 +403,7 @@ namespace CryptoExchange.Net /// Should return the request which can be used to authenticate a socket connection /// /// - protected internal virtual Query GetAuthenticationRequest() => throw new NotImplementedException(); + protected internal virtual BaseQuery GetAuthenticationRequest() => throw new NotImplementedException(); /// /// Add a subscription to a connection @@ -418,7 +418,7 @@ namespace CryptoExchange.Net if (!connection.AddListener(subscription)) return false; - return false; + return true; } /// @@ -511,7 +511,7 @@ namespace CryptoExchange.Net /// Process an unhandled message /// /// The message that wasn't processed - protected virtual void HandleUnhandledMessage(ParsedMessage message) + protected virtual void HandleUnhandledMessage(BaseParsedMessage message) { } diff --git a/CryptoExchange.Net/Converters/SocketConverter.cs b/CryptoExchange.Net/Converters/SocketConverter.cs index 408d972..b781960 100644 --- a/CryptoExchange.Net/Converters/SocketConverter.cs +++ b/CryptoExchange.Net/Converters/SocketConverter.cs @@ -32,18 +32,17 @@ namespace CryptoExchange.Net.Converters public abstract Type? GetDeserializationType(Dictionary idValues, List pendingRequests, List listeners); /// - public ParsedMessage? ReadJson(Stream stream, List pendingRequests, List listeners, bool outputOriginalData) + public BaseParsedMessage? ReadJson(Stream stream, List pendingRequests, List listeners, bool outputOriginalData) { // Start reading the data // Once we reach the properties that identify the message we save those in a dict // Once all id properties have been read callback to see what the deserialization type should be // Deserialize to the correct type - var result = new ParsedMessage(); using var sr = new StreamReader(stream, Encoding.UTF8, false, (int)stream.Length, true); if (outputOriginalData) { - result.OriginalData = sr.ReadToEnd(); + //result.OriginalData = sr.ReadToEnd(); stream.Position = 0; } @@ -81,10 +80,12 @@ namespace CryptoExchange.Net.Converters idString += GetValueForKey(token, idField); } - result.Identifier = idString; var resultType = GetDeserializationType(typeIdDict, pendingRequests, listeners); - result.Data = resultType == null ? null : token.ToObject(resultType); - return result; + var resultMessageType = typeof(ParsedMessage<>).MakeGenericType(resultType); + var instance = (BaseParsedMessage)Activator.CreateInstance(resultMessageType, resultType == null ? null : token.ToObject(resultType)); + instance.Identifier = idString; + instance.Parsed = resultType != null; + return instance; } private string? GetValueForKey(JToken token, string key) diff --git a/CryptoExchange.Net/Objects/Sockets/ParsedMessage.cs b/CryptoExchange.Net/Objects/Sockets/ParsedMessage.cs index 6a8a0e7..d9327a7 100644 --- a/CryptoExchange.Net/Objects/Sockets/ParsedMessage.cs +++ b/CryptoExchange.Net/Objects/Sockets/ParsedMessage.cs @@ -3,7 +3,7 @@ /// /// Parsed message object /// - public class ParsedMessage + public abstract class BaseParsedMessage { /// /// Identifier string @@ -13,9 +13,22 @@ /// Original data if the option is enabled /// public string? OriginalData { get; set; } + /// + /// If parsed + /// + public bool Parsed { get; set; } + } + + public class ParsedMessage : BaseParsedMessage + { /// /// Parsed data object /// - public object? Data { get; set; } + public T? Data { get; set; } + + public ParsedMessage(T? data) + { + Data = data; + } } } diff --git a/CryptoExchange.Net/Objects/Sockets/PendingRequest.cs b/CryptoExchange.Net/Objects/Sockets/PendingRequest.cs index 4188b76..f95c014 100644 --- a/CryptoExchange.Net/Objects/Sockets/PendingRequest.cs +++ b/CryptoExchange.Net/Objects/Sockets/PendingRequest.cs @@ -11,7 +11,7 @@ namespace CryptoExchange.Net.Objects.Sockets public abstract class BasePendingRequest { public int Id { get; set; } - public Func MessageMatchesHandler { get; } + public Func MessageMatchesHandler { get; } public bool Completed { get; private set; } public abstract Type ResponseType { get; } @@ -21,10 +21,9 @@ namespace CryptoExchange.Net.Objects.Sockets public object Request { get; set; } private CancellationTokenSource? _cts; + public abstract CallResult Result { get; set; } - public int Priority => 100; - - protected BasePendingRequest(int id, object request, Func messageMatchesHandler, TimeSpan timeout) + protected BasePendingRequest(int id, object request, Func messageMatchesHandler, TimeSpan timeout) { Id = id; MessageMatchesHandler = messageMatchesHandler; @@ -47,12 +46,12 @@ namespace CryptoExchange.Net.Objects.Sockets Event.Set(); } - public bool MessageMatches(ParsedMessage message) + public bool MessageMatches(BaseParsedMessage message) { return MessageMatchesHandler(message); } - public virtual Task ProcessAsync(ParsedMessage message) + public virtual Task ProcessAsync(BaseParsedMessage message) { Completed = true; Event.Set(); @@ -60,59 +59,60 @@ namespace CryptoExchange.Net.Objects.Sockets } } - public class PendingRequest : BasePendingRequest - { - public CallResult Result { get; set; } - public Func Handler { get; } - public override Type? ResponseType => null; + //public class PendingRequest : BasePendingRequest + //{ + // public CallResult Result { get; set; } + // public Func Handler { get; } + // public override Type? ResponseType => null; - private PendingRequest(int id, object request, Func messageMatchesHandler, Func messageHandler, TimeSpan timeout) - : base(id, request, messageMatchesHandler, timeout) - { - Handler = messageHandler; - } + // private PendingRequest(int id, object request, Func messageMatchesHandler, Func messageHandler, TimeSpan timeout) + // : base(id, request, messageMatchesHandler, timeout) + // { + // Handler = messageHandler; + // } - public static PendingRequest CreateForQuery(Query query) - { - return new PendingRequest(ExchangeHelpers.NextId(), query.Request, query.MessageMatchesQuery, query.HandleResult, TimeSpan.FromSeconds(5)); - } + // public static PendingRequest CreateForQuery(BaseQuery query) + // { + // return new PendingRequest(ExchangeHelpers.NextId(), query.Request, query.MessageMatchesQuery, query.HandleResult, TimeSpan.FromSeconds(5)); + // } - public static PendingRequest CreateForSubRequest(Subscription subscription) - { - return new PendingRequest(ExchangeHelpers.NextId(), subscription.GetSubRequest, subscription.MessageMatchesSubRequest, subscription.HandleSubResponse, TimeSpan.FromSeconds(5)); - } + // public static PendingRequest CreateForSubRequest(Subscription subscription) + // { + // return new PendingRequest(ExchangeHelpers.NextId(), subscription.GetSubRequest(), subscription.MessageMatchesSubRequest, subscription.HandleSubResponse, TimeSpan.FromSeconds(5)); + // } - public static PendingRequest CreateForUnsubRequest(Subscription subscription) - { - return new PendingRequest(ExchangeHelpers.NextId(), subscription.GetUnsubRequest, subscription.MessageMatchesUnsubRequest, subscription.HandleUnsubResponse, TimeSpan.FromSeconds(5)); - } + // public static PendingRequest CreateForUnsubRequest(Subscription subscription) + // { + // return new PendingRequest(ExchangeHelpers.NextId(), subscription.GetUnsubRequest(), subscription.MessageMatchesUnsubRequest, subscription.HandleUnsubResponse, TimeSpan.FromSeconds(5)); + // } - public override void Fail(string error) - { - Result = new CallResult(new ServerError(error)); - base.Fail(error); - } + // public override void Fail(string error) + // { + // Result = new CallResult(new ServerError(error)); + // base.Fail(error); + // } - public override Task ProcessAsync(ParsedMessage message) - { - Result = Handler(message); - return base.ProcessAsync(message); - } - } + // public override Task ProcessAsync(BaseParsedMessage message) + // { + // Result = Handler(message); + // return base.ProcessAsync(message); + // } + //} public class PendingRequest : BasePendingRequest { - public CallResult Result { get; set; } - public Func> Handler { get; } + public override CallResult Result { get; set; } + public CallResult TypedResult => (CallResult)Result; + public Func, CallResult> Handler { get; } public override Type? ResponseType => typeof(T); - public PendingRequest(int id, object request, Func messageMatchesHandler, Func> messageHandler, TimeSpan timeout) - : base(id, request, messageMatchesHandler, timeout) + public PendingRequest(int id, object request, Func, bool> messageMatchesHandler, Func, CallResult> messageHandler, TimeSpan timeout) + : base(id, request, (x) => messageMatchesHandler((ParsedMessage)x), timeout) { Handler = messageHandler; } - public static PendingRequest CreateForQuery(Query query) + public static PendingRequest CreateForQuery(Query query) { return new PendingRequest(ExchangeHelpers.NextId(), query.Request, query.MessageMatchesQuery, x => { @@ -127,9 +127,9 @@ namespace CryptoExchange.Net.Objects.Sockets base.Fail(error); } - public override Task ProcessAsync(ParsedMessage message) + public override Task ProcessAsync(BaseParsedMessage message) { - Result = Handler(message); + Result = Handler((ParsedMessage)message); return base.ProcessAsync(message); } } diff --git a/CryptoExchange.Net/Sockets/Query.cs b/CryptoExchange.Net/Sockets/Query.cs index 655bddb..2940472 100644 --- a/CryptoExchange.Net/Sockets/Query.cs +++ b/CryptoExchange.Net/Sockets/Query.cs @@ -25,12 +25,9 @@ namespace CryptoExchange.Net.Sockets /// public int Weight { get; } - /// - /// Check if the message is the response to the query - /// - /// - /// - public abstract bool MessageMatchesQuery(ParsedMessage message); + public abstract bool MessageMatchesQuery(BaseParsedMessage message); + public abstract CallResult HandleResult(BaseParsedMessage message); + /// /// ctor @@ -44,20 +41,8 @@ namespace CryptoExchange.Net.Sockets Request = request; Weight = weight; } - } - public abstract class Query : BaseQuery - { - protected Query(object request, bool authenticated, int weight = 1) : base(request, authenticated, weight) - { - } - - /// - /// Handle the query response - /// - /// - /// - public abstract CallResult HandleResult(ParsedMessage message); + public abstract BasePendingRequest CreatePendingRequest(); } public abstract class Query : BaseQuery @@ -66,11 +51,23 @@ namespace CryptoExchange.Net.Sockets { } + public override CallResult HandleResult(BaseParsedMessage message) => HandleResponse((ParsedMessage) message); + public override bool MessageMatchesQuery(BaseParsedMessage message) => MessageMatchesQuery((ParsedMessage)message); + /// /// Handle the query response /// /// /// - public abstract CallResult HandleResponse(ParsedMessage message); + public abstract CallResult HandleResponse(ParsedMessage message); + + /// + /// Check if the message is the response to the query + /// + /// + /// + public abstract bool MessageMatchesQuery(ParsedMessage message); + + public override BasePendingRequest CreatePendingRequest() => PendingRequest.CreateForQuery(this); } } diff --git a/CryptoExchange.Net/Sockets/SocketConnection.cs b/CryptoExchange.Net/Sockets/SocketConnection.cs index 2a4fd04..46d14c8 100644 --- a/CryptoExchange.Net/Sockets/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/SocketConnection.cs @@ -49,7 +49,7 @@ namespace CryptoExchange.Net.Sockets /// /// Unhandled message event /// - public event Action? UnhandledMessage; + public event Action? UnhandledMessage; /// /// Unparsed message event @@ -341,7 +341,7 @@ namespace CryptoExchange.Net.Sockets if (result.OriginalData != null) _logger.LogDebug($"Socket {SocketId} Data received: {result.OriginalData}"); - if (result.Data == null) + if (!result.Parsed) { _logger.LogWarning("Message not matched to type"); return; @@ -352,7 +352,7 @@ namespace CryptoExchange.Net.Sockets { // Matched based on identifier var userSw = Stopwatch.StartNew(); - var dataEvent = new DataEvent(result, null, result.OriginalData, DateTime.UtcNow, null); + var dataEvent = new DataEvent(result, null, result.OriginalData, DateTime.UtcNow, null); await idListener.HandleEventAsync(dataEvent).ConfigureAwait(false); userSw.Stop(); return; @@ -547,18 +547,18 @@ namespace CryptoExchange.Net.Sockets return _messageListeners.SingleOrDefault(s => predicate(s)); } - public virtual async Task> SendAndWaitQueryAsync(Query query) + public virtual async Task SendAndWaitQueryAsync(BaseQuery query) { - var pendingRequest = PendingRequest.CreateForQuery(query); + var pendingRequest = query.CreatePendingRequest(); await SendAndWaitAsync(pendingRequest, query.Weight).ConfigureAwait(false); return pendingRequest.Result; } - public virtual async Task SendAndWaitQueryAsync(Query query) + public virtual async Task> SendAndWaitQueryAsync(Query query) { - var pendingRequest = PendingRequest.CreateForQuery(query); + var pendingRequest = PendingRequest.CreateForQuery(query); await SendAndWaitAsync(pendingRequest, query.Weight).ConfigureAwait(false); - return pendingRequest.Result; + return pendingRequest.TypedResult; } public virtual async Task SendAndWaitSubAsync(Subscription subscription) @@ -607,52 +607,6 @@ namespace CryptoExchange.Net.Sockets } } - ///// - ///// Send data and wait for an answer - ///// - ///// The data type expected in response - ///// The object to send - ///// The timeout for response - ///// Listener if this is a subscribe request - ///// The response handler - ///// The weight of the message - ///// - //public virtual async Task SendAndWaitAsync(T obj, TimeSpan timeout, MessageListener? listener, int weight, Func handler) - //{ - // // TODO either Query or Subscription should be passed here instead of T obj - // // That would allow to track the Query/Subscription on the PendingRequest instead of the listener, which allow us to match the pending request in the Converter - - // var pending = new PendingRequest(ExchangeHelpers.NextId(), handler, timeout, listener); - // lock (_messageListeners) - // { - // _pendingRequests.Add(pending); - // } - - // var sendOk = Send(pending.Id, obj, weight); - // if (!sendOk) - // { - // pending.Fail(); - // return; - // } - - // while (true) - // { - // if(!_socket.IsOpen) - // { - // pending.Fail(); - // return; - // } - - // if (pending.Completed) - // return; - - // await pending.Event.WaitAsync(TimeSpan.FromMilliseconds(500)).ConfigureAwait(false); - - // if (pending.Completed) - // return; - // } - //} - /// /// Send data over the websocket connection /// @@ -727,16 +681,7 @@ namespace CryptoExchange.Net.Sockets // Get a list of all subscriptions on the socket List listenerList = new List(); lock (_listenerLock) - { - // ? - foreach (var listener in _messageListeners) - { - if (listener != null) - listenerList.Add(listener); - else - listener.Confirmed = true; - } - } + listenerList = _messageListeners.ToList(); foreach(var listener in listenerList) { @@ -778,10 +723,8 @@ namespace CryptoExchange.Net.Sockets var unsubscribeRequest = subscription?.GetUnsubRequest(); if (unsubscribeRequest != null) { - var pendingRequest = PendingRequest.CreateForUnsubRequest(subscription!); - await SendAndWaitAsync(pendingRequest, 1).ConfigureAwait(false); - - _logger.Log(LogLevel.Information, $"Socket {SocketId} subscription {subscription.Id} unsubscribed"); + await SendAndWaitUnsubAsync(subscription!).ConfigureAwait(false); + _logger.Log(LogLevel.Information, $"Socket {SocketId} subscription {subscription!.Id} unsubscribed"); } } diff --git a/CryptoExchange.Net/Sockets/Subscription.cs b/CryptoExchange.Net/Sockets/Subscription.cs index 1b8f703..0312f2b 100644 --- a/CryptoExchange.Net/Sockets/Subscription.cs +++ b/CryptoExchange.Net/Sockets/Subscription.cs @@ -57,33 +57,24 @@ namespace CryptoExchange.Net.Sockets /// /// public abstract object? GetSubRequest(); - /// - /// Check if the message is the response to the subscribe request - /// - /// - /// - public abstract bool MessageMatchesSubRequest(ParsedMessage message); - public abstract CallResult HandleSubResponse(ParsedMessage message); /// /// Get the unsubscribe object to send when unsubscribing /// /// public abstract object? GetUnsubRequest(); - /// - /// Check if the message is the response to the unsubscribe request - /// - /// - /// - public abstract bool MessageMatchesUnsubRequest(ParsedMessage message); - public abstract CallResult HandleUnsubResponse(ParsedMessage message); /// /// Handle the update message /// /// /// - public abstract Task HandleEventAsync(DataEvent message); + public abstract Task HandleEventAsync(DataEvent message); + public abstract CallResult HandleSubResponse(BaseParsedMessage message); + public abstract CallResult HandleUnsubResponse(BaseParsedMessage message); + + public abstract bool MessageMatchesUnsubRequest(BaseParsedMessage message); + public abstract bool MessageMatchesSubRequest(BaseParsedMessage message); /// /// Invoke the exception event @@ -94,4 +85,56 @@ namespace CryptoExchange.Net.Sockets Exception?.Invoke(e); } } + + public abstract class Subscription : Subscription + { + protected Subscription(ILogger logger, bool authenticated) : base(logger, authenticated) + { + } + } + + public abstract class Subscription : Subscription + { + protected Subscription(ILogger logger, bool authenticated) : base(logger, authenticated) + { + } + + public override CallResult HandleUnsubResponse(BaseParsedMessage message) + => HandleUnsubResponse((ParsedMessage)message); + + public override CallResult HandleSubResponse(BaseParsedMessage message) + => HandleSubResponse((ParsedMessage)message); + + public override Task HandleEventAsync(DataEvent message) + => HandleEventAsync(message.As((ParsedMessage)message.Data)); + + public override bool MessageMatchesSubRequest(BaseParsedMessage message) + => MessageMatchesSubRequest((ParsedMessage)message); + + public override bool MessageMatchesUnsubRequest(BaseParsedMessage message) + => MessageMatchesUnsubRequest((ParsedMessage)message); + + /// + /// Handle the update message + /// + /// + /// + public abstract Task HandleEventAsync(DataEvent> message); + + /// + /// Check if the message is the response to the subscribe request + /// + /// + /// + public abstract bool MessageMatchesSubRequest(ParsedMessage message); + public abstract CallResult HandleSubResponse(ParsedMessage message); + + /// + /// Check if the message is the response to the unsubscribe request + /// + /// + /// + public abstract bool MessageMatchesUnsubRequest(ParsedMessage message); + public abstract CallResult HandleUnsubResponse(ParsedMessage message); + } } diff --git a/CryptoExchange.Net/Sockets/SystemSubscription.cs b/CryptoExchange.Net/Sockets/SystemSubscription.cs index 3e162c9..8494fcf 100644 --- a/CryptoExchange.Net/Sockets/SystemSubscription.cs +++ b/CryptoExchange.Net/Sockets/SystemSubscription.cs @@ -24,15 +24,15 @@ namespace CryptoExchange.Net.Sockets /// public override object? GetSubRequest() => null; /// - public override bool MessageMatchesSubRequest(ParsedMessage message) => throw new NotImplementedException(); + public override bool MessageMatchesSubRequest(BaseParsedMessage message) => throw new NotImplementedException(); /// - public override CallResult HandleSubResponse(ParsedMessage message) => throw new NotImplementedException(); + public override CallResult HandleSubResponse(BaseParsedMessage message) => throw new NotImplementedException(); /// public override object? GetUnsubRequest() => null; /// - public override bool MessageMatchesUnsubRequest(ParsedMessage message) => throw new NotImplementedException(); + public override bool MessageMatchesUnsubRequest(BaseParsedMessage message) => throw new NotImplementedException(); /// - public override CallResult HandleUnsubResponse(ParsedMessage message) => throw new NotImplementedException(); + public override CallResult HandleUnsubResponse(BaseParsedMessage message) => throw new NotImplementedException(); } }