From 081c2d42682d561a6cb65eb3169265260a5d70c9 Mon Sep 17 00:00:00 2001 From: JKorf Date: Sun, 5 Nov 2023 15:22:53 +0100 Subject: [PATCH] wip --- CryptoExchange.Net/Clients/SocketApiClient.cs | 97 +++----- .../Converters/SocketConverter.cs | 22 +- .../Objects/Sockets/DataEvent.cs | 3 +- .../Objects/Sockets/ParsedMessage.cs | 8 + .../Objects/Sockets/PendingRequest.cs | 189 ++++++++------- .../Objects/Sockets/SocketSubscription.cs | 95 -------- .../Objects/Sockets/StreamMessage.cs | 70 ------ CryptoExchange.Net/Sockets/Query.cs | 27 ++- .../Sockets/SocketConnection.cs | 215 +++++++++--------- CryptoExchange.Net/Sockets/Subscription.cs | 69 +++--- .../Sockets/SystemSubscription.cs | 19 +- 11 files changed, 324 insertions(+), 490 deletions(-) delete mode 100644 CryptoExchange.Net/Objects/Sockets/SocketSubscription.cs delete mode 100644 CryptoExchange.Net/Objects/Sockets/StreamMessage.cs diff --git a/CryptoExchange.Net/Clients/SocketApiClient.cs b/CryptoExchange.Net/Clients/SocketApiClient.cs index 2578291..1b1b2d6 100644 --- a/CryptoExchange.Net/Clients/SocketApiClient.cs +++ b/CryptoExchange.Net/Clients/SocketApiClient.cs @@ -99,7 +99,7 @@ namespace CryptoExchange.Net if (!socketConnections.Any()) return 0; - return socketConnections.Sum(s => s.Value.UserListenerCount); + return socketConnections.Sum(s => s.Value.UserSubscriptionCount); } } @@ -194,7 +194,7 @@ namespace CryptoExchange.Net socketConnection = socketResult.Data; // Add a subscription on the socket connection - var success = AddSubscription(subscription, true, socketConnection); + var success = socketConnection.AddSubscription(subscription); if (!success) { _logger.Log(LogLevel.Trace, $"Socket {socketConnection.SocketId} failed to add subscription, retrying on different connection"); @@ -229,15 +229,18 @@ namespace CryptoExchange.Net return new CallResult(new ServerError("Socket is paused")); } - var request = subscription.GetSubRequest(); - if (request != null) + var subQuery = subscription.GetSubQuery(); + if (subQuery != null) { // Send the request and wait for answer - var subResult = await SubscribeAndWaitAsync(socketConnection, subscription).ConfigureAwait(false); + var subResult = await socketConnection.SendAndWaitQueryAsync(subQuery).ConfigureAwait(false); if (!subResult) { _logger.Log(LogLevel.Warning, $"Socket {socketConnection.SocketId} failed to subscribe: {subResult.Error}"); - await socketConnection.CloseAsync(subscription).ConfigureAwait(false); + // If this was a timeout we still need to send an unsubscribe to prevent messages coming in later + var unsubscribe = subResult.Error is CancellationRequestedError; + await socketConnection.CloseAsync(subscription, unsubscribe).ConfigureAwait(false); + return new CallResult(subResult.Error!); } } @@ -260,22 +263,6 @@ namespace CryptoExchange.Net return new CallResult(new UpdateSubscription(socketConnection, subscription)); } - /// - /// Sends the subscribe request and waits for a response to that request - /// - /// The connection to send the request on - /// The request to send, will be serialized to json - /// The message listener for the subscription - /// - protected internal virtual async Task SubscribeAndWaitAsync(SocketConnection socketConnection, Subscription subscription) - { - var callResult = await socketConnection.SendAndWaitSubAsync(subscription).ConfigureAwait(false); - if (callResult) - subscription.Confirmed = true; - - return callResult; - } - /// /// Send a query on a socket connection to the BaseAddress and wait for the response /// @@ -333,20 +320,7 @@ namespace CryptoExchange.Net return new CallResult(new ServerError("Socket is paused")); } - return await QueryAndWaitAsync(socketConnection, query).ConfigureAwait(false); - } - - /// - /// Sends the query request and waits for the result - /// - /// The expected result type - /// The connection to send and wait on - /// The query - /// - protected virtual async Task> QueryAndWaitAsync(SocketConnection socket, Query query) - { - var dataResult = new CallResult(new ServerError("No response on query received")); - return await socket.SendAndWaitQueryAsync(query).ConfigureAwait(false); + return await socketConnection.SendAndWaitQueryAsync(query).ConfigureAwait(false); } /// @@ -405,22 +379,6 @@ namespace CryptoExchange.Net /// protected internal virtual BaseQuery GetAuthenticationRequest() => throw new NotImplementedException(); - /// - /// Add a subscription to a connection - /// - /// The type of data the subscription expects - /// The subscription - /// Whether or not this is a user subscription (counts towards the max amount of handlers on a socket) - /// The socket connection the handler is on - /// - protected virtual bool AddSubscription(Subscription subscription, bool userSubscription, SocketConnection connection) - { - if (!connection.AddListener(subscription)) - return false; - - return true; - } - /// /// Adds a system subscription. Used for example to reply to ping requests /// @@ -429,7 +387,7 @@ namespace CryptoExchange.Net { systemSubscriptions.Add(systemSubscription); foreach (var connection in socketConnections.Values) - connection.AddListener(systemSubscription); + connection.AddSubscription(systemSubscription); } /// @@ -474,11 +432,11 @@ namespace CryptoExchange.Net var socketResult = socketConnections.Where(s => (s.Value.Status == SocketConnection.SocketStatus.None || s.Value.Status == SocketConnection.SocketStatus.Connected) && s.Value.Tag.TrimEnd('/') == address.TrimEnd('/') && (s.Value.ApiClient.GetType() == GetType()) - && (s.Value.Authenticated == authenticated || !authenticated) && s.Value.Connected).OrderBy(s => s.Value.UserListenerCount).FirstOrDefault(); + && (s.Value.Authenticated == authenticated || !authenticated) && s.Value.Connected).OrderBy(s => s.Value.UserSubscriptionCount).FirstOrDefault(); var result = socketResult.Equals(default(KeyValuePair)) ? null : socketResult.Value; if (result != null) { - if (result.UserListenerCount < ClientOptions.SocketSubscriptionsCombineTarget || (socketConnections.Count >= (ApiOptions.MaxSocketConnections ?? ClientOptions.MaxSocketConnections) && socketConnections.All(s => s.Value.UserListenerCount >= ClientOptions.SocketSubscriptionsCombineTarget))) + if (result.UserSubscriptionCount < ClientOptions.SocketSubscriptionsCombineTarget || (socketConnections.Count >= (ApiOptions.MaxSocketConnections ?? ClientOptions.MaxSocketConnections) && socketConnections.All(s => s.Value.UserSubscriptionCount >= ClientOptions.SocketSubscriptionsCombineTarget))) { // Use existing socket if it has less than target connections OR it has the least connections and we can't make new return new CallResult(result); @@ -502,7 +460,7 @@ namespace CryptoExchange.Net socketConnection.UnparsedMessage += HandleUnparsedMessage; foreach (var systemSubscription in systemSubscriptions) - socketConnection.AddListener(systemSubscription); + socketConnection.AddSubscription(systemSubscription); return new CallResult(socketConnection); } @@ -573,11 +531,14 @@ namespace CryptoExchange.Net /// /// Identifier for the periodic send /// How often - /// Method returning the object to send - protected virtual void SendPeriodic(string identifier, TimeSpan interval, Func objGetter) + /// Method returning the query to send + protected virtual void QueryPeriodic(string identifier, TimeSpan interval, Func queryDelegate) { - if (objGetter == null) - throw new ArgumentNullException(nameof(objGetter)); + if (queryDelegate == null) + throw new ArgumentNullException(nameof(queryDelegate)); + + // TODO instead of having this on ApiClient level, this should be registered on the socket connection + // This would prevent this looping without any connections periodicEvent = new AsyncResetEvent(); periodicTask = Task.Run(async () => @@ -596,15 +557,15 @@ namespace CryptoExchange.Net if (!socketConnection.Connected) continue; - var obj = objGetter(socketConnection); - if (obj == null) + var query = queryDelegate(socketConnection); + if (query == null) continue; _logger.Log(LogLevel.Trace, $"Socket {socketConnection.SocketId} sending periodic {identifier}"); try { - socketConnection.Send(ExchangeHelpers.NextId(), obj, 1); + await socketConnection.SendAndWaitQueryAsync(query).ConfigureAwait(false); } catch (Exception ex) { @@ -626,7 +587,7 @@ namespace CryptoExchange.Net SocketConnection? connection = null; foreach (var socket in socketConnections.Values.ToList()) { - subscription = socket.GetListener(subscriptionId); + subscription = socket.GetSubscription(subscriptionId); if (subscription != null) { connection = socket; @@ -662,11 +623,11 @@ namespace CryptoExchange.Net /// public virtual async Task UnsubscribeAllAsync() { - var sum = socketConnections.Sum(s => s.Value.UserListenerCount); + var sum = socketConnections.Sum(s => s.Value.UserSubscriptionCount); if (sum == 0) return; - _logger.Log(LogLevel.Information, $"Unsubscribing all {socketConnections.Sum(s => s.Value.UserListenerCount)} subscriptions"); + _logger.Log(LogLevel.Information, $"Unsubscribing all {socketConnections.Sum(s => s.Value.UserSubscriptionCount)} subscriptions"); var tasks = new List(); { var socketList = socketConnections.Values; @@ -703,7 +664,7 @@ namespace CryptoExchange.Net sb.AppendLine($"{socketConnections.Count} connections, {CurrentSubscriptions} subscriptions, kbps: {IncomingKbps}"); foreach (var connection in socketConnections) { - sb.AppendLine($" Connection {connection.Key}: {connection.Value.UserListenerCount} subscriptions, status: {connection.Value.Status}, authenticated: {connection.Value.Authenticated}, kbps: {connection.Value.IncomingKbps}"); + sb.AppendLine($" Connection {connection.Key}: {connection.Value.UserSubscriptionCount} subscriptions, status: {connection.Value.Status}, authenticated: {connection.Value.Authenticated}, kbps: {connection.Value.IncomingKbps}"); foreach (var subscription in connection.Value.Subscriptions) sb.AppendLine($" Subscription {subscription.Id}, authenticated: {subscription.Authenticated}, confirmed: {subscription.Confirmed}"); } @@ -718,7 +679,7 @@ namespace CryptoExchange.Net _disposing = true; periodicEvent?.Set(); periodicEvent?.Dispose(); - if (socketConnections.Sum(s => s.Value.UserListenerCount) > 0) + if (socketConnections.Sum(s => s.Value.UserSubscriptionCount) > 0) { _logger.Log(LogLevel.Debug, "Disposing socket client, closing all subscriptions"); _ = UnsubscribeAllAsync(); diff --git a/CryptoExchange.Net/Converters/SocketConverter.cs b/CryptoExchange.Net/Converters/SocketConverter.cs index b781960..f389d50 100644 --- a/CryptoExchange.Net/Converters/SocketConverter.cs +++ b/CryptoExchange.Net/Converters/SocketConverter.cs @@ -14,6 +14,8 @@ namespace CryptoExchange.Net.Converters /// public abstract class SocketConverter { + private static JsonSerializer _serializer = JsonSerializer.Create(SerializerOptions.WithConverters); + /// /// Fields to use for the message subscription identifier /// @@ -40,12 +42,6 @@ namespace CryptoExchange.Net.Converters // Deserialize to the correct type using var sr = new StreamReader(stream, Encoding.UTF8, false, (int)stream.Length, true); - if (outputOriginalData) - { - //result.OriginalData = sr.ReadToEnd(); - stream.Position = 0; - } - using var jsonTextReader = new JsonTextReader(sr); JToken token; try @@ -81,8 +77,20 @@ namespace CryptoExchange.Net.Converters } var resultType = GetDeserializationType(typeIdDict, pendingRequests, listeners); + if (resultType == null) + { + // ? + return null; + } + var resultMessageType = typeof(ParsedMessage<>).MakeGenericType(resultType); - var instance = (BaseParsedMessage)Activator.CreateInstance(resultMessageType, resultType == null ? null : token.ToObject(resultType)); + var instance = (BaseParsedMessage)Activator.CreateInstance(resultMessageType, resultType == null ? null : token.ToObject(resultType, _serializer)); + if (outputOriginalData) + { + stream.Position = 0; + instance.OriginalData = sr.ReadToEnd(); + } + instance.Identifier = idString; instance.Parsed = resultType != null; return instance; diff --git a/CryptoExchange.Net/Objects/Sockets/DataEvent.cs b/CryptoExchange.Net/Objects/Sockets/DataEvent.cs index b1b4058..bfd9db3 100644 --- a/CryptoExchange.Net/Objects/Sockets/DataEvent.cs +++ b/CryptoExchange.Net/Objects/Sockets/DataEvent.cs @@ -1,5 +1,4 @@ -using CryptoExchange.Net.Objects; -using System; +using System; namespace CryptoExchange.Net.Objects.Sockets { diff --git a/CryptoExchange.Net/Objects/Sockets/ParsedMessage.cs b/CryptoExchange.Net/Objects/Sockets/ParsedMessage.cs index d9327a7..9968f48 100644 --- a/CryptoExchange.Net/Objects/Sockets/ParsedMessage.cs +++ b/CryptoExchange.Net/Objects/Sockets/ParsedMessage.cs @@ -19,6 +19,10 @@ public bool Parsed { get; set; } } + /// + /// Parsed message object + /// + /// Data type public class ParsedMessage : BaseParsedMessage { /// @@ -26,6 +30,10 @@ /// public T? Data { get; set; } + /// + /// ctor + /// + /// public ParsedMessage(T? data) { Data = data; diff --git a/CryptoExchange.Net/Objects/Sockets/PendingRequest.cs b/CryptoExchange.Net/Objects/Sockets/PendingRequest.cs index f95c014..b90d3a5 100644 --- a/CryptoExchange.Net/Objects/Sockets/PendingRequest.cs +++ b/CryptoExchange.Net/Objects/Sockets/PendingRequest.cs @@ -1,136 +1,171 @@ -using CryptoExchange.Net.Converters; -using CryptoExchange.Net.Interfaces; -using CryptoExchange.Net.Sockets; +using CryptoExchange.Net.Sockets; using System; -using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; namespace CryptoExchange.Net.Objects.Sockets { + /// + /// Pending socket request + /// public abstract class BasePendingRequest { + /// + /// Request id + /// public int Id { get; set; } + /// + /// Callback for checking if a message is a response to this request + /// public Func MessageMatchesHandler { get; } - public bool Completed { get; private set; } - public abstract Type ResponseType { get; } - - public AsyncResetEvent Event { get; } + /// + /// If the request is completed + /// + public bool Completed { get; protected set; } + /// + /// The response object type + /// + public abstract Type? ResponseType { get; } + /// + /// Timer event + /// public DateTime RequestTimestamp { get; set; } - public TimeSpan Timeout { get; } + /// + /// The request object + /// public object Request { get; set; } + /// + /// The result + /// + public abstract CallResult? Result { get; set; } - private CancellationTokenSource? _cts; - public abstract CallResult Result { get; set; } + protected AsyncResetEvent _event; + protected TimeSpan _timeout; + protected CancellationTokenSource? _cts; + /// + /// ctor + /// + /// + /// + /// + /// protected BasePendingRequest(int id, object request, Func messageMatchesHandler, TimeSpan timeout) { Id = id; MessageMatchesHandler = messageMatchesHandler; - Event = new AsyncResetEvent(false, false); - Timeout = timeout; + _event = new AsyncResetEvent(false, false); + _timeout = timeout; Request = request; RequestTimestamp = DateTime.UtcNow; } + /// + /// Signal that the request has been send and the timeout timer should start + /// public void IsSend() { // Start timeout countdown - _cts = new CancellationTokenSource(Timeout); - _cts.Token.Register(() => Fail("No response"), false); + _cts = new CancellationTokenSource(_timeout); + _cts.Token.Register(Timeout, false); } - public virtual void Fail(string error) - { - Completed = true; - Event.Set(); - } + /// + /// Wait untill timeout or the request is competed + /// + /// + /// + public async Task WaitAsync(TimeSpan timeout) => await _event.WaitAsync(timeout).ConfigureAwait(false); - public bool MessageMatches(BaseParsedMessage message) - { - return MessageMatchesHandler(message); - } + /// + /// Mark request as timeout + /// + public abstract void Timeout(); - public virtual Task ProcessAsync(BaseParsedMessage message) - { - Completed = true; - Event.Set(); - return Task.CompletedTask; - } + /// + /// Mark request as failed + /// + /// + public abstract void Fail(string error); + + /// + /// Process a response + /// + /// + /// + public abstract void ProcessAsync(BaseParsedMessage message); } - //public class PendingRequest : BasePendingRequest - //{ - // public CallResult Result { get; set; } - // public Func Handler { get; } - // public override Type? ResponseType => null; - - // private PendingRequest(int id, object request, Func messageMatchesHandler, Func messageHandler, TimeSpan timeout) - // : base(id, request, messageMatchesHandler, timeout) - // { - // Handler = messageHandler; - // } - - // public static PendingRequest CreateForQuery(BaseQuery query) - // { - // return new PendingRequest(ExchangeHelpers.NextId(), query.Request, query.MessageMatchesQuery, query.HandleResult, TimeSpan.FromSeconds(5)); - // } - - // public static PendingRequest CreateForSubRequest(Subscription subscription) - // { - // return new PendingRequest(ExchangeHelpers.NextId(), subscription.GetSubRequest(), subscription.MessageMatchesSubRequest, subscription.HandleSubResponse, TimeSpan.FromSeconds(5)); - // } - - // public static PendingRequest CreateForUnsubRequest(Subscription subscription) - // { - // return new PendingRequest(ExchangeHelpers.NextId(), subscription.GetUnsubRequest(), subscription.MessageMatchesUnsubRequest, subscription.HandleUnsubResponse, TimeSpan.FromSeconds(5)); - // } - - // public override void Fail(string error) - // { - // Result = new CallResult(new ServerError(error)); - // base.Fail(error); - // } - - // public override Task ProcessAsync(BaseParsedMessage message) - // { - // Result = Handler(message); - // return base.ProcessAsync(message); - // } - //} - + /// + /// Pending socket request + /// + /// The response data type public class PendingRequest : BasePendingRequest { - public override CallResult Result { get; set; } - public CallResult TypedResult => (CallResult)Result; + /// + public override CallResult? Result { get; set; } + /// + /// The typed call result + /// + public CallResult? TypedResult => (CallResult?)Result; + /// + /// Data handler + /// public Func, CallResult> Handler { get; } + /// + /// The response object type + /// public override Type? ResponseType => typeof(T); - public PendingRequest(int id, object request, Func, bool> messageMatchesHandler, Func, CallResult> messageHandler, TimeSpan timeout) + /// + /// ctor + /// + /// + /// + /// + /// + /// + private PendingRequest(int id, object request, Func, bool> messageMatchesHandler, Func, CallResult> messageHandler, TimeSpan timeout) : base(id, request, (x) => messageMatchesHandler((ParsedMessage)x), timeout) { Handler = messageHandler; } + /// + /// Create a new pending request for provided query + /// + /// + /// public static PendingRequest CreateForQuery(Query query) { return new PendingRequest(ExchangeHelpers.NextId(), query.Request, query.MessageMatchesQuery, x => { var response = query.HandleResponse(x); - return response.As((T)response.Data); + return response.As(response.Data); }, TimeSpan.FromSeconds(5)); } + /// + public override void Timeout() + { + Completed = true; + Result = new CallResult(new CancellationRequestedError()); + } + + /// public override void Fail(string error) { Result = new CallResult(new ServerError(error)); - base.Fail(error); + Completed = true; + _event.Set(); } - public override Task ProcessAsync(BaseParsedMessage message) + /// + public override void ProcessAsync(BaseParsedMessage message) { + Completed = true; Result = Handler((ParsedMessage)message); - return base.ProcessAsync(message); + _event.Set(); } } } diff --git a/CryptoExchange.Net/Objects/Sockets/SocketSubscription.cs b/CryptoExchange.Net/Objects/Sockets/SocketSubscription.cs deleted file mode 100644 index 44a1c44..0000000 --- a/CryptoExchange.Net/Objects/Sockets/SocketSubscription.cs +++ /dev/null @@ -1,95 +0,0 @@ -//using CryptoExchange.Net.Converters; -//using CryptoExchange.Net.Interfaces; -//using CryptoExchange.Net.Sockets; -//using System; -//using System.Threading; -//using System.Threading.Tasks; - -//namespace CryptoExchange.Net.Objects.Sockets -//{ -// /// -// /// Socket listener -// /// -// public class MessageListener -// { -// /// -// /// Unique listener id -// /// -// public int Id { get; } - -// /// -// /// Exception event -// /// -// public event Action? Exception; - -// /// -// /// The request object send when subscribing on the server. Either this or the `Identifier` property should be set -// /// -// public Subscription Subscription { get; set; } - -// /// -// /// Whether this is a user subscription or an internal listener -// /// -// public bool UserListener { get; set; } - -// /// -// /// If the subscription has been confirmed to be subscribed by the server -// /// -// public bool Confirmed { get; set; } - -// /// -// /// Whether authentication is needed for this subscription -// /// -// public bool Authenticated => Subscription.Authenticated; - -// /// -// /// Whether we're closing this subscription and a socket connection shouldn't be kept open for it -// /// -// public bool Closed { get; set; } - -// /// -// /// Cancellation token registration, should be disposed when subscription is closed. Used for closing the subscription with -// /// a provided cancelation token -// /// -// public CancellationTokenRegistration? CancellationTokenRegistration { get; set; } - -// /// -// /// ctor -// /// -// /// -// /// -// /// -// public MessageListener(int id, Subscription request, bool userSubscription) -// { -// Id = id; -// UserListener = userSubscription; -// Subscription = request; -// } - -// /// -// /// Invoke the exception event -// /// -// /// -// public void InvokeExceptionHandler(Exception e) -// { -// Exception?.Invoke(e); -// } - -// /// -// /// The priority of this subscription -// /// -// public int Priority => Subscription is SystemSubscription ? 50 : 1; - -// /// -// /// Process the message -// /// -// /// -// /// -// public Task ProcessAsync(ParsedMessage message) -// { -// // TODO -// var dataEvent = new DataEvent(message, null, message.OriginalData, DateTime.UtcNow, null); -// return Subscription.HandleEventAsync(dataEvent); -// } -// } -//} diff --git a/CryptoExchange.Net/Objects/Sockets/StreamMessage.cs b/CryptoExchange.Net/Objects/Sockets/StreamMessage.cs deleted file mode 100644 index 9f4826b..0000000 --- a/CryptoExchange.Net/Objects/Sockets/StreamMessage.cs +++ /dev/null @@ -1,70 +0,0 @@ -//using System; -//using System.Collections.Generic; -//using System.Data.Common; -//using System.IO; -//using System.Text; -//using System.Threading.Tasks; -//using CryptoExchange.Net.Sockets; - -//namespace CryptoExchange.Net.Objects.Sockets -//{ -// /// -// /// A message received from a stream -// /// -// public class StreamMessage : IDisposable -// { -// /// -// /// The connection it was received on -// /// -// public SocketConnection Connection { get; } -// /// -// /// The data stream -// /// -// public Stream Stream { get; } -// /// -// /// Receive timestamp -// /// -// public DateTime Timestamp { get; set; } - -// private Dictionary _casted; - -// /// -// /// Get the data from the memory in specified type using the converter. If this type has been resolved before it will use that instead -// /// -// /// -// /// -// /// -// public T Get(Func converter) -// { -// if (_casted.TryGetValue(typeof(T), out var casted)) -// return (T)casted; - -// var result = converter(Stream); -// _casted.Add(typeof(T), result!); -// Stream.Position = 0; -// return result; -// } - -// /// -// /// Dispose -// /// -// public void Dispose() -// { -// Stream.Dispose(); -// } - -// /// -// /// ctor -// /// -// /// -// /// -// /// -// public StreamMessage(SocketConnection connection, Stream stream, DateTime timestamp) -// { -// Connection = connection; -// Stream = stream; -// Timestamp = timestamp; -// _casted = new Dictionary(); -// } -// } -//} diff --git a/CryptoExchange.Net/Sockets/Query.cs b/CryptoExchange.Net/Sockets/Query.cs index 2940472..9310a5d 100644 --- a/CryptoExchange.Net/Sockets/Query.cs +++ b/CryptoExchange.Net/Sockets/Query.cs @@ -1,7 +1,5 @@ -using CryptoExchange.Net.Converters; -using CryptoExchange.Net.Objects; +using CryptoExchange.Net.Objects; using CryptoExchange.Net.Objects.Sockets; -using System; namespace CryptoExchange.Net.Sockets { @@ -11,7 +9,7 @@ namespace CryptoExchange.Net.Sockets public abstract class BaseQuery { /// - /// The query request + /// The query request object /// public object Request { get; set; } @@ -25,10 +23,6 @@ namespace CryptoExchange.Net.Sockets /// public int Weight { get; } - public abstract bool MessageMatchesQuery(BaseParsedMessage message); - public abstract CallResult HandleResult(BaseParsedMessage message); - - /// /// ctor /// @@ -42,18 +36,28 @@ namespace CryptoExchange.Net.Sockets Weight = weight; } + /// + /// Create a pending request for this query + /// public abstract BasePendingRequest CreatePendingRequest(); } + /// + /// Query + /// + /// Response object type public abstract class Query : BaseQuery { + /// + /// ctor + /// + /// + /// + /// protected Query(object request, bool authenticated, int weight = 1) : base(request, authenticated, weight) { } - public override CallResult HandleResult(BaseParsedMessage message) => HandleResponse((ParsedMessage) message); - public override bool MessageMatchesQuery(BaseParsedMessage message) => MessageMatchesQuery((ParsedMessage)message); - /// /// Handle the query response /// @@ -68,6 +72,7 @@ namespace CryptoExchange.Net.Sockets /// public abstract bool MessageMatchesQuery(ParsedMessage message); + /// public override BasePendingRequest CreatePendingRequest() => PendingRequest.CreateForQuery(this); } } diff --git a/CryptoExchange.Net/Sockets/SocketConnection.cs b/CryptoExchange.Net/Sockets/SocketConnection.cs index 46d14c8..e02d6b7 100644 --- a/CryptoExchange.Net/Sockets/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/SocketConnection.cs @@ -10,9 +10,7 @@ using CryptoExchange.Net.Objects; using System.Net.WebSockets; using System.IO; using CryptoExchange.Net.Objects.Sockets; -using CryptoExchange.Net.Converters; using System.Text; -using System.Runtime; namespace CryptoExchange.Net.Sockets { @@ -57,23 +55,23 @@ namespace CryptoExchange.Net.Sockets public event Action? UnparsedMessage; /// - /// The amount of listeners on this connection + /// The amount of subscriptions on this connection /// - public int UserListenerCount + public int UserSubscriptionCount { - get { lock (_listenerLock) - return _messageIdentifierListeners.Values.Count(h => h.UserSubscription); } + get { lock (_subscriptionLock) + return _messageIdentifierSubscriptions.Values.Count(h => h.UserSubscription); } } /// - /// Get a copy of the current message listeners + /// Get a copy of the current message subscriptions /// public Subscription[] Subscriptions { get { - lock (_listenerLock) - return _messageIdentifierListeners.Values.Where(h => h.UserSubscription).ToArray(); + lock (_subscriptionLock) + return _messageIdentifierSubscriptions.Values.Where(h => h.UserSubscription).ToArray(); } } @@ -159,10 +157,10 @@ namespace CryptoExchange.Net.Sockets private bool _pausedActivity; private readonly List _pendingRequests; - private readonly List _messageListeners; - private readonly Dictionary _messageIdentifierListeners; + private readonly List _subscriptions; + private readonly Dictionary _messageIdentifierSubscriptions; - private readonly object _listenerLock = new(); + private readonly object _subscriptionLock = new(); private readonly ILogger _logger; private SocketStatus _status; @@ -187,8 +185,8 @@ namespace CryptoExchange.Net.Sockets Properties = new Dictionary(); _pendingRequests = new List(); - _messageListeners = new List(); - _messageIdentifierListeners = new Dictionary(); + _subscriptions = new List(); + _messageIdentifierSubscriptions = new Dictionary(); _socket = socket; _socket.OnStreamMessage += HandleStreamMessage; @@ -217,10 +215,10 @@ namespace CryptoExchange.Net.Sockets { Status = SocketStatus.Closed; Authenticated = false; - lock(_listenerLock) + lock(_subscriptionLock) { - foreach (var listener in _messageListeners) - listener.Confirmed = false; + foreach (var subscription in _subscriptions) + subscription.Confirmed = false; } Task.Run(() => ConnectionClosed?.Invoke()); } @@ -233,10 +231,10 @@ namespace CryptoExchange.Net.Sockets Status = SocketStatus.Reconnecting; DisconnectTime = DateTime.UtcNow; Authenticated = false; - lock (_listenerLock) + lock (_subscriptionLock) { - foreach (var listener in _messageListeners) - listener.Confirmed = false; + foreach (var subscription in _subscriptions) + subscription.Confirmed = false; } _ = Task.Run(() => ConnectionLost?.Invoke()); @@ -257,7 +255,7 @@ namespace CryptoExchange.Net.Sockets protected virtual async void HandleReconnected() { Status = SocketStatus.Resubscribing; - lock (_messageListeners) + lock (_subscriptions) { foreach (var pendingRequest in _pendingRequests.ToList()) { @@ -324,11 +322,11 @@ namespace CryptoExchange.Net.Sockets var timestamp = DateTime.UtcNow; TimeSpan userCodeDuration = TimeSpan.Zero; - List listeners; - lock (_listenerLock) - listeners = _messageListeners.OrderByDescending(x => !x.UserSubscription).ToList(); + List subscriptions; + lock (_subscriptionLock) + subscriptions = _subscriptions.OrderByDescending(x => !x.UserSubscription).ToList(); - var result = ApiClient.StreamConverter.ReadJson(stream, _pendingRequests, listeners, ApiClient.ApiOptions.OutputOriginalData ?? ApiClient.ClientOptions.OutputOriginalData); + var result = ApiClient.StreamConverter.ReadJson(stream, _pendingRequests, subscriptions, ApiClient.ApiOptions.OutputOriginalData ?? ApiClient.ClientOptions.OutputOriginalData); if(result == null) { stream.Position = 0; @@ -348,12 +346,12 @@ namespace CryptoExchange.Net.Sockets } // TODO lock - if (_messageIdentifierListeners.TryGetValue(result.Identifier.ToLowerInvariant(), out var idListener)) + if (_messageIdentifierSubscriptions.TryGetValue(result.Identifier.ToLowerInvariant(), out var idSubscription)) { // Matched based on identifier var userSw = Stopwatch.StartNew(); var dataEvent = new DataEvent(result, null, result.OriginalData, DateTime.UtcNow, null); - await idListener.HandleEventAsync(dataEvent).ConfigureAwait(false); + await idSubscription.HandleEventAsync(dataEvent).ConfigureAwait(false); userSw.Stop(); return; } @@ -371,18 +369,13 @@ namespace CryptoExchange.Net.Sockets if (pendingRequest.Completed) { - // Answer to a timed out request, unsub if it is a subscription request - // TODO - //if (pendingRequest.MessageListener != null) - //{ - // _logger.Log(LogLevel.Warning, $"Socket {SocketId} Received subscription info after request timed out; unsubscribing. Consider increasing the RequestTimeout"); - // _ = UnsubscribeAsync(pendingRequest.MessageListener).ConfigureAwait(false); - //} + // Answer to a timed out request + _logger.Log(LogLevel.Warning, $"Socket {SocketId} Received after request timeout. Consider increasing the RequestTimeout"); } else { _logger.Log(LogLevel.Trace, $"Socket {SocketId} - msg {pendingRequest.Id} - received data matched to pending request"); - await pendingRequest.ProcessAsync(result).ConfigureAwait(false); + pendingRequest.ProcessAsync(result); } return; @@ -427,12 +420,12 @@ namespace CryptoExchange.Net.Sockets if (ApiClient.socketConnections.ContainsKey(SocketId)) ApiClient.socketConnections.TryRemove(SocketId, out _); - lock (_listenerLock) + lock (_subscriptionLock) { - foreach (var listener in _messageListeners) + foreach (var subscription in _subscriptions) { - if (listener.CancellationTokenRegistration.HasValue) - listener.CancellationTokenRegistration.Value.Dispose(); + if (subscription.CancellationTokenRegistration.HasValue) + subscription.CancellationTokenRegistration.Value.Dispose(); } } @@ -441,15 +434,16 @@ namespace CryptoExchange.Net.Sockets } /// - /// Close a listener on this connection. If all listener on this connection are closed the connection gets closed as well + /// Close a subscription on this connection. If all subscriptions on this connection are closed the connection gets closed as well /// - /// Listener to close + /// Subscription to close + /// Whether to send an unsub request even if the subscription wasn't confirmed /// - public async Task CloseAsync(Subscription subscription) + public async Task CloseAsync(Subscription subscription, bool unsubEvenIfNotConfirmed = false) { - lock (_listenerLock) + lock (_subscriptionLock) { - if (!_messageListeners.Contains(subscription)) + if (!_subscriptions.Contains(subscription)) return; subscription.Closed = true; @@ -458,15 +452,15 @@ namespace CryptoExchange.Net.Sockets if (Status == SocketStatus.Closing || Status == SocketStatus.Closed || Status == SocketStatus.Disposed) return; - _logger.Log(LogLevel.Debug, $"Socket {SocketId} closing listener {subscription.Id}"); + _logger.Log(LogLevel.Debug, $"Socket {SocketId} closing subscription {subscription.Id}"); if (subscription.CancellationTokenRegistration.HasValue) subscription.CancellationTokenRegistration.Value.Dispose(); - if (subscription.Confirmed && _socket.IsOpen) + if ((unsubEvenIfNotConfirmed || subscription.Confirmed) && _socket.IsOpen) await UnsubscribeAsync(subscription).ConfigureAwait(false); bool shouldCloseConnection; - lock (_listenerLock) + lock (_subscriptionLock) { if (Status == SocketStatus.Closing) { @@ -474,22 +468,22 @@ namespace CryptoExchange.Net.Sockets return; } - shouldCloseConnection = _messageIdentifierListeners.All(r => !r.Value.UserSubscription || r.Value.Closed); + shouldCloseConnection = _messageIdentifierSubscriptions.All(r => !r.Value.UserSubscription || r.Value.Closed); if (shouldCloseConnection) Status = SocketStatus.Closing; } if (shouldCloseConnection) { - _logger.Log(LogLevel.Debug, $"Socket {SocketId} closing as there are no more listeners"); + _logger.Log(LogLevel.Debug, $"Socket {SocketId} closing as there are no more subscriptions"); await CloseAsync().ConfigureAwait(false); } - lock (_listenerLock) + lock (_subscriptionLock) { - _messageListeners.Remove(subscription); + _subscriptions.Remove(subscription); foreach (var id in subscription.Identifiers) - _messageIdentifierListeners.Remove(id); + _messageIdentifierSubscriptions.Remove(id); } } @@ -503,84 +497,79 @@ namespace CryptoExchange.Net.Sockets } /// - /// Add a listener to this connection + /// Add a subscription to this connection /// - /// - public bool AddListener(Subscription subscription) + /// + public bool AddSubscription(Subscription subscription) { - lock (_listenerLock) + lock (_subscriptionLock) { if (Status != SocketStatus.None && Status != SocketStatus.Connected) return false; - _messageListeners.Add(subscription); + _subscriptions.Add(subscription); if (subscription.Identifiers != null) { foreach (var id in subscription.Identifiers) - _messageIdentifierListeners.Add(id.ToLowerInvariant(), subscription); + _messageIdentifierSubscriptions.Add(id.ToLowerInvariant(), subscription); } if (subscription.UserSubscription) - _logger.Log(LogLevel.Debug, $"Socket {SocketId} adding new listener with id {subscription.Id}, total listeners on connection: {_messageListeners.Count(s => s.UserSubscription)}"); + _logger.Log(LogLevel.Debug, $"Socket {SocketId} adding new subscription with id {subscription.Id}, total subscriptions on connection: {_subscriptions.Count(s => s.UserSubscription)}"); return true; } } /// - /// Get a listener on this connection by id + /// Get a subscription on this connection by id /// /// - public Subscription? GetListener(int id) + public Subscription? GetSubscription(int id) { - lock (_listenerLock) - return _messageListeners.SingleOrDefault(s => s.Id == id); + lock (_subscriptionLock) + return _subscriptions.SingleOrDefault(s => s.Id == id); } /// - /// Get a listener on this connection by its subscribe request + /// Get a subscription on this connection by its subscribe request /// /// Filter for a request /// - public Subscription? GetListenerByRequest(Func predicate) + public Subscription? GetSubscriptionByRequest(Func predicate) { - lock(_listenerLock) - return _messageListeners.SingleOrDefault(s => predicate(s)); + lock(_subscriptionLock) + return _subscriptions.SingleOrDefault(s => predicate(s)); } + /// + /// Send a query request and wait for an answer + /// + /// Query to send + /// public virtual async Task SendAndWaitQueryAsync(BaseQuery query) { var pendingRequest = query.CreatePendingRequest(); await SendAndWaitAsync(pendingRequest, query.Weight).ConfigureAwait(false); - return pendingRequest.Result; + return pendingRequest.Result!; } + /// + /// Send a query request and wait for an answer + /// + /// Query response type + /// Query to send + /// public virtual async Task> SendAndWaitQueryAsync(Query query) { var pendingRequest = PendingRequest.CreateForQuery(query); await SendAndWaitAsync(pendingRequest, query.Weight).ConfigureAwait(false); - return pendingRequest.TypedResult; - } - - public virtual async Task SendAndWaitSubAsync(Subscription subscription) - { - var pendingRequest = PendingRequest.CreateForSubRequest(subscription); - await SendAndWaitAsync(pendingRequest, 1).ConfigureAwait(false); - return pendingRequest.Result; - } - - public virtual async Task SendAndWaitUnsubAsync(Subscription subscription) - { - var pendingRequest = PendingRequest.CreateForUnsubRequest(subscription); - await SendAndWaitAsync(pendingRequest, 1).ConfigureAwait(false); - return pendingRequest.Result; + return pendingRequest.TypedResult!; } private async Task SendAndWaitAsync(BasePendingRequest pending, int weight) { - lock (_messageListeners) - { + lock (_subscriptions) _pendingRequests.Add(pending); - } var sendOk = Send(pending.Id, pending.Request, weight); if (!sendOk) @@ -600,7 +589,7 @@ namespace CryptoExchange.Net.Sockets if (pending.Completed) return; - await pending.Event.WaitAsync(TimeSpan.FromMilliseconds(500)).ConfigureAwait(false); + await pending.WaitAsync(TimeSpan.FromMilliseconds(500)).ConfigureAwait(false); if (pending.Completed) return; @@ -649,8 +638,8 @@ namespace CryptoExchange.Net.Sockets return new CallResult(new WebError("Socket not connected")); bool anySubscriptions = false; - lock (_listenerLock) - anySubscriptions = _messageListeners.Any(s => s.UserSubscription); + lock (_subscriptionLock) + anySubscriptions = _subscriptions.Any(s => s.UserSubscription); if (!anySubscriptions) { @@ -661,8 +650,8 @@ namespace CryptoExchange.Net.Sockets } bool anyAuthenticated = false; - lock (_listenerLock) - anyAuthenticated = _messageListeners.Any(s => s.Authenticated); + lock (_subscriptionLock) + anyAuthenticated = _subscriptions.Any(s => s.Authenticated); if (anyAuthenticated) { @@ -679,13 +668,13 @@ namespace CryptoExchange.Net.Sockets } // Get a list of all subscriptions on the socket - List listenerList = new List(); - lock (_listenerLock) - listenerList = _messageListeners.ToList(); + List subList = new List(); + lock (_subscriptionLock) + subList = _subscriptions.ToList(); - foreach(var listener in listenerList) + foreach(var subscription in subList) { - var result = await ApiClient.RevitalizeRequestAsync(listener).ConfigureAwait(false); + var result = await ApiClient.RevitalizeRequestAsync(subscription).ConfigureAwait(false); if (!result) { _logger.Log(LogLevel.Warning, $"Socket {SocketId} Failed request revitalization: " + result.Error); @@ -694,22 +683,28 @@ namespace CryptoExchange.Net.Sockets } // Foreach subscription which is subscribed by a subscription request we will need to resend that request to resubscribe - for (var i = 0; i < listenerList.Count; i += ApiClient.ClientOptions.MaxConcurrentResubscriptionsPerSocket) + for (var i = 0; i < subList.Count; i += ApiClient.ClientOptions.MaxConcurrentResubscriptionsPerSocket) { if (!_socket.IsOpen) return new CallResult(new WebError("Socket not connected")); var taskList = new List>(); - foreach (var listener in listenerList.Skip(i).Take(ApiClient.ClientOptions.MaxConcurrentResubscriptionsPerSocket)) - taskList.Add(ApiClient.SubscribeAndWaitAsync(this, listener)); + foreach (var subscription in subList.Skip(i).Take(ApiClient.ClientOptions.MaxConcurrentResubscriptionsPerSocket)) + { + var subQuery = subscription.GetSubQuery(); + if (subQuery == null) + continue; + + taskList.Add(SendAndWaitQueryAsync(subQuery)); + } await Task.WhenAll(taskList).ConfigureAwait(false); if (taskList.Any(t => !t.Result.Success)) return taskList.First(t => !t.Result.Success).Result; } - foreach (var listener in listenerList) - listener.Confirmed = true; + foreach (var subscription in subList) + subscription.Confirmed = true; if (!_socket.IsOpen) return new CallResult(new WebError("Socket not connected")); @@ -720,12 +715,12 @@ namespace CryptoExchange.Net.Sockets internal async Task UnsubscribeAsync(Subscription subscription) { - var unsubscribeRequest = subscription?.GetUnsubRequest(); - if (unsubscribeRequest != null) - { - await SendAndWaitUnsubAsync(subscription!).ConfigureAwait(false); - _logger.Log(LogLevel.Information, $"Socket {SocketId} subscription {subscription!.Id} unsubscribed"); - } + var unsubscribeRequest = subscription.GetUnsubQuery(); + if (unsubscribeRequest == null) + return; + + await SendAndWaitQueryAsync(unsubscribeRequest).ConfigureAwait(false); + _logger.Log(LogLevel.Information, $"Socket {SocketId} subscription {subscription!.Id} unsubscribed"); } internal async Task ResubscribeAsync(Subscription subscription) @@ -733,7 +728,11 @@ namespace CryptoExchange.Net.Sockets if (!_socket.IsOpen) return new CallResult(new UnknownError("Socket is not connected")); - return await ApiClient.SubscribeAndWaitAsync(this, subscription).ConfigureAwait(false); + var subQuery = subscription.GetSubQuery(); + if (subQuery == null) + return new CallResult(null); + + return await SendAndWaitQueryAsync(subQuery).ConfigureAwait(false); } /// diff --git a/CryptoExchange.Net/Sockets/Subscription.cs b/CryptoExchange.Net/Sockets/Subscription.cs index 0312f2b..5f73def 100644 --- a/CryptoExchange.Net/Sockets/Subscription.cs +++ b/CryptoExchange.Net/Sockets/Subscription.cs @@ -9,14 +9,28 @@ using System.Threading.Tasks; namespace CryptoExchange.Net.Sockets { /// - /// Subscription base + /// Socket subscription /// public abstract class Subscription { + /// + /// Subscription id + /// public int Id { get; set; } + /// + /// Is it a user subscription + /// public bool UserSubscription { get; set; } + + /// + /// Has the subscription been confirmed + /// public bool Confirmed { get; set; } + + /// + /// Is the subscription closed + /// public bool Closed { get; set; } /// @@ -34,6 +48,9 @@ namespace CryptoExchange.Net.Sockets /// public abstract List Identifiers { get; } + /// + /// Cancellation token registration + /// public CancellationTokenRegistration? CancellationTokenRegistration { get; set; } /// @@ -56,13 +73,13 @@ namespace CryptoExchange.Net.Sockets /// Get the subscribe object to send when subscribing /// /// - public abstract object? GetSubRequest(); + public abstract BaseQuery? GetSubQuery(); /// /// Get the unsubscribe object to send when unsubscribing /// /// - public abstract object? GetUnsubRequest(); + public abstract BaseQuery? GetUnsubQuery(); /// /// Handle the update message @@ -70,11 +87,6 @@ namespace CryptoExchange.Net.Sockets /// /// public abstract Task HandleEventAsync(DataEvent message); - public abstract CallResult HandleSubResponse(BaseParsedMessage message); - public abstract CallResult HandleUnsubResponse(BaseParsedMessage message); - - public abstract bool MessageMatchesUnsubRequest(BaseParsedMessage message); - public abstract bool MessageMatchesSubRequest(BaseParsedMessage message); /// /// Invoke the exception event @@ -86,55 +98,40 @@ namespace CryptoExchange.Net.Sockets } } + /// public abstract class Subscription : Subscription { + /// + /// ctor + /// + /// + /// protected Subscription(ILogger logger, bool authenticated) : base(logger, authenticated) { } } + /// public abstract class Subscription : Subscription { + /// + /// ctor + /// + /// + /// protected Subscription(ILogger logger, bool authenticated) : base(logger, authenticated) { } - public override CallResult HandleUnsubResponse(BaseParsedMessage message) - => HandleUnsubResponse((ParsedMessage)message); - - public override CallResult HandleSubResponse(BaseParsedMessage message) - => HandleSubResponse((ParsedMessage)message); - + /// public override Task HandleEventAsync(DataEvent message) => HandleEventAsync(message.As((ParsedMessage)message.Data)); - public override bool MessageMatchesSubRequest(BaseParsedMessage message) - => MessageMatchesSubRequest((ParsedMessage)message); - - public override bool MessageMatchesUnsubRequest(BaseParsedMessage message) - => MessageMatchesUnsubRequest((ParsedMessage)message); - /// /// Handle the update message /// /// /// public abstract Task HandleEventAsync(DataEvent> message); - - /// - /// Check if the message is the response to the subscribe request - /// - /// - /// - public abstract bool MessageMatchesSubRequest(ParsedMessage message); - public abstract CallResult HandleSubResponse(ParsedMessage message); - - /// - /// Check if the message is the response to the unsubscribe request - /// - /// - /// - public abstract bool MessageMatchesUnsubRequest(ParsedMessage message); - public abstract CallResult HandleUnsubResponse(ParsedMessage message); } } diff --git a/CryptoExchange.Net/Sockets/SystemSubscription.cs b/CryptoExchange.Net/Sockets/SystemSubscription.cs index 8494fcf..c2c82e5 100644 --- a/CryptoExchange.Net/Sockets/SystemSubscription.cs +++ b/CryptoExchange.Net/Sockets/SystemSubscription.cs @@ -1,9 +1,4 @@ -using CryptoExchange.Net.Converters; -using CryptoExchange.Net.Interfaces; -using CryptoExchange.Net.Objects; -using CryptoExchange.Net.Objects.Sockets; -using Microsoft.Extensions.Logging; -using System; +using Microsoft.Extensions.Logging; namespace CryptoExchange.Net.Sockets { @@ -22,17 +17,9 @@ namespace CryptoExchange.Net.Sockets } /// - public override object? GetSubRequest() => null; - /// - public override bool MessageMatchesSubRequest(BaseParsedMessage message) => throw new NotImplementedException(); - /// - public override CallResult HandleSubResponse(BaseParsedMessage message) => throw new NotImplementedException(); + public override BaseQuery? GetSubQuery() => null; /// - public override object? GetUnsubRequest() => null; - /// - public override bool MessageMatchesUnsubRequest(BaseParsedMessage message) => throw new NotImplementedException(); - /// - public override CallResult HandleUnsubResponse(BaseParsedMessage message) => throw new NotImplementedException(); + public override BaseQuery? GetUnsubQuery() => null; } }