From bf854c92af230083f1f4d1362f92e45e82ca6844 Mon Sep 17 00:00:00 2001 From: JKorf Date: Sat, 28 Oct 2023 10:08:04 +0200 Subject: [PATCH] wip --- CryptoExchange.Net/Clients/SocketApiClient.cs | 9 +- .../Converters/SocketConverter.cs | 72 ++++++++++++++ .../Sockets/SocketConnection.cs | 97 +++++++++++++------ CryptoExchange.Net/Sockets/Subscription.cs | 3 + 4 files changed, 147 insertions(+), 34 deletions(-) create mode 100644 CryptoExchange.Net/Converters/SocketConverter.cs diff --git a/CryptoExchange.Net/Clients/SocketApiClient.cs b/CryptoExchange.Net/Clients/SocketApiClient.cs index 950c513..2a04a05 100644 --- a/CryptoExchange.Net/Clients/SocketApiClient.cs +++ b/CryptoExchange.Net/Clients/SocketApiClient.cs @@ -1,3 +1,4 @@ +using CryptoExchange.Net.Converters; using CryptoExchange.Net.Interfaces; using CryptoExchange.Net.Objects; using CryptoExchange.Net.Objects.Options; @@ -450,12 +451,14 @@ namespace CryptoExchange.Net protected virtual MessageListener? AddSubscription(Subscription subscription, bool userSubscription, SocketConnection connection) { var messageListener = new MessageListener(ExchangeHelpers.NextId(), subscription, userSubscription); - if (!connection.AddListener(messageListener)) + if (!connection.AddListener(messageListener, subscription.Identifiers)) return null; return messageListener; } + protected internal abstract SocketConverter GetConverter(); + /// /// Adds a system subscription. Used for example to reply to ping requests /// @@ -465,7 +468,7 @@ namespace CryptoExchange.Net systemSubscriptions.Add(systemSubscription); var subscription = new MessageListener(ExchangeHelpers.NextId(), systemSubscription, false); foreach (var connection in socketConnections.Values) - connection.AddListener(subscription); + connection.AddListener(subscription, null); } /// @@ -539,7 +542,7 @@ namespace CryptoExchange.Net foreach (var systemSubscription in systemSubscriptions) { var handler = new MessageListener(ExchangeHelpers.NextId(), systemSubscription, false); - socketConnection.AddListener(handler); + socketConnection.AddListener(handler, null); } return new CallResult(socketConnection); diff --git a/CryptoExchange.Net/Converters/SocketConverter.cs b/CryptoExchange.Net/Converters/SocketConverter.cs new file mode 100644 index 0000000..f84eda2 --- /dev/null +++ b/CryptoExchange.Net/Converters/SocketConverter.cs @@ -0,0 +1,72 @@ +using CryptoExchange.Net.Objects.Sockets; +using Newtonsoft.Json; +using Newtonsoft.Json.Linq; +using System; +using System.Collections.Generic; +using System.Text; + +namespace CryptoExchange.Net.Converters +{ + public class SocketConverter : JsonConverter + { + private readonly List _idFields; + private readonly Func, Type> _typeIdentifier; + + public SocketConverter(List idFields, Func, Type> typeIdentifier) + { + _idFields = idFields; + _typeIdentifier = typeIdentifier; + } + + /// + public override bool CanConvert(Type objectType) => true; + + /// + public override object? ReadJson(JsonReader reader, Type objectType, object? existingValue, JsonSerializer serializer) + { + // Start reading the data + // Once we reach the property that identifies the message we save those in a string array + // Once all id properties have been read callback to see what the deserialization type should be + // Deserialize to the correct type + + var token = JToken.Load(reader); + var dict = new Dictionary(); + foreach(var idField in _idFields) + { + var splitTokens = idField.Split(new char[] { ':' }); + var accessToken = token; + foreach (var splitToken in splitTokens) + { + accessToken = accessToken[splitToken]; + } + dict[idField] = accessToken?.ToString(); + } + + var resultType = _typeIdentifier(dict); + string idString = ""; + foreach(var item in dict) + idString += item.Value; + + return new ParsedMessage + { + Identifier = idString, + Data = resultType == null ? null : token.ToObject(resultType) + }; + } + + /// + public override bool CanWrite { get { return false; } } + + /// + public override void WriteJson(JsonWriter writer, object? value, JsonSerializer serializer) + { + } + } + + public class ParsedMessage + { + public string Identifier { get; set; } + + public object Data { get; set; } + } +} diff --git a/CryptoExchange.Net/Sockets/SocketConnection.cs b/CryptoExchange.Net/Sockets/SocketConnection.cs index 90bda3c..556ae72 100644 --- a/CryptoExchange.Net/Sockets/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/SocketConnection.cs @@ -10,6 +10,9 @@ using CryptoExchange.Net.Objects; using System.Net.WebSockets; using System.IO; using CryptoExchange.Net.Objects.Sockets; +using CryptoExchange.Net.Converters; +using System.Text; +using System.Runtime; namespace CryptoExchange.Net.Sockets { @@ -153,6 +156,8 @@ namespace CryptoExchange.Net.Sockets private readonly List _listeners; private readonly List _messageListeners; // ? + private readonly Dictionary _messageIdentifierListeners; + private readonly object _listenerLock = new(); private readonly ILogger _logger; @@ -162,6 +167,8 @@ namespace CryptoExchange.Net.Sockets /// The underlying websocket /// private readonly IWebsocket _socket; + private readonly JsonSerializerSettings _options; + private readonly JsonSerializer _serializer; /// /// New socket connection @@ -178,6 +185,7 @@ namespace CryptoExchange.Net.Sockets Properties = new Dictionary(); _messageListeners = new List(); + _messageIdentifierListeners = new Dictionary(); _listeners = new List(); _socket = socket; @@ -189,6 +197,11 @@ namespace CryptoExchange.Net.Sockets _socket.OnReconnected += HandleReconnected; _socket.OnError += HandleError; _socket.GetReconnectionUrl = GetReconnectionUrlAsync; + + var converter = ApiClient.GetConverter(); + _options = SerializerOptions.Default; + _options.Converters.Add(converter); + _serializer = JsonSerializer.Create(_options); } /// @@ -321,45 +334,62 @@ namespace CryptoExchange.Net.Sockets lock (_listenerLock) listeners = _messageListeners.OrderByDescending(x => x.Priority).ToList(); - foreach (var listener in listeners) + var converter = ApiClient.GetConverter(); + using var sr = new StreamReader(stream, Encoding.UTF8, false, (int)stream.Length, true); + using var jsonTextReader = new JsonTextReader(sr); + var result = (ParsedMessage)converter.ReadJson(jsonTextReader, typeof(object), null, _serializer); + stream.Position = 0; + + if (_messageIdentifierListeners.TryGetValue(result.Identifier.ToLowerInvariant(), out var idListener)) { - if (listener.MessageMatches(streamMessage)) + var userSw = Stopwatch.StartNew(); + await idListener.ProcessAsync(streamMessage).ConfigureAwait(false); + userSw.Stop(); + userCodeDuration = userSw.Elapsed; + handledResponse = true; + } + else + { + foreach (var listener in listeners) { - if (listener is PendingRequest pendingRequest) + if (listener.MessageMatches(streamMessage)) { - lock (_messageListeners) - _messageListeners.Remove(pendingRequest); - - if (pendingRequest.Completed) + if (listener is PendingRequest pendingRequest) { - // Answer to a timed out request, unsub if it is a subscription request - if (pendingRequest.MessageListener != null) + lock (_messageListeners) + _messageListeners.Remove(pendingRequest); + + if (pendingRequest.Completed) { - _logger.Log(LogLevel.Warning, $"Socket {SocketId} Received subscription info after request timed out; unsubscribing. Consider increasing the RequestTimeout"); - _ = UnsubscribeAsync(pendingRequest.MessageListener).ConfigureAwait(false); + // Answer to a timed out request, unsub if it is a subscription request + if (pendingRequest.MessageListener != null) + { + _logger.Log(LogLevel.Warning, $"Socket {SocketId} Received subscription info after request timed out; unsubscribing. Consider increasing the RequestTimeout"); + _ = UnsubscribeAsync(pendingRequest.MessageListener).ConfigureAwait(false); + } } + else + { + _logger.Log(LogLevel.Trace, $"Socket {SocketId} - msg {pendingRequest.Id} - received data matched to pending request"); + await pendingRequest.ProcessAsync(streamMessage).ConfigureAwait(false); + } + + if (!ApiClient.ContinueOnQueryResponse) + return; + + handledResponse = true; + break; } - else + else if (listener is MessageListener subscription) { - _logger.Log(LogLevel.Trace, $"Socket {SocketId} - msg {pendingRequest.Id} - received data matched to pending request"); - await pendingRequest.ProcessAsync(streamMessage).ConfigureAwait(false); + currentSubscription = subscription; + handledResponse = true; + var userSw = Stopwatch.StartNew(); + await subscription.ProcessAsync(streamMessage).ConfigureAwait(false); + userSw.Stop(); + userCodeDuration = userSw.Elapsed; + break; } - - if (!ApiClient.ContinueOnQueryResponse) - return; - - handledResponse = true; - break; - } - else if (listener is MessageListener subscription) - { - currentSubscription = subscription; - handledResponse = true; - var userSw = Stopwatch.StartNew(); - await subscription.ProcessAsync(streamMessage).ConfigureAwait(false); - userSw.Stop(); - userCodeDuration = userSw.Elapsed; - break; } } } @@ -480,7 +510,7 @@ namespace CryptoExchange.Net.Sockets /// Add a listener to this connection /// /// - public bool AddListener(MessageListener listener) + public bool AddListener(MessageListener listener, List? listenerIdentifiers) { lock (_listenerLock) { @@ -489,6 +519,11 @@ namespace CryptoExchange.Net.Sockets _listeners.Add(listener); _messageListeners.Add(listener); + if (listenerIdentifiers != null) + { + foreach (var id in listenerIdentifiers) + _messageIdentifierListeners.Add(id.ToLowerInvariant(), listener); + } if (listener.UserListener) _logger.Log(LogLevel.Debug, $"Socket {SocketId} adding new listener with id {listener.Id}, total listeners on connection: {_listeners.Count(s => s.UserListener)}"); diff --git a/CryptoExchange.Net/Sockets/Subscription.cs b/CryptoExchange.Net/Sockets/Subscription.cs index 324b5fb..6510c72 100644 --- a/CryptoExchange.Net/Sockets/Subscription.cs +++ b/CryptoExchange.Net/Sockets/Subscription.cs @@ -3,6 +3,7 @@ using CryptoExchange.Net.Objects; using CryptoExchange.Net.Objects.Sockets; using Microsoft.Extensions.Logging; using Newtonsoft.Json; +using System.Collections.Generic; using System.IO; using System.Text; using System.Threading.Tasks; @@ -26,6 +27,8 @@ namespace CryptoExchange.Net.Sockets /// public bool Authenticated { get; } + public abstract List Identifiers { get; } + /// /// ctor ///