From 312d54cf04e7b90470409f5785d2e0d63edffb5a Mon Sep 17 00:00:00 2001 From: JKorf Date: Thu, 2 Nov 2023 22:19:42 +0100 Subject: [PATCH] wip --- CryptoExchange.Net/Clients/SocketApiClient.cs | 105 +++------ .../Converters/SocketConverter.cs | 7 +- .../Objects/Sockets/PendingRequest.cs | 92 +++++++- .../Objects/Sockets/SocketSubscription.cs | 166 ++++++------- .../Objects/Sockets/UpdateSubscription.cs | 6 +- CryptoExchange.Net/Sockets/Query.cs | 39 +++- .../Sockets/SocketConnection.cs | 218 +++++++++++------- CryptoExchange.Net/Sockets/Subscription.cs | 30 ++- .../Sockets/SystemSubscription.cs | 8 +- 9 files changed, 411 insertions(+), 260 deletions(-) diff --git a/CryptoExchange.Net/Clients/SocketApiClient.cs b/CryptoExchange.Net/Clients/SocketApiClient.cs index 9921c33..74a8a58 100644 --- a/CryptoExchange.Net/Clients/SocketApiClient.cs +++ b/CryptoExchange.Net/Clients/SocketApiClient.cs @@ -170,7 +170,6 @@ namespace CryptoExchange.Net return new CallResult(new InvalidOperationError("Client disposed, can't subscribe")); SocketConnection socketConnection; - MessageListener? messageListener; var released = false; // Wait for a semaphore here, so we only connect 1 socket at a time. // This is necessary for being able to see if connections can be combined @@ -195,8 +194,8 @@ namespace CryptoExchange.Net socketConnection = socketResult.Data; // Add a subscription on the socket connection - messageListener = AddSubscription(subscription, true, socketConnection); - if (messageListener == null) + var success = AddSubscription(subscription, true, socketConnection); + if (!success) { _logger.Log(LogLevel.Trace, $"Socket {socketConnection.SocketId} failed to add subscription, retrying on different connection"); continue; @@ -234,31 +233,31 @@ namespace CryptoExchange.Net if (request != null) { // Send the request and wait for answer - var subResult = await SubscribeAndWaitAsync(socketConnection, request, messageListener).ConfigureAwait(false); + var subResult = await SubscribeAndWaitAsync(socketConnection, subscription).ConfigureAwait(false); if (!subResult) { _logger.Log(LogLevel.Warning, $"Socket {socketConnection.SocketId} failed to subscribe: {subResult.Error}"); - await socketConnection.CloseAsync(messageListener).ConfigureAwait(false); + await socketConnection.CloseAsync(subscription).ConfigureAwait(false); return new CallResult(subResult.Error!); } } else { // No request to be sent, so just mark the subscription as comfirmed - messageListener.Confirmed = true; + subscription.Confirmed = true; } if (ct != default) { - messageListener.CancellationTokenRegistration = ct.Register(async () => + subscription.CancellationTokenRegistration = ct.Register(async () => { - _logger.Log(LogLevel.Information, $"Socket {socketConnection.SocketId} Cancellation token set, closing subscription {messageListener.Id}"); - await socketConnection.CloseAsync(messageListener).ConfigureAwait(false); + _logger.Log(LogLevel.Information, $"Socket {socketConnection.SocketId} Cancellation token set, closing subscription {subscription.Id}"); + await socketConnection.CloseAsync(subscription).ConfigureAwait(false); }, false); } - _logger.Log(LogLevel.Information, $"Socket {socketConnection.SocketId} subscription {messageListener.Id} completed successfully"); - return new CallResult(new UpdateSubscription(socketConnection, messageListener)); + _logger.Log(LogLevel.Information, $"Socket {socketConnection.SocketId} subscription {subscription.Id} completed successfully"); + return new CallResult(new UpdateSubscription(socketConnection, subscription)); } /// @@ -268,27 +267,13 @@ namespace CryptoExchange.Net /// The request to send, will be serialized to json /// The message listener for the subscription /// - protected internal virtual async Task> SubscribeAndWaitAsync(SocketConnection socketConnection, object request, MessageListener listener) + protected internal virtual async Task SubscribeAndWaitAsync(SocketConnection socketConnection, Subscription subscription) { - CallResult? callResult = null; - await socketConnection.SendAndWaitAsync(request, ClientOptions.RequestTimeout, listener, 1, x => - { - var (matches, result) = listener.Subscription!.MessageMatchesSubRequest(x); - if (matches) - callResult = result; - return matches; - }).ConfigureAwait(false); + var callResult = await socketConnection.SendAndWaitSubAsync(subscription).ConfigureAwait(false); + if (callResult) + subscription.Confirmed = true; - if (callResult?.Success == true) - { - listener.Confirmed = true; - return new CallResult(true); - } - - if (callResult == null) - return new CallResult(new ServerError("No response on subscription request received")); - - return new CallResult(callResult.Error!); + return callResult; } /// @@ -297,7 +282,7 @@ namespace CryptoExchange.Net /// Expected result type /// The query /// - protected virtual Task> QueryAsync(Query query) + protected virtual Task> QueryAsync(Query query) { return QueryAsync(BaseAddress, query); } @@ -309,7 +294,7 @@ namespace CryptoExchange.Net /// The url for the request /// The query /// - protected virtual async Task> QueryAsync(string url, Query query) + protected virtual async Task> QueryAsync(string url, Query query) { if (_disposing) return new CallResult(new InvalidOperationError("Client disposed, can't query")); @@ -358,22 +343,10 @@ namespace CryptoExchange.Net /// The connection to send and wait on /// The query /// - protected virtual async Task> QueryAndWaitAsync(SocketConnection socket, Query query) + protected virtual async Task> QueryAndWaitAsync(SocketConnection socket, Query query) { var dataResult = new CallResult(new ServerError("No response on query received")); - await socket.SendAndWaitAsync(query.Request, ClientOptions.RequestTimeout, null, query.Weight, x => - { - var matches = query.MessageMatchesQuery(x); - if (matches) - { - query.HandleResponse(x); - return true; - } - - return false; - }).ConfigureAwait(false); - - return dataResult; + return await socket.SendAndWaitQueryAsync(query).ConfigureAwait(false); } /// @@ -409,27 +382,16 @@ namespace CryptoExchange.Net { _logger.Log(LogLevel.Debug, $"Socket {socket.SocketId} Attempting to authenticate"); var authRequest = GetAuthenticationRequest(); - var authResult = new CallResult(new ServerError("No response from server")); - await socket.SendAndWaitAsync(authRequest.Request, ClientOptions.RequestTimeout, null, 1, x => - { - var matches = authRequest.MessageMatchesQuery(x); - if (matches) - { - authResult = authRequest.HandleResponse(x); - return true; - } + var result = await socket.SendAndWaitQueryAsync(authRequest).ConfigureAwait(false); - return false; - }).ConfigureAwait(false); - - if (!authResult) + if (!result) { _logger.Log(LogLevel.Warning, $"Socket {socket.SocketId} authentication failed"); if (socket.Connected) await socket.CloseAsync().ConfigureAwait(false); - authResult.Error!.Message = "Authentication failed: " + authResult.Error.Message; - return new CallResult(authResult.Error); + result.Error!.Message = "Authentication failed: " + result.Error.Message; + return new CallResult(result.Error)!; } _logger.Log(LogLevel.Debug, $"Socket {socket.SocketId} authenticated"); @@ -451,13 +413,12 @@ namespace CryptoExchange.Net /// Whether or not this is a user subscription (counts towards the max amount of handlers on a socket) /// The socket connection the handler is on /// - protected virtual MessageListener? AddSubscription(Subscription subscription, bool userSubscription, SocketConnection connection) + protected virtual bool AddSubscription(Subscription subscription, bool userSubscription, SocketConnection connection) { - var messageListener = new MessageListener(ExchangeHelpers.NextId(), subscription, userSubscription); - if (!connection.AddListener(messageListener)) - return null; + if (!connection.AddListener(subscription)) + return false; - return messageListener; + return false; } /// @@ -467,9 +428,8 @@ namespace CryptoExchange.Net protected void AddSystemSubscription(SystemSubscription systemSubscription) { systemSubscriptions.Add(systemSubscription); - var subscription = new MessageListener(ExchangeHelpers.NextId(), systemSubscription, false); foreach (var connection in socketConnections.Values) - connection.AddListener(subscription); + connection.AddListener(systemSubscription); } /// @@ -542,10 +502,7 @@ namespace CryptoExchange.Net socketConnection.UnparsedMessage += HandleUnparsedMessage; foreach (var systemSubscription in systemSubscriptions) - { - var handler = new MessageListener(ExchangeHelpers.NextId(), systemSubscription, false); - socketConnection.AddListener(handler); - } + socketConnection.AddListener(systemSubscription); return new CallResult(socketConnection); } @@ -665,7 +622,7 @@ namespace CryptoExchange.Net /// public virtual async Task UnsubscribeAsync(int subscriptionId) { - MessageListener? subscription = null; + Subscription? subscription = null; SocketConnection? connection = null; foreach (var socket in socketConnections.Values.ToList()) { @@ -747,7 +704,7 @@ namespace CryptoExchange.Net foreach (var connection in socketConnections) { sb.AppendLine($" Connection {connection.Key}: {connection.Value.UserListenerCount} subscriptions, status: {connection.Value.Status}, authenticated: {connection.Value.Authenticated}, kbps: {connection.Value.IncomingKbps}"); - foreach (var subscription in connection.Value.MessageListeners) + foreach (var subscription in connection.Value.Subscriptions) sb.AppendLine($" Subscription {subscription.Id}, authenticated: {subscription.Authenticated}, confirmed: {subscription.Confirmed}"); } return sb.ToString(); diff --git a/CryptoExchange.Net/Converters/SocketConverter.cs b/CryptoExchange.Net/Converters/SocketConverter.cs index e9cfc8c..408d972 100644 --- a/CryptoExchange.Net/Converters/SocketConverter.cs +++ b/CryptoExchange.Net/Converters/SocketConverter.cs @@ -1,4 +1,5 @@ using CryptoExchange.Net.Objects.Sockets; +using CryptoExchange.Net.Sockets; using Newtonsoft.Json; using Newtonsoft.Json.Linq; using System; @@ -28,10 +29,10 @@ namespace CryptoExchange.Net.Converters /// /// /// - public abstract Type? GetDeserializationType(Dictionary idValues, List listeners); + public abstract Type? GetDeserializationType(Dictionary idValues, List pendingRequests, List listeners); /// - public ParsedMessage? ReadJson(Stream stream, List listeners, bool outputOriginalData) + public ParsedMessage? ReadJson(Stream stream, List pendingRequests, List listeners, bool outputOriginalData) { // Start reading the data // Once we reach the properties that identify the message we save those in a dict @@ -81,7 +82,7 @@ namespace CryptoExchange.Net.Converters } result.Identifier = idString; - var resultType = GetDeserializationType(typeIdDict, listeners); + var resultType = GetDeserializationType(typeIdDict, pendingRequests, listeners); result.Data = resultType == null ? null : token.ToObject(resultType); return result; } diff --git a/CryptoExchange.Net/Objects/Sockets/PendingRequest.cs b/CryptoExchange.Net/Objects/Sockets/PendingRequest.cs index 80c50ed..4188b76 100644 --- a/CryptoExchange.Net/Objects/Sockets/PendingRequest.cs +++ b/CryptoExchange.Net/Objects/Sockets/PendingRequest.cs @@ -1,43 +1,47 @@ using CryptoExchange.Net.Converters; using CryptoExchange.Net.Interfaces; +using CryptoExchange.Net.Sockets; using System; +using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; namespace CryptoExchange.Net.Objects.Sockets { - internal class PendingRequest + public abstract class BasePendingRequest { public int Id { get; set; } public Func MessageMatchesHandler { get; } public bool Completed { get; private set; } + public abstract Type ResponseType { get; } + public AsyncResetEvent Event { get; } public DateTime RequestTimestamp { get; set; } public TimeSpan Timeout { get; } - public MessageListener? MessageListener { get; } + public object Request { get; set; } private CancellationTokenSource? _cts; public int Priority => 100; - public PendingRequest(int id, Func messageMatchesHandler, TimeSpan timeout, MessageListener? subscription) + protected BasePendingRequest(int id, object request, Func messageMatchesHandler, TimeSpan timeout) { Id = id; MessageMatchesHandler = messageMatchesHandler; Event = new AsyncResetEvent(false, false); Timeout = timeout; + Request = request; RequestTimestamp = DateTime.UtcNow; - MessageListener = subscription; } public void IsSend() { // Start timeout countdown _cts = new CancellationTokenSource(Timeout); - _cts.Token.Register(Fail, false); + _cts.Token.Register(() => Fail("No response"), false); } - public void Fail() + public virtual void Fail(string error) { Completed = true; Event.Set(); @@ -48,11 +52,85 @@ namespace CryptoExchange.Net.Objects.Sockets return MessageMatchesHandler(message); } - public Task ProcessAsync(ParsedMessage message) + public virtual Task ProcessAsync(ParsedMessage message) { Completed = true; Event.Set(); return Task.CompletedTask; } } + + public class PendingRequest : BasePendingRequest + { + public CallResult Result { get; set; } + public Func Handler { get; } + public override Type? ResponseType => null; + + private PendingRequest(int id, object request, Func messageMatchesHandler, Func messageHandler, TimeSpan timeout) + : base(id, request, messageMatchesHandler, timeout) + { + Handler = messageHandler; + } + + public static PendingRequest CreateForQuery(Query query) + { + return new PendingRequest(ExchangeHelpers.NextId(), query.Request, query.MessageMatchesQuery, query.HandleResult, TimeSpan.FromSeconds(5)); + } + + public static PendingRequest CreateForSubRequest(Subscription subscription) + { + return new PendingRequest(ExchangeHelpers.NextId(), subscription.GetSubRequest, subscription.MessageMatchesSubRequest, subscription.HandleSubResponse, TimeSpan.FromSeconds(5)); + } + + public static PendingRequest CreateForUnsubRequest(Subscription subscription) + { + return new PendingRequest(ExchangeHelpers.NextId(), subscription.GetUnsubRequest, subscription.MessageMatchesUnsubRequest, subscription.HandleUnsubResponse, TimeSpan.FromSeconds(5)); + } + + public override void Fail(string error) + { + Result = new CallResult(new ServerError(error)); + base.Fail(error); + } + + public override Task ProcessAsync(ParsedMessage message) + { + Result = Handler(message); + return base.ProcessAsync(message); + } + } + + public class PendingRequest : BasePendingRequest + { + public CallResult Result { get; set; } + public Func> Handler { get; } + public override Type? ResponseType => typeof(T); + + public PendingRequest(int id, object request, Func messageMatchesHandler, Func> messageHandler, TimeSpan timeout) + : base(id, request, messageMatchesHandler, timeout) + { + Handler = messageHandler; + } + + public static PendingRequest CreateForQuery(Query query) + { + return new PendingRequest(ExchangeHelpers.NextId(), query.Request, query.MessageMatchesQuery, x => + { + var response = query.HandleResponse(x); + return response.As((T)response.Data); + }, TimeSpan.FromSeconds(5)); + } + + public override void Fail(string error) + { + Result = new CallResult(new ServerError(error)); + base.Fail(error); + } + + public override Task ProcessAsync(ParsedMessage message) + { + Result = Handler(message); + return base.ProcessAsync(message); + } + } } diff --git a/CryptoExchange.Net/Objects/Sockets/SocketSubscription.cs b/CryptoExchange.Net/Objects/Sockets/SocketSubscription.cs index 441aca4..44a1c44 100644 --- a/CryptoExchange.Net/Objects/Sockets/SocketSubscription.cs +++ b/CryptoExchange.Net/Objects/Sockets/SocketSubscription.cs @@ -1,95 +1,95 @@ -using CryptoExchange.Net.Converters; -using CryptoExchange.Net.Interfaces; -using CryptoExchange.Net.Sockets; -using System; -using System.Threading; -using System.Threading.Tasks; +//using CryptoExchange.Net.Converters; +//using CryptoExchange.Net.Interfaces; +//using CryptoExchange.Net.Sockets; +//using System; +//using System.Threading; +//using System.Threading.Tasks; -namespace CryptoExchange.Net.Objects.Sockets -{ - /// - /// Socket listener - /// - public class MessageListener - { - /// - /// Unique listener id - /// - public int Id { get; } +//namespace CryptoExchange.Net.Objects.Sockets +//{ +// /// +// /// Socket listener +// /// +// public class MessageListener +// { +// /// +// /// Unique listener id +// /// +// public int Id { get; } - /// - /// Exception event - /// - public event Action? Exception; +// /// +// /// Exception event +// /// +// public event Action? Exception; - /// - /// The request object send when subscribing on the server. Either this or the `Identifier` property should be set - /// - public Subscription Subscription { get; set; } +// /// +// /// The request object send when subscribing on the server. Either this or the `Identifier` property should be set +// /// +// public Subscription Subscription { get; set; } - /// - /// Whether this is a user subscription or an internal listener - /// - public bool UserListener { get; set; } +// /// +// /// Whether this is a user subscription or an internal listener +// /// +// public bool UserListener { get; set; } - /// - /// If the subscription has been confirmed to be subscribed by the server - /// - public bool Confirmed { get; set; } +// /// +// /// If the subscription has been confirmed to be subscribed by the server +// /// +// public bool Confirmed { get; set; } - /// - /// Whether authentication is needed for this subscription - /// - public bool Authenticated => Subscription.Authenticated; +// /// +// /// Whether authentication is needed for this subscription +// /// +// public bool Authenticated => Subscription.Authenticated; - /// - /// Whether we're closing this subscription and a socket connection shouldn't be kept open for it - /// - public bool Closed { get; set; } +// /// +// /// Whether we're closing this subscription and a socket connection shouldn't be kept open for it +// /// +// public bool Closed { get; set; } - /// - /// Cancellation token registration, should be disposed when subscription is closed. Used for closing the subscription with - /// a provided cancelation token - /// - public CancellationTokenRegistration? CancellationTokenRegistration { get; set; } +// /// +// /// Cancellation token registration, should be disposed when subscription is closed. Used for closing the subscription with +// /// a provided cancelation token +// /// +// public CancellationTokenRegistration? CancellationTokenRegistration { get; set; } - /// - /// ctor - /// - /// - /// - /// - public MessageListener(int id, Subscription request, bool userSubscription) - { - Id = id; - UserListener = userSubscription; - Subscription = request; - } +// /// +// /// ctor +// /// +// /// +// /// +// /// +// public MessageListener(int id, Subscription request, bool userSubscription) +// { +// Id = id; +// UserListener = userSubscription; +// Subscription = request; +// } - /// - /// Invoke the exception event - /// - /// - public void InvokeExceptionHandler(Exception e) - { - Exception?.Invoke(e); - } +// /// +// /// Invoke the exception event +// /// +// /// +// public void InvokeExceptionHandler(Exception e) +// { +// Exception?.Invoke(e); +// } - /// - /// The priority of this subscription - /// - public int Priority => Subscription is SystemSubscription ? 50 : 1; +// /// +// /// The priority of this subscription +// /// +// public int Priority => Subscription is SystemSubscription ? 50 : 1; - /// - /// Process the message - /// - /// - /// - public Task ProcessAsync(ParsedMessage message) - { - // TODO - var dataEvent = new DataEvent(message, null, message.OriginalData, DateTime.UtcNow, null); - return Subscription.HandleEventAsync(dataEvent); - } - } -} +// /// +// /// Process the message +// /// +// /// +// /// +// public Task ProcessAsync(ParsedMessage message) +// { +// // TODO +// var dataEvent = new DataEvent(message, null, message.OriginalData, DateTime.UtcNow, null); +// return Subscription.HandleEventAsync(dataEvent); +// } +// } +//} diff --git a/CryptoExchange.Net/Objects/Sockets/UpdateSubscription.cs b/CryptoExchange.Net/Objects/Sockets/UpdateSubscription.cs index e4fdede..31673a6 100644 --- a/CryptoExchange.Net/Objects/Sockets/UpdateSubscription.cs +++ b/CryptoExchange.Net/Objects/Sockets/UpdateSubscription.cs @@ -11,7 +11,7 @@ namespace CryptoExchange.Net.Objects.Sockets public class UpdateSubscription { private readonly SocketConnection _connection; - private readonly MessageListener _listener; + private readonly Subscription _listener; /// /// Event when the connection is lost. The socket will automatically reconnect when possible. @@ -84,7 +84,7 @@ namespace CryptoExchange.Net.Objects.Sockets /// /// The socket connection the subscription is on /// The subscription - public UpdateSubscription(SocketConnection connection, MessageListener subscription) + public UpdateSubscription(SocketConnection connection, Subscription subscription) { _connection = connection; _listener = subscription; @@ -121,7 +121,7 @@ namespace CryptoExchange.Net.Objects.Sockets /// Resubscribe this subscription /// /// - internal async Task> ResubscribeAsync() + internal async Task ResubscribeAsync() { return await _connection.ResubscribeAsync(_listener).ConfigureAwait(false); } diff --git a/CryptoExchange.Net/Sockets/Query.cs b/CryptoExchange.Net/Sockets/Query.cs index c65f9e6..655bddb 100644 --- a/CryptoExchange.Net/Sockets/Query.cs +++ b/CryptoExchange.Net/Sockets/Query.cs @@ -1,13 +1,14 @@ using CryptoExchange.Net.Converters; using CryptoExchange.Net.Objects; using CryptoExchange.Net.Objects.Sockets; +using System; namespace CryptoExchange.Net.Sockets { /// /// Query /// - public abstract class Query + public abstract class BaseQuery { /// /// The query request @@ -30,12 +31,6 @@ namespace CryptoExchange.Net.Sockets /// /// public abstract bool MessageMatchesQuery(ParsedMessage message); - /// - /// Handle the query response - /// - /// - /// - public abstract CallResult HandleResponse(ParsedMessage message); /// /// ctor @@ -43,11 +38,39 @@ namespace CryptoExchange.Net.Sockets /// /// /// - public Query(object request, bool authenticated, int weight = 1) + public BaseQuery(object request, bool authenticated, int weight = 1) { Authenticated = authenticated; Request = request; Weight = weight; } } + + public abstract class Query : BaseQuery + { + protected Query(object request, bool authenticated, int weight = 1) : base(request, authenticated, weight) + { + } + + /// + /// Handle the query response + /// + /// + /// + public abstract CallResult HandleResult(ParsedMessage message); + } + + public abstract class Query : BaseQuery + { + protected Query(object request, bool authenticated, int weight = 1) : base(request, authenticated, weight) + { + } + + /// + /// Handle the query response + /// + /// + /// + public abstract CallResult HandleResponse(ParsedMessage message); + } } diff --git a/CryptoExchange.Net/Sockets/SocketConnection.cs b/CryptoExchange.Net/Sockets/SocketConnection.cs index 814967c..2a4fd04 100644 --- a/CryptoExchange.Net/Sockets/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/SocketConnection.cs @@ -62,18 +62,18 @@ namespace CryptoExchange.Net.Sockets public int UserListenerCount { get { lock (_listenerLock) - return _messageIdentifierListeners.Values.Count(h => h.UserListener); } + return _messageIdentifierListeners.Values.Count(h => h.UserSubscription); } } /// /// Get a copy of the current message listeners /// - public MessageListener[] MessageListeners + public Subscription[] Subscriptions { get { lock (_listenerLock) - return _messageIdentifierListeners.Values.Where(h => h.UserListener).ToArray(); + return _messageIdentifierListeners.Values.Where(h => h.UserSubscription).ToArray(); } } @@ -158,9 +158,9 @@ namespace CryptoExchange.Net.Sockets } private bool _pausedActivity; - private readonly List _pendingRequests; - private readonly List _messageListeners; - private readonly Dictionary _messageIdentifierListeners; + private readonly List _pendingRequests; + private readonly List _messageListeners; + private readonly Dictionary _messageIdentifierListeners; private readonly object _listenerLock = new(); private readonly ILogger _logger; @@ -186,9 +186,9 @@ namespace CryptoExchange.Net.Sockets Tag = tag; Properties = new Dictionary(); - _pendingRequests = new List(); - _messageListeners = new List(); - _messageIdentifierListeners = new Dictionary(); + _pendingRequests = new List(); + _messageListeners = new List(); + _messageIdentifierListeners = new Dictionary(); _socket = socket; _socket.OnStreamMessage += HandleStreamMessage; @@ -261,7 +261,7 @@ namespace CryptoExchange.Net.Sockets { foreach (var pendingRequest in _pendingRequests.ToList()) { - pendingRequest.Fail(); + pendingRequest.Fail("Connection interupted"); // Remove? } } @@ -301,7 +301,7 @@ namespace CryptoExchange.Net.Sockets /// Id of the request sent protected virtual void HandleRequestSent(int requestId) { - PendingRequest pendingRequest; + BasePendingRequest pendingRequest; lock (_pendingRequests) pendingRequest = _pendingRequests.SingleOrDefault(p => p.Id == requestId); @@ -324,11 +324,11 @@ namespace CryptoExchange.Net.Sockets var timestamp = DateTime.UtcNow; TimeSpan userCodeDuration = TimeSpan.Zero; - List listeners; + List listeners; lock (_listenerLock) - listeners = _messageListeners.OrderByDescending(x => x.Priority).ToList(); + listeners = _messageListeners.OrderByDescending(x => !x.UserSubscription).ToList(); - var result = ApiClient.StreamConverter.ReadJson(stream, listeners, ApiClient.ApiOptions.OutputOriginalData ?? ApiClient.ClientOptions.OutputOriginalData); // TODO + var result = ApiClient.StreamConverter.ReadJson(stream, _pendingRequests, listeners, ApiClient.ApiOptions.OutputOriginalData ?? ApiClient.ClientOptions.OutputOriginalData); if(result == null) { stream.Position = 0; @@ -352,12 +352,13 @@ namespace CryptoExchange.Net.Sockets { // Matched based on identifier var userSw = Stopwatch.StartNew(); - await idListener.ProcessAsync(result).ConfigureAwait(false); + var dataEvent = new DataEvent(result, null, result.OriginalData, DateTime.UtcNow, null); + await idListener.HandleEventAsync(dataEvent).ConfigureAwait(false); userSw.Stop(); return; } - List pendingRequests; + List pendingRequests; lock (_pendingRequests) pendingRequests = _pendingRequests.ToList(); @@ -371,11 +372,12 @@ namespace CryptoExchange.Net.Sockets if (pendingRequest.Completed) { // Answer to a timed out request, unsub if it is a subscription request - if (pendingRequest.MessageListener != null) - { - _logger.Log(LogLevel.Warning, $"Socket {SocketId} Received subscription info after request timed out; unsubscribing. Consider increasing the RequestTimeout"); - _ = UnsubscribeAsync(pendingRequest.MessageListener).ConfigureAwait(false); - } + // TODO + //if (pendingRequest.MessageListener != null) + //{ + // _logger.Log(LogLevel.Warning, $"Socket {SocketId} Received subscription info after request timed out; unsubscribing. Consider increasing the RequestTimeout"); + // _ = UnsubscribeAsync(pendingRequest.MessageListener).ConfigureAwait(false); + //} } else { @@ -443,25 +445,25 @@ namespace CryptoExchange.Net.Sockets /// /// Listener to close /// - public async Task CloseAsync(MessageListener listener) + public async Task CloseAsync(Subscription subscription) { lock (_listenerLock) { - if (!_messageListeners.Contains(listener)) + if (!_messageListeners.Contains(subscription)) return; - listener.Closed = true; + subscription.Closed = true; } if (Status == SocketStatus.Closing || Status == SocketStatus.Closed || Status == SocketStatus.Disposed) return; - _logger.Log(LogLevel.Debug, $"Socket {SocketId} closing listener {listener.Id}"); - if (listener.CancellationTokenRegistration.HasValue) - listener.CancellationTokenRegistration.Value.Dispose(); + _logger.Log(LogLevel.Debug, $"Socket {SocketId} closing listener {subscription.Id}"); + if (subscription.CancellationTokenRegistration.HasValue) + subscription.CancellationTokenRegistration.Value.Dispose(); - if (listener.Confirmed && _socket.IsOpen) - await UnsubscribeAsync(listener).ConfigureAwait(false); + if (subscription.Confirmed && _socket.IsOpen) + await UnsubscribeAsync(subscription).ConfigureAwait(false); bool shouldCloseConnection; lock (_listenerLock) @@ -472,7 +474,7 @@ namespace CryptoExchange.Net.Sockets return; } - shouldCloseConnection = _messageIdentifierListeners.All(r => !r.Value.UserListener || r.Value.Closed); + shouldCloseConnection = _messageIdentifierListeners.All(r => !r.Value.UserSubscription || r.Value.Closed); if (shouldCloseConnection) Status = SocketStatus.Closing; } @@ -485,8 +487,8 @@ namespace CryptoExchange.Net.Sockets lock (_listenerLock) { - _messageListeners.Remove(listener); - foreach (var id in listener.Subscription.Identifiers) + _messageListeners.Remove(subscription); + foreach (var id in subscription.Identifiers) _messageIdentifierListeners.Remove(id); } } @@ -504,22 +506,22 @@ namespace CryptoExchange.Net.Sockets /// Add a listener to this connection /// /// - public bool AddListener(MessageListener listener) + public bool AddListener(Subscription subscription) { lock (_listenerLock) { if (Status != SocketStatus.None && Status != SocketStatus.Connected) return false; - _messageListeners.Add(listener); - if (listener.Subscription.Identifiers != null) + _messageListeners.Add(subscription); + if (subscription.Identifiers != null) { - foreach (var id in listener.Subscription.Identifiers) - _messageIdentifierListeners.Add(id.ToLowerInvariant(), listener); + foreach (var id in subscription.Identifiers) + _messageIdentifierListeners.Add(id.ToLowerInvariant(), subscription); } - if (listener.UserListener) - _logger.Log(LogLevel.Debug, $"Socket {SocketId} adding new listener with id {listener.Id}, total listeners on connection: {_messageListeners.Count(s => s.UserListener)}"); + if (subscription.UserSubscription) + _logger.Log(LogLevel.Debug, $"Socket {SocketId} adding new listener with id {subscription.Id}, total listeners on connection: {_messageListeners.Count(s => s.UserSubscription)}"); return true; } } @@ -528,7 +530,7 @@ namespace CryptoExchange.Net.Sockets /// Get a listener on this connection by id /// /// - public MessageListener? GetListener(int id) + public Subscription? GetListener(int id) { lock (_listenerLock) return _messageListeners.SingleOrDefault(s => s.Id == id); @@ -539,42 +541,59 @@ namespace CryptoExchange.Net.Sockets /// /// Filter for a request /// - public MessageListener? GetListenerByRequest(Func predicate) + public Subscription? GetListenerByRequest(Func predicate) { lock(_listenerLock) - return _messageListeners.SingleOrDefault(s => predicate(s.Subscription)); + return _messageListeners.SingleOrDefault(s => predicate(s)); } - /// - /// Send data and wait for an answer - /// - /// The data type expected in response - /// The object to send - /// The timeout for response - /// Listener if this is a subscribe request - /// The response handler - /// The weight of the message - /// - public virtual async Task SendAndWaitAsync(T obj, TimeSpan timeout, MessageListener? listener, int weight, Func handler) + public virtual async Task> SendAndWaitQueryAsync(Query query) + { + var pendingRequest = PendingRequest.CreateForQuery(query); + await SendAndWaitAsync(pendingRequest, query.Weight).ConfigureAwait(false); + return pendingRequest.Result; + } + + public virtual async Task SendAndWaitQueryAsync(Query query) + { + var pendingRequest = PendingRequest.CreateForQuery(query); + await SendAndWaitAsync(pendingRequest, query.Weight).ConfigureAwait(false); + return pendingRequest.Result; + } + + public virtual async Task SendAndWaitSubAsync(Subscription subscription) + { + var pendingRequest = PendingRequest.CreateForSubRequest(subscription); + await SendAndWaitAsync(pendingRequest, 1).ConfigureAwait(false); + return pendingRequest.Result; + } + + public virtual async Task SendAndWaitUnsubAsync(Subscription subscription) + { + var pendingRequest = PendingRequest.CreateForUnsubRequest(subscription); + await SendAndWaitAsync(pendingRequest, 1).ConfigureAwait(false); + return pendingRequest.Result; + } + + private async Task SendAndWaitAsync(BasePendingRequest pending, int weight) { - var pending = new PendingRequest(ExchangeHelpers.NextId(), handler, timeout, listener); lock (_messageListeners) { _pendingRequests.Add(pending); } - var sendOk = Send(pending.Id, obj, weight); + var sendOk = Send(pending.Id, pending.Request, weight); if (!sendOk) { - pending.Fail(); + pending.Fail("Failed to send"); return; } while (true) { - if(!_socket.IsOpen) + if (!_socket.IsOpen) { - pending.Fail(); + pending.Fail("Socket not open"); return; } @@ -588,6 +607,52 @@ namespace CryptoExchange.Net.Sockets } } + ///// + ///// Send data and wait for an answer + ///// + ///// The data type expected in response + ///// The object to send + ///// The timeout for response + ///// Listener if this is a subscribe request + ///// The response handler + ///// The weight of the message + ///// + //public virtual async Task SendAndWaitAsync(T obj, TimeSpan timeout, MessageListener? listener, int weight, Func handler) + //{ + // // TODO either Query or Subscription should be passed here instead of T obj + // // That would allow to track the Query/Subscription on the PendingRequest instead of the listener, which allow us to match the pending request in the Converter + + // var pending = new PendingRequest(ExchangeHelpers.NextId(), handler, timeout, listener); + // lock (_messageListeners) + // { + // _pendingRequests.Add(pending); + // } + + // var sendOk = Send(pending.Id, obj, weight); + // if (!sendOk) + // { + // pending.Fail(); + // return; + // } + + // while (true) + // { + // if(!_socket.IsOpen) + // { + // pending.Fail(); + // return; + // } + + // if (pending.Completed) + // return; + + // await pending.Event.WaitAsync(TimeSpan.FromMilliseconds(500)).ConfigureAwait(false); + + // if (pending.Completed) + // return; + // } + //} + /// /// Send data over the websocket connection /// @@ -624,14 +689,14 @@ namespace CryptoExchange.Net.Sockets } } - private async Task> ProcessReconnectAsync() + private async Task ProcessReconnectAsync() { if (!_socket.IsOpen) return new CallResult(new WebError("Socket not connected")); bool anySubscriptions = false; lock (_listenerLock) - anySubscriptions = _messageListeners.Any(s => s.UserListener); + anySubscriptions = _messageListeners.Any(s => s.UserSubscription); if (!anySubscriptions) { @@ -660,21 +725,22 @@ namespace CryptoExchange.Net.Sockets } // Get a list of all subscriptions on the socket - List listenerList = new List(); + List listenerList = new List(); lock (_listenerLock) { + // ? foreach (var listener in _messageListeners) { - if (listener.Subscription != null) + if (listener != null) listenerList.Add(listener); else listener.Confirmed = true; } } - foreach(var listener in listenerList.Where(s => s.Subscription != null)) + foreach(var listener in listenerList) { - var result = await ApiClient.RevitalizeRequestAsync(listener.Subscription!).ConfigureAwait(false); + var result = await ApiClient.RevitalizeRequestAsync(listener).ConfigureAwait(false); if (!result) { _logger.Log(LogLevel.Warning, $"Socket {SocketId} Failed request revitalization: " + result.Error); @@ -688,9 +754,9 @@ namespace CryptoExchange.Net.Sockets if (!_socket.IsOpen) return new CallResult(new WebError("Socket not connected")); - var taskList = new List>>(); + var taskList = new List>(); foreach (var listener in listenerList.Skip(i).Take(ApiClient.ClientOptions.MaxConcurrentResubscriptionsPerSocket)) - taskList.Add(ApiClient.SubscribeAndWaitAsync(this, listener.Subscription!, listener)); + taskList.Add(ApiClient.SubscribeAndWaitAsync(this, listener)); await Task.WhenAll(taskList).ConfigureAwait(false); if (taskList.Any(t => !t.Result.Success)) @@ -707,28 +773,24 @@ namespace CryptoExchange.Net.Sockets return new CallResult(true); } - internal async Task UnsubscribeAsync(MessageListener listener) + internal async Task UnsubscribeAsync(Subscription subscription) { - var unsubscribeRequest = listener.Subscription?.GetUnsubRequest(); + var unsubscribeRequest = subscription?.GetUnsubRequest(); if (unsubscribeRequest != null) { - await SendAndWaitAsync(unsubscribeRequest, TimeSpan.FromSeconds(10), listener, 0, x => - { - var (matches, result) = listener.Subscription!.MessageMatchesUnsubRequest(x); - // TODO check result? - return matches; - }).ConfigureAwait(false); + var pendingRequest = PendingRequest.CreateForUnsubRequest(subscription!); + await SendAndWaitAsync(pendingRequest, 1).ConfigureAwait(false); - _logger.Log(LogLevel.Information, $"Socket {SocketId} subscription {listener.Id} unsubscribed"); + _logger.Log(LogLevel.Information, $"Socket {SocketId} subscription {subscription.Id} unsubscribed"); } } - internal async Task> ResubscribeAsync(MessageListener listener) + internal async Task ResubscribeAsync(Subscription subscription) { if (!_socket.IsOpen) - return new CallResult(new UnknownError("Socket is not connected")); + return new CallResult(new UnknownError("Socket is not connected")); - return await ApiClient.SubscribeAndWaitAsync(this, listener.Subscription!, listener).ConfigureAwait(false); + return await ApiClient.SubscribeAndWaitAsync(this, subscription).ConfigureAwait(false); } /// diff --git a/CryptoExchange.Net/Sockets/Subscription.cs b/CryptoExchange.Net/Sockets/Subscription.cs index ed98d79..1b8f703 100644 --- a/CryptoExchange.Net/Sockets/Subscription.cs +++ b/CryptoExchange.Net/Sockets/Subscription.cs @@ -1,7 +1,9 @@ using CryptoExchange.Net.Objects; using CryptoExchange.Net.Objects.Sockets; using Microsoft.Extensions.Logging; +using System; using System.Collections.Generic; +using System.Threading; using System.Threading.Tasks; namespace CryptoExchange.Net.Sockets @@ -11,6 +13,12 @@ namespace CryptoExchange.Net.Sockets /// public abstract class Subscription { + public int Id { get; set; } + + public bool UserSubscription { get; set; } + public bool Confirmed { get; set; } + public bool Closed { get; set; } + /// /// Logger /// @@ -26,6 +34,13 @@ namespace CryptoExchange.Net.Sockets /// public abstract List Identifiers { get; } + public CancellationTokenRegistration? CancellationTokenRegistration { get; set; } + + /// + /// Exception event + /// + public event Action? Exception; + /// /// ctor /// @@ -47,7 +62,8 @@ namespace CryptoExchange.Net.Sockets /// /// /// - public abstract (bool, CallResult?) MessageMatchesSubRequest(ParsedMessage message); + public abstract bool MessageMatchesSubRequest(ParsedMessage message); + public abstract CallResult HandleSubResponse(ParsedMessage message); /// /// Get the unsubscribe object to send when unsubscribing @@ -59,7 +75,8 @@ namespace CryptoExchange.Net.Sockets /// /// /// - public abstract (bool, CallResult?) MessageMatchesUnsubRequest(ParsedMessage message); + public abstract bool MessageMatchesUnsubRequest(ParsedMessage message); + public abstract CallResult HandleUnsubResponse(ParsedMessage message); /// /// Handle the update message @@ -67,5 +84,14 @@ namespace CryptoExchange.Net.Sockets /// /// public abstract Task HandleEventAsync(DataEvent message); + + /// + /// Invoke the exception event + /// + /// + public void InvokeExceptionHandler(Exception e) + { + Exception?.Invoke(e); + } } } diff --git a/CryptoExchange.Net/Sockets/SystemSubscription.cs b/CryptoExchange.Net/Sockets/SystemSubscription.cs index 4781832..3e162c9 100644 --- a/CryptoExchange.Net/Sockets/SystemSubscription.cs +++ b/CryptoExchange.Net/Sockets/SystemSubscription.cs @@ -24,11 +24,15 @@ namespace CryptoExchange.Net.Sockets /// public override object? GetSubRequest() => null; /// - public override (bool, CallResult?) MessageMatchesSubRequest(ParsedMessage message) => throw new NotImplementedException(); + public override bool MessageMatchesSubRequest(ParsedMessage message) => throw new NotImplementedException(); + /// + public override CallResult HandleSubResponse(ParsedMessage message) => throw new NotImplementedException(); /// public override object? GetUnsubRequest() => null; /// - public override (bool, CallResult?) MessageMatchesUnsubRequest(ParsedMessage message) => throw new NotImplementedException(); + public override bool MessageMatchesUnsubRequest(ParsedMessage message) => throw new NotImplementedException(); + /// + public override CallResult HandleUnsubResponse(ParsedMessage message) => throw new NotImplementedException(); } }