diff --git a/CryptoExchange.Net.UnitTests/SymbolOrderBookTests.cs b/CryptoExchange.Net.UnitTests/SymbolOrderBookTests.cs index 825e435..ad6da83 100644 --- a/CryptoExchange.Net.UnitTests/SymbolOrderBookTests.cs +++ b/CryptoExchange.Net.UnitTests/SymbolOrderBookTests.cs @@ -5,8 +5,8 @@ using System.Threading.Tasks; using CryptoExchange.Net.Interfaces; using CryptoExchange.Net.Objects; using CryptoExchange.Net.Objects.Options; +using CryptoExchange.Net.Objects.Sockets; using CryptoExchange.Net.OrderBook; -using CryptoExchange.Net.Sockets; using NUnit.Framework; namespace CryptoExchange.Net.UnitTests diff --git a/CryptoExchange.Net.UnitTests/TestImplementations/TestSocketClient.cs b/CryptoExchange.Net.UnitTests/TestImplementations/TestSocketClient.cs index 54b551c..796213e 100644 --- a/CryptoExchange.Net.UnitTests/TestImplementations/TestSocketClient.cs +++ b/CryptoExchange.Net.UnitTests/TestImplementations/TestSocketClient.cs @@ -5,6 +5,7 @@ using CryptoExchange.Net.Authentication; using CryptoExchange.Net.Interfaces; using CryptoExchange.Net.Objects; using CryptoExchange.Net.Objects.Options; +using CryptoExchange.Net.Objects.Sockets; using CryptoExchange.Net.Sockets; using Microsoft.Extensions.Logging; using Moq; diff --git a/CryptoExchange.Net/Clients/BaseSocketClient.cs b/CryptoExchange.Net/Clients/BaseSocketClient.cs index a9c82bf..f74aa35 100644 --- a/CryptoExchange.Net/Clients/BaseSocketClient.cs +++ b/CryptoExchange.Net/Clients/BaseSocketClient.cs @@ -4,7 +4,7 @@ using System.Linq; using System.Text; using System.Threading.Tasks; using CryptoExchange.Net.Interfaces; -using CryptoExchange.Net.Sockets; +using CryptoExchange.Net.Objects.Sockets; using Microsoft.Extensions.Logging; namespace CryptoExchange.Net diff --git a/CryptoExchange.Net/Clients/SocketApiClient.cs b/CryptoExchange.Net/Clients/SocketApiClient.cs index db94163..699a3dc 100644 --- a/CryptoExchange.Net/Clients/SocketApiClient.cs +++ b/CryptoExchange.Net/Clients/SocketApiClient.cs @@ -1,6 +1,7 @@ using CryptoExchange.Net.Interfaces; using CryptoExchange.Net.Objects; using CryptoExchange.Net.Objects.Options; +using CryptoExchange.Net.Objects.Sockets; using CryptoExchange.Net.Sockets; using Microsoft.Extensions.Logging; using Newtonsoft.Json.Linq; @@ -9,6 +10,7 @@ using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; using System.Linq; +using System.Net.Sockets; using System.Text; using System.Threading; using System.Threading.Tasks; @@ -53,7 +55,7 @@ namespace CryptoExchange.Net /// /// Handlers for data from the socket which doesn't need to be forwarded to the caller. Ping or welcome messages for example. /// - protected Dictionary> genericHandlers = new(); + protected List genericHandlers = new(); /// /// The task that is sending periodic data on the websocket. Can be used for sending Ping messages every x seconds or similair. Not necesarry. @@ -144,6 +146,7 @@ namespace CryptoExchange.Net /// Handler for string data protected void SetDataInterpreter(Func? byteHandler, Func? stringHandler) { + // TODO dataInterpreterBytes = byteHandler; dataInterpreterString = stringHandler; } @@ -152,15 +155,12 @@ namespace CryptoExchange.Net /// Connect to an url and listen for data on the BaseAddress /// /// The type of the expected data - /// The optional request object to send, will be serialized to json - /// The identifier to use, necessary if no request object is sent - /// If the subscription is to an authenticated endpoint - /// The handler of update data + /// The subscription /// Cancellation token for closing this subscription /// - protected virtual Task> SubscribeAsync(object? request, string? identifier, bool authenticated, Action> dataHandler, CancellationToken ct) + protected virtual Task> SubscribeAsync(SubscriptionActor subscriptionObject, CancellationToken ct) { - return SubscribeAsync(BaseAddress, request, identifier, authenticated, dataHandler, ct); + return SubscribeAsync(BaseAddress, subscriptionObject, ct); } /// @@ -168,19 +168,16 @@ namespace CryptoExchange.Net /// /// The type of the expected data /// The URL to connect to - /// The optional request object to send, will be serialized to json - /// The identifier to use, necessary if no request object is sent - /// If the subscription is to an authenticated endpoint - /// The handler of update data + /// The subscription /// Cancellation token for closing this subscription /// - protected virtual async Task> SubscribeAsync(string url, object? request, string? identifier, bool authenticated, Action> dataHandler, CancellationToken ct) + protected virtual async Task> SubscribeAsync(string url, SubscriptionActor subscriptionObject, CancellationToken ct) { if (_disposing) return new CallResult(new InvalidOperationError("Client disposed, can't subscribe")); SocketConnection socketConnection; - SocketSubscription? subscription; + SocketSubscriptionListener? subscription; var released = false; // Wait for a semaphore here, so we only connect 1 socket at a time. // This is necessary for being able to see if connections can be combined @@ -198,14 +195,14 @@ namespace CryptoExchange.Net while (true) { // Get a new or existing socket connection - var socketResult = await GetSocketConnection(url, authenticated).ConfigureAwait(false); + var socketResult = await GetSocketConnection(url, subscriptionObject.Authenticated).ConfigureAwait(false); if (!socketResult) return socketResult.As(null); socketConnection = socketResult.Data; // Add a subscription on the socket connection - subscription = AddSubscription(request, identifier, true, socketConnection, dataHandler, authenticated); + subscription = AddSubscription(subscriptionObject, true, socketConnection); if (subscription == null) { _logger.Log(LogLevel.Trace, $"Socket {socketConnection.SocketId} failed to add subscription, retrying on different connection"); @@ -221,7 +218,7 @@ namespace CryptoExchange.Net var needsConnecting = !socketConnection.Connected; - var connectResult = await ConnectIfNeededAsync(socketConnection, authenticated).ConfigureAwait(false); + var connectResult = await ConnectIfNeededAsync(socketConnection, subscriptionObject.Authenticated).ConfigureAwait(false); if (!connectResult) return new CallResult(connectResult.Error!); @@ -240,6 +237,7 @@ namespace CryptoExchange.Net return new CallResult(new ServerError("Socket is paused")); } + var request = subscriptionObject.GetSubscribeRequest(); if (request != null) { // Send the request and wait for answer @@ -277,10 +275,16 @@ namespace CryptoExchange.Net /// The request to send, will be serialized to json /// The subscription the request is for /// - protected internal virtual async Task> SubscribeAndWaitAsync(SocketConnection socketConnection, object request, SocketSubscription subscription) + protected internal virtual async Task> SubscribeAndWaitAsync(SocketConnection socketConnection, object request, SocketSubscriptionListener subscription) { - CallResult? callResult = null; - await socketConnection.SendAndWaitAsync(request, ClientOptions.RequestTimeout, subscription, 1, data => HandleSubscriptionResponse(socketConnection, subscription, request, data, out callResult)).ConfigureAwait(false); + CallResult? callResult = null; + await socketConnection.SendAndWaitAsync(request, ClientOptions.RequestTimeout, subscription, 1, x => + { + var (matches, result) = subscription.Subscription!.MessageMatchesSubscribeRequest(x); + if (matches) + callResult = result; + return matches; + }).ConfigureAwait(false); if (callResult?.Success == true) { @@ -298,13 +302,12 @@ namespace CryptoExchange.Net /// Send a query on a socket connection to the BaseAddress and wait for the response /// /// Expected result type - /// The request to send, will be serialized to json - /// If the query is to an authenticated endpoint + /// The query /// Weight of the request /// - protected virtual Task> QueryAsync(object request, bool authenticated, int weight = 1) + protected virtual Task> QueryAsync(QueryActor query, int weight = 1) { - return QueryAsync(BaseAddress, request, authenticated, weight); + return QueryAsync(BaseAddress, query, weight); } /// @@ -313,10 +316,9 @@ namespace CryptoExchange.Net /// The expected result type /// The url for the request /// The request to send - /// Whether the socket should be authenticated /// Weight of the request /// - protected virtual async Task> QueryAsync(string url, object request, bool authenticated, int weight = 1) + protected virtual async Task> QueryAsync(string url, QueryActor request, int weight = 1) { if (_disposing) return new CallResult(new InvalidOperationError("Client disposed, can't query")); @@ -326,7 +328,7 @@ namespace CryptoExchange.Net await semaphoreSlim.WaitAsync().ConfigureAwait(false); try { - var socketResult = await GetSocketConnection(url, authenticated).ConfigureAwait(false); + var socketResult = await GetSocketConnection(url, request.Authenticated).ConfigureAwait(false); if (!socketResult) return socketResult.As(default); @@ -339,7 +341,7 @@ namespace CryptoExchange.Net released = true; } - var connectResult = await ConnectIfNeededAsync(socketConnection, authenticated).ConfigureAwait(false); + var connectResult = await ConnectIfNeededAsync(socketConnection, request.Authenticated).ConfigureAwait(false); if (!connectResult) return new CallResult(connectResult.Error!); } @@ -366,16 +368,19 @@ namespace CryptoExchange.Net /// The request to send /// The weight of the query /// - protected virtual async Task> QueryAndWaitAsync(SocketConnection socket, object request, int weight) + protected virtual async Task> QueryAndWaitAsync(SocketConnection socket, QueryActor request, int weight) { var dataResult = new CallResult(new ServerError("No response on query received")); - await socket.SendAndWaitAsync(request, ClientOptions.RequestTimeout, null, weight, data => + await socket.SendAndWaitAsync(request.Query, ClientOptions.RequestTimeout, null, weight, x => { - if (!HandleQueryResponse(socket, request, data, out var callResult)) - return false; + var matches = request.MessageMatchesQuery(x); + if (matches) + { + request.HandleResponse(x); + return true; + } - dataResult = callResult; - return true; + return false; }).ConfigureAwait(false); return dataResult; @@ -402,16 +407,39 @@ namespace CryptoExchange.Net if (!authenticated || socket.Authenticated) return new CallResult(true); + return await AuthenticateSocketAsync(socket).ConfigureAwait(false); + } + + /// + /// Authenticate a socket connection + /// + /// Socket to authenticate + /// + public virtual async Task> AuthenticateSocketAsync(SocketConnection socket) + { _logger.Log(LogLevel.Debug, $"Socket {socket.SocketId} Attempting to authenticate"); - var result = await AuthenticateSocketAsync(socket).ConfigureAwait(false); - if (!result) + var authRequest = GetAuthenticationRequest(); + var authResult = new CallResult(new ServerError("No response from server")); + await socket.SendAndWaitAsync(authRequest.Query, ClientOptions.RequestTimeout, null, 1, x => + { + var matches = authRequest.MessageMatchesQuery(x); + if (matches) + { + authResult = authRequest.HandleResponse(x); + return true; + } + + return false; + }).ConfigureAwait(false); + + if (!authResult) { _logger.Log(LogLevel.Warning, $"Socket {socket.SocketId} authentication failed"); if (socket.Connected) await socket.CloseAsync().ConfigureAwait(false); - result.Error!.Message = "Authentication failed: " + result.Error.Message; - return new CallResult(result.Error); + authResult.Error!.Message = "Authentication failed: " + authResult.Error.Message; + return new CallResult(authResult.Error); } _logger.Log(LogLevel.Debug, $"Socket {socket.SocketId} authenticated"); @@ -420,129 +448,46 @@ namespace CryptoExchange.Net } /// - /// The socketConnection received data (the data JToken parameter). The implementation of this method should check if the received data is a response to the query that was send (the request parameter). - /// For example; A query is sent in a request message with an Id parameter with value 10. The socket receives data and calls this method to see if the data it received is an - /// anwser to any query that was done. The implementation of this method should check if the response.Id == request.Id to see if they match (assuming the api has some sort of Id tracking on messages, - /// if not some other method has be implemented to match the messages). - /// If the messages match, the callResult out parameter should be set with the deserialized data in the from of (T) and return true. + /// Should return the request which can be used to authenticate a socket connection /// - /// The type of response that is expected on the query - /// The socket connection - /// The request that a response is awaited for - /// The message received from the server - /// The interpretation (null if message wasn't a response to the request) - /// True if the message was a response to the query - protected internal abstract bool HandleQueryResponse(SocketConnection socketConnection, object request, JToken data, [NotNullWhen(true)] out CallResult? callResult); - - /// - /// The socketConnection received data (the data JToken parameter). The implementation of this method should check if the received data is a response to the subscription request that was send (the request parameter). - /// For example; A subscribe request message is send with an Id parameter with value 10. The socket receives data and calls this method to see if the data it received is an - /// anwser to any subscription request that was done. The implementation of this method should check if the response.Id == request.Id to see if they match (assuming the api has some sort of Id tracking on messages, - /// if not some other method has be implemented to match the messages). - /// If the messages match, the callResult out parameter should be set with the deserialized data in the from of (T) and return true. - /// - /// The socket connection - /// A subscription that waiting for a subscription response - /// The request that the subscription sent - /// The message received from the server - /// The interpretation (null if message wasn't a response to the request) - /// True if the message was a response to the subscription request - protected internal abstract bool HandleSubscriptionResponse(SocketConnection socketConnection, SocketSubscription subscription, object request, JToken data, out CallResult? callResult); - - /// - /// Needs to check if a received message matches a handler by request. After subscribing data message will come in. These data messages need to be matched to a specific connection - /// to pass the correct data to the correct handler. The implementation of this method should check if the message received matches the subscribe request that was sent. - /// - /// The socket connection the message was recieved on - /// The received data - /// The subscription request - /// True if the message is for the subscription which sent the request - protected internal abstract bool MessageMatchesHandler(SocketConnection socketConnection, JToken message, object request); - - /// - /// Needs to check if a received message matches a handler by identifier. Generally used by GenericHandlers. For example; a generic handler is registered which handles ping messages - /// from the server. This method should check if the message received is a ping message and the identifer is the identifier of the GenericHandler - /// - /// The socket connection the message was recieved on - /// The received data - /// The string identifier of the handler - /// True if the message is for the handler which has the identifier - protected internal abstract bool MessageMatchesHandler(SocketConnection socketConnection, JToken message, string identifier); - - /// - /// Needs to authenticate the socket so authenticated queries/subscriptions can be made on this socket connection - /// - /// The socket connection that should be authenticated /// - protected internal abstract Task> AuthenticateSocketAsync(SocketConnection socketConnection); - - /// - /// Needs to unsubscribe a subscription, typically by sending an unsubscribe request. If multiple subscriptions per socket is not allowed this can just return since the socket will be closed anyway - /// - /// The connection on which to unsubscribe - /// The subscription to unsubscribe - /// - protected internal abstract Task UnsubscribeAsync(SocketConnection connection, SocketSubscription subscriptionToUnsub); + protected internal abstract QueryActor GetAuthenticationRequest(); /// /// Optional handler to interpolate data before sending it to the handlers /// /// /// - protected internal virtual JToken ProcessTokenData(JToken message) - { - return message; - } + //protected internal virtual JToken ProcessTokenData(JToken message) + //{ + // return message; + //} /// /// Add a subscription to a connection /// /// The type of data the subscription expects - /// The request of the subscription - /// The identifier of the subscription (can be null if request param is used) + /// The subscription /// Whether or not this is a user subscription (counts towards the max amount of handlers on a socket) /// The socket connection the handler is on - /// The handler of the data received - /// Whether the subscription needs authentication /// - protected virtual SocketSubscription? AddSubscription(object? request, string? identifier, bool userSubscription, SocketConnection connection, Action> dataHandler, bool authenticated) + protected virtual SocketSubscriptionListener? AddSubscription(SubscriptionActor subscriptionObject, bool userSubscription, SocketConnection connection) { - void InternalHandler(MessageEvent messageEvent) - { - if (typeof(T) == typeof(string)) - { - var stringData = (T)Convert.ChangeType(messageEvent.JsonData.ToString(), typeof(T)); - dataHandler(new DataEvent(stringData, null, OutputOriginalData ? messageEvent.OriginalData : null, messageEvent.ReceivedTimestamp, null)); - return; - } - - var desResult = Deserialize(messageEvent.JsonData); - if (!desResult) - { - _logger.Log(LogLevel.Warning, $"Socket {connection.SocketId} Failed to deserialize data into type {typeof(T)}: {desResult.Error}"); - return; - } - - dataHandler(new DataEvent(desResult.Data, null, OutputOriginalData ? messageEvent.OriginalData : null, messageEvent.ReceivedTimestamp, null)); - } - - var subscription = request == null - ? SocketSubscription.CreateForIdentifier(ExchangeHelpers.NextId(), identifier!, userSubscription, authenticated, InternalHandler) - : SocketSubscription.CreateForRequest(ExchangeHelpers.NextId(), request, userSubscription, authenticated, InternalHandler); + var subscription = new SocketSubscriptionListener(ExchangeHelpers.NextId(), subscriptionObject, userSubscription); if (!connection.AddSubscription(subscription)) return null; + return subscription; } /// - /// Adds a generic message handler. Used for example to reply to ping requests + /// Adds a system subscription. Used for example to reply to ping requests /// - /// The name of the request handler. Needs to be unique - /// The action to execute when receiving a message for this handler (checked by ) - protected void AddGenericHandler(string identifier, Action action) + /// The subscription + protected void AddSystemSubscription(SystemSubscription systemSubscription) { - genericHandlers.Add(identifier, action); - var subscription = SocketSubscription.CreateForIdentifier(ExchangeHelpers.NextId(), identifier, false, false, action); + genericHandlers.Add(systemSubscription); + var subscription = new SocketSubscriptionListener(ExchangeHelpers.NextId(), systemSubscription, false); foreach (var connection in socketConnections.Values) connection.AddSubscription(subscription); } @@ -614,9 +559,10 @@ namespace CryptoExchange.Net var socket = CreateSocket(connectionAddress.Data!); var socketConnection = new SocketConnection(_logger, this, socket, address); socketConnection.UnhandledMessage += HandleUnhandledMessage; - foreach (var kvp in genericHandlers) + + foreach (var systemHandler in genericHandlers) { - var handler = SocketSubscription.CreateForIdentifier(ExchangeHelpers.NextId(), kvp.Key, false, false, kvp.Value); + var handler = new SocketSubscriptionListener(ExchangeHelpers.NextId(), systemHandler, false); socketConnection.AddSubscription(handler); } @@ -626,8 +572,8 @@ namespace CryptoExchange.Net /// /// Process an unhandled message /// - /// The token that wasn't processed - protected virtual void HandleUnhandledMessage(JToken token) + /// The message that wasn't processed + protected virtual void HandleUnhandledMessage(StreamMessage message) { } @@ -731,7 +677,7 @@ namespace CryptoExchange.Net /// public virtual async Task UnsubscribeAsync(int subscriptionId) { - SocketSubscription? subscription = null; + SocketSubscriptionListener? subscription = null; SocketConnection? connection = null; foreach (var socket in socketConnections.Values.ToList()) { diff --git a/CryptoExchange.Net/Interfaces/ISocketApiClient.cs b/CryptoExchange.Net/Interfaces/ISocketApiClient.cs index 7d1c3c6..8623514 100644 --- a/CryptoExchange.Net/Interfaces/ISocketApiClient.cs +++ b/CryptoExchange.Net/Interfaces/ISocketApiClient.cs @@ -1,6 +1,6 @@ using CryptoExchange.Net.Objects; using CryptoExchange.Net.Objects.Options; -using CryptoExchange.Net.Sockets; +using CryptoExchange.Net.Objects.Sockets; using System; using System.Threading.Tasks; @@ -26,7 +26,15 @@ namespace CryptoExchange.Net.Interfaces /// /// The factory for creating sockets. Used for unit testing /// - IWebsocketFactory SocketFactory { get; set; } + IWebsocketFactory SocketFactory { get; } + /// + /// Current client options + /// + SocketExchangeOptions ClientOptions { get; } + /// + /// Current API options + /// + SocketApiOptions ApiOptions { get; } /// /// Log the current state of connections and subscriptions /// diff --git a/CryptoExchange.Net/Interfaces/ISocketClient.cs b/CryptoExchange.Net/Interfaces/ISocketClient.cs index e2e7f97..108f7ed 100644 --- a/CryptoExchange.Net/Interfaces/ISocketClient.cs +++ b/CryptoExchange.Net/Interfaces/ISocketClient.cs @@ -3,7 +3,7 @@ using System.Threading.Tasks; using CryptoExchange.Net.Authentication; using CryptoExchange.Net.Objects; using CryptoExchange.Net.Objects.Options; -using CryptoExchange.Net.Sockets; +using CryptoExchange.Net.Objects.Sockets; namespace CryptoExchange.Net.Interfaces { diff --git a/CryptoExchange.Net/Interfaces/IStreamMessageListener.cs b/CryptoExchange.Net/Interfaces/IStreamMessageListener.cs new file mode 100644 index 0000000..ad77bd7 --- /dev/null +++ b/CryptoExchange.Net/Interfaces/IStreamMessageListener.cs @@ -0,0 +1,12 @@ +using CryptoExchange.Net.Objects.Sockets; +using System.Threading.Tasks; + +namespace CryptoExchange.Net.Interfaces +{ + internal interface IStreamMessageListener + { + int Priority { get; } + bool MessageMatches(StreamMessage message); + Task ProcessAsync(StreamMessage message); + } +} diff --git a/CryptoExchange.Net/Interfaces/IWebsocket.cs b/CryptoExchange.Net/Interfaces/IWebsocket.cs index 1c6bfc6..2d7145a 100644 --- a/CryptoExchange.Net/Interfaces/IWebsocket.cs +++ b/CryptoExchange.Net/Interfaces/IWebsocket.cs @@ -1,6 +1,7 @@ using CryptoExchange.Net.Objects; using CryptoExchange.Net.Sockets; using System; +using System.IO; using System.Security.Authentication; using System.Text; using System.Threading.Tasks; @@ -19,7 +20,7 @@ namespace CryptoExchange.Net.Interfaces /// /// Websocket message received event /// - event Action OnMessage; + event Func OnStreamMessage; /// /// Websocket sent event, RequestId as parameter /// diff --git a/CryptoExchange.Net/Interfaces/IWebsocketFactory.cs b/CryptoExchange.Net/Interfaces/IWebsocketFactory.cs index a8304da..3fe70c1 100644 --- a/CryptoExchange.Net/Interfaces/IWebsocketFactory.cs +++ b/CryptoExchange.Net/Interfaces/IWebsocketFactory.cs @@ -1,4 +1,4 @@ -using CryptoExchange.Net.Sockets; +using CryptoExchange.Net.Objects.Sockets; using Microsoft.Extensions.Logging; namespace CryptoExchange.Net.Interfaces diff --git a/CryptoExchange.Net/Sockets/DataEvent.cs b/CryptoExchange.Net/Objects/Sockets/DataEvent.cs similarity index 98% rename from CryptoExchange.Net/Sockets/DataEvent.cs rename to CryptoExchange.Net/Objects/Sockets/DataEvent.cs index d6e3151..b1b4058 100644 --- a/CryptoExchange.Net/Sockets/DataEvent.cs +++ b/CryptoExchange.Net/Objects/Sockets/DataEvent.cs @@ -1,7 +1,7 @@ using CryptoExchange.Net.Objects; using System; -namespace CryptoExchange.Net.Sockets +namespace CryptoExchange.Net.Objects.Sockets { /// /// An update received from a socket update subscription diff --git a/CryptoExchange.Net/Sockets/PendingRequest.cs b/CryptoExchange.Net/Objects/Sockets/PendingRequest.cs similarity index 54% rename from CryptoExchange.Net/Sockets/PendingRequest.cs rename to CryptoExchange.Net/Objects/Sockets/PendingRequest.cs index 6043d22..9e26257 100644 --- a/CryptoExchange.Net/Sockets/PendingRequest.cs +++ b/CryptoExchange.Net/Objects/Sockets/PendingRequest.cs @@ -1,27 +1,29 @@ -using CryptoExchange.Net.Objects; -using Newtonsoft.Json.Linq; +using CryptoExchange.Net.Interfaces; +using CryptoExchange.Net.Objects; using System; using System.Threading; +using System.Threading.Tasks; -namespace CryptoExchange.Net.Sockets +namespace CryptoExchange.Net.Objects.Sockets { - internal class PendingRequest + internal class PendingRequest : IStreamMessageListener { public int Id { get; set; } - public Func Handler { get; } - public JToken? Result { get; private set; } + public Func MessageMatchesHandler { get; } public bool Completed { get; private set; } public AsyncResetEvent Event { get; } public DateTime RequestTimestamp { get; set; } public TimeSpan Timeout { get; } - public SocketSubscription? Subscription { get; } + public SocketSubscriptionListener? Subscription { get; } private CancellationTokenSource? _cts; - public PendingRequest(int id, Func handler, TimeSpan timeout, SocketSubscription? subscription) + public int Priority => 100; + + public PendingRequest(int id, Func messageMatchesHandler, TimeSpan timeout, SocketSubscriptionListener? subscription) { Id = id; - Handler = handler; + MessageMatchesHandler = messageMatchesHandler; Event = new AsyncResetEvent(false, false); Timeout = timeout; RequestTimestamp = DateTime.UtcNow; @@ -35,23 +37,22 @@ namespace CryptoExchange.Net.Sockets _cts.Token.Register(Fail, false); } - public bool CheckData(JToken data) - { - return Handler(data); - } - - public bool Succeed(JToken data) - { - Result = data; - Completed = true; - Event.Set(); - return true; - } - public void Fail() { Completed = true; Event.Set(); } + + public bool MessageMatches(StreamMessage message) + { + return MessageMatchesHandler(message); + } + + public Task ProcessAsync(StreamMessage message) + { + Completed = true; + Event.Set(); + return Task.CompletedTask; + } } } diff --git a/CryptoExchange.Net/Sockets/SocketSubscription.cs b/CryptoExchange.Net/Objects/Sockets/SocketSubscription.cs similarity index 53% rename from CryptoExchange.Net/Sockets/SocketSubscription.cs rename to CryptoExchange.Net/Objects/Sockets/SocketSubscription.cs index 62e7b87..d662738 100644 --- a/CryptoExchange.Net/Sockets/SocketSubscription.cs +++ b/CryptoExchange.Net/Objects/Sockets/SocketSubscription.cs @@ -1,12 +1,15 @@ -using System; +using CryptoExchange.Net.Interfaces; +using CryptoExchange.Net.Sockets; +using System; using System.Threading; +using System.Threading.Tasks; -namespace CryptoExchange.Net.Sockets +namespace CryptoExchange.Net.Objects.Sockets { /// /// Socket subscription /// - public class SocketSubscription + public class SocketSubscriptionListener : IStreamMessageListener { /// /// Unique subscription id @@ -18,26 +21,16 @@ namespace CryptoExchange.Net.Sockets /// public event Action? Exception; - /// - /// Message handlers for this subscription. Should return true if the message is handled and should not be distributed to the other handlers - /// - public Action MessageHandler { get; set; } - /// /// The request object send when subscribing on the server. Either this or the `Identifier` property should be set /// - public object? Request { get; set; } - - /// - /// The subscription identifier, used instead of a `Request` object to identify the subscription - /// - public string? Identifier { get; set; } + public SubscriptionActor Subscription { get; set; } /// /// Whether this is a user subscription or an internal listener /// public bool UserSubscription { get; set; } - + /// /// If the subscription has been confirmed to be subscribed by the server /// @@ -46,7 +39,7 @@ namespace CryptoExchange.Net.Sockets /// /// Whether authentication is needed for this subscription /// - public bool Authenticated { get; set; } + public bool Authenticated => Subscription.Authenticated; /// /// Whether we're closing this subscription and a socket connection shouldn't be kept open for it @@ -59,44 +52,17 @@ namespace CryptoExchange.Net.Sockets /// public CancellationTokenRegistration? CancellationTokenRegistration { get; set; } - private SocketSubscription(int id, object? request, string? identifier, bool userSubscription, bool authenticated, Action dataHandler) - { - Id = id; - UserSubscription = userSubscription; - MessageHandler = dataHandler; - Request = request; - Identifier = identifier; - Authenticated = authenticated; - } - /// - /// Create SocketSubscription for a subscribe request + /// ctor /// /// /// /// - /// - /// - /// - public static SocketSubscription CreateForRequest(int id, object request, bool userSubscription, - bool authenticated, Action dataHandler) + public SocketSubscriptionListener(int id, SubscriptionActor request, bool userSubscription) { - return new SocketSubscription(id, request, null, userSubscription, authenticated, dataHandler); - } - - /// - /// Create SocketSubscription for an identifier - /// - /// - /// - /// - /// - /// - /// - public static SocketSubscription CreateForIdentifier(int id, string identifier, bool userSubscription, - bool authenticated, Action dataHandler) - { - return new SocketSubscription(id, null, identifier, userSubscription, authenticated, dataHandler); + Id = id; + UserSubscription = userSubscription; + Subscription = request; } /// @@ -107,5 +73,24 @@ namespace CryptoExchange.Net.Sockets { Exception?.Invoke(e); } + + /// + /// The priority of this subscription + /// + public int Priority => Subscription is SystemSubscription ? 50 : 1; + + /// + /// Check if message matches the subscription + /// + /// + /// + public bool MessageMatches(StreamMessage message) => Subscription.MessageMatchesSubscription(message); + + /// + /// Process the message + /// + /// + /// + public Task ProcessAsync(StreamMessage message) => Subscription.HandleEventAsync(message); } } diff --git a/CryptoExchange.Net/Objects/Sockets/StreamMessage.cs b/CryptoExchange.Net/Objects/Sockets/StreamMessage.cs new file mode 100644 index 0000000..3930918 --- /dev/null +++ b/CryptoExchange.Net/Objects/Sockets/StreamMessage.cs @@ -0,0 +1,67 @@ +using System; +using System.Collections.Generic; +using System.Data.Common; +using System.IO; +using System.Text; +using System.Threading.Tasks; +using CryptoExchange.Net.Sockets; + +namespace CryptoExchange.Net.Objects.Sockets +{ + /// + /// A message received from a stream + /// + public class StreamMessage : IDisposable + { + /// + /// The connection it was received on + /// + public SocketConnection Connection { get; } + /// + /// The data stream + /// + public MemoryStream Stream { get; } + /// + /// Receive timestamp + /// + public DateTime Timestamp { get; set; } + + private Dictionary _casted; + + /// + /// Get the data from the memory in specified type using the converter. If this type has been resolved before it will use that instead + /// + /// + /// + /// + public T Get(Func converter) + { + if (_casted.TryGetValue(typeof(T), out var casted)) + return (T)casted; + + var result = converter(Stream); + _casted.Add(typeof(T), result!); + Stream.Position = 0; + return result; + } + + public void Dispose() + { + Stream.Dispose(); + } + + /// + /// ctor + /// + /// + /// + /// + public StreamMessage(SocketConnection connection, MemoryStream stream, DateTime timestamp) + { + Connection = connection; + Stream = stream; + Timestamp = timestamp; + _casted = new Dictionary(); + } + } +} diff --git a/CryptoExchange.Net/Sockets/UpdateSubscription.cs b/CryptoExchange.Net/Objects/Sockets/UpdateSubscription.cs similarity index 93% rename from CryptoExchange.Net/Sockets/UpdateSubscription.cs rename to CryptoExchange.Net/Objects/Sockets/UpdateSubscription.cs index 64cf082..ea23216 100644 --- a/CryptoExchange.Net/Sockets/UpdateSubscription.cs +++ b/CryptoExchange.Net/Objects/Sockets/UpdateSubscription.cs @@ -1,8 +1,9 @@ using CryptoExchange.Net.Objects; +using CryptoExchange.Net.Sockets; using System; using System.Threading.Tasks; -namespace CryptoExchange.Net.Sockets +namespace CryptoExchange.Net.Objects.Sockets { /// /// Subscription to a data stream @@ -10,7 +11,7 @@ namespace CryptoExchange.Net.Sockets public class UpdateSubscription { private readonly SocketConnection _connection; - private readonly SocketSubscription _subscription; + private readonly SocketSubscriptionListener _subscription; /// /// Event when the connection is lost. The socket will automatically reconnect when possible. @@ -83,12 +84,12 @@ namespace CryptoExchange.Net.Sockets /// /// The socket connection the subscription is on /// The subscription - public UpdateSubscription(SocketConnection connection, SocketSubscription subscription) + public UpdateSubscription(SocketConnection connection, SocketSubscriptionListener subscription) { - this._connection = connection; - this._subscription = subscription; + _connection = connection; + _subscription = subscription; } - + /// /// Close the subscription /// diff --git a/CryptoExchange.Net/Sockets/WebSocketParameters.cs b/CryptoExchange.Net/Objects/Sockets/WebSocketParameters.cs similarity index 98% rename from CryptoExchange.Net/Sockets/WebSocketParameters.cs rename to CryptoExchange.Net/Objects/Sockets/WebSocketParameters.cs index a20b296..e65481f 100644 --- a/CryptoExchange.Net/Sockets/WebSocketParameters.cs +++ b/CryptoExchange.Net/Objects/Sockets/WebSocketParameters.cs @@ -5,7 +5,7 @@ using System.Collections.Generic; using System.Text; using System.Threading.Tasks; -namespace CryptoExchange.Net.Sockets +namespace CryptoExchange.Net.Objects.Sockets { /// /// Parameters for a websocket diff --git a/CryptoExchange.Net/OrderBook/SymbolOrderBook.cs b/CryptoExchange.Net/OrderBook/SymbolOrderBook.cs index d6404f4..5d019d2 100644 --- a/CryptoExchange.Net/OrderBook/SymbolOrderBook.cs +++ b/CryptoExchange.Net/OrderBook/SymbolOrderBook.cs @@ -9,7 +9,7 @@ using System.Threading.Tasks; using CryptoExchange.Net.Interfaces; using CryptoExchange.Net.Objects; using CryptoExchange.Net.Objects.Options; -using CryptoExchange.Net.Sockets; +using CryptoExchange.Net.Objects.Sockets; using Microsoft.Extensions.Logging; namespace CryptoExchange.Net.OrderBook diff --git a/CryptoExchange.Net/ParsingUtils.cs b/CryptoExchange.Net/ParsingUtils.cs new file mode 100644 index 0000000..e4c8953 --- /dev/null +++ b/CryptoExchange.Net/ParsingUtils.cs @@ -0,0 +1,36 @@ +using Newtonsoft.Json; +using Newtonsoft.Json.Linq; +using System.IO; +using System.Text; + +namespace CryptoExchange.Net +{ + /// + /// Parsing utility methods + /// + public static class ParsingUtils + { + /// + /// Read the stream as string + /// + /// + /// + public static string GetString(Stream stream) + { + using var reader = new StreamReader(stream, Encoding.UTF8, false, (int)stream.Length, true); + return reader.ReadToEnd(); + } + + /// + /// Read the stream and parse to JToken + /// + /// + /// + public static JToken GetJToken(Stream x) + { + using var sr = new StreamReader(x, Encoding.UTF8, false, (int)x.Length, true); + using var jsonTextReader = new JsonTextReader(sr); + return JToken.Load(jsonTextReader); + } + } +} diff --git a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs index 3c9ab9b..0c73aaf 100644 --- a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs +++ b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs @@ -1,5 +1,6 @@ using CryptoExchange.Net.Interfaces; using CryptoExchange.Net.Objects; +using CryptoExchange.Net.Objects.Sockets; using Microsoft.Extensions.Logging; using System; using System.Collections.Concurrent; @@ -44,7 +45,6 @@ namespace CryptoExchange.Net.Sockets private ProcessState _processState; private DateTime _lastReconnectTime; - /// /// Received messages, the size and the timstamp /// @@ -101,7 +101,7 @@ namespace CryptoExchange.Net.Sockets public event Action? OnClose; /// - public event Action? OnMessage; + public event Func? OnStreamMessage; /// public event Action? OnRequestSent; @@ -521,7 +521,7 @@ namespace CryptoExchange.Net.Sockets { // Received a complete message and it's not multi part _logger.Log(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in single message"); - HandleMessage(buffer.Array!, buffer.Offset, receiveResult.Count, receiveResult.MessageType); + await ProcessByteData(new MemoryStream(buffer.Array, buffer.Offset, receiveResult.Count), receiveResult.MessageType).ConfigureAwait(false); } else { @@ -555,11 +555,13 @@ namespace CryptoExchange.Net.Sockets { // Reassemble complete message from memory stream _logger.Log(LogLevel.Trace, $"Socket {Id} reassembled message of {memoryStream!.Length} bytes"); - HandleMessage(memoryStream!.ToArray(), 0, (int)memoryStream.Length, receiveResult.MessageType); + await ProcessByteData(memoryStream, receiveResult.MessageType).ConfigureAwait(false); memoryStream.Dispose(); } else + { _logger.Log(LogLevel.Trace, $"Socket {Id} discarding incomplete message of {memoryStream!.Length} bytes"); + } } } } @@ -578,58 +580,10 @@ namespace CryptoExchange.Net.Sockets } } - /// - /// Handles the message - /// - /// - /// - /// - /// - private void HandleMessage(byte[] data, int offset, int count, WebSocketMessageType messageType) + private async Task ProcessByteData(MemoryStream memoryStream, WebSocketMessageType messageType) { - string strData; - if (messageType == WebSocketMessageType.Binary) - { - if (Parameters.DataInterpreterBytes == null) - throw new Exception("Byte interpreter not set while receiving byte data"); - - try - { - var relevantData = new byte[count]; - Array.Copy(data, offset, relevantData, 0, count); - strData = Parameters.DataInterpreterBytes(relevantData); - } - catch(Exception e) - { - _logger.Log(LogLevel.Error, $"Socket {Id} unhandled exception during byte data interpretation: " + e.ToLogString()); - return; - } - } - else - strData = Parameters.Encoding.GetString(data, offset, count); - - if (Parameters.DataInterpreterString != null) - { - try - { - strData = Parameters.DataInterpreterString(strData); - } - catch(Exception e) - { - _logger.Log(LogLevel.Error, $"Socket {Id} unhandled exception during string data interpretation: " + e.ToLogString()); - return; - } - } - - try - { - LastActionTime = DateTime.UtcNow; - OnMessage?.Invoke(strData); - } - catch(Exception e) - { - _logger.Log(LogLevel.Error, $"Socket {Id} unhandled exception during message processing: " + e.ToLogString()); - } + if (OnStreamMessage != null) + await OnStreamMessage.Invoke(memoryStream).ConfigureAwait(false); } /// diff --git a/CryptoExchange.Net/Sockets/MessageEvent.cs b/CryptoExchange.Net/Sockets/MessageEvent.cs deleted file mode 100644 index ae12e0f..0000000 --- a/CryptoExchange.Net/Sockets/MessageEvent.cs +++ /dev/null @@ -1,46 +0,0 @@ -using Newtonsoft.Json.Linq; -using System; - -namespace CryptoExchange.Net.Sockets -{ - /// - /// Message received event - /// - public class MessageEvent - { - /// - /// The connection the message was received on - /// - public SocketConnection Connection { get; set; } - - /// - /// The json object of the data - /// - public JToken JsonData { get; set; } - - /// - /// The originally received string data - /// - public string? OriginalData { get; set; } - - /// - /// The timestamp of when the data was received - /// - public DateTime ReceivedTimestamp { get; set; } - - /// - /// ctor - /// - /// - /// - /// - /// - public MessageEvent(SocketConnection connection, JToken jsonData, string? originalData, DateTime timestamp) - { - Connection = connection; - JsonData = jsonData; - OriginalData = originalData; - ReceivedTimestamp = timestamp; - } - } -} diff --git a/CryptoExchange.Net/Sockets/QueryActor.cs b/CryptoExchange.Net/Sockets/QueryActor.cs new file mode 100644 index 0000000..bae03a4 --- /dev/null +++ b/CryptoExchange.Net/Sockets/QueryActor.cs @@ -0,0 +1,45 @@ +using CryptoExchange.Net.Objects; +using CryptoExchange.Net.Objects.Sockets; + +namespace CryptoExchange.Net.Sockets +{ + /// + /// Query + /// + public abstract class QueryActor + { + /// + /// The query request + /// + public object Query { get; set; } + + /// + /// If this is a private request + /// + public bool Authenticated { get; } + + /// + /// Check if the message is the response to the query + /// + /// + /// + public abstract bool MessageMatchesQuery(StreamMessage message); + /// + /// Handle the query response + /// + /// + /// + public abstract CallResult HandleResponse(StreamMessage message); + + /// + /// ctor + /// + /// + /// + public QueryActor(object query, bool authenticated) + { + Authenticated = authenticated; + Query = query; + } + } +} diff --git a/CryptoExchange.Net/Sockets/SocketConnection.cs b/CryptoExchange.Net/Sockets/SocketConnection.cs index 49f6917..74df18d 100644 --- a/CryptoExchange.Net/Sockets/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/SocketConnection.cs @@ -9,6 +9,8 @@ using Newtonsoft.Json.Linq; using Microsoft.Extensions.Logging; using CryptoExchange.Net.Objects; using System.Net.WebSockets; +using System.IO; +using CryptoExchange.Net.Objects.Sockets; namespace CryptoExchange.Net.Sockets { @@ -45,7 +47,7 @@ namespace CryptoExchange.Net.Sockets /// /// Unhandled message event /// - public event Action? UnhandledMessage; + public event Action? UnhandledMessage; /// /// The amount of subscriptions on this connection @@ -59,7 +61,7 @@ namespace CryptoExchange.Net.Sockets /// /// Get a copy of the current subscriptions /// - public SocketSubscription[] Subscriptions + public SocketSubscriptionListener[] Subscriptions { get { @@ -149,13 +151,12 @@ namespace CryptoExchange.Net.Sockets } private bool _pausedActivity; - private readonly List _subscriptions; + private readonly List _subscriptions; + private readonly List _messageListeners; + private readonly object _subscriptionLock = new(); - private readonly ILogger _logger; - private readonly List _pendingRequests; - private SocketStatus _status; /// @@ -177,11 +178,11 @@ namespace CryptoExchange.Net.Sockets Tag = tag; Properties = new Dictionary(); - _pendingRequests = new List(); - _subscriptions = new List(); + _messageListeners = new List(); + _subscriptions = new List(); _socket = socket; - _socket.OnMessage += HandleMessage; + _socket.OnStreamMessage += HandleStreamMessage; _socket.OnRequestSent += HandleRequestSent; _socket.OnOpen += HandleOpen; _socket.OnClose += HandleClose; @@ -247,12 +248,12 @@ namespace CryptoExchange.Net.Sockets protected virtual async void HandleReconnected() { Status = SocketStatus.Resubscribing; - lock (_pendingRequests) + lock (_messageListeners) { - foreach (var pendingRequest in _pendingRequests.ToList()) + foreach (var pendingRequest in _messageListeners.OfType().ToList()) { pendingRequest.Fail(); - _pendingRequests.Remove(pendingRequest); + _messageListeners.Remove(pendingRequest); } } @@ -292,8 +293,8 @@ namespace CryptoExchange.Net.Sockets protected virtual void HandleRequestSent(int requestId) { PendingRequest pendingRequest; - lock (_pendingRequests) - pendingRequest = _pendingRequests.SingleOrDefault(p => p.Id == requestId); + lock (_messageListeners) + pendingRequest = _messageListeners.OfType().SingleOrDefault(p => p.Id == requestId); if (pendingRequest == null) { @@ -305,86 +306,67 @@ namespace CryptoExchange.Net.Sockets } /// - /// Process a message received by the socket + /// Handle a message /// - /// The received data - protected virtual void HandleMessage(string data) + /// + /// + protected virtual async Task HandleStreamMessage(MemoryStream stream) { var timestamp = DateTime.UtcNow; - _logger.Log(LogLevel.Trace, $"Socket {SocketId} received data: " + data); - if (string.IsNullOrEmpty(data)) return; - - var tokenData = data.ToJToken(_logger); - if (tokenData == null) - { - data = $"\"{data}\""; - tokenData = data.ToJToken(_logger); - if (tokenData == null) - return; - } - + var streamMessage = new StreamMessage(this, stream, timestamp); var handledResponse = false; - - // Remove any timed out requests - PendingRequest[] requests; - lock (_pendingRequests) + SocketSubscriptionListener? currentSubscription = null; + TimeSpan userCodeDuration = TimeSpan.Zero; + foreach (var listener in _messageListeners.OrderByDescending(x => x.Priority).ToList()) // LOCK { - // Remove only timed out requests after 5 minutes have passed so we can still process any - // message coming in after the request timeout - _pendingRequests.RemoveAll(r => r.Completed && DateTime.UtcNow - r.RequestTimestamp > TimeSpan.FromMinutes(5)); - requests = _pendingRequests.ToArray(); - } - - // Check if this message is an answer on any pending requests - foreach (var pendingRequest in requests) - { - if (pendingRequest.CheckData(tokenData)) + if (listener.MessageMatches(streamMessage)) { - lock (_pendingRequests) - _pendingRequests.Remove(pendingRequest); - - if (pendingRequest.Completed) + if (listener is PendingRequest pendingRequest) { - // Answer to a timed out request, unsub if it is a subscription request - if (pendingRequest.Subscription != null) + lock (_messageListeners) + _messageListeners.Remove(pendingRequest); + + if (pendingRequest.Completed) { - _logger.Log(LogLevel.Warning, $"Socket {SocketId} Received subscription info after request timed out; unsubscribing. Consider increasing the RequestTimeout"); - _ = ApiClient.UnsubscribeAsync(this, pendingRequest.Subscription).ConfigureAwait(false); + // Answer to a timed out request, unsub if it is a subscription request + if (pendingRequest.Subscription != null) + { + _logger.Log(LogLevel.Warning, $"Socket {SocketId} Received subscription info after request timed out; unsubscribing. Consider increasing the RequestTimeout"); + _ = UnsubscribeAsync(pendingRequest.Subscription).ConfigureAwait(false); + } } + else + { + _logger.Log(LogLevel.Trace, $"Socket {SocketId} - msg {pendingRequest.Id} - received data matched to pending request"); + await pendingRequest.ProcessAsync(streamMessage).ConfigureAwait(false); + } + + if (!ApiClient.ContinueOnQueryResponse) + return; + + handledResponse = true; + break; } - else + else if (listener is SocketSubscriptionListener subscription) { - _logger.Log(LogLevel.Trace, $"Socket {SocketId} - msg {pendingRequest.Id} - received data matched to pending request"); - pendingRequest.Succeed(tokenData); + currentSubscription = subscription; + handledResponse = true; + var userSw = Stopwatch.StartNew(); + await subscription.ProcessAsync(streamMessage).ConfigureAwait(false); + userSw.Stop(); + userCodeDuration = userSw.Elapsed; + break; } - - if (!ApiClient.ContinueOnQueryResponse) - return; - - handledResponse = true; - break; } } - // Message was not a request response, check data handlers - var messageEvent = new MessageEvent(this, tokenData, ApiClient.OutputOriginalData ? data : null, timestamp); - var (handled, userProcessTime, subscription) = HandleData(messageEvent); - if (!handled && !handledResponse) + if (!handledResponse) { if (!ApiClient.UnhandledMessageExpected) - _logger.Log(LogLevel.Warning, $"Socket {SocketId} Message not handled: " + tokenData); - UnhandledMessage?.Invoke(tokenData); + _logger.Log(LogLevel.Warning, $"Socket {SocketId} Message not handled: " + streamMessage.Get(ParsingUtils.GetString)); + UnhandledMessage?.Invoke(streamMessage); } - - var total = DateTime.UtcNow - timestamp; - if (userProcessTime.TotalMilliseconds > 500) - { - _logger.Log(LogLevel.Debug, $"Socket {SocketId}{(subscription == null ? "" : " subscription " + subscription!.Id)} message processing slow ({(int)total.TotalMilliseconds}ms, {(int)userProcessTime.TotalMilliseconds}ms user code), consider offloading data handling to another thread. " + - "Data from this socket may arrive late or not at all if message processing is continuously slow."); - } - - _logger.Log(LogLevel.Trace, $"Socket {SocketId}{(subscription == null ? "" : " subscription " + subscription!.Id)} message processed in {(int)total.TotalMilliseconds}ms ({(int)userProcessTime.TotalMilliseconds}ms user code)"); - } + } /// /// Connect the websocket @@ -434,7 +416,7 @@ namespace CryptoExchange.Net.Sockets /// /// Subscription to close /// - public async Task CloseAsync(SocketSubscription subscription) + public async Task CloseAsync(SocketSubscriptionListener subscription) { lock (_subscriptionLock) { @@ -452,7 +434,7 @@ namespace CryptoExchange.Net.Sockets subscription.CancellationTokenRegistration.Value.Dispose(); if (subscription.Confirmed && _socket.IsOpen) - await ApiClient.UnsubscribeAsync(this, subscription).ConfigureAwait(false); + await UnsubscribeAsync(subscription).ConfigureAwait(false); bool shouldCloseConnection; lock (_subscriptionLock) @@ -475,7 +457,10 @@ namespace CryptoExchange.Net.Sockets } lock (_subscriptionLock) + { + _messageListeners.Remove(subscription); _subscriptions.Remove(subscription); + } } /// @@ -491,7 +476,7 @@ namespace CryptoExchange.Net.Sockets /// Add a subscription to this connection /// /// - public bool AddSubscription(SocketSubscription subscription) + public bool AddSubscription(SocketSubscriptionListener subscription) { lock (_subscriptionLock) { @@ -499,7 +484,9 @@ namespace CryptoExchange.Net.Sockets return false; _subscriptions.Add(subscription); - if(subscription.UserSubscription) + _messageListeners.Add(subscription); + + if (subscription.UserSubscription) _logger.Log(LogLevel.Debug, $"Socket {SocketId} adding new subscription with id {subscription.Id}, total subscriptions on connection: {_subscriptions.Count(s => s.UserSubscription)}"); return true; } @@ -509,7 +496,7 @@ namespace CryptoExchange.Net.Sockets /// Get a subscription on this connection by id /// /// - public SocketSubscription? GetSubscription(int id) + public SocketSubscriptionListener? GetSubscription(int id) { lock (_subscriptionLock) return _subscriptions.SingleOrDefault(s => s.Id == id); @@ -520,66 +507,10 @@ namespace CryptoExchange.Net.Sockets /// /// Filter for a request /// - public SocketSubscription? GetSubscriptionByRequest(Func predicate) + public SocketSubscriptionListener? GetSubscriptionByRequest(Func predicate) { lock(_subscriptionLock) - return _subscriptions.SingleOrDefault(s => predicate(s.Request)); - } - - /// - /// Process data - /// - /// - /// True if the data was successfully handled - private (bool, TimeSpan, SocketSubscription?) HandleData(MessageEvent messageEvent) - { - SocketSubscription? currentSubscription = null; - try - { - var handled = false; - TimeSpan userCodeDuration = TimeSpan.Zero; - - // Loop the subscriptions to check if any of them signal us that the message is for them - List subscriptionsCopy; - lock (_subscriptionLock) - subscriptionsCopy = _subscriptions.ToList(); - - foreach (var subscription in subscriptionsCopy) - { - currentSubscription = subscription; - if (subscription.Request == null) - { - if (ApiClient.MessageMatchesHandler(this, messageEvent.JsonData, subscription.Identifier!)) - { - handled = true; - var userSw = Stopwatch.StartNew(); - subscription.MessageHandler(messageEvent); - userSw.Stop(); - userCodeDuration = userSw.Elapsed; - } - } - else - { - if (ApiClient.MessageMatchesHandler(this, messageEvent.JsonData, subscription.Request)) - { - handled = true; - messageEvent.JsonData = ApiClient.ProcessTokenData(messageEvent.JsonData); - var userSw = Stopwatch.StartNew(); - subscription.MessageHandler(messageEvent); - userSw.Stop(); - userCodeDuration = userSw.Elapsed; - } - } - } - - return (handled, userCodeDuration, currentSubscription); - } - catch (Exception ex) - { - _logger.Log(LogLevel.Error, $"Socket {SocketId} Exception during message processing\r\nException: {ex.ToLogString()}\r\nData: {messageEvent.JsonData}"); - currentSubscription?.InvokeExceptionHandler(ex); - return (false, TimeSpan.Zero, null); - } + return _subscriptions.SingleOrDefault(s => predicate(s.Subscription)); } /// @@ -589,15 +520,15 @@ namespace CryptoExchange.Net.Sockets /// The object to send /// The timeout for response /// Subscription if this is a subscribe request - /// The response handler, should return true if the received JToken was the response to the request + /// The response handler /// The weight of the message /// - public virtual async Task SendAndWaitAsync(T obj, TimeSpan timeout, SocketSubscription? subscription, int weight, Func handler) + public virtual async Task SendAndWaitAsync(T obj, TimeSpan timeout, SocketSubscriptionListener? subscription, int weight, Func handler) { var pending = new PendingRequest(ExchangeHelpers.NextId(), handler, timeout, subscription); - lock (_pendingRequests) + lock (_messageListeners) { - _pendingRequests.Add(pending); + _messageListeners.Add(pending); } var sendOk = Send(pending.Id, obj, weight); @@ -697,21 +628,21 @@ namespace CryptoExchange.Net.Sockets } // Get a list of all subscriptions on the socket - List subscriptionList = new List(); + List subscriptionList = new List(); lock (_subscriptionLock) { foreach (var subscription in _subscriptions) { - if (subscription.Request != null) + if (subscription.Subscription != null) subscriptionList.Add(subscription); else subscription.Confirmed = true; } } - foreach(var subscription in subscriptionList.Where(s => s.Request != null)) + foreach(var subscription in subscriptionList.Where(s => s.Subscription != null)) { - var result = await ApiClient.RevitalizeRequestAsync(subscription.Request!).ConfigureAwait(false); + var result = await ApiClient.RevitalizeRequestAsync(subscription.Subscription!).ConfigureAwait(false); if (!result) { _logger.Log(LogLevel.Warning, $"Socket {SocketId} Failed request revitalization: " + result.Error); @@ -727,7 +658,7 @@ namespace CryptoExchange.Net.Sockets var taskList = new List>>(); foreach (var subscription in subscriptionList.Skip(i).Take(ApiClient.ClientOptions.MaxConcurrentResubscriptionsPerSocket)) - taskList.Add(ApiClient.SubscribeAndWaitAsync(this, subscription.Request!, subscription)); + taskList.Add(ApiClient.SubscribeAndWaitAsync(this, subscription.Subscription!, subscription)); await Task.WhenAll(taskList).ConfigureAwait(false); if (taskList.Any(t => !t.Result.Success)) @@ -744,17 +675,26 @@ namespace CryptoExchange.Net.Sockets return new CallResult(true); } - internal async Task UnsubscribeAsync(SocketSubscription socketSubscription) + internal async Task UnsubscribeAsync(SocketSubscriptionListener socketSubscription) { - await ApiClient.UnsubscribeAsync(this, socketSubscription).ConfigureAwait(false); + var unsubscribeRequest = socketSubscription.Subscription?.GetUnsubscribeRequest(); + if (unsubscribeRequest != null) + { + await SendAndWaitAsync(unsubscribeRequest, TimeSpan.FromSeconds(10), socketSubscription, 0, x => + { + var (matches, result) = socketSubscription.Subscription!.MessageMatchesUnsubscribeRequest(x); + // TODO check result? + return matches; + }).ConfigureAwait(false); + } } - internal async Task> ResubscribeAsync(SocketSubscription socketSubscription) + internal async Task> ResubscribeAsync(SocketSubscriptionListener socketSubscription) { if (!_socket.IsOpen) return new CallResult(new UnknownError("Socket is not connected")); - return await ApiClient.SubscribeAndWaitAsync(this, socketSubscription.Request!, socketSubscription).ConfigureAwait(false); + return await ApiClient.SubscribeAndWaitAsync(this, socketSubscription.Subscription!, socketSubscription).ConfigureAwait(false); } /// @@ -793,3 +733,4 @@ namespace CryptoExchange.Net.Sockets } } } + diff --git a/CryptoExchange.Net/Sockets/SubscriptionActor.cs b/CryptoExchange.Net/Sockets/SubscriptionActor.cs new file mode 100644 index 0000000..12967ac --- /dev/null +++ b/CryptoExchange.Net/Sockets/SubscriptionActor.cs @@ -0,0 +1,114 @@ +using CryptoExchange.Net.Interfaces; +using CryptoExchange.Net.Objects; +using CryptoExchange.Net.Objects.Sockets; +using Microsoft.Extensions.Logging; +using Newtonsoft.Json; +using System.IO; +using System.Text; +using System.Threading.Tasks; + +namespace CryptoExchange.Net.Sockets +{ + /// + /// Subscription base + /// + public abstract class SubscriptionActor + { + private bool _outputOriginalData; + + /// + /// Logger + /// + protected readonly ILogger _logger; + + /// + /// If the subscription is a private subscription and needs authentication + /// + public bool Authenticated { get; } + + /// + /// ctor + /// + /// + /// + /// + public SubscriptionActor(ILogger logger, ISocketApiClient apiClient, bool authenticated) + { + _logger = logger; + _outputOriginalData = apiClient.ApiOptions.OutputOriginalData ?? apiClient.ClientOptions.OutputOriginalData; + Authenticated = authenticated; + } + + /// + /// Get the subscribe object to send when subscribing + /// + /// + public abstract object? GetSubscribeRequest(); + /// + /// Check if the message is the response to the subscribe request + /// + /// + /// + public abstract (bool, CallResult?) MessageMatchesSubscribeRequest(StreamMessage message); + + /// + /// Get the unsubscribe object to send when unsubscribing + /// + /// + public abstract object? GetUnsubscribeRequest(); + /// + /// Check if the message is the response to the unsubscribe request + /// + /// + /// + public abstract (bool, CallResult?) MessageMatchesUnsubscribeRequest(StreamMessage message); + + /// + /// Check if the message is an update for this subscription + /// + /// + /// + public abstract bool MessageMatchesSubscription(StreamMessage message); + /// + /// Handle the update message + /// + /// + /// + public abstract Task HandleEventAsync(StreamMessage message); + + /// + /// Create a data event + /// + /// + /// + /// + /// + /// + /// + protected DataEvent CreateDataEvent(T obj, StreamMessage message, string? topic = null, SocketUpdateType? type = null) + { + string? originalData = null; + if (_outputOriginalData) + originalData = message.Get(ParsingUtils.GetString); + + return new DataEvent(obj, topic, originalData, message.Timestamp, type); + } + + /// + /// Deserialize the message to an object using Json.Net + /// + /// + /// + /// + /// + protected Task> DeserializeAsync(StreamMessage message, JsonSerializerSettings settings) + { + var serializer = JsonSerializer.Create(settings); + using var sr = new StreamReader(message.Stream, Encoding.UTF8, false, (int)message.Stream.Length, true); + using var jsonTextReader = new JsonTextReader(sr); + var result = serializer.Deserialize(jsonTextReader); + message.Stream.Position = 0; + return Task.FromResult(new CallResult(result!)); + } + } +} diff --git a/CryptoExchange.Net/Sockets/SystemSubscription.cs b/CryptoExchange.Net/Sockets/SystemSubscription.cs new file mode 100644 index 0000000..ddcc142 --- /dev/null +++ b/CryptoExchange.Net/Sockets/SystemSubscription.cs @@ -0,0 +1,33 @@ +using CryptoExchange.Net.Interfaces; +using CryptoExchange.Net.Objects; +using CryptoExchange.Net.Objects.Sockets; +using Microsoft.Extensions.Logging; +using System; + +namespace CryptoExchange.Net.Sockets +{ + /// + /// A system subscription + /// + public abstract class SystemSubscription : SubscriptionActor + { + /// + /// ctor + /// + /// + /// + public SystemSubscription(ILogger logger, ISocketApiClient socketApiClient, bool authenticated = false) : base(logger, socketApiClient, authenticated) + { + } + + /// + public override object? GetSubscribeRequest() => null; + /// + public override (bool, CallResult?) MessageMatchesSubscribeRequest(StreamMessage message) => throw new NotImplementedException(); + + /// + public override object? GetUnsubscribeRequest() => null; + /// + public override (bool, CallResult?) MessageMatchesUnsubscribeRequest(StreamMessage message) => throw new NotImplementedException(); + } +} diff --git a/CryptoExchange.Net/Sockets/WebsocketFactory.cs b/CryptoExchange.Net/Sockets/WebsocketFactory.cs index fd1606b..286d37d 100644 --- a/CryptoExchange.Net/Sockets/WebsocketFactory.cs +++ b/CryptoExchange.Net/Sockets/WebsocketFactory.cs @@ -1,4 +1,5 @@ using CryptoExchange.Net.Interfaces; +using CryptoExchange.Net.Objects.Sockets; using Microsoft.Extensions.Logging; namespace CryptoExchange.Net.Sockets