From cff3863373acc1d3cd4e4e6619b6fc5577011aaa Mon Sep 17 00:00:00 2001 From: JKorf <jankorf91@gmail.com> Date: Sun, 22 Oct 2023 21:42:03 +0200 Subject: [PATCH] wip --- .../SymbolOrderBookTests.cs | 2 +- .../TestImplementations/TestSocketClient.cs | 1 + .../Clients/BaseSocketClient.cs | 2 +- CryptoExchange.Net/Clients/SocketApiClient.cs | 226 ++++++---------- .../Interfaces/ISocketApiClient.cs | 12 +- .../Interfaces/ISocketClient.cs | 2 +- .../Interfaces/IStreamMessageListener.cs | 12 + CryptoExchange.Net/Interfaces/IWebsocket.cs | 3 +- .../Interfaces/IWebsocketFactory.cs | 2 +- .../{ => Objects}/Sockets/DataEvent.cs | 2 +- .../{ => Objects}/Sockets/PendingRequest.cs | 45 ++-- .../Sockets/SocketSubscription.cs | 81 +++--- .../Objects/Sockets/StreamMessage.cs | 67 +++++ .../Sockets/UpdateSubscription.cs | 13 +- .../Sockets/WebSocketParameters.cs | 2 +- .../OrderBook/SymbolOrderBook.cs | 2 +- CryptoExchange.Net/ParsingUtils.cs | 36 +++ .../Sockets/CryptoExchangeWebSocketClient.cs | 64 +---- CryptoExchange.Net/Sockets/MessageEvent.cs | 46 ---- CryptoExchange.Net/Sockets/QueryActor.cs | 45 ++++ .../Sockets/SocketConnection.cs | 245 +++++++----------- .../Sockets/SubscriptionActor.cs | 114 ++++++++ .../Sockets/SystemSubscription.cs | 33 +++ .../Sockets/WebsocketFactory.cs | 1 + 24 files changed, 579 insertions(+), 479 deletions(-) create mode 100644 CryptoExchange.Net/Interfaces/IStreamMessageListener.cs rename CryptoExchange.Net/{ => Objects}/Sockets/DataEvent.cs (98%) rename CryptoExchange.Net/{ => Objects}/Sockets/PendingRequest.cs (54%) rename CryptoExchange.Net/{ => Objects}/Sockets/SocketSubscription.cs (53%) create mode 100644 CryptoExchange.Net/Objects/Sockets/StreamMessage.cs rename CryptoExchange.Net/{ => Objects}/Sockets/UpdateSubscription.cs (93%) rename CryptoExchange.Net/{ => Objects}/Sockets/WebSocketParameters.cs (98%) create mode 100644 CryptoExchange.Net/ParsingUtils.cs delete mode 100644 CryptoExchange.Net/Sockets/MessageEvent.cs create mode 100644 CryptoExchange.Net/Sockets/QueryActor.cs create mode 100644 CryptoExchange.Net/Sockets/SubscriptionActor.cs create mode 100644 CryptoExchange.Net/Sockets/SystemSubscription.cs diff --git a/CryptoExchange.Net.UnitTests/SymbolOrderBookTests.cs b/CryptoExchange.Net.UnitTests/SymbolOrderBookTests.cs index 825e435..ad6da83 100644 --- a/CryptoExchange.Net.UnitTests/SymbolOrderBookTests.cs +++ b/CryptoExchange.Net.UnitTests/SymbolOrderBookTests.cs @@ -5,8 +5,8 @@ using System.Threading.Tasks; using CryptoExchange.Net.Interfaces; using CryptoExchange.Net.Objects; using CryptoExchange.Net.Objects.Options; +using CryptoExchange.Net.Objects.Sockets; using CryptoExchange.Net.OrderBook; -using CryptoExchange.Net.Sockets; using NUnit.Framework; namespace CryptoExchange.Net.UnitTests diff --git a/CryptoExchange.Net.UnitTests/TestImplementations/TestSocketClient.cs b/CryptoExchange.Net.UnitTests/TestImplementations/TestSocketClient.cs index 54b551c..796213e 100644 --- a/CryptoExchange.Net.UnitTests/TestImplementations/TestSocketClient.cs +++ b/CryptoExchange.Net.UnitTests/TestImplementations/TestSocketClient.cs @@ -5,6 +5,7 @@ using CryptoExchange.Net.Authentication; using CryptoExchange.Net.Interfaces; using CryptoExchange.Net.Objects; using CryptoExchange.Net.Objects.Options; +using CryptoExchange.Net.Objects.Sockets; using CryptoExchange.Net.Sockets; using Microsoft.Extensions.Logging; using Moq; diff --git a/CryptoExchange.Net/Clients/BaseSocketClient.cs b/CryptoExchange.Net/Clients/BaseSocketClient.cs index a9c82bf..f74aa35 100644 --- a/CryptoExchange.Net/Clients/BaseSocketClient.cs +++ b/CryptoExchange.Net/Clients/BaseSocketClient.cs @@ -4,7 +4,7 @@ using System.Linq; using System.Text; using System.Threading.Tasks; using CryptoExchange.Net.Interfaces; -using CryptoExchange.Net.Sockets; +using CryptoExchange.Net.Objects.Sockets; using Microsoft.Extensions.Logging; namespace CryptoExchange.Net diff --git a/CryptoExchange.Net/Clients/SocketApiClient.cs b/CryptoExchange.Net/Clients/SocketApiClient.cs index db94163..699a3dc 100644 --- a/CryptoExchange.Net/Clients/SocketApiClient.cs +++ b/CryptoExchange.Net/Clients/SocketApiClient.cs @@ -1,6 +1,7 @@ using CryptoExchange.Net.Interfaces; using CryptoExchange.Net.Objects; using CryptoExchange.Net.Objects.Options; +using CryptoExchange.Net.Objects.Sockets; using CryptoExchange.Net.Sockets; using Microsoft.Extensions.Logging; using Newtonsoft.Json.Linq; @@ -9,6 +10,7 @@ using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; using System.Linq; +using System.Net.Sockets; using System.Text; using System.Threading; using System.Threading.Tasks; @@ -53,7 +55,7 @@ namespace CryptoExchange.Net /// <summary> /// Handlers for data from the socket which doesn't need to be forwarded to the caller. Ping or welcome messages for example. /// </summary> - protected Dictionary<string, Action<MessageEvent>> genericHandlers = new(); + protected List<SystemSubscription> genericHandlers = new(); /// <summary> /// The task that is sending periodic data on the websocket. Can be used for sending Ping messages every x seconds or similair. Not necesarry. @@ -144,6 +146,7 @@ namespace CryptoExchange.Net /// <param name="stringHandler">Handler for string data</param> protected void SetDataInterpreter(Func<byte[], string>? byteHandler, Func<string, string>? stringHandler) { + // TODO dataInterpreterBytes = byteHandler; dataInterpreterString = stringHandler; } @@ -152,15 +155,12 @@ namespace CryptoExchange.Net /// Connect to an url and listen for data on the BaseAddress /// </summary> /// <typeparam name="T">The type of the expected data</typeparam> - /// <param name="request">The optional request object to send, will be serialized to json</param> - /// <param name="identifier">The identifier to use, necessary if no request object is sent</param> - /// <param name="authenticated">If the subscription is to an authenticated endpoint</param> - /// <param name="dataHandler">The handler of update data</param> + /// <param name="subscriptionObject">The subscription</param> /// <param name="ct">Cancellation token for closing this subscription</param> /// <returns></returns> - protected virtual Task<CallResult<UpdateSubscription>> SubscribeAsync<T>(object? request, string? identifier, bool authenticated, Action<DataEvent<T>> dataHandler, CancellationToken ct) + protected virtual Task<CallResult<UpdateSubscription>> SubscribeAsync<T>(SubscriptionActor subscriptionObject, CancellationToken ct) { - return SubscribeAsync(BaseAddress, request, identifier, authenticated, dataHandler, ct); + return SubscribeAsync<T>(BaseAddress, subscriptionObject, ct); } /// <summary> @@ -168,19 +168,16 @@ namespace CryptoExchange.Net /// </summary> /// <typeparam name="T">The type of the expected data</typeparam> /// <param name="url">The URL to connect to</param> - /// <param name="request">The optional request object to send, will be serialized to json</param> - /// <param name="identifier">The identifier to use, necessary if no request object is sent</param> - /// <param name="authenticated">If the subscription is to an authenticated endpoint</param> - /// <param name="dataHandler">The handler of update data</param> + /// <param name="subscriptionObject">The subscription</param> /// <param name="ct">Cancellation token for closing this subscription</param> /// <returns></returns> - protected virtual async Task<CallResult<UpdateSubscription>> SubscribeAsync<T>(string url, object? request, string? identifier, bool authenticated, Action<DataEvent<T>> dataHandler, CancellationToken ct) + protected virtual async Task<CallResult<UpdateSubscription>> SubscribeAsync<T>(string url, SubscriptionActor subscriptionObject, CancellationToken ct) { if (_disposing) return new CallResult<UpdateSubscription>(new InvalidOperationError("Client disposed, can't subscribe")); SocketConnection socketConnection; - SocketSubscription? subscription; + SocketSubscriptionListener? subscription; var released = false; // Wait for a semaphore here, so we only connect 1 socket at a time. // This is necessary for being able to see if connections can be combined @@ -198,14 +195,14 @@ namespace CryptoExchange.Net while (true) { // Get a new or existing socket connection - var socketResult = await GetSocketConnection(url, authenticated).ConfigureAwait(false); + var socketResult = await GetSocketConnection(url, subscriptionObject.Authenticated).ConfigureAwait(false); if (!socketResult) return socketResult.As<UpdateSubscription>(null); socketConnection = socketResult.Data; // Add a subscription on the socket connection - subscription = AddSubscription(request, identifier, true, socketConnection, dataHandler, authenticated); + subscription = AddSubscription<T>(subscriptionObject, true, socketConnection); if (subscription == null) { _logger.Log(LogLevel.Trace, $"Socket {socketConnection.SocketId} failed to add subscription, retrying on different connection"); @@ -221,7 +218,7 @@ namespace CryptoExchange.Net var needsConnecting = !socketConnection.Connected; - var connectResult = await ConnectIfNeededAsync(socketConnection, authenticated).ConfigureAwait(false); + var connectResult = await ConnectIfNeededAsync(socketConnection, subscriptionObject.Authenticated).ConfigureAwait(false); if (!connectResult) return new CallResult<UpdateSubscription>(connectResult.Error!); @@ -240,6 +237,7 @@ namespace CryptoExchange.Net return new CallResult<UpdateSubscription>(new ServerError("Socket is paused")); } + var request = subscriptionObject.GetSubscribeRequest(); if (request != null) { // Send the request and wait for answer @@ -277,10 +275,16 @@ namespace CryptoExchange.Net /// <param name="request">The request to send, will be serialized to json</param> /// <param name="subscription">The subscription the request is for</param> /// <returns></returns> - protected internal virtual async Task<CallResult<bool>> SubscribeAndWaitAsync(SocketConnection socketConnection, object request, SocketSubscription subscription) + protected internal virtual async Task<CallResult<bool>> SubscribeAndWaitAsync(SocketConnection socketConnection, object request, SocketSubscriptionListener subscription) { - CallResult<object>? callResult = null; - await socketConnection.SendAndWaitAsync(request, ClientOptions.RequestTimeout, subscription, 1, data => HandleSubscriptionResponse(socketConnection, subscription, request, data, out callResult)).ConfigureAwait(false); + CallResult? callResult = null; + await socketConnection.SendAndWaitAsync(request, ClientOptions.RequestTimeout, subscription, 1, x => + { + var (matches, result) = subscription.Subscription!.MessageMatchesSubscribeRequest(x); + if (matches) + callResult = result; + return matches; + }).ConfigureAwait(false); if (callResult?.Success == true) { @@ -298,13 +302,12 @@ namespace CryptoExchange.Net /// Send a query on a socket connection to the BaseAddress and wait for the response /// </summary> /// <typeparam name="T">Expected result type</typeparam> - /// <param name="request">The request to send, will be serialized to json</param> - /// <param name="authenticated">If the query is to an authenticated endpoint</param> + /// <param name="query">The query</param> /// <param name="weight">Weight of the request</param> /// <returns></returns> - protected virtual Task<CallResult<T>> QueryAsync<T>(object request, bool authenticated, int weight = 1) + protected virtual Task<CallResult<T>> QueryAsync<T>(QueryActor query, int weight = 1) { - return QueryAsync<T>(BaseAddress, request, authenticated, weight); + return QueryAsync<T>(BaseAddress, query, weight); } /// <summary> @@ -313,10 +316,9 @@ namespace CryptoExchange.Net /// <typeparam name="T">The expected result type</typeparam> /// <param name="url">The url for the request</param> /// <param name="request">The request to send</param> - /// <param name="authenticated">Whether the socket should be authenticated</param> /// <param name="weight">Weight of the request</param> /// <returns></returns> - protected virtual async Task<CallResult<T>> QueryAsync<T>(string url, object request, bool authenticated, int weight = 1) + protected virtual async Task<CallResult<T>> QueryAsync<T>(string url, QueryActor request, int weight = 1) { if (_disposing) return new CallResult<T>(new InvalidOperationError("Client disposed, can't query")); @@ -326,7 +328,7 @@ namespace CryptoExchange.Net await semaphoreSlim.WaitAsync().ConfigureAwait(false); try { - var socketResult = await GetSocketConnection(url, authenticated).ConfigureAwait(false); + var socketResult = await GetSocketConnection(url, request.Authenticated).ConfigureAwait(false); if (!socketResult) return socketResult.As<T>(default); @@ -339,7 +341,7 @@ namespace CryptoExchange.Net released = true; } - var connectResult = await ConnectIfNeededAsync(socketConnection, authenticated).ConfigureAwait(false); + var connectResult = await ConnectIfNeededAsync(socketConnection, request.Authenticated).ConfigureAwait(false); if (!connectResult) return new CallResult<T>(connectResult.Error!); } @@ -366,16 +368,19 @@ namespace CryptoExchange.Net /// <param name="request">The request to send</param> /// <param name="weight">The weight of the query</param> /// <returns></returns> - protected virtual async Task<CallResult<T>> QueryAndWaitAsync<T>(SocketConnection socket, object request, int weight) + protected virtual async Task<CallResult<T>> QueryAndWaitAsync<T>(SocketConnection socket, QueryActor request, int weight) { var dataResult = new CallResult<T>(new ServerError("No response on query received")); - await socket.SendAndWaitAsync(request, ClientOptions.RequestTimeout, null, weight, data => + await socket.SendAndWaitAsync(request.Query, ClientOptions.RequestTimeout, null, weight, x => { - if (!HandleQueryResponse<T>(socket, request, data, out var callResult)) - return false; + var matches = request.MessageMatchesQuery(x); + if (matches) + { + request.HandleResponse(x); + return true; + } - dataResult = callResult; - return true; + return false; }).ConfigureAwait(false); return dataResult; @@ -402,16 +407,39 @@ namespace CryptoExchange.Net if (!authenticated || socket.Authenticated) return new CallResult<bool>(true); + return await AuthenticateSocketAsync(socket).ConfigureAwait(false); + } + + /// <summary> + /// Authenticate a socket connection + /// </summary> + /// <param name="socket">Socket to authenticate</param> + /// <returns></returns> + public virtual async Task<CallResult<bool>> AuthenticateSocketAsync(SocketConnection socket) + { _logger.Log(LogLevel.Debug, $"Socket {socket.SocketId} Attempting to authenticate"); - var result = await AuthenticateSocketAsync(socket).ConfigureAwait(false); - if (!result) + var authRequest = GetAuthenticationRequest(); + var authResult = new CallResult(new ServerError("No response from server")); + await socket.SendAndWaitAsync(authRequest.Query, ClientOptions.RequestTimeout, null, 1, x => + { + var matches = authRequest.MessageMatchesQuery(x); + if (matches) + { + authResult = authRequest.HandleResponse(x); + return true; + } + + return false; + }).ConfigureAwait(false); + + if (!authResult) { _logger.Log(LogLevel.Warning, $"Socket {socket.SocketId} authentication failed"); if (socket.Connected) await socket.CloseAsync().ConfigureAwait(false); - result.Error!.Message = "Authentication failed: " + result.Error.Message; - return new CallResult<bool>(result.Error); + authResult.Error!.Message = "Authentication failed: " + authResult.Error.Message; + return new CallResult<bool>(authResult.Error); } _logger.Log(LogLevel.Debug, $"Socket {socket.SocketId} authenticated"); @@ -420,129 +448,46 @@ namespace CryptoExchange.Net } /// <summary> - /// The socketConnection received data (the data JToken parameter). The implementation of this method should check if the received data is a response to the query that was send (the request parameter). - /// For example; A query is sent in a request message with an Id parameter with value 10. The socket receives data and calls this method to see if the data it received is an - /// anwser to any query that was done. The implementation of this method should check if the response.Id == request.Id to see if they match (assuming the api has some sort of Id tracking on messages, - /// if not some other method has be implemented to match the messages). - /// If the messages match, the callResult out parameter should be set with the deserialized data in the from of (T) and return true. + /// Should return the request which can be used to authenticate a socket connection /// </summary> - /// <typeparam name="T">The type of response that is expected on the query</typeparam> - /// <param name="socketConnection">The socket connection</param> - /// <param name="request">The request that a response is awaited for</param> - /// <param name="data">The message received from the server</param> - /// <param name="callResult">The interpretation (null if message wasn't a response to the request)</param> - /// <returns>True if the message was a response to the query</returns> - protected internal abstract bool HandleQueryResponse<T>(SocketConnection socketConnection, object request, JToken data, [NotNullWhen(true)] out CallResult<T>? callResult); - - /// <summary> - /// The socketConnection received data (the data JToken parameter). The implementation of this method should check if the received data is a response to the subscription request that was send (the request parameter). - /// For example; A subscribe request message is send with an Id parameter with value 10. The socket receives data and calls this method to see if the data it received is an - /// anwser to any subscription request that was done. The implementation of this method should check if the response.Id == request.Id to see if they match (assuming the api has some sort of Id tracking on messages, - /// if not some other method has be implemented to match the messages). - /// If the messages match, the callResult out parameter should be set with the deserialized data in the from of (T) and return true. - /// </summary> - /// <param name="socketConnection">The socket connection</param> - /// <param name="subscription">A subscription that waiting for a subscription response</param> - /// <param name="request">The request that the subscription sent</param> - /// <param name="data">The message received from the server</param> - /// <param name="callResult">The interpretation (null if message wasn't a response to the request)</param> - /// <returns>True if the message was a response to the subscription request</returns> - protected internal abstract bool HandleSubscriptionResponse(SocketConnection socketConnection, SocketSubscription subscription, object request, JToken data, out CallResult<object>? callResult); - - /// <summary> - /// Needs to check if a received message matches a handler by request. After subscribing data message will come in. These data messages need to be matched to a specific connection - /// to pass the correct data to the correct handler. The implementation of this method should check if the message received matches the subscribe request that was sent. - /// </summary> - /// <param name="socketConnection">The socket connection the message was recieved on</param> - /// <param name="message">The received data</param> - /// <param name="request">The subscription request</param> - /// <returns>True if the message is for the subscription which sent the request</returns> - protected internal abstract bool MessageMatchesHandler(SocketConnection socketConnection, JToken message, object request); - - /// <summary> - /// Needs to check if a received message matches a handler by identifier. Generally used by GenericHandlers. For example; a generic handler is registered which handles ping messages - /// from the server. This method should check if the message received is a ping message and the identifer is the identifier of the GenericHandler - /// </summary> - /// <param name="socketConnection">The socket connection the message was recieved on</param> - /// <param name="message">The received data</param> - /// <param name="identifier">The string identifier of the handler</param> - /// <returns>True if the message is for the handler which has the identifier</returns> - protected internal abstract bool MessageMatchesHandler(SocketConnection socketConnection, JToken message, string identifier); - - /// <summary> - /// Needs to authenticate the socket so authenticated queries/subscriptions can be made on this socket connection - /// </summary> - /// <param name="socketConnection">The socket connection that should be authenticated</param> /// <returns></returns> - protected internal abstract Task<CallResult<bool>> AuthenticateSocketAsync(SocketConnection socketConnection); - - /// <summary> - /// Needs to unsubscribe a subscription, typically by sending an unsubscribe request. If multiple subscriptions per socket is not allowed this can just return since the socket will be closed anyway - /// </summary> - /// <param name="connection">The connection on which to unsubscribe</param> - /// <param name="subscriptionToUnsub">The subscription to unsubscribe</param> - /// <returns></returns> - protected internal abstract Task<bool> UnsubscribeAsync(SocketConnection connection, SocketSubscription subscriptionToUnsub); + protected internal abstract QueryActor GetAuthenticationRequest(); /// <summary> /// Optional handler to interpolate data before sending it to the handlers /// </summary> /// <param name="message"></param> /// <returns></returns> - protected internal virtual JToken ProcessTokenData(JToken message) - { - return message; - } + //protected internal virtual JToken ProcessTokenData(JToken message) + //{ + // return message; + //} /// <summary> /// Add a subscription to a connection /// </summary> /// <typeparam name="T">The type of data the subscription expects</typeparam> - /// <param name="request">The request of the subscription</param> - /// <param name="identifier">The identifier of the subscription (can be null if request param is used)</param> + /// <param name="subscriptionObject">The subscription</param> /// <param name="userSubscription">Whether or not this is a user subscription (counts towards the max amount of handlers on a socket)</param> /// <param name="connection">The socket connection the handler is on</param> - /// <param name="dataHandler">The handler of the data received</param> - /// <param name="authenticated">Whether the subscription needs authentication</param> /// <returns></returns> - protected virtual SocketSubscription? AddSubscription<T>(object? request, string? identifier, bool userSubscription, SocketConnection connection, Action<DataEvent<T>> dataHandler, bool authenticated) + protected virtual SocketSubscriptionListener? AddSubscription<T>(SubscriptionActor subscriptionObject, bool userSubscription, SocketConnection connection) { - void InternalHandler(MessageEvent messageEvent) - { - if (typeof(T) == typeof(string)) - { - var stringData = (T)Convert.ChangeType(messageEvent.JsonData.ToString(), typeof(T)); - dataHandler(new DataEvent<T>(stringData, null, OutputOriginalData ? messageEvent.OriginalData : null, messageEvent.ReceivedTimestamp, null)); - return; - } - - var desResult = Deserialize<T>(messageEvent.JsonData); - if (!desResult) - { - _logger.Log(LogLevel.Warning, $"Socket {connection.SocketId} Failed to deserialize data into type {typeof(T)}: {desResult.Error}"); - return; - } - - dataHandler(new DataEvent<T>(desResult.Data, null, OutputOriginalData ? messageEvent.OriginalData : null, messageEvent.ReceivedTimestamp, null)); - } - - var subscription = request == null - ? SocketSubscription.CreateForIdentifier(ExchangeHelpers.NextId(), identifier!, userSubscription, authenticated, InternalHandler) - : SocketSubscription.CreateForRequest(ExchangeHelpers.NextId(), request, userSubscription, authenticated, InternalHandler); + var subscription = new SocketSubscriptionListener(ExchangeHelpers.NextId(), subscriptionObject, userSubscription); if (!connection.AddSubscription(subscription)) return null; + return subscription; } /// <summary> - /// Adds a generic message handler. Used for example to reply to ping requests + /// Adds a system subscription. Used for example to reply to ping requests /// </summary> - /// <param name="identifier">The name of the request handler. Needs to be unique</param> - /// <param name="action">The action to execute when receiving a message for this handler (checked by <see cref="MessageMatchesHandler(SocketConnection, Newtonsoft.Json.Linq.JToken,string)"/>)</param> - protected void AddGenericHandler(string identifier, Action<MessageEvent> action) + /// <param name="systemSubscription">The subscription</param> + protected void AddSystemSubscription(SystemSubscription systemSubscription) { - genericHandlers.Add(identifier, action); - var subscription = SocketSubscription.CreateForIdentifier(ExchangeHelpers.NextId(), identifier, false, false, action); + genericHandlers.Add(systemSubscription); + var subscription = new SocketSubscriptionListener(ExchangeHelpers.NextId(), systemSubscription, false); foreach (var connection in socketConnections.Values) connection.AddSubscription(subscription); } @@ -614,9 +559,10 @@ namespace CryptoExchange.Net var socket = CreateSocket(connectionAddress.Data!); var socketConnection = new SocketConnection(_logger, this, socket, address); socketConnection.UnhandledMessage += HandleUnhandledMessage; - foreach (var kvp in genericHandlers) + + foreach (var systemHandler in genericHandlers) { - var handler = SocketSubscription.CreateForIdentifier(ExchangeHelpers.NextId(), kvp.Key, false, false, kvp.Value); + var handler = new SocketSubscriptionListener(ExchangeHelpers.NextId(), systemHandler, false); socketConnection.AddSubscription(handler); } @@ -626,8 +572,8 @@ namespace CryptoExchange.Net /// <summary> /// Process an unhandled message /// </summary> - /// <param name="token">The token that wasn't processed</param> - protected virtual void HandleUnhandledMessage(JToken token) + /// <param name="message">The message that wasn't processed</param> + protected virtual void HandleUnhandledMessage(StreamMessage message) { } @@ -731,7 +677,7 @@ namespace CryptoExchange.Net /// <returns></returns> public virtual async Task<bool> UnsubscribeAsync(int subscriptionId) { - SocketSubscription? subscription = null; + SocketSubscriptionListener? subscription = null; SocketConnection? connection = null; foreach (var socket in socketConnections.Values.ToList()) { diff --git a/CryptoExchange.Net/Interfaces/ISocketApiClient.cs b/CryptoExchange.Net/Interfaces/ISocketApiClient.cs index 7d1c3c6..8623514 100644 --- a/CryptoExchange.Net/Interfaces/ISocketApiClient.cs +++ b/CryptoExchange.Net/Interfaces/ISocketApiClient.cs @@ -1,6 +1,6 @@ using CryptoExchange.Net.Objects; using CryptoExchange.Net.Objects.Options; -using CryptoExchange.Net.Sockets; +using CryptoExchange.Net.Objects.Sockets; using System; using System.Threading.Tasks; @@ -26,7 +26,15 @@ namespace CryptoExchange.Net.Interfaces /// <summary> /// The factory for creating sockets. Used for unit testing /// </summary> - IWebsocketFactory SocketFactory { get; set; } + IWebsocketFactory SocketFactory { get; } + /// <summary> + /// Current client options + /// </summary> + SocketExchangeOptions ClientOptions { get; } + /// <summary> + /// Current API options + /// </summary> + SocketApiOptions ApiOptions { get; } /// <summary> /// Log the current state of connections and subscriptions /// </summary> diff --git a/CryptoExchange.Net/Interfaces/ISocketClient.cs b/CryptoExchange.Net/Interfaces/ISocketClient.cs index e2e7f97..108f7ed 100644 --- a/CryptoExchange.Net/Interfaces/ISocketClient.cs +++ b/CryptoExchange.Net/Interfaces/ISocketClient.cs @@ -3,7 +3,7 @@ using System.Threading.Tasks; using CryptoExchange.Net.Authentication; using CryptoExchange.Net.Objects; using CryptoExchange.Net.Objects.Options; -using CryptoExchange.Net.Sockets; +using CryptoExchange.Net.Objects.Sockets; namespace CryptoExchange.Net.Interfaces { diff --git a/CryptoExchange.Net/Interfaces/IStreamMessageListener.cs b/CryptoExchange.Net/Interfaces/IStreamMessageListener.cs new file mode 100644 index 0000000..ad77bd7 --- /dev/null +++ b/CryptoExchange.Net/Interfaces/IStreamMessageListener.cs @@ -0,0 +1,12 @@ +using CryptoExchange.Net.Objects.Sockets; +using System.Threading.Tasks; + +namespace CryptoExchange.Net.Interfaces +{ + internal interface IStreamMessageListener + { + int Priority { get; } + bool MessageMatches(StreamMessage message); + Task ProcessAsync(StreamMessage message); + } +} diff --git a/CryptoExchange.Net/Interfaces/IWebsocket.cs b/CryptoExchange.Net/Interfaces/IWebsocket.cs index 1c6bfc6..2d7145a 100644 --- a/CryptoExchange.Net/Interfaces/IWebsocket.cs +++ b/CryptoExchange.Net/Interfaces/IWebsocket.cs @@ -1,6 +1,7 @@ using CryptoExchange.Net.Objects; using CryptoExchange.Net.Sockets; using System; +using System.IO; using System.Security.Authentication; using System.Text; using System.Threading.Tasks; @@ -19,7 +20,7 @@ namespace CryptoExchange.Net.Interfaces /// <summary> /// Websocket message received event /// </summary> - event Action<string> OnMessage; + event Func<MemoryStream, Task> OnStreamMessage; /// <summary> /// Websocket sent event, RequestId as parameter /// </summary> diff --git a/CryptoExchange.Net/Interfaces/IWebsocketFactory.cs b/CryptoExchange.Net/Interfaces/IWebsocketFactory.cs index a8304da..3fe70c1 100644 --- a/CryptoExchange.Net/Interfaces/IWebsocketFactory.cs +++ b/CryptoExchange.Net/Interfaces/IWebsocketFactory.cs @@ -1,4 +1,4 @@ -using CryptoExchange.Net.Sockets; +using CryptoExchange.Net.Objects.Sockets; using Microsoft.Extensions.Logging; namespace CryptoExchange.Net.Interfaces diff --git a/CryptoExchange.Net/Sockets/DataEvent.cs b/CryptoExchange.Net/Objects/Sockets/DataEvent.cs similarity index 98% rename from CryptoExchange.Net/Sockets/DataEvent.cs rename to CryptoExchange.Net/Objects/Sockets/DataEvent.cs index d6e3151..b1b4058 100644 --- a/CryptoExchange.Net/Sockets/DataEvent.cs +++ b/CryptoExchange.Net/Objects/Sockets/DataEvent.cs @@ -1,7 +1,7 @@ using CryptoExchange.Net.Objects; using System; -namespace CryptoExchange.Net.Sockets +namespace CryptoExchange.Net.Objects.Sockets { /// <summary> /// An update received from a socket update subscription diff --git a/CryptoExchange.Net/Sockets/PendingRequest.cs b/CryptoExchange.Net/Objects/Sockets/PendingRequest.cs similarity index 54% rename from CryptoExchange.Net/Sockets/PendingRequest.cs rename to CryptoExchange.Net/Objects/Sockets/PendingRequest.cs index 6043d22..9e26257 100644 --- a/CryptoExchange.Net/Sockets/PendingRequest.cs +++ b/CryptoExchange.Net/Objects/Sockets/PendingRequest.cs @@ -1,27 +1,29 @@ -using CryptoExchange.Net.Objects; -using Newtonsoft.Json.Linq; +using CryptoExchange.Net.Interfaces; +using CryptoExchange.Net.Objects; using System; using System.Threading; +using System.Threading.Tasks; -namespace CryptoExchange.Net.Sockets +namespace CryptoExchange.Net.Objects.Sockets { - internal class PendingRequest + internal class PendingRequest : IStreamMessageListener { public int Id { get; set; } - public Func<JToken, bool> Handler { get; } - public JToken? Result { get; private set; } + public Func<StreamMessage, bool> MessageMatchesHandler { get; } public bool Completed { get; private set; } public AsyncResetEvent Event { get; } public DateTime RequestTimestamp { get; set; } public TimeSpan Timeout { get; } - public SocketSubscription? Subscription { get; } + public SocketSubscriptionListener? Subscription { get; } private CancellationTokenSource? _cts; - public PendingRequest(int id, Func<JToken, bool> handler, TimeSpan timeout, SocketSubscription? subscription) + public int Priority => 100; + + public PendingRequest(int id, Func<StreamMessage, bool> messageMatchesHandler, TimeSpan timeout, SocketSubscriptionListener? subscription) { Id = id; - Handler = handler; + MessageMatchesHandler = messageMatchesHandler; Event = new AsyncResetEvent(false, false); Timeout = timeout; RequestTimestamp = DateTime.UtcNow; @@ -35,23 +37,22 @@ namespace CryptoExchange.Net.Sockets _cts.Token.Register(Fail, false); } - public bool CheckData(JToken data) - { - return Handler(data); - } - - public bool Succeed(JToken data) - { - Result = data; - Completed = true; - Event.Set(); - return true; - } - public void Fail() { Completed = true; Event.Set(); } + + public bool MessageMatches(StreamMessage message) + { + return MessageMatchesHandler(message); + } + + public Task ProcessAsync(StreamMessage message) + { + Completed = true; + Event.Set(); + return Task.CompletedTask; + } } } diff --git a/CryptoExchange.Net/Sockets/SocketSubscription.cs b/CryptoExchange.Net/Objects/Sockets/SocketSubscription.cs similarity index 53% rename from CryptoExchange.Net/Sockets/SocketSubscription.cs rename to CryptoExchange.Net/Objects/Sockets/SocketSubscription.cs index 62e7b87..d662738 100644 --- a/CryptoExchange.Net/Sockets/SocketSubscription.cs +++ b/CryptoExchange.Net/Objects/Sockets/SocketSubscription.cs @@ -1,12 +1,15 @@ -using System; +using CryptoExchange.Net.Interfaces; +using CryptoExchange.Net.Sockets; +using System; using System.Threading; +using System.Threading.Tasks; -namespace CryptoExchange.Net.Sockets +namespace CryptoExchange.Net.Objects.Sockets { /// <summary> /// Socket subscription /// </summary> - public class SocketSubscription + public class SocketSubscriptionListener : IStreamMessageListener { /// <summary> /// Unique subscription id @@ -18,26 +21,16 @@ namespace CryptoExchange.Net.Sockets /// </summary> public event Action<Exception>? Exception; - /// <summary> - /// Message handlers for this subscription. Should return true if the message is handled and should not be distributed to the other handlers - /// </summary> - public Action<MessageEvent> MessageHandler { get; set; } - /// <summary> /// The request object send when subscribing on the server. Either this or the `Identifier` property should be set /// </summary> - public object? Request { get; set; } - - /// <summary> - /// The subscription identifier, used instead of a `Request` object to identify the subscription - /// </summary> - public string? Identifier { get; set; } + public SubscriptionActor Subscription { get; set; } /// <summary> /// Whether this is a user subscription or an internal listener /// </summary> public bool UserSubscription { get; set; } - + /// <summary> /// If the subscription has been confirmed to be subscribed by the server /// </summary> @@ -46,7 +39,7 @@ namespace CryptoExchange.Net.Sockets /// <summary> /// Whether authentication is needed for this subscription /// </summary> - public bool Authenticated { get; set; } + public bool Authenticated => Subscription.Authenticated; /// <summary> /// Whether we're closing this subscription and a socket connection shouldn't be kept open for it @@ -59,44 +52,17 @@ namespace CryptoExchange.Net.Sockets /// </summary> public CancellationTokenRegistration? CancellationTokenRegistration { get; set; } - private SocketSubscription(int id, object? request, string? identifier, bool userSubscription, bool authenticated, Action<MessageEvent> dataHandler) - { - Id = id; - UserSubscription = userSubscription; - MessageHandler = dataHandler; - Request = request; - Identifier = identifier; - Authenticated = authenticated; - } - /// <summary> - /// Create SocketSubscription for a subscribe request + /// ctor /// </summary> /// <param name="id"></param> /// <param name="request"></param> /// <param name="userSubscription"></param> - /// <param name="authenticated"></param> - /// <param name="dataHandler"></param> - /// <returns></returns> - public static SocketSubscription CreateForRequest(int id, object request, bool userSubscription, - bool authenticated, Action<MessageEvent> dataHandler) + public SocketSubscriptionListener(int id, SubscriptionActor request, bool userSubscription) { - return new SocketSubscription(id, request, null, userSubscription, authenticated, dataHandler); - } - - /// <summary> - /// Create SocketSubscription for an identifier - /// </summary> - /// <param name="id"></param> - /// <param name="identifier"></param> - /// <param name="userSubscription"></param> - /// <param name="authenticated"></param> - /// <param name="dataHandler"></param> - /// <returns></returns> - public static SocketSubscription CreateForIdentifier(int id, string identifier, bool userSubscription, - bool authenticated, Action<MessageEvent> dataHandler) - { - return new SocketSubscription(id, null, identifier, userSubscription, authenticated, dataHandler); + Id = id; + UserSubscription = userSubscription; + Subscription = request; } /// <summary> @@ -107,5 +73,24 @@ namespace CryptoExchange.Net.Sockets { Exception?.Invoke(e); } + + /// <summary> + /// The priority of this subscription + /// </summary> + public int Priority => Subscription is SystemSubscription ? 50 : 1; + + /// <summary> + /// Check if message matches the subscription + /// </summary> + /// <param name="message"></param> + /// <returns></returns> + public bool MessageMatches(StreamMessage message) => Subscription.MessageMatchesSubscription(message); + + /// <summary> + /// Process the message + /// </summary> + /// <param name="message"></param> + /// <returns></returns> + public Task ProcessAsync(StreamMessage message) => Subscription.HandleEventAsync(message); } } diff --git a/CryptoExchange.Net/Objects/Sockets/StreamMessage.cs b/CryptoExchange.Net/Objects/Sockets/StreamMessage.cs new file mode 100644 index 0000000..3930918 --- /dev/null +++ b/CryptoExchange.Net/Objects/Sockets/StreamMessage.cs @@ -0,0 +1,67 @@ +using System; +using System.Collections.Generic; +using System.Data.Common; +using System.IO; +using System.Text; +using System.Threading.Tasks; +using CryptoExchange.Net.Sockets; + +namespace CryptoExchange.Net.Objects.Sockets +{ + /// <summary> + /// A message received from a stream + /// </summary> + public class StreamMessage : IDisposable + { + /// <summary> + /// The connection it was received on + /// </summary> + public SocketConnection Connection { get; } + /// <summary> + /// The data stream + /// </summary> + public MemoryStream Stream { get; } + /// <summary> + /// Receive timestamp + /// </summary> + public DateTime Timestamp { get; set; } + + private Dictionary<Type, object> _casted; + + /// <summary> + /// Get the data from the memory in specified type using the converter. If this type has been resolved before it will use that instead + /// </summary> + /// <typeparam name="T"></typeparam> + /// <param name="converter"></param> + /// <returns></returns> + public T Get<T>(Func<Stream, T> converter) + { + if (_casted.TryGetValue(typeof(T), out var casted)) + return (T)casted; + + var result = converter(Stream); + _casted.Add(typeof(T), result!); + Stream.Position = 0; + return result; + } + + public void Dispose() + { + Stream.Dispose(); + } + + /// <summary> + /// ctor + /// </summary> + /// <param name="connection"></param> + /// <param name="stream"></param> + /// <param name="timestamp"></param> + public StreamMessage(SocketConnection connection, MemoryStream stream, DateTime timestamp) + { + Connection = connection; + Stream = stream; + Timestamp = timestamp; + _casted = new Dictionary<Type, object>(); + } + } +} diff --git a/CryptoExchange.Net/Sockets/UpdateSubscription.cs b/CryptoExchange.Net/Objects/Sockets/UpdateSubscription.cs similarity index 93% rename from CryptoExchange.Net/Sockets/UpdateSubscription.cs rename to CryptoExchange.Net/Objects/Sockets/UpdateSubscription.cs index 64cf082..ea23216 100644 --- a/CryptoExchange.Net/Sockets/UpdateSubscription.cs +++ b/CryptoExchange.Net/Objects/Sockets/UpdateSubscription.cs @@ -1,8 +1,9 @@ using CryptoExchange.Net.Objects; +using CryptoExchange.Net.Sockets; using System; using System.Threading.Tasks; -namespace CryptoExchange.Net.Sockets +namespace CryptoExchange.Net.Objects.Sockets { /// <summary> /// Subscription to a data stream @@ -10,7 +11,7 @@ namespace CryptoExchange.Net.Sockets public class UpdateSubscription { private readonly SocketConnection _connection; - private readonly SocketSubscription _subscription; + private readonly SocketSubscriptionListener _subscription; /// <summary> /// Event when the connection is lost. The socket will automatically reconnect when possible. @@ -83,12 +84,12 @@ namespace CryptoExchange.Net.Sockets /// </summary> /// <param name="connection">The socket connection the subscription is on</param> /// <param name="subscription">The subscription</param> - public UpdateSubscription(SocketConnection connection, SocketSubscription subscription) + public UpdateSubscription(SocketConnection connection, SocketSubscriptionListener subscription) { - this._connection = connection; - this._subscription = subscription; + _connection = connection; + _subscription = subscription; } - + /// <summary> /// Close the subscription /// </summary> diff --git a/CryptoExchange.Net/Sockets/WebSocketParameters.cs b/CryptoExchange.Net/Objects/Sockets/WebSocketParameters.cs similarity index 98% rename from CryptoExchange.Net/Sockets/WebSocketParameters.cs rename to CryptoExchange.Net/Objects/Sockets/WebSocketParameters.cs index a20b296..e65481f 100644 --- a/CryptoExchange.Net/Sockets/WebSocketParameters.cs +++ b/CryptoExchange.Net/Objects/Sockets/WebSocketParameters.cs @@ -5,7 +5,7 @@ using System.Collections.Generic; using System.Text; using System.Threading.Tasks; -namespace CryptoExchange.Net.Sockets +namespace CryptoExchange.Net.Objects.Sockets { /// <summary> /// Parameters for a websocket diff --git a/CryptoExchange.Net/OrderBook/SymbolOrderBook.cs b/CryptoExchange.Net/OrderBook/SymbolOrderBook.cs index d6404f4..5d019d2 100644 --- a/CryptoExchange.Net/OrderBook/SymbolOrderBook.cs +++ b/CryptoExchange.Net/OrderBook/SymbolOrderBook.cs @@ -9,7 +9,7 @@ using System.Threading.Tasks; using CryptoExchange.Net.Interfaces; using CryptoExchange.Net.Objects; using CryptoExchange.Net.Objects.Options; -using CryptoExchange.Net.Sockets; +using CryptoExchange.Net.Objects.Sockets; using Microsoft.Extensions.Logging; namespace CryptoExchange.Net.OrderBook diff --git a/CryptoExchange.Net/ParsingUtils.cs b/CryptoExchange.Net/ParsingUtils.cs new file mode 100644 index 0000000..e4c8953 --- /dev/null +++ b/CryptoExchange.Net/ParsingUtils.cs @@ -0,0 +1,36 @@ +using Newtonsoft.Json; +using Newtonsoft.Json.Linq; +using System.IO; +using System.Text; + +namespace CryptoExchange.Net +{ + /// <summary> + /// Parsing utility methods + /// </summary> + public static class ParsingUtils + { + /// <summary> + /// Read the stream as string + /// </summary> + /// <param name="stream"></param> + /// <returns></returns> + public static string GetString(Stream stream) + { + using var reader = new StreamReader(stream, Encoding.UTF8, false, (int)stream.Length, true); + return reader.ReadToEnd(); + } + + /// <summary> + /// Read the stream and parse to JToken + /// </summary> + /// <param name="x"></param> + /// <returns></returns> + public static JToken GetJToken(Stream x) + { + using var sr = new StreamReader(x, Encoding.UTF8, false, (int)x.Length, true); + using var jsonTextReader = new JsonTextReader(sr); + return JToken.Load(jsonTextReader); + } + } +} diff --git a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs index 3c9ab9b..0c73aaf 100644 --- a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs +++ b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs @@ -1,5 +1,6 @@ using CryptoExchange.Net.Interfaces; using CryptoExchange.Net.Objects; +using CryptoExchange.Net.Objects.Sockets; using Microsoft.Extensions.Logging; using System; using System.Collections.Concurrent; @@ -44,7 +45,6 @@ namespace CryptoExchange.Net.Sockets private ProcessState _processState; private DateTime _lastReconnectTime; - /// <summary> /// Received messages, the size and the timstamp /// </summary> @@ -101,7 +101,7 @@ namespace CryptoExchange.Net.Sockets public event Action? OnClose; /// <inheritdoc /> - public event Action<string>? OnMessage; + public event Func<MemoryStream, Task>? OnStreamMessage; /// <inheritdoc /> public event Action<int>? OnRequestSent; @@ -521,7 +521,7 @@ namespace CryptoExchange.Net.Sockets { // Received a complete message and it's not multi part _logger.Log(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in single message"); - HandleMessage(buffer.Array!, buffer.Offset, receiveResult.Count, receiveResult.MessageType); + await ProcessByteData(new MemoryStream(buffer.Array, buffer.Offset, receiveResult.Count), receiveResult.MessageType).ConfigureAwait(false); } else { @@ -555,11 +555,13 @@ namespace CryptoExchange.Net.Sockets { // Reassemble complete message from memory stream _logger.Log(LogLevel.Trace, $"Socket {Id} reassembled message of {memoryStream!.Length} bytes"); - HandleMessage(memoryStream!.ToArray(), 0, (int)memoryStream.Length, receiveResult.MessageType); + await ProcessByteData(memoryStream, receiveResult.MessageType).ConfigureAwait(false); memoryStream.Dispose(); } else + { _logger.Log(LogLevel.Trace, $"Socket {Id} discarding incomplete message of {memoryStream!.Length} bytes"); + } } } } @@ -578,58 +580,10 @@ namespace CryptoExchange.Net.Sockets } } - /// <summary> - /// Handles the message - /// </summary> - /// <param name="data"></param> - /// <param name="offset"></param> - /// <param name="count"></param> - /// <param name="messageType"></param> - private void HandleMessage(byte[] data, int offset, int count, WebSocketMessageType messageType) + private async Task ProcessByteData(MemoryStream memoryStream, WebSocketMessageType messageType) { - string strData; - if (messageType == WebSocketMessageType.Binary) - { - if (Parameters.DataInterpreterBytes == null) - throw new Exception("Byte interpreter not set while receiving byte data"); - - try - { - var relevantData = new byte[count]; - Array.Copy(data, offset, relevantData, 0, count); - strData = Parameters.DataInterpreterBytes(relevantData); - } - catch(Exception e) - { - _logger.Log(LogLevel.Error, $"Socket {Id} unhandled exception during byte data interpretation: " + e.ToLogString()); - return; - } - } - else - strData = Parameters.Encoding.GetString(data, offset, count); - - if (Parameters.DataInterpreterString != null) - { - try - { - strData = Parameters.DataInterpreterString(strData); - } - catch(Exception e) - { - _logger.Log(LogLevel.Error, $"Socket {Id} unhandled exception during string data interpretation: " + e.ToLogString()); - return; - } - } - - try - { - LastActionTime = DateTime.UtcNow; - OnMessage?.Invoke(strData); - } - catch(Exception e) - { - _logger.Log(LogLevel.Error, $"Socket {Id} unhandled exception during message processing: " + e.ToLogString()); - } + if (OnStreamMessage != null) + await OnStreamMessage.Invoke(memoryStream).ConfigureAwait(false); } /// <summary> diff --git a/CryptoExchange.Net/Sockets/MessageEvent.cs b/CryptoExchange.Net/Sockets/MessageEvent.cs deleted file mode 100644 index ae12e0f..0000000 --- a/CryptoExchange.Net/Sockets/MessageEvent.cs +++ /dev/null @@ -1,46 +0,0 @@ -using Newtonsoft.Json.Linq; -using System; - -namespace CryptoExchange.Net.Sockets -{ - /// <summary> - /// Message received event - /// </summary> - public class MessageEvent - { - /// <summary> - /// The connection the message was received on - /// </summary> - public SocketConnection Connection { get; set; } - - /// <summary> - /// The json object of the data - /// </summary> - public JToken JsonData { get; set; } - - /// <summary> - /// The originally received string data - /// </summary> - public string? OriginalData { get; set; } - - /// <summary> - /// The timestamp of when the data was received - /// </summary> - public DateTime ReceivedTimestamp { get; set; } - - /// <summary> - /// ctor - /// </summary> - /// <param name="connection"></param> - /// <param name="jsonData"></param> - /// <param name="originalData"></param> - /// <param name="timestamp"></param> - public MessageEvent(SocketConnection connection, JToken jsonData, string? originalData, DateTime timestamp) - { - Connection = connection; - JsonData = jsonData; - OriginalData = originalData; - ReceivedTimestamp = timestamp; - } - } -} diff --git a/CryptoExchange.Net/Sockets/QueryActor.cs b/CryptoExchange.Net/Sockets/QueryActor.cs new file mode 100644 index 0000000..bae03a4 --- /dev/null +++ b/CryptoExchange.Net/Sockets/QueryActor.cs @@ -0,0 +1,45 @@ +using CryptoExchange.Net.Objects; +using CryptoExchange.Net.Objects.Sockets; + +namespace CryptoExchange.Net.Sockets +{ + /// <summary> + /// Query + /// </summary> + public abstract class QueryActor + { + /// <summary> + /// The query request + /// </summary> + public object Query { get; set; } + + /// <summary> + /// If this is a private request + /// </summary> + public bool Authenticated { get; } + + /// <summary> + /// Check if the message is the response to the query + /// </summary> + /// <param name="message"></param> + /// <returns></returns> + public abstract bool MessageMatchesQuery(StreamMessage message); + /// <summary> + /// Handle the query response + /// </summary> + /// <param name="message"></param> + /// <returns></returns> + public abstract CallResult HandleResponse(StreamMessage message); + + /// <summary> + /// ctor + /// </summary> + /// <param name="query"></param> + /// <param name="authenticated"></param> + public QueryActor(object query, bool authenticated) + { + Authenticated = authenticated; + Query = query; + } + } +} diff --git a/CryptoExchange.Net/Sockets/SocketConnection.cs b/CryptoExchange.Net/Sockets/SocketConnection.cs index 49f6917..74df18d 100644 --- a/CryptoExchange.Net/Sockets/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/SocketConnection.cs @@ -9,6 +9,8 @@ using Newtonsoft.Json.Linq; using Microsoft.Extensions.Logging; using CryptoExchange.Net.Objects; using System.Net.WebSockets; +using System.IO; +using CryptoExchange.Net.Objects.Sockets; namespace CryptoExchange.Net.Sockets { @@ -45,7 +47,7 @@ namespace CryptoExchange.Net.Sockets /// <summary> /// Unhandled message event /// </summary> - public event Action<JToken>? UnhandledMessage; + public event Action<StreamMessage>? UnhandledMessage; /// <summary> /// The amount of subscriptions on this connection @@ -59,7 +61,7 @@ namespace CryptoExchange.Net.Sockets /// <summary> /// Get a copy of the current subscriptions /// </summary> - public SocketSubscription[] Subscriptions + public SocketSubscriptionListener[] Subscriptions { get { @@ -149,13 +151,12 @@ namespace CryptoExchange.Net.Sockets } private bool _pausedActivity; - private readonly List<SocketSubscription> _subscriptions; + private readonly List<SocketSubscriptionListener> _subscriptions; + private readonly List<IStreamMessageListener> _messageListeners; + private readonly object _subscriptionLock = new(); - private readonly ILogger _logger; - private readonly List<PendingRequest> _pendingRequests; - private SocketStatus _status; /// <summary> @@ -177,11 +178,11 @@ namespace CryptoExchange.Net.Sockets Tag = tag; Properties = new Dictionary<string, object>(); - _pendingRequests = new List<PendingRequest>(); - _subscriptions = new List<SocketSubscription>(); + _messageListeners = new List<IStreamMessageListener>(); + _subscriptions = new List<SocketSubscriptionListener>(); _socket = socket; - _socket.OnMessage += HandleMessage; + _socket.OnStreamMessage += HandleStreamMessage; _socket.OnRequestSent += HandleRequestSent; _socket.OnOpen += HandleOpen; _socket.OnClose += HandleClose; @@ -247,12 +248,12 @@ namespace CryptoExchange.Net.Sockets protected virtual async void HandleReconnected() { Status = SocketStatus.Resubscribing; - lock (_pendingRequests) + lock (_messageListeners) { - foreach (var pendingRequest in _pendingRequests.ToList()) + foreach (var pendingRequest in _messageListeners.OfType<PendingRequest>().ToList()) { pendingRequest.Fail(); - _pendingRequests.Remove(pendingRequest); + _messageListeners.Remove(pendingRequest); } } @@ -292,8 +293,8 @@ namespace CryptoExchange.Net.Sockets protected virtual void HandleRequestSent(int requestId) { PendingRequest pendingRequest; - lock (_pendingRequests) - pendingRequest = _pendingRequests.SingleOrDefault(p => p.Id == requestId); + lock (_messageListeners) + pendingRequest = _messageListeners.OfType<PendingRequest>().SingleOrDefault(p => p.Id == requestId); if (pendingRequest == null) { @@ -305,86 +306,67 @@ namespace CryptoExchange.Net.Sockets } /// <summary> - /// Process a message received by the socket + /// Handle a message /// </summary> - /// <param name="data">The received data</param> - protected virtual void HandleMessage(string data) + /// <param name="stream"></param> + /// <returns></returns> + protected virtual async Task HandleStreamMessage(MemoryStream stream) { var timestamp = DateTime.UtcNow; - _logger.Log(LogLevel.Trace, $"Socket {SocketId} received data: " + data); - if (string.IsNullOrEmpty(data)) return; - - var tokenData = data.ToJToken(_logger); - if (tokenData == null) - { - data = $"\"{data}\""; - tokenData = data.ToJToken(_logger); - if (tokenData == null) - return; - } - + var streamMessage = new StreamMessage(this, stream, timestamp); var handledResponse = false; - - // Remove any timed out requests - PendingRequest[] requests; - lock (_pendingRequests) + SocketSubscriptionListener? currentSubscription = null; + TimeSpan userCodeDuration = TimeSpan.Zero; + foreach (var listener in _messageListeners.OrderByDescending(x => x.Priority).ToList()) // LOCK { - // Remove only timed out requests after 5 minutes have passed so we can still process any - // message coming in after the request timeout - _pendingRequests.RemoveAll(r => r.Completed && DateTime.UtcNow - r.RequestTimestamp > TimeSpan.FromMinutes(5)); - requests = _pendingRequests.ToArray(); - } - - // Check if this message is an answer on any pending requests - foreach (var pendingRequest in requests) - { - if (pendingRequest.CheckData(tokenData)) + if (listener.MessageMatches(streamMessage)) { - lock (_pendingRequests) - _pendingRequests.Remove(pendingRequest); - - if (pendingRequest.Completed) + if (listener is PendingRequest pendingRequest) { - // Answer to a timed out request, unsub if it is a subscription request - if (pendingRequest.Subscription != null) + lock (_messageListeners) + _messageListeners.Remove(pendingRequest); + + if (pendingRequest.Completed) { - _logger.Log(LogLevel.Warning, $"Socket {SocketId} Received subscription info after request timed out; unsubscribing. Consider increasing the RequestTimeout"); - _ = ApiClient.UnsubscribeAsync(this, pendingRequest.Subscription).ConfigureAwait(false); + // Answer to a timed out request, unsub if it is a subscription request + if (pendingRequest.Subscription != null) + { + _logger.Log(LogLevel.Warning, $"Socket {SocketId} Received subscription info after request timed out; unsubscribing. Consider increasing the RequestTimeout"); + _ = UnsubscribeAsync(pendingRequest.Subscription).ConfigureAwait(false); + } } + else + { + _logger.Log(LogLevel.Trace, $"Socket {SocketId} - msg {pendingRequest.Id} - received data matched to pending request"); + await pendingRequest.ProcessAsync(streamMessage).ConfigureAwait(false); + } + + if (!ApiClient.ContinueOnQueryResponse) + return; + + handledResponse = true; + break; } - else + else if (listener is SocketSubscriptionListener subscription) { - _logger.Log(LogLevel.Trace, $"Socket {SocketId} - msg {pendingRequest.Id} - received data matched to pending request"); - pendingRequest.Succeed(tokenData); + currentSubscription = subscription; + handledResponse = true; + var userSw = Stopwatch.StartNew(); + await subscription.ProcessAsync(streamMessage).ConfigureAwait(false); + userSw.Stop(); + userCodeDuration = userSw.Elapsed; + break; } - - if (!ApiClient.ContinueOnQueryResponse) - return; - - handledResponse = true; - break; } } - // Message was not a request response, check data handlers - var messageEvent = new MessageEvent(this, tokenData, ApiClient.OutputOriginalData ? data : null, timestamp); - var (handled, userProcessTime, subscription) = HandleData(messageEvent); - if (!handled && !handledResponse) + if (!handledResponse) { if (!ApiClient.UnhandledMessageExpected) - _logger.Log(LogLevel.Warning, $"Socket {SocketId} Message not handled: " + tokenData); - UnhandledMessage?.Invoke(tokenData); + _logger.Log(LogLevel.Warning, $"Socket {SocketId} Message not handled: " + streamMessage.Get(ParsingUtils.GetString)); + UnhandledMessage?.Invoke(streamMessage); } - - var total = DateTime.UtcNow - timestamp; - if (userProcessTime.TotalMilliseconds > 500) - { - _logger.Log(LogLevel.Debug, $"Socket {SocketId}{(subscription == null ? "" : " subscription " + subscription!.Id)} message processing slow ({(int)total.TotalMilliseconds}ms, {(int)userProcessTime.TotalMilliseconds}ms user code), consider offloading data handling to another thread. " + - "Data from this socket may arrive late or not at all if message processing is continuously slow."); - } - - _logger.Log(LogLevel.Trace, $"Socket {SocketId}{(subscription == null ? "" : " subscription " + subscription!.Id)} message processed in {(int)total.TotalMilliseconds}ms ({(int)userProcessTime.TotalMilliseconds}ms user code)"); - } + } /// <summary> /// Connect the websocket @@ -434,7 +416,7 @@ namespace CryptoExchange.Net.Sockets /// </summary> /// <param name="subscription">Subscription to close</param> /// <returns></returns> - public async Task CloseAsync(SocketSubscription subscription) + public async Task CloseAsync(SocketSubscriptionListener subscription) { lock (_subscriptionLock) { @@ -452,7 +434,7 @@ namespace CryptoExchange.Net.Sockets subscription.CancellationTokenRegistration.Value.Dispose(); if (subscription.Confirmed && _socket.IsOpen) - await ApiClient.UnsubscribeAsync(this, subscription).ConfigureAwait(false); + await UnsubscribeAsync(subscription).ConfigureAwait(false); bool shouldCloseConnection; lock (_subscriptionLock) @@ -475,7 +457,10 @@ namespace CryptoExchange.Net.Sockets } lock (_subscriptionLock) + { + _messageListeners.Remove(subscription); _subscriptions.Remove(subscription); + } } /// <summary> @@ -491,7 +476,7 @@ namespace CryptoExchange.Net.Sockets /// Add a subscription to this connection /// </summary> /// <param name="subscription"></param> - public bool AddSubscription(SocketSubscription subscription) + public bool AddSubscription(SocketSubscriptionListener subscription) { lock (_subscriptionLock) { @@ -499,7 +484,9 @@ namespace CryptoExchange.Net.Sockets return false; _subscriptions.Add(subscription); - if(subscription.UserSubscription) + _messageListeners.Add(subscription); + + if (subscription.UserSubscription) _logger.Log(LogLevel.Debug, $"Socket {SocketId} adding new subscription with id {subscription.Id}, total subscriptions on connection: {_subscriptions.Count(s => s.UserSubscription)}"); return true; } @@ -509,7 +496,7 @@ namespace CryptoExchange.Net.Sockets /// Get a subscription on this connection by id /// </summary> /// <param name="id"></param> - public SocketSubscription? GetSubscription(int id) + public SocketSubscriptionListener? GetSubscription(int id) { lock (_subscriptionLock) return _subscriptions.SingleOrDefault(s => s.Id == id); @@ -520,66 +507,10 @@ namespace CryptoExchange.Net.Sockets /// </summary> /// <param name="predicate">Filter for a request</param> /// <returns></returns> - public SocketSubscription? GetSubscriptionByRequest(Func<object?, bool> predicate) + public SocketSubscriptionListener? GetSubscriptionByRequest(Func<object?, bool> predicate) { lock(_subscriptionLock) - return _subscriptions.SingleOrDefault(s => predicate(s.Request)); - } - - /// <summary> - /// Process data - /// </summary> - /// <param name="messageEvent"></param> - /// <returns>True if the data was successfully handled</returns> - private (bool, TimeSpan, SocketSubscription?) HandleData(MessageEvent messageEvent) - { - SocketSubscription? currentSubscription = null; - try - { - var handled = false; - TimeSpan userCodeDuration = TimeSpan.Zero; - - // Loop the subscriptions to check if any of them signal us that the message is for them - List<SocketSubscription> subscriptionsCopy; - lock (_subscriptionLock) - subscriptionsCopy = _subscriptions.ToList(); - - foreach (var subscription in subscriptionsCopy) - { - currentSubscription = subscription; - if (subscription.Request == null) - { - if (ApiClient.MessageMatchesHandler(this, messageEvent.JsonData, subscription.Identifier!)) - { - handled = true; - var userSw = Stopwatch.StartNew(); - subscription.MessageHandler(messageEvent); - userSw.Stop(); - userCodeDuration = userSw.Elapsed; - } - } - else - { - if (ApiClient.MessageMatchesHandler(this, messageEvent.JsonData, subscription.Request)) - { - handled = true; - messageEvent.JsonData = ApiClient.ProcessTokenData(messageEvent.JsonData); - var userSw = Stopwatch.StartNew(); - subscription.MessageHandler(messageEvent); - userSw.Stop(); - userCodeDuration = userSw.Elapsed; - } - } - } - - return (handled, userCodeDuration, currentSubscription); - } - catch (Exception ex) - { - _logger.Log(LogLevel.Error, $"Socket {SocketId} Exception during message processing\r\nException: {ex.ToLogString()}\r\nData: {messageEvent.JsonData}"); - currentSubscription?.InvokeExceptionHandler(ex); - return (false, TimeSpan.Zero, null); - } + return _subscriptions.SingleOrDefault(s => predicate(s.Subscription)); } /// <summary> @@ -589,15 +520,15 @@ namespace CryptoExchange.Net.Sockets /// <param name="obj">The object to send</param> /// <param name="timeout">The timeout for response</param> /// <param name="subscription">Subscription if this is a subscribe request</param> - /// <param name="handler">The response handler, should return true if the received JToken was the response to the request</param> + /// <param name="handler">The response handler</param> /// <param name="weight">The weight of the message</param> /// <returns></returns> - public virtual async Task SendAndWaitAsync<T>(T obj, TimeSpan timeout, SocketSubscription? subscription, int weight, Func<JToken, bool> handler) + public virtual async Task SendAndWaitAsync<T>(T obj, TimeSpan timeout, SocketSubscriptionListener? subscription, int weight, Func<StreamMessage, bool> handler) { var pending = new PendingRequest(ExchangeHelpers.NextId(), handler, timeout, subscription); - lock (_pendingRequests) + lock (_messageListeners) { - _pendingRequests.Add(pending); + _messageListeners.Add(pending); } var sendOk = Send(pending.Id, obj, weight); @@ -697,21 +628,21 @@ namespace CryptoExchange.Net.Sockets } // Get a list of all subscriptions on the socket - List<SocketSubscription> subscriptionList = new List<SocketSubscription>(); + List<SocketSubscriptionListener> subscriptionList = new List<SocketSubscriptionListener>(); lock (_subscriptionLock) { foreach (var subscription in _subscriptions) { - if (subscription.Request != null) + if (subscription.Subscription != null) subscriptionList.Add(subscription); else subscription.Confirmed = true; } } - foreach(var subscription in subscriptionList.Where(s => s.Request != null)) + foreach(var subscription in subscriptionList.Where(s => s.Subscription != null)) { - var result = await ApiClient.RevitalizeRequestAsync(subscription.Request!).ConfigureAwait(false); + var result = await ApiClient.RevitalizeRequestAsync(subscription.Subscription!).ConfigureAwait(false); if (!result) { _logger.Log(LogLevel.Warning, $"Socket {SocketId} Failed request revitalization: " + result.Error); @@ -727,7 +658,7 @@ namespace CryptoExchange.Net.Sockets var taskList = new List<Task<CallResult<bool>>>(); foreach (var subscription in subscriptionList.Skip(i).Take(ApiClient.ClientOptions.MaxConcurrentResubscriptionsPerSocket)) - taskList.Add(ApiClient.SubscribeAndWaitAsync(this, subscription.Request!, subscription)); + taskList.Add(ApiClient.SubscribeAndWaitAsync(this, subscription.Subscription!, subscription)); await Task.WhenAll(taskList).ConfigureAwait(false); if (taskList.Any(t => !t.Result.Success)) @@ -744,17 +675,26 @@ namespace CryptoExchange.Net.Sockets return new CallResult<bool>(true); } - internal async Task UnsubscribeAsync(SocketSubscription socketSubscription) + internal async Task UnsubscribeAsync(SocketSubscriptionListener socketSubscription) { - await ApiClient.UnsubscribeAsync(this, socketSubscription).ConfigureAwait(false); + var unsubscribeRequest = socketSubscription.Subscription?.GetUnsubscribeRequest(); + if (unsubscribeRequest != null) + { + await SendAndWaitAsync(unsubscribeRequest, TimeSpan.FromSeconds(10), socketSubscription, 0, x => + { + var (matches, result) = socketSubscription.Subscription!.MessageMatchesUnsubscribeRequest(x); + // TODO check result? + return matches; + }).ConfigureAwait(false); + } } - internal async Task<CallResult<bool>> ResubscribeAsync(SocketSubscription socketSubscription) + internal async Task<CallResult<bool>> ResubscribeAsync(SocketSubscriptionListener socketSubscription) { if (!_socket.IsOpen) return new CallResult<bool>(new UnknownError("Socket is not connected")); - return await ApiClient.SubscribeAndWaitAsync(this, socketSubscription.Request!, socketSubscription).ConfigureAwait(false); + return await ApiClient.SubscribeAndWaitAsync(this, socketSubscription.Subscription!, socketSubscription).ConfigureAwait(false); } /// <summary> @@ -793,3 +733,4 @@ namespace CryptoExchange.Net.Sockets } } } + diff --git a/CryptoExchange.Net/Sockets/SubscriptionActor.cs b/CryptoExchange.Net/Sockets/SubscriptionActor.cs new file mode 100644 index 0000000..12967ac --- /dev/null +++ b/CryptoExchange.Net/Sockets/SubscriptionActor.cs @@ -0,0 +1,114 @@ +using CryptoExchange.Net.Interfaces; +using CryptoExchange.Net.Objects; +using CryptoExchange.Net.Objects.Sockets; +using Microsoft.Extensions.Logging; +using Newtonsoft.Json; +using System.IO; +using System.Text; +using System.Threading.Tasks; + +namespace CryptoExchange.Net.Sockets +{ + /// <summary> + /// Subscription base + /// </summary> + public abstract class SubscriptionActor + { + private bool _outputOriginalData; + + /// <summary> + /// Logger + /// </summary> + protected readonly ILogger _logger; + + /// <summary> + /// If the subscription is a private subscription and needs authentication + /// </summary> + public bool Authenticated { get; } + + /// <summary> + /// ctor + /// </summary> + /// <param name="logger"></param> + /// <param name="apiClient"></param> + /// <param name="authenticated"></param> + public SubscriptionActor(ILogger logger, ISocketApiClient apiClient, bool authenticated) + { + _logger = logger; + _outputOriginalData = apiClient.ApiOptions.OutputOriginalData ?? apiClient.ClientOptions.OutputOriginalData; + Authenticated = authenticated; + } + + /// <summary> + /// Get the subscribe object to send when subscribing + /// </summary> + /// <returns></returns> + public abstract object? GetSubscribeRequest(); + /// <summary> + /// Check if the message is the response to the subscribe request + /// </summary> + /// <param name="message"></param> + /// <returns></returns> + public abstract (bool, CallResult?) MessageMatchesSubscribeRequest(StreamMessage message); + + /// <summary> + /// Get the unsubscribe object to send when unsubscribing + /// </summary> + /// <returns></returns> + public abstract object? GetUnsubscribeRequest(); + /// <summary> + /// Check if the message is the response to the unsubscribe request + /// </summary> + /// <param name="message"></param> + /// <returns></returns> + public abstract (bool, CallResult?) MessageMatchesUnsubscribeRequest(StreamMessage message); + + /// <summary> + /// Check if the message is an update for this subscription + /// </summary> + /// <param name="message"></param> + /// <returns></returns> + public abstract bool MessageMatchesSubscription(StreamMessage message); + /// <summary> + /// Handle the update message + /// </summary> + /// <param name="message"></param> + /// <returns></returns> + public abstract Task HandleEventAsync(StreamMessage message); + + /// <summary> + /// Create a data event + /// </summary> + /// <typeparam name="T"></typeparam> + /// <param name="obj"></param> + /// <param name="message"></param> + /// <param name="topic"></param> + /// <param name="type"></param> + /// <returns></returns> + protected DataEvent<T> CreateDataEvent<T>(T obj, StreamMessage message, string? topic = null, SocketUpdateType? type = null) + { + string? originalData = null; + if (_outputOriginalData) + originalData = message.Get(ParsingUtils.GetString); + + return new DataEvent<T>(obj, topic, originalData, message.Timestamp, type); + } + + /// <summary> + /// Deserialize the message to an object using Json.Net + /// </summary> + /// <typeparam name="T"></typeparam> + /// <param name="message"></param> + /// <param name="settings"></param> + /// <returns></returns> + protected Task<CallResult<T>> DeserializeAsync<T>(StreamMessage message, JsonSerializerSettings settings) + { + var serializer = JsonSerializer.Create(settings); + using var sr = new StreamReader(message.Stream, Encoding.UTF8, false, (int)message.Stream.Length, true); + using var jsonTextReader = new JsonTextReader(sr); + var result = serializer.Deserialize<T>(jsonTextReader); + message.Stream.Position = 0; + return Task.FromResult(new CallResult<T>(result!)); + } + } +} diff --git a/CryptoExchange.Net/Sockets/SystemSubscription.cs b/CryptoExchange.Net/Sockets/SystemSubscription.cs new file mode 100644 index 0000000..ddcc142 --- /dev/null +++ b/CryptoExchange.Net/Sockets/SystemSubscription.cs @@ -0,0 +1,33 @@ +using CryptoExchange.Net.Interfaces; +using CryptoExchange.Net.Objects; +using CryptoExchange.Net.Objects.Sockets; +using Microsoft.Extensions.Logging; +using System; + +namespace CryptoExchange.Net.Sockets +{ + /// <summary> + /// A system subscription + /// </summary> + public abstract class SystemSubscription : SubscriptionActor + { + /// <summary> + /// ctor + /// </summary> + /// <param name="logger"></param> + /// <param name="authenticated"></param> + public SystemSubscription(ILogger logger, ISocketApiClient socketApiClient, bool authenticated = false) : base(logger, socketApiClient, authenticated) + { + } + + /// <inheritdoc /> + public override object? GetSubscribeRequest() => null; + /// <inheritdoc /> + public override (bool, CallResult?) MessageMatchesSubscribeRequest(StreamMessage message) => throw new NotImplementedException(); + + /// <inheritdoc /> + public override object? GetUnsubscribeRequest() => null; + /// <inheritdoc /> + public override (bool, CallResult?) MessageMatchesUnsubscribeRequest(StreamMessage message) => throw new NotImplementedException(); + } +} diff --git a/CryptoExchange.Net/Sockets/WebsocketFactory.cs b/CryptoExchange.Net/Sockets/WebsocketFactory.cs index fd1606b..286d37d 100644 --- a/CryptoExchange.Net/Sockets/WebsocketFactory.cs +++ b/CryptoExchange.Net/Sockets/WebsocketFactory.cs @@ -1,4 +1,5 @@ using CryptoExchange.Net.Interfaces; +using CryptoExchange.Net.Objects.Sockets; using Microsoft.Extensions.Logging; namespace CryptoExchange.Net.Sockets