From 6fa66d819d1c85438e70d5c7ca12a44192ded109 Mon Sep 17 00:00:00 2001 From: JKorf <jankorf91@gmail.com> Date: Sat, 4 Nov 2023 11:16:46 +0100 Subject: [PATCH] wip --- CryptoExchange.Net/Clients/SocketApiClient.cs | 8 +- .../Converters/SocketConverter.cs | 13 +-- .../Objects/Sockets/ParsedMessage.cs | 17 +++- .../Objects/Sockets/PendingRequest.cs | 92 +++++++++---------- CryptoExchange.Net/Sockets/Query.cs | 37 ++++---- .../Sockets/SocketConnection.cs | 79 +++------------- CryptoExchange.Net/Sockets/Subscription.cs | 73 ++++++++++++--- .../Sockets/SystemSubscription.cs | 8 +- 8 files changed, 162 insertions(+), 165 deletions(-) diff --git a/CryptoExchange.Net/Clients/SocketApiClient.cs b/CryptoExchange.Net/Clients/SocketApiClient.cs index 74a8a58..2578291 100644 --- a/CryptoExchange.Net/Clients/SocketApiClient.cs +++ b/CryptoExchange.Net/Clients/SocketApiClient.cs @@ -284,7 +284,7 @@ namespace CryptoExchange.Net /// <returns></returns> protected virtual Task<CallResult<T>> QueryAsync<T>(Query<T> query) { - return QueryAsync<T>(BaseAddress, query); + return QueryAsync(BaseAddress, query); } /// <summary> @@ -403,7 +403,7 @@ namespace CryptoExchange.Net /// Should return the request which can be used to authenticate a socket connection /// </summary> /// <returns></returns> - protected internal virtual Query GetAuthenticationRequest() => throw new NotImplementedException(); + protected internal virtual BaseQuery GetAuthenticationRequest() => throw new NotImplementedException(); /// <summary> /// Add a subscription to a connection @@ -418,7 +418,7 @@ namespace CryptoExchange.Net if (!connection.AddListener(subscription)) return false; - return false; + return true; } /// <summary> @@ -511,7 +511,7 @@ namespace CryptoExchange.Net /// Process an unhandled message /// </summary> /// <param name="message">The message that wasn't processed</param> - protected virtual void HandleUnhandledMessage(ParsedMessage message) + protected virtual void HandleUnhandledMessage(BaseParsedMessage message) { } diff --git a/CryptoExchange.Net/Converters/SocketConverter.cs b/CryptoExchange.Net/Converters/SocketConverter.cs index 408d972..b781960 100644 --- a/CryptoExchange.Net/Converters/SocketConverter.cs +++ b/CryptoExchange.Net/Converters/SocketConverter.cs @@ -32,18 +32,17 @@ namespace CryptoExchange.Net.Converters public abstract Type? GetDeserializationType(Dictionary<string, string?> idValues, List<BasePendingRequest> pendingRequests, List<Subscription> listeners); /// <inheritdoc /> - public ParsedMessage? ReadJson(Stream stream, List<BasePendingRequest> pendingRequests, List<Subscription> listeners, bool outputOriginalData) + public BaseParsedMessage? ReadJson(Stream stream, List<BasePendingRequest> pendingRequests, List<Subscription> listeners, bool outputOriginalData) { // Start reading the data // Once we reach the properties that identify the message we save those in a dict // Once all id properties have been read callback to see what the deserialization type should be // Deserialize to the correct type - var result = new ParsedMessage(); using var sr = new StreamReader(stream, Encoding.UTF8, false, (int)stream.Length, true); if (outputOriginalData) { - result.OriginalData = sr.ReadToEnd(); + //result.OriginalData = sr.ReadToEnd(); stream.Position = 0; } @@ -81,10 +80,12 @@ namespace CryptoExchange.Net.Converters idString += GetValueForKey(token, idField); } - result.Identifier = idString; var resultType = GetDeserializationType(typeIdDict, pendingRequests, listeners); - result.Data = resultType == null ? null : token.ToObject(resultType); - return result; + var resultMessageType = typeof(ParsedMessage<>).MakeGenericType(resultType); + var instance = (BaseParsedMessage)Activator.CreateInstance(resultMessageType, resultType == null ? null : token.ToObject(resultType)); + instance.Identifier = idString; + instance.Parsed = resultType != null; + return instance; } private string? GetValueForKey(JToken token, string key) diff --git a/CryptoExchange.Net/Objects/Sockets/ParsedMessage.cs b/CryptoExchange.Net/Objects/Sockets/ParsedMessage.cs index 6a8a0e7..d9327a7 100644 --- a/CryptoExchange.Net/Objects/Sockets/ParsedMessage.cs +++ b/CryptoExchange.Net/Objects/Sockets/ParsedMessage.cs @@ -3,7 +3,7 @@ /// <summary> /// Parsed message object /// </summary> - public class ParsedMessage + public abstract class BaseParsedMessage { /// <summary> /// Identifier string @@ -13,9 +13,22 @@ /// Original data if the option is enabled /// </summary> public string? OriginalData { get; set; } + /// <summary> + /// If parsed + /// </summary> + public bool Parsed { get; set; } + } + + public class ParsedMessage<T> : BaseParsedMessage + { /// <summary> /// Parsed data object /// </summary> - public object? Data { get; set; } + public T? Data { get; set; } + + public ParsedMessage(T? data) + { + Data = data; + } } } diff --git a/CryptoExchange.Net/Objects/Sockets/PendingRequest.cs b/CryptoExchange.Net/Objects/Sockets/PendingRequest.cs index 4188b76..f95c014 100644 --- a/CryptoExchange.Net/Objects/Sockets/PendingRequest.cs +++ b/CryptoExchange.Net/Objects/Sockets/PendingRequest.cs @@ -11,7 +11,7 @@ namespace CryptoExchange.Net.Objects.Sockets public abstract class BasePendingRequest { public int Id { get; set; } - public Func<ParsedMessage, bool> MessageMatchesHandler { get; } + public Func<BaseParsedMessage, bool> MessageMatchesHandler { get; } public bool Completed { get; private set; } public abstract Type ResponseType { get; } @@ -21,10 +21,9 @@ namespace CryptoExchange.Net.Objects.Sockets public object Request { get; set; } private CancellationTokenSource? _cts; + public abstract CallResult Result { get; set; } - public int Priority => 100; - - protected BasePendingRequest(int id, object request, Func<ParsedMessage, bool> messageMatchesHandler, TimeSpan timeout) + protected BasePendingRequest(int id, object request, Func<BaseParsedMessage, bool> messageMatchesHandler, TimeSpan timeout) { Id = id; MessageMatchesHandler = messageMatchesHandler; @@ -47,12 +46,12 @@ namespace CryptoExchange.Net.Objects.Sockets Event.Set(); } - public bool MessageMatches(ParsedMessage message) + public bool MessageMatches(BaseParsedMessage message) { return MessageMatchesHandler(message); } - public virtual Task ProcessAsync(ParsedMessage message) + public virtual Task ProcessAsync(BaseParsedMessage message) { Completed = true; Event.Set(); @@ -60,59 +59,60 @@ namespace CryptoExchange.Net.Objects.Sockets } } - public class PendingRequest : BasePendingRequest - { - public CallResult Result { get; set; } - public Func<ParsedMessage, CallResult> Handler { get; } - public override Type? ResponseType => null; + //public class PendingRequest : BasePendingRequest + //{ + // public CallResult Result { get; set; } + // public Func<BaseParsedMessage, CallResult> Handler { get; } + // public override Type? ResponseType => null; - private PendingRequest(int id, object request, Func<ParsedMessage, bool> messageMatchesHandler, Func<ParsedMessage, CallResult> messageHandler, TimeSpan timeout) - : base(id, request, messageMatchesHandler, timeout) - { - Handler = messageHandler; - } + // private PendingRequest(int id, object request, Func<BaseParsedMessage, bool> messageMatchesHandler, Func<BaseParsedMessage, CallResult> messageHandler, TimeSpan timeout) + // : base(id, request, messageMatchesHandler, timeout) + // { + // Handler = messageHandler; + // } - public static PendingRequest CreateForQuery(Query query) - { - return new PendingRequest(ExchangeHelpers.NextId(), query.Request, query.MessageMatchesQuery, query.HandleResult, TimeSpan.FromSeconds(5)); - } + // public static PendingRequest CreateForQuery(BaseQuery query) + // { + // return new PendingRequest(ExchangeHelpers.NextId(), query.Request, query.MessageMatchesQuery, query.HandleResult, TimeSpan.FromSeconds(5)); + // } - public static PendingRequest CreateForSubRequest(Subscription subscription) - { - return new PendingRequest(ExchangeHelpers.NextId(), subscription.GetSubRequest, subscription.MessageMatchesSubRequest, subscription.HandleSubResponse, TimeSpan.FromSeconds(5)); - } + // public static PendingRequest CreateForSubRequest(Subscription subscription) + // { + // return new PendingRequest(ExchangeHelpers.NextId(), subscription.GetSubRequest(), subscription.MessageMatchesSubRequest, subscription.HandleSubResponse, TimeSpan.FromSeconds(5)); + // } - public static PendingRequest CreateForUnsubRequest(Subscription subscription) - { - return new PendingRequest(ExchangeHelpers.NextId(), subscription.GetUnsubRequest, subscription.MessageMatchesUnsubRequest, subscription.HandleUnsubResponse, TimeSpan.FromSeconds(5)); - } + // public static PendingRequest CreateForUnsubRequest(Subscription subscription) + // { + // return new PendingRequest(ExchangeHelpers.NextId(), subscription.GetUnsubRequest(), subscription.MessageMatchesUnsubRequest, subscription.HandleUnsubResponse, TimeSpan.FromSeconds(5)); + // } - public override void Fail(string error) - { - Result = new CallResult(new ServerError(error)); - base.Fail(error); - } + // public override void Fail(string error) + // { + // Result = new CallResult(new ServerError(error)); + // base.Fail(error); + // } - public override Task ProcessAsync(ParsedMessage message) - { - Result = Handler(message); - return base.ProcessAsync(message); - } - } + // public override Task ProcessAsync(BaseParsedMessage message) + // { + // Result = Handler(message); + // return base.ProcessAsync(message); + // } + //} public class PendingRequest<T> : BasePendingRequest { - public CallResult<T> Result { get; set; } - public Func<ParsedMessage, CallResult<T>> Handler { get; } + public override CallResult Result { get; set; } + public CallResult<T> TypedResult => (CallResult<T>)Result; + public Func<ParsedMessage<T>, CallResult<T>> Handler { get; } public override Type? ResponseType => typeof(T); - public PendingRequest(int id, object request, Func<ParsedMessage, bool> messageMatchesHandler, Func<ParsedMessage, CallResult<T>> messageHandler, TimeSpan timeout) - : base(id, request, messageMatchesHandler, timeout) + public PendingRequest(int id, object request, Func<ParsedMessage<T>, bool> messageMatchesHandler, Func<ParsedMessage<T>, CallResult<T>> messageHandler, TimeSpan timeout) + : base(id, request, (x) => messageMatchesHandler((ParsedMessage<T>)x), timeout) { Handler = messageHandler; } - public static PendingRequest<T> CreateForQuery<T>(Query<T> query) + public static PendingRequest<T> CreateForQuery(Query<T> query) { return new PendingRequest<T>(ExchangeHelpers.NextId(), query.Request, query.MessageMatchesQuery, x => { @@ -127,9 +127,9 @@ namespace CryptoExchange.Net.Objects.Sockets base.Fail(error); } - public override Task ProcessAsync(ParsedMessage message) + public override Task ProcessAsync(BaseParsedMessage message) { - Result = Handler(message); + Result = Handler((ParsedMessage<T>)message); return base.ProcessAsync(message); } } diff --git a/CryptoExchange.Net/Sockets/Query.cs b/CryptoExchange.Net/Sockets/Query.cs index 655bddb..2940472 100644 --- a/CryptoExchange.Net/Sockets/Query.cs +++ b/CryptoExchange.Net/Sockets/Query.cs @@ -25,12 +25,9 @@ namespace CryptoExchange.Net.Sockets /// </summary> public int Weight { get; } - /// <summary> - /// Check if the message is the response to the query - /// </summary> - /// <param name="message"></param> - /// <returns></returns> - public abstract bool MessageMatchesQuery(ParsedMessage message); + public abstract bool MessageMatchesQuery(BaseParsedMessage message); + public abstract CallResult HandleResult(BaseParsedMessage message); + /// <summary> /// ctor @@ -44,20 +41,8 @@ namespace CryptoExchange.Net.Sockets Request = request; Weight = weight; } - } - public abstract class Query : BaseQuery - { - protected Query(object request, bool authenticated, int weight = 1) : base(request, authenticated, weight) - { - } - - /// <summary> - /// Handle the query response - /// </summary> - /// <param name="message"></param> - /// <returns></returns> - public abstract CallResult HandleResult(ParsedMessage message); + public abstract BasePendingRequest CreatePendingRequest(); } public abstract class Query<TResponse> : BaseQuery @@ -66,11 +51,23 @@ namespace CryptoExchange.Net.Sockets { } + public override CallResult HandleResult(BaseParsedMessage message) => HandleResponse((ParsedMessage<TResponse>) message); + public override bool MessageMatchesQuery(BaseParsedMessage message) => MessageMatchesQuery((ParsedMessage<TResponse>)message); + /// <summary> /// Handle the query response /// </summary> /// <param name="message"></param> /// <returns></returns> - public abstract CallResult<TResponse> HandleResponse(ParsedMessage message); + public abstract CallResult<TResponse> HandleResponse(ParsedMessage<TResponse> message); + + /// <summary> + /// Check if the message is the response to the query + /// </summary> + /// <param name="message"></param> + /// <returns></returns> + public abstract bool MessageMatchesQuery(ParsedMessage<TResponse> message); + + public override BasePendingRequest CreatePendingRequest() => PendingRequest<TResponse>.CreateForQuery(this); } } diff --git a/CryptoExchange.Net/Sockets/SocketConnection.cs b/CryptoExchange.Net/Sockets/SocketConnection.cs index 2a4fd04..46d14c8 100644 --- a/CryptoExchange.Net/Sockets/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/SocketConnection.cs @@ -49,7 +49,7 @@ namespace CryptoExchange.Net.Sockets /// <summary> /// Unhandled message event /// </summary> - public event Action<ParsedMessage>? UnhandledMessage; + public event Action<BaseParsedMessage>? UnhandledMessage; /// <summary> /// Unparsed message event @@ -341,7 +341,7 @@ namespace CryptoExchange.Net.Sockets if (result.OriginalData != null) _logger.LogDebug($"Socket {SocketId} Data received: {result.OriginalData}"); - if (result.Data == null) + if (!result.Parsed) { _logger.LogWarning("Message not matched to type"); return; @@ -352,7 +352,7 @@ namespace CryptoExchange.Net.Sockets { // Matched based on identifier var userSw = Stopwatch.StartNew(); - var dataEvent = new DataEvent<ParsedMessage>(result, null, result.OriginalData, DateTime.UtcNow, null); + var dataEvent = new DataEvent<BaseParsedMessage>(result, null, result.OriginalData, DateTime.UtcNow, null); await idListener.HandleEventAsync(dataEvent).ConfigureAwait(false); userSw.Stop(); return; @@ -547,18 +547,18 @@ namespace CryptoExchange.Net.Sockets return _messageListeners.SingleOrDefault(s => predicate(s)); } - public virtual async Task<CallResult<T>> SendAndWaitQueryAsync<T>(Query<T> query) + public virtual async Task<CallResult> SendAndWaitQueryAsync(BaseQuery query) { - var pendingRequest = PendingRequest<T>.CreateForQuery(query); + var pendingRequest = query.CreatePendingRequest(); await SendAndWaitAsync(pendingRequest, query.Weight).ConfigureAwait(false); return pendingRequest.Result; } - public virtual async Task<CallResult> SendAndWaitQueryAsync(Query query) + public virtual async Task<CallResult<T>> SendAndWaitQueryAsync<T>(Query<T> query) { - var pendingRequest = PendingRequest.CreateForQuery(query); + var pendingRequest = PendingRequest<T>.CreateForQuery(query); await SendAndWaitAsync(pendingRequest, query.Weight).ConfigureAwait(false); - return pendingRequest.Result; + return pendingRequest.TypedResult; } public virtual async Task<CallResult> SendAndWaitSubAsync(Subscription subscription) @@ -607,52 +607,6 @@ namespace CryptoExchange.Net.Sockets } } - ///// <summary> - ///// Send data and wait for an answer - ///// </summary> - ///// <typeparam name="T">The data type expected in response</typeparam> - ///// <param name="obj">The object to send</param> - ///// <param name="timeout">The timeout for response</param> - ///// <param name="listener">Listener if this is a subscribe request</param> - ///// <param name="handler">The response handler</param> - ///// <param name="weight">The weight of the message</param> - ///// <returns></returns> - //public virtual async Task SendAndWaitAsync<T>(T obj, TimeSpan timeout, MessageListener? listener, int weight, Func<ParsedMessage, bool> handler) - //{ - // // TODO either Query<T> or Subscription<T> should be passed here instead of T obj - // // That would allow to track the Query/Subscription on the PendingRequest instead of the listener, which allow us to match the pending request in the Converter - - // var pending = new PendingRequest(ExchangeHelpers.NextId(), handler, timeout, listener); - // lock (_messageListeners) - // { - // _pendingRequests.Add(pending); - // } - - // var sendOk = Send(pending.Id, obj, weight); - // if (!sendOk) - // { - // pending.Fail(); - // return; - // } - - // while (true) - // { - // if(!_socket.IsOpen) - // { - // pending.Fail(); - // return; - // } - - // if (pending.Completed) - // return; - - // await pending.Event.WaitAsync(TimeSpan.FromMilliseconds(500)).ConfigureAwait(false); - - // if (pending.Completed) - // return; - // } - //} - /// <summary> /// Send data over the websocket connection /// </summary> @@ -727,16 +681,7 @@ namespace CryptoExchange.Net.Sockets // Get a list of all subscriptions on the socket List<Subscription> listenerList = new List<Subscription>(); lock (_listenerLock) - { - // ? - foreach (var listener in _messageListeners) - { - if (listener != null) - listenerList.Add(listener); - else - listener.Confirmed = true; - } - } + listenerList = _messageListeners.ToList(); foreach(var listener in listenerList) { @@ -778,10 +723,8 @@ namespace CryptoExchange.Net.Sockets var unsubscribeRequest = subscription?.GetUnsubRequest(); if (unsubscribeRequest != null) { - var pendingRequest = PendingRequest.CreateForUnsubRequest(subscription!); - await SendAndWaitAsync(pendingRequest, 1).ConfigureAwait(false); - - _logger.Log(LogLevel.Information, $"Socket {SocketId} subscription {subscription.Id} unsubscribed"); + await SendAndWaitUnsubAsync(subscription!).ConfigureAwait(false); + _logger.Log(LogLevel.Information, $"Socket {SocketId} subscription {subscription!.Id} unsubscribed"); } } diff --git a/CryptoExchange.Net/Sockets/Subscription.cs b/CryptoExchange.Net/Sockets/Subscription.cs index 1b8f703..0312f2b 100644 --- a/CryptoExchange.Net/Sockets/Subscription.cs +++ b/CryptoExchange.Net/Sockets/Subscription.cs @@ -57,33 +57,24 @@ namespace CryptoExchange.Net.Sockets /// </summary> /// <returns></returns> public abstract object? GetSubRequest(); - /// <summary> - /// Check if the message is the response to the subscribe request - /// </summary> - /// <param name="message"></param> - /// <returns></returns> - public abstract bool MessageMatchesSubRequest(ParsedMessage message); - public abstract CallResult HandleSubResponse(ParsedMessage message); /// <summary> /// Get the unsubscribe object to send when unsubscribing /// </summary> /// <returns></returns> public abstract object? GetUnsubRequest(); - /// <summary> - /// Check if the message is the response to the unsubscribe request - /// </summary> - /// <param name="message"></param> - /// <returns></returns> - public abstract bool MessageMatchesUnsubRequest(ParsedMessage message); - public abstract CallResult HandleUnsubResponse(ParsedMessage message); /// <summary> /// Handle the update message /// </summary> /// <param name="message"></param> /// <returns></returns> - public abstract Task HandleEventAsync(DataEvent<ParsedMessage> message); + public abstract Task HandleEventAsync(DataEvent<BaseParsedMessage> message); + public abstract CallResult HandleSubResponse(BaseParsedMessage message); + public abstract CallResult HandleUnsubResponse(BaseParsedMessage message); + + public abstract bool MessageMatchesUnsubRequest(BaseParsedMessage message); + public abstract bool MessageMatchesSubRequest(BaseParsedMessage message); /// <summary> /// Invoke the exception event @@ -94,4 +85,56 @@ namespace CryptoExchange.Net.Sockets Exception?.Invoke(e); } } + + public abstract class Subscription<TQuery, TEvent> : Subscription<TQuery, TEvent, TQuery> + { + protected Subscription(ILogger logger, bool authenticated) : base(logger, authenticated) + { + } + } + + public abstract class Subscription<TSubResponse, TEvent, TUnsubResponse> : Subscription + { + protected Subscription(ILogger logger, bool authenticated) : base(logger, authenticated) + { + } + + public override CallResult HandleUnsubResponse(BaseParsedMessage message) + => HandleUnsubResponse((ParsedMessage<TUnsubResponse>)message); + + public override CallResult HandleSubResponse(BaseParsedMessage message) + => HandleSubResponse((ParsedMessage<TSubResponse>)message); + + public override Task HandleEventAsync(DataEvent<BaseParsedMessage> message) + => HandleEventAsync(message.As((ParsedMessage<TEvent>)message.Data)); + + public override bool MessageMatchesSubRequest(BaseParsedMessage message) + => MessageMatchesSubRequest((ParsedMessage<TSubResponse>)message); + + public override bool MessageMatchesUnsubRequest(BaseParsedMessage message) + => MessageMatchesUnsubRequest((ParsedMessage<TUnsubResponse>)message); + + /// <summary> + /// Handle the update message + /// </summary> + /// <param name="message"></param> + /// <returns></returns> + public abstract Task HandleEventAsync(DataEvent<ParsedMessage<TEvent>> message); + + /// <summary> + /// Check if the message is the response to the subscribe request + /// </summary> + /// <param name="message"></param> + /// <returns></returns> + public abstract bool MessageMatchesSubRequest(ParsedMessage<TSubResponse> message); + public abstract CallResult HandleSubResponse(ParsedMessage<TSubResponse> message); + + /// <summary> + /// Check if the message is the response to the unsubscribe request + /// </summary> + /// <param name="message"></param> + /// <returns></returns> + public abstract bool MessageMatchesUnsubRequest(ParsedMessage<TUnsubResponse> message); + public abstract CallResult HandleUnsubResponse(ParsedMessage<TUnsubResponse> message); + } } diff --git a/CryptoExchange.Net/Sockets/SystemSubscription.cs b/CryptoExchange.Net/Sockets/SystemSubscription.cs index 3e162c9..8494fcf 100644 --- a/CryptoExchange.Net/Sockets/SystemSubscription.cs +++ b/CryptoExchange.Net/Sockets/SystemSubscription.cs @@ -24,15 +24,15 @@ namespace CryptoExchange.Net.Sockets /// <inheritdoc /> public override object? GetSubRequest() => null; /// <inheritdoc /> - public override bool MessageMatchesSubRequest(ParsedMessage message) => throw new NotImplementedException(); + public override bool MessageMatchesSubRequest(BaseParsedMessage message) => throw new NotImplementedException(); /// <inheritdoc /> - public override CallResult HandleSubResponse(ParsedMessage message) => throw new NotImplementedException(); + public override CallResult HandleSubResponse(BaseParsedMessage message) => throw new NotImplementedException(); /// <inheritdoc /> public override object? GetUnsubRequest() => null; /// <inheritdoc /> - public override bool MessageMatchesUnsubRequest(ParsedMessage message) => throw new NotImplementedException(); + public override bool MessageMatchesUnsubRequest(BaseParsedMessage message) => throw new NotImplementedException(); /// <inheritdoc /> - public override CallResult HandleUnsubResponse(ParsedMessage message) => throw new NotImplementedException(); + public override CallResult HandleUnsubResponse(BaseParsedMessage message) => throw new NotImplementedException(); } }