From 0d3e05880af1fd2d497c4d75f18f98356c956471 Mon Sep 17 00:00:00 2001 From: JKorf Date: Mon, 31 Oct 2022 21:41:30 +0100 Subject: [PATCH] Wip client work --- CryptoExchange.Net/Clients/BaseApiClient.cs | 18 +- CryptoExchange.Net/Clients/BaseClient.cs | 4 +- .../Clients/BaseSocketClient.cs | 737 +---------------- CryptoExchange.Net/Clients/SocketApiClient.cs | 782 +++++++++++++++++- CryptoExchange.Net/Objects/Options.cs | 295 +++---- .../Sockets/SocketConnection.cs | 40 +- 6 files changed, 951 insertions(+), 925 deletions(-) diff --git a/CryptoExchange.Net/Clients/BaseApiClient.cs b/CryptoExchange.Net/Clients/BaseApiClient.cs index 9bde40c..b2b5418 100644 --- a/CryptoExchange.Net/Clients/BaseApiClient.cs +++ b/CryptoExchange.Net/Clients/BaseApiClient.cs @@ -2,7 +2,9 @@ using System; using System.Collections.Generic; using System.Net.Http; using CryptoExchange.Net.Authentication; +using CryptoExchange.Net.Logging; using CryptoExchange.Net.Objects; +using Microsoft.Extensions.Logging; namespace CryptoExchange.Net { @@ -13,8 +15,9 @@ namespace CryptoExchange.Net { private ApiCredentials? _apiCredentials; private AuthenticationProvider? _authenticationProvider; + protected Log _log; + protected bool _disposing; private bool _created; - private bool _disposing; /// /// The authentication provider for this API client. (null if no credentials are set) @@ -70,19 +73,20 @@ namespace CryptoExchange.Net internal protected string BaseAddress { get; } /// - /// Api client options + /// Options /// - internal ApiClientOptions Options { get; } + public ApiClientOptions Options { get; } /// /// ctor /// - /// Client options + /// Logger /// Api client options - protected BaseApiClient(BaseClientOptions options, ApiClientOptions apiOptions) + protected BaseApiClient(Log log, ApiClientOptions apiOptions) { Options = apiOptions; - _apiCredentials = apiOptions.ApiCredentials?.Copy() ?? options.ApiCredentials?.Copy(); + _log = log; + _apiCredentials = apiOptions.ApiCredentials?.Copy(); BaseAddress = apiOptions.BaseAddress; } @@ -104,7 +108,7 @@ namespace CryptoExchange.Net /// /// Dispose /// - public void Dispose() + public virtual void Dispose() { _disposing = true; _apiCredentials?.Dispose(); diff --git a/CryptoExchange.Net/Clients/BaseClient.cs b/CryptoExchange.Net/Clients/BaseClient.cs index 360ba44..e03e986 100644 --- a/CryptoExchange.Net/Clients/BaseClient.cs +++ b/CryptoExchange.Net/Clients/BaseClient.cs @@ -51,14 +51,14 @@ namespace CryptoExchange.Net /// /// Provided client options /// - public BaseClientOptions ClientOptions { get; } + public ClientOptions ClientOptions { get; } /// /// ctor /// /// The name of the API this client is for /// The options for this client - protected BaseClient(string name, BaseClientOptions options) + protected BaseClient(string name, ClientOptions options) { log = new Log(name); log.UpdateWriters(options.LogWriters); diff --git a/CryptoExchange.Net/Clients/BaseSocketClient.cs b/CryptoExchange.Net/Clients/BaseSocketClient.cs index 8d6c26f..c4dbb9d 100644 --- a/CryptoExchange.Net/Clients/BaseSocketClient.cs +++ b/CryptoExchange.Net/Clients/BaseSocketClient.cs @@ -21,95 +21,16 @@ namespace CryptoExchange.Net public abstract class BaseSocketClient: BaseClient, ISocketClient { #region fields - /// - /// The factory for creating sockets. Used for unit testing - /// - public IWebsocketFactory SocketFactory { get; set; } = new WebsocketFactory(); - - /// - /// List of socket connections currently connecting/connected - /// - protected internal ConcurrentDictionary socketConnections = new(); - /// - /// Semaphore used while creating sockets - /// - protected internal readonly SemaphoreSlim semaphoreSlim = new(1); - /// - /// Keep alive interval for websocket connection - /// - protected TimeSpan KeepAliveInterval { get; set; } = TimeSpan.FromSeconds(10); - /// - /// Delegate used for processing byte data received from socket connections before it is processed by handlers - /// - protected Func? dataInterpreterBytes; - /// - /// Delegate used for processing string data received from socket connections before it is processed by handlers - /// - protected Func? dataInterpreterString; - /// - /// Handlers for data from the socket which doesn't need to be forwarded to the caller. Ping or welcome messages for example. - /// - protected Dictionary> genericHandlers = new(); - /// - /// The task that is sending periodic data on the websocket. Can be used for sending Ping messages every x seconds or similair. Not necesarry. - /// - protected Task? periodicTask; - /// - /// Wait event for the periodicTask - /// - protected AsyncResetEvent? periodicEvent; + /// /// If client is disposing /// protected bool disposing; - - /// - /// If true; data which is a response to a query will also be distributed to subscriptions - /// If false; data which is a response to a query won't get forwarded to subscriptions as well - /// - protected internal bool ContinueOnQueryResponse { get; protected set; } - - /// - /// If a message is received on the socket which is not handled by a handler this boolean determines whether this logs an error message - /// - protected internal bool UnhandledMessageExpected { get; set; } - - /// - /// The max amount of outgoing messages per socket per second - /// - protected internal int? RateLimitPerSocketPerSecond { get; set; } - + /// - public double IncomingKbps - { - get - { - if (!socketConnections.Any()) - return 0; - - return socketConnections.Sum(s => s.Value.IncomingKbps); - } - } - + public int CurrentConnections => ApiClients.OfType().Sum(c => c.CurrentConnections); /// - public int CurrentConnections => socketConnections.Count; - /// - public int CurrentSubscriptions - { - get - { - if (!socketConnections.Any()) - return 0; - - return socketConnections.Sum(s => s.Value.SubscriptionCount); - } - } - - /// - /// Client options - /// - public new BaseSocketClientOptions ClientOptions { get; } - + public int CurrentSubscriptions => ApiClients.OfType().Sum(s => s.CurrentSubscriptions); #endregion /// @@ -117,9 +38,8 @@ namespace CryptoExchange.Net /// /// The name of the API this client is for /// The options for this client - protected BaseSocketClient(string name, BaseSocketClientOptions options) : base(name, options) + protected BaseSocketClient(string name, ClientOptions options) : base(name, options) { - ClientOptions = options ?? throw new ArgumentNullException(nameof(options)); } /// @@ -129,578 +49,6 @@ namespace CryptoExchange.Net apiClient.SetApiCredentials(credentials); } - /// - /// Set a delegate to be used for processing data received from socket connections before it is processed by handlers - /// - /// Handler for byte data - /// Handler for string data - protected void SetDataInterpreter(Func? byteHandler, Func? stringHandler) - { - dataInterpreterBytes = byteHandler; - dataInterpreterString = stringHandler; - } - - /// - /// Connect to an url and listen for data on the BaseAddress - /// - /// The type of the expected data - /// The API client the subscription is for - /// The optional request object to send, will be serialized to json - /// The identifier to use, necessary if no request object is sent - /// If the subscription is to an authenticated endpoint - /// The handler of update data - /// Cancellation token for closing this subscription - /// - protected virtual Task> SubscribeAsync(SocketApiClient apiClient, object? request, string? identifier, bool authenticated, Action> dataHandler, CancellationToken ct) - { - return SubscribeAsync(apiClient, apiClient.Options.BaseAddress, request, identifier, authenticated, dataHandler, ct); - } - - /// - /// Connect to an url and listen for data - /// - /// The type of the expected data - /// The API client the subscription is for - /// The URL to connect to - /// The optional request object to send, will be serialized to json - /// The identifier to use, necessary if no request object is sent - /// If the subscription is to an authenticated endpoint - /// The handler of update data - /// Cancellation token for closing this subscription - /// - protected virtual async Task> SubscribeAsync(SocketApiClient apiClient, string url, object? request, string? identifier, bool authenticated, Action> dataHandler, CancellationToken ct) - { - if (disposing) - return new CallResult(new InvalidOperationError("Client disposed, can't subscribe")); - - SocketConnection socketConnection; - SocketSubscription? subscription; - var released = false; - // Wait for a semaphore here, so we only connect 1 socket at a time. - // This is necessary for being able to see if connections can be combined - try - { - await semaphoreSlim.WaitAsync(ct).ConfigureAwait(false); - } - catch (OperationCanceledException) - { - return new CallResult(new CancellationRequestedError()); - } - - try - { - while (true) - { - // Get a new or existing socket connection - var socketResult = await GetSocketConnection(apiClient, url, authenticated).ConfigureAwait(false); - if(!socketResult) - return socketResult.As(null); - - socketConnection = socketResult.Data; - - // Add a subscription on the socket connection - subscription = AddSubscription(request, identifier, true, socketConnection, dataHandler, authenticated); - if (subscription == null) - { - log.Write(LogLevel.Trace, $"Socket {socketConnection.SocketId} failed to add subscription, retrying on different connection"); - continue; - } - - if (ClientOptions.SocketSubscriptionsCombineTarget == 1) - { - // Only 1 subscription per connection, so no need to wait for connection since a new subscription will create a new connection anyway - semaphoreSlim.Release(); - released = true; - } - - var needsConnecting = !socketConnection.Connected; - - var connectResult = await ConnectIfNeededAsync(socketConnection, authenticated).ConfigureAwait(false); - if (!connectResult) - return new CallResult(connectResult.Error!); - - break; - } - } - finally - { - if(!released) - semaphoreSlim.Release(); - } - - if (socketConnection.PausedActivity) - { - log.Write(LogLevel.Warning, $"Socket {socketConnection.SocketId} has been paused, can't subscribe at this moment"); - return new CallResult( new ServerError("Socket is paused")); - } - - if (request != null) - { - // Send the request and wait for answer - var subResult = await SubscribeAndWaitAsync(socketConnection, request, subscription).ConfigureAwait(false); - if (!subResult) - { - log.Write(LogLevel.Warning, $"Socket {socketConnection.SocketId} failed to subscribe: {subResult.Error}"); - await socketConnection.CloseAsync(subscription).ConfigureAwait(false); - return new CallResult(subResult.Error!); - } - } - else - { - // No request to be sent, so just mark the subscription as comfirmed - subscription.Confirmed = true; - } - - if (ct != default) - { - subscription.CancellationTokenRegistration = ct.Register(async () => - { - log.Write(LogLevel.Information, $"Socket {socketConnection.SocketId} Cancellation token set, closing subscription"); - await socketConnection.CloseAsync(subscription).ConfigureAwait(false); - }, false); - } - - log.Write(LogLevel.Information, $"Socket {socketConnection.SocketId} subscription {subscription.Id} completed successfully"); - return new CallResult(new UpdateSubscription(socketConnection, subscription)); - } - - /// - /// Sends the subscribe request and waits for a response to that request - /// - /// The connection to send the request on - /// The request to send, will be serialized to json - /// The subscription the request is for - /// - protected internal virtual async Task> SubscribeAndWaitAsync(SocketConnection socketConnection, object request, SocketSubscription subscription) - { - CallResult? callResult = null; - await socketConnection.SendAndWaitAsync(request, ClientOptions.SocketResponseTimeout, data => HandleSubscriptionResponse(socketConnection, subscription, request, data, out callResult)).ConfigureAwait(false); - - if (callResult?.Success == true) - { - subscription.Confirmed = true; - return new CallResult(true); - } - - if(callResult== null) - return new CallResult(new ServerError("No response on subscription request received")); - - return new CallResult(callResult.Error!); - } - - /// - /// Send a query on a socket connection to the BaseAddress and wait for the response - /// - /// Expected result type - /// The API client the query is for - /// The request to send, will be serialized to json - /// If the query is to an authenticated endpoint - /// - protected virtual Task> QueryAsync(SocketApiClient apiClient, object request, bool authenticated) - { - return QueryAsync(apiClient, apiClient.Options.BaseAddress, request, authenticated); - } - - /// - /// Send a query on a socket connection and wait for the response - /// - /// The expected result type - /// The API client the query is for - /// The url for the request - /// The request to send - /// Whether the socket should be authenticated - /// - protected virtual async Task> QueryAsync(SocketApiClient apiClient, string url, object request, bool authenticated) - { - if (disposing) - return new CallResult(new InvalidOperationError("Client disposed, can't query")); - - SocketConnection socketConnection; - var released = false; - await semaphoreSlim.WaitAsync().ConfigureAwait(false); - try - { - var socketResult = await GetSocketConnection(apiClient, url, authenticated).ConfigureAwait(false); - if (!socketResult) - return socketResult.As(default); - - socketConnection = socketResult.Data; - - if (ClientOptions.SocketSubscriptionsCombineTarget == 1) - { - // Can release early when only a single sub per connection - semaphoreSlim.Release(); - released = true; - } - - var connectResult = await ConnectIfNeededAsync(socketConnection, authenticated).ConfigureAwait(false); - if (!connectResult) - return new CallResult(connectResult.Error!); - } - finally - { - if (!released) - semaphoreSlim.Release(); - } - - if (socketConnection.PausedActivity) - { - log.Write(LogLevel.Warning, $"Socket {socketConnection.SocketId} has been paused, can't send query at this moment"); - return new CallResult(new ServerError("Socket is paused")); - } - - return await QueryAndWaitAsync(socketConnection, request).ConfigureAwait(false); - } - - /// - /// Sends the query request and waits for the result - /// - /// The expected result type - /// The connection to send and wait on - /// The request to send - /// - protected virtual async Task> QueryAndWaitAsync(SocketConnection socket, object request) - { - var dataResult = new CallResult(new ServerError("No response on query received")); - await socket.SendAndWaitAsync(request, ClientOptions.SocketResponseTimeout, data => - { - if (!HandleQueryResponse(socket, request, data, out var callResult)) - return false; - - dataResult = callResult; - return true; - }).ConfigureAwait(false); - - return dataResult; - } - - /// - /// Checks if a socket needs to be connected and does so if needed. Also authenticates on the socket if needed - /// - /// The connection to check - /// Whether the socket should authenticated - /// - protected virtual async Task> ConnectIfNeededAsync(SocketConnection socket, bool authenticated) - { - if (socket.Connected) - return new CallResult(true); - - var connectResult = await ConnectSocketAsync(socket).ConfigureAwait(false); - if (!connectResult) - return new CallResult(connectResult.Error!); - - if (!authenticated || socket.Authenticated) - return new CallResult(true); - - log.Write(LogLevel.Debug, $"Attempting to authenticate {socket.SocketId}"); - var result = await AuthenticateSocketAsync(socket).ConfigureAwait(false); - if (!result) - { - log.Write(LogLevel.Warning, $"Socket {socket.SocketId} authentication failed"); - if(socket.Connected) - await socket.CloseAsync().ConfigureAwait(false); - - result.Error!.Message = "Authentication failed: " + result.Error.Message; - return new CallResult(result.Error); - } - - socket.Authenticated = true; - return new CallResult(true); - } - - /// - /// The socketConnection received data (the data JToken parameter). The implementation of this method should check if the received data is a response to the query that was send (the request parameter). - /// For example; A query is sent in a request message with an Id parameter with value 10. The socket receives data and calls this method to see if the data it received is an - /// anwser to any query that was done. The implementation of this method should check if the response.Id == request.Id to see if they match (assuming the api has some sort of Id tracking on messages, - /// if not some other method has be implemented to match the messages). - /// If the messages match, the callResult out parameter should be set with the deserialized data in the from of (T) and return true. - /// - /// The type of response that is expected on the query - /// The socket connection - /// The request that a response is awaited for - /// The message received from the server - /// The interpretation (null if message wasn't a response to the request) - /// True if the message was a response to the query - protected internal abstract bool HandleQueryResponse(SocketConnection socketConnection, object request, JToken data, [NotNullWhen(true)]out CallResult? callResult); - /// - /// The socketConnection received data (the data JToken parameter). The implementation of this method should check if the received data is a response to the subscription request that was send (the request parameter). - /// For example; A subscribe request message is send with an Id parameter with value 10. The socket receives data and calls this method to see if the data it received is an - /// anwser to any subscription request that was done. The implementation of this method should check if the response.Id == request.Id to see if they match (assuming the api has some sort of Id tracking on messages, - /// if not some other method has be implemented to match the messages). - /// If the messages match, the callResult out parameter should be set with the deserialized data in the from of (T) and return true. - /// - /// The socket connection - /// A subscription that waiting for a subscription response - /// The request that the subscription sent - /// The message received from the server - /// The interpretation (null if message wasn't a response to the request) - /// True if the message was a response to the subscription request - protected internal abstract bool HandleSubscriptionResponse(SocketConnection socketConnection, SocketSubscription subscription, object request, JToken data, out CallResult? callResult); - /// - /// Needs to check if a received message matches a handler by request. After subscribing data message will come in. These data messages need to be matched to a specific connection - /// to pass the correct data to the correct handler. The implementation of this method should check if the message received matches the subscribe request that was sent. - /// - /// The socket connection the message was recieved on - /// The received data - /// The subscription request - /// True if the message is for the subscription which sent the request - protected internal abstract bool MessageMatchesHandler(SocketConnection socketConnection, JToken message, object request); - /// - /// Needs to check if a received message matches a handler by identifier. Generally used by GenericHandlers. For example; a generic handler is registered which handles ping messages - /// from the server. This method should check if the message received is a ping message and the identifer is the identifier of the GenericHandler - /// - /// The socket connection the message was recieved on - /// The received data - /// The string identifier of the handler - /// True if the message is for the handler which has the identifier - protected internal abstract bool MessageMatchesHandler(SocketConnection socketConnection, JToken message, string identifier); - /// - /// Needs to authenticate the socket so authenticated queries/subscriptions can be made on this socket connection - /// - /// The socket connection that should be authenticated - /// - protected internal abstract Task> AuthenticateSocketAsync(SocketConnection socketConnection); - /// - /// Needs to unsubscribe a subscription, typically by sending an unsubscribe request. If multiple subscriptions per socket is not allowed this can just return since the socket will be closed anyway - /// - /// The connection on which to unsubscribe - /// The subscription to unsubscribe - /// - protected internal abstract Task UnsubscribeAsync(SocketConnection connection, SocketSubscription subscriptionToUnsub); - - /// - /// Optional handler to interpolate data before sending it to the handlers - /// - /// - /// - protected internal virtual JToken ProcessTokenData(JToken message) - { - return message; - } - - /// - /// Add a subscription to a connection - /// - /// The type of data the subscription expects - /// The request of the subscription - /// The identifier of the subscription (can be null if request param is used) - /// Whether or not this is a user subscription (counts towards the max amount of handlers on a socket) - /// The socket connection the handler is on - /// The handler of the data received - /// Whether the subscription needs authentication - /// - protected virtual SocketSubscription? AddSubscription(object? request, string? identifier, bool userSubscription, SocketConnection connection, Action> dataHandler, bool authenticated) - { - void InternalHandler(MessageEvent messageEvent) - { - if (typeof(T) == typeof(string)) - { - var stringData = (T)Convert.ChangeType(messageEvent.JsonData.ToString(), typeof(T)); - dataHandler(new DataEvent(stringData, null, ClientOptions.OutputOriginalData ? messageEvent.OriginalData : null, messageEvent.ReceivedTimestamp)); - return; - } - - var desResult = Deserialize(messageEvent.JsonData); - if (!desResult) - { - log.Write(LogLevel.Warning, $"Socket {connection.SocketId} Failed to deserialize data into type {typeof(T)}: {desResult.Error}"); - return; - } - - dataHandler(new DataEvent(desResult.Data, null, ClientOptions.OutputOriginalData ? messageEvent.OriginalData : null, messageEvent.ReceivedTimestamp)); - } - - var subscription = request == null - ? SocketSubscription.CreateForIdentifier(NextId(), identifier!, userSubscription, authenticated, InternalHandler) - : SocketSubscription.CreateForRequest(NextId(), request, userSubscription, authenticated, InternalHandler); - if (!connection.AddSubscription(subscription)) - return null; - return subscription; - } - - /// - /// Adds a generic message handler. Used for example to reply to ping requests - /// - /// The name of the request handler. Needs to be unique - /// The action to execute when receiving a message for this handler (checked by ) - protected void AddGenericHandler(string identifier, Action action) - { - genericHandlers.Add(identifier, action); - var subscription = SocketSubscription.CreateForIdentifier(NextId(), identifier, false, false, action); - foreach (var connection in socketConnections.Values) - connection.AddSubscription(subscription); - } - - /// - /// Get the url to connect to (defaults to BaseAddress form the client options) - /// - /// - /// - /// - /// - protected virtual Task> GetConnectionUrlAsync(SocketApiClient apiClient, string address, bool authentication) - { - return Task.FromResult(new CallResult(address)); - } - - /// - /// Get the url to reconnect to after losing a connection - /// - /// - /// - /// - public virtual Task GetReconnectUriAsync(SocketApiClient apiClient, SocketConnection connection) - { - return Task.FromResult(connection.ConnectionUri); - } - - /// - /// Gets a connection for a new subscription or query. Can be an existing if there are open position or a new one. - /// - /// The API client the connection is for - /// The address the socket is for - /// Whether the socket should be authenticated - /// - protected virtual async Task> GetSocketConnection(SocketApiClient apiClient, string address, bool authenticated) - { - var socketResult = socketConnections.Where(s => (s.Value.Status == SocketConnection.SocketStatus.None || s.Value.Status == SocketConnection.SocketStatus.Connected) - && s.Value.Tag.TrimEnd('/') == address.TrimEnd('/') - && (s.Value.ApiClient.GetType() == apiClient.GetType()) - && (s.Value.Authenticated == authenticated || !authenticated) && s.Value.Connected).OrderBy(s => s.Value.SubscriptionCount).FirstOrDefault(); - var result = socketResult.Equals(default(KeyValuePair)) ? null : socketResult.Value; - if (result != null) - { - if (result.SubscriptionCount < ClientOptions.SocketSubscriptionsCombineTarget || (socketConnections.Count >= ClientOptions.MaxSocketConnections && socketConnections.All(s => s.Value.SubscriptionCount >= ClientOptions.SocketSubscriptionsCombineTarget))) - { - // Use existing socket if it has less than target connections OR it has the least connections and we can't make new - return new CallResult(result); - } - } - - var connectionAddress = await GetConnectionUrlAsync(apiClient, address, authenticated).ConfigureAwait(false); - if (!connectionAddress) - { - log.Write(LogLevel.Warning, $"Failed to determine connection url: " + connectionAddress.Error); - return connectionAddress.As(null); - } - - if (connectionAddress.Data != address) - log.Write(LogLevel.Debug, $"Connection address set to " + connectionAddress.Data); - - // Create new socket - var socket = CreateSocket(connectionAddress.Data!); - var socketConnection = new SocketConnection(this, apiClient, socket, address); - socketConnection.UnhandledMessage += HandleUnhandledMessage; - foreach (var kvp in genericHandlers) - { - var handler = SocketSubscription.CreateForIdentifier(NextId(), kvp.Key, false, false, kvp.Value); - socketConnection.AddSubscription(handler); - } - - return new CallResult(socketConnection); - } - - /// - /// Process an unhandled message - /// - /// The token that wasn't processed - protected virtual void HandleUnhandledMessage(JToken token) - { - } - - /// - /// Connect a socket - /// - /// The socket to connect - /// - protected virtual async Task> ConnectSocketAsync(SocketConnection socketConnection) - { - if (await socketConnection.ConnectAsync().ConfigureAwait(false)) - { - socketConnections.TryAdd(socketConnection.SocketId, socketConnection); - return new CallResult(true); - } - - socketConnection.Dispose(); - return new CallResult(new CantConnectError()); - } - - /// - /// Get parameters for the websocket connection - /// - /// The address to connect to - /// - protected virtual WebSocketParameters GetWebSocketParameters(string address) - => new (new Uri(address), ClientOptions.AutoReconnect) - { - DataInterpreterBytes = dataInterpreterBytes, - DataInterpreterString = dataInterpreterString, - KeepAliveInterval = KeepAliveInterval, - ReconnectInterval = ClientOptions.ReconnectInterval, - RatelimitPerSecond = RateLimitPerSocketPerSecond, - Proxy = ClientOptions.Proxy, - Timeout = ClientOptions.SocketNoDataTimeout - }; - - /// - /// Create a socket for an address - /// - /// The address the socket should connect to - /// - protected virtual IWebsocket CreateSocket(string address) - { - var socket = SocketFactory.CreateWebsocket(log, GetWebSocketParameters(address)); - log.Write(LogLevel.Debug, $"Socket {socket.Id} new socket created for " + address); - return socket; - } - - /// - /// Periodically sends data over a socket connection - /// - /// Identifier for the periodic send - /// How often - /// Method returning the object to send - public virtual void SendPeriodic(string identifier, TimeSpan interval, Func objGetter) - { - if (objGetter == null) - throw new ArgumentNullException(nameof(objGetter)); - - periodicEvent = new AsyncResetEvent(); - periodicTask = Task.Run(async () => - { - while (!disposing) - { - await periodicEvent.WaitAsync(interval).ConfigureAwait(false); - if (disposing) - break; - - foreach (var socketConnection in socketConnections.Values) - { - if (disposing) - break; - - if (!socketConnection.Connected) - continue; - - var obj = objGetter(socketConnection); - if (obj == null) - continue; - - log.Write(LogLevel.Trace, $"Socket {socketConnection.SocketId} sending periodic {identifier}"); - - try - { - socketConnection.Send(obj); - } - catch (Exception ex) - { - log.Write(LogLevel.Warning, $"Socket {socketConnection.SocketId} Periodic send {identifier} failed: " + ex.ToLogString()); - } - } - } - }); - } - /// /// Unsubscribe an update subscription /// @@ -708,23 +56,12 @@ namespace CryptoExchange.Net /// public virtual async Task UnsubscribeAsync(int subscriptionId) { - SocketSubscription? subscription = null; - SocketConnection? connection = null; - foreach(var socket in socketConnections.Values.ToList()) + foreach(var socket in ApiClients.OfType()) { - subscription = socket.GetSubscription(subscriptionId); - if (subscription != null) - { - connection = socket; - break; - } + var result = await socket.UnsubscribeAsync(subscriptionId).ConfigureAwait(false); + if (result) + break; } - - if (subscription == null || connection == null) - return; - - log.Write(LogLevel.Information, $"Socket {connection.SocketId} Unsubscribing subscription " + subscriptionId); - await connection.CloseAsync(subscription).ConfigureAwait(false); } /// @@ -747,14 +84,10 @@ namespace CryptoExchange.Net /// public virtual async Task UnsubscribeAllAsync() { - log.Write(LogLevel.Information, $"Unsubscribing all {socketConnections.Sum(s => s.Value.SubscriptionCount)} subscriptions"); - var tasks = new List(); - { - var socketList = socketConnections.Values; - foreach (var sub in socketList) - tasks.Add(sub.CloseAsync()); - } - + var tasks = new List(); + foreach (var client in ApiClients.OfType()) + tasks.Add(client.UnsubscribeAllAsync()); + await Task.WhenAll(tasks.ToArray()).ConfigureAwait(false); } @@ -764,14 +97,12 @@ namespace CryptoExchange.Net /// public virtual async Task ReconnectAsync() { - log.Write(LogLevel.Information, $"Reconnecting all {socketConnections.Count} connections"); + log.Write(LogLevel.Information, $"Reconnecting all {CurrentConnections} connections"); var tasks = new List(); + foreach (var client in ApiClients.OfType()) { - var socketList = socketConnections.Values; - foreach (var sub in socketList) - tasks.Add(sub.TriggerReconnectAsync()); + tasks.Add(client.ReconnectAsync()); } - await Task.WhenAll(tasks.ToArray()).ConfigureAwait(false); } @@ -780,32 +111,16 @@ namespace CryptoExchange.Net /// public string GetSubscriptionsState() { - var sb = new StringBuilder(); - sb.AppendLine($"{socketConnections.Count} connections, {CurrentSubscriptions} subscriptions, kbps: {IncomingKbps}"); - foreach(var connection in socketConnections) - { - sb.AppendLine($" Connection {connection.Key}: {connection.Value.SubscriptionCount} subscriptions, status: {connection.Value.Status}, authenticated: {connection.Value.Authenticated}, kbps: {connection.Value.IncomingKbps}"); - foreach (var subscription in connection.Value.Subscriptions) - sb.AppendLine($" Subscription {subscription.Id}, authenticated: {subscription.Authenticated}, confirmed: {subscription.Confirmed}"); - } - return sb.ToString(); - } - - /// - /// Dispose the client - /// - public override void Dispose() - { - disposing = true; - periodicEvent?.Set(); - periodicEvent?.Dispose(); - if (socketConnections.Sum(s => s.Value.SubscriptionCount) > 0) - { - log.Write(LogLevel.Debug, "Disposing socket client, closing all subscriptions"); - _ = UnsubscribeAllAsync(); - } - semaphoreSlim?.Dispose(); - base.Dispose(); + //var sb = new StringBuilder(); + //sb.AppendLine($"{socketConnections.Count} connections, {CurrentSubscriptions} subscriptions, kbps: {IncomingKbps}"); + //foreach(var connection in socketConnections) + //{ + // sb.AppendLine($" Connection {connection.Key}: {connection.Value.SubscriptionCount} subscriptions, status: {connection.Value.Status}, authenticated: {connection.Value.Authenticated}, kbps: {connection.Value.IncomingKbps}"); + // foreach (var subscription in connection.Value.Subscriptions) + // sb.AppendLine($" Subscription {subscription.Id}, authenticated: {subscription.Authenticated}, confirmed: {subscription.Confirmed}"); + //} + //return sb.ToString(); + return ""; } } } diff --git a/CryptoExchange.Net/Clients/SocketApiClient.cs b/CryptoExchange.Net/Clients/SocketApiClient.cs index dd14bfa..e081122 100644 --- a/CryptoExchange.Net/Clients/SocketApiClient.cs +++ b/CryptoExchange.Net/Clients/SocketApiClient.cs @@ -1,4 +1,17 @@ +using CryptoExchange.Net.Interfaces; +using CryptoExchange.Net.Logging; using CryptoExchange.Net.Objects; +using CryptoExchange.Net.Sockets; +using Microsoft.Extensions.Logging; +using Newtonsoft.Json.Linq; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; namespace CryptoExchange.Net { @@ -7,13 +20,778 @@ namespace CryptoExchange.Net /// public abstract class SocketApiClient : BaseApiClient { + #region Fields + + /// + /// The factory for creating sockets. Used for unit testing + /// + public IWebsocketFactory SocketFactory { get; set; } = new WebsocketFactory(); + + /// + /// List of socket connections currently connecting/connected + /// + protected internal ConcurrentDictionary socketConnections = new(); + /// + /// Semaphore used while creating sockets + /// + protected internal readonly SemaphoreSlim semaphoreSlim = new(1); + /// + /// Keep alive interval for websocket connection + /// + protected TimeSpan KeepAliveInterval { get; set; } = TimeSpan.FromSeconds(10); + /// + /// Delegate used for processing byte data received from socket connections before it is processed by handlers + /// + protected Func? dataInterpreterBytes; + /// + /// Delegate used for processing string data received from socket connections before it is processed by handlers + /// + protected Func? dataInterpreterString; + /// + /// Handlers for data from the socket which doesn't need to be forwarded to the caller. Ping or welcome messages for example. + /// + protected Dictionary> genericHandlers = new(); + /// + /// The task that is sending periodic data on the websocket. Can be used for sending Ping messages every x seconds or similair. Not necesarry. + /// + protected Task? periodicTask; + /// + /// Wait event for the periodicTask + /// + protected AsyncResetEvent? periodicEvent; + + /// + /// If true; data which is a response to a query will also be distributed to subscriptions + /// If false; data which is a response to a query won't get forwarded to subscriptions as well + /// + protected internal bool ContinueOnQueryResponse { get; protected set; } + + /// + /// If a message is received on the socket which is not handled by a handler this boolean determines whether this logs an error message + /// + protected internal bool UnhandledMessageExpected { get; set; } + + /// + /// The max amount of outgoing messages per socket per second + /// + protected internal int? RateLimitPerSocketPerSecond { get; set; } + + /// + public double IncomingKbps + { + get + { + if (!socketConnections.Any()) + return 0; + + return socketConnections.Sum(s => s.Value.IncomingKbps); + } + } + + /// + public int CurrentConnections => socketConnections.Count; + /// + public int CurrentSubscriptions + { + get + { + if (!socketConnections.Any()) + return 0; + + return socketConnections.Sum(s => s.Value.SubscriptionCount); + } + } + + /// + public new SocketApiClientOptions Options => (SocketApiClientOptions)base.Options; + + #endregion + /// /// ctor /// - /// The base client options + /// log /// The Api client options - public SocketApiClient(BaseClientOptions options, ApiClientOptions apiOptions): base(options, apiOptions) + public SocketApiClient(Log log, SocketApiClientOptions apiOptions): base(log, apiOptions) { } + + /// + /// Set a delegate to be used for processing data received from socket connections before it is processed by handlers + /// + /// Handler for byte data + /// Handler for string data + protected void SetDataInterpreter(Func? byteHandler, Func? stringHandler) + { + dataInterpreterBytes = byteHandler; + dataInterpreterString = stringHandler; + } + + /// + /// Connect to an url and listen for data on the BaseAddress + /// + /// The type of the expected data + /// The API client the subscription is for + /// The optional request object to send, will be serialized to json + /// The identifier to use, necessary if no request object is sent + /// If the subscription is to an authenticated endpoint + /// The handler of update data + /// Cancellation token for closing this subscription + /// + protected virtual Task> SubscribeAsync(SocketApiClient apiClient, object? request, string? identifier, bool authenticated, Action> dataHandler, CancellationToken ct) + { + return SubscribeAsync(apiClient, Options.BaseAddress, request, identifier, authenticated, dataHandler, ct); + } + + /// + /// Connect to an url and listen for data + /// + /// The type of the expected data + /// The API client the subscription is for + /// The URL to connect to + /// The optional request object to send, will be serialized to json + /// The identifier to use, necessary if no request object is sent + /// If the subscription is to an authenticated endpoint + /// The handler of update data + /// Cancellation token for closing this subscription + /// + protected virtual async Task> SubscribeAsync(SocketApiClient apiClient, string url, object? request, string? identifier, bool authenticated, Action> dataHandler, CancellationToken ct) + { + if (_disposing) + return new CallResult(new InvalidOperationError("Client disposed, can't subscribe")); + + SocketConnection socketConnection; + SocketSubscription? subscription; + var released = false; + // Wait for a semaphore here, so we only connect 1 socket at a time. + // This is necessary for being able to see if connections can be combined + try + { + await semaphoreSlim.WaitAsync(ct).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + return new CallResult(new CancellationRequestedError()); + } + + try + { + while (true) + { + // Get a new or existing socket connection + var socketResult = await GetSocketConnection(apiClient, url, authenticated).ConfigureAwait(false); + if (!socketResult) + return socketResult.As(null); + + socketConnection = socketResult.Data; + + // Add a subscription on the socket connection + subscription = AddSubscription(request, identifier, true, socketConnection, dataHandler, authenticated); + if (subscription == null) + { + _log.Write(LogLevel.Trace, $"Socket {socketConnection.SocketId} failed to add subscription, retrying on different connection"); + continue; + } + + if (Options.SocketSubscriptionsCombineTarget == 1) + { + // Only 1 subscription per connection, so no need to wait for connection since a new subscription will create a new connection anyway + semaphoreSlim.Release(); + released = true; + } + + var needsConnecting = !socketConnection.Connected; + + var connectResult = await ConnectIfNeededAsync(socketConnection, authenticated).ConfigureAwait(false); + if (!connectResult) + return new CallResult(connectResult.Error!); + + break; + } + } + finally + { + if (!released) + semaphoreSlim.Release(); + } + + if (socketConnection.PausedActivity) + { + _log.Write(LogLevel.Warning, $"Socket {socketConnection.SocketId} has been paused, can't subscribe at this moment"); + return new CallResult(new ServerError("Socket is paused")); + } + + if (request != null) + { + // Send the request and wait for answer + var subResult = await SubscribeAndWaitAsync(socketConnection, request, subscription).ConfigureAwait(false); + if (!subResult) + { + _log.Write(LogLevel.Warning, $"Socket {socketConnection.SocketId} failed to subscribe: {subResult.Error}"); + await socketConnection.CloseAsync(subscription).ConfigureAwait(false); + return new CallResult(subResult.Error!); + } + } + else + { + // No request to be sent, so just mark the subscription as comfirmed + subscription.Confirmed = true; + } + + if (ct != default) + { + subscription.CancellationTokenRegistration = ct.Register(async () => + { + _log.Write(LogLevel.Information, $"Socket {socketConnection.SocketId} Cancellation token set, closing subscription"); + await socketConnection.CloseAsync(subscription).ConfigureAwait(false); + }, false); + } + + _log.Write(LogLevel.Information, $"Socket {socketConnection.SocketId} subscription {subscription.Id} completed successfully"); + return new CallResult(new UpdateSubscription(socketConnection, subscription)); + } + + /// + /// Sends the subscribe request and waits for a response to that request + /// + /// The connection to send the request on + /// The request to send, will be serialized to json + /// The subscription the request is for + /// + protected internal virtual async Task> SubscribeAndWaitAsync(SocketConnection socketConnection, object request, SocketSubscription subscription) + { + CallResult? callResult = null; + await socketConnection.SendAndWaitAsync(request, Options.SocketResponseTimeout, data => HandleSubscriptionResponse(socketConnection, subscription, request, data, out callResult)).ConfigureAwait(false); + + if (callResult?.Success == true) + { + subscription.Confirmed = true; + return new CallResult(true); + } + + if (callResult == null) + return new CallResult(new ServerError("No response on subscription request received")); + + return new CallResult(callResult.Error!); + } + + /// + /// Send a query on a socket connection to the BaseAddress and wait for the response + /// + /// Expected result type + /// The request to send, will be serialized to json + /// If the query is to an authenticated endpoint + /// + protected virtual Task> QueryAsync(object request, bool authenticated) + { + return QueryAsync(Options.BaseAddress, request, authenticated); + } + + /// + /// Send a query on a socket connection and wait for the response + /// + /// The expected result type + /// The url for the request + /// The request to send + /// Whether the socket should be authenticated + /// + protected virtual async Task> QueryAsync(string url, object request, bool authenticated) + { + if (_disposing) + return new CallResult(new InvalidOperationError("Client disposed, can't query")); + + SocketConnection socketConnection; + var released = false; + await semaphoreSlim.WaitAsync().ConfigureAwait(false); + try + { + var socketResult = await GetSocketConnection(url, authenticated).ConfigureAwait(false); + if (!socketResult) + return socketResult.As(default); + + socketConnection = socketResult.Data; + + if (Options.SocketSubscriptionsCombineTarget == 1) + { + // Can release early when only a single sub per connection + semaphoreSlim.Release(); + released = true; + } + + var connectResult = await ConnectIfNeededAsync(socketConnection, authenticated).ConfigureAwait(false); + if (!connectResult) + return new CallResult(connectResult.Error!); + } + finally + { + if (!released) + semaphoreSlim.Release(); + } + + if (socketConnection.PausedActivity) + { + _log.Write(LogLevel.Warning, $"Socket {socketConnection.SocketId} has been paused, can't send query at this moment"); + return new CallResult(new ServerError("Socket is paused")); + } + + return await QueryAndWaitAsync(socketConnection, request).ConfigureAwait(false); + } + + /// + /// Sends the query request and waits for the result + /// + /// The expected result type + /// The connection to send and wait on + /// The request to send + /// + protected virtual async Task> QueryAndWaitAsync(SocketConnection socket, object request) + { + var dataResult = new CallResult(new ServerError("No response on query received")); + await socket.SendAndWaitAsync(request, Options.SocketResponseTimeout, data => + { + if (!HandleQueryResponse(socket, request, data, out var callResult)) + return false; + + dataResult = callResult; + return true; + }).ConfigureAwait(false); + + return dataResult; + } + + /// + /// Checks if a socket needs to be connected and does so if needed. Also authenticates on the socket if needed + /// + /// The connection to check + /// Whether the socket should authenticated + /// + protected virtual async Task> ConnectIfNeededAsync(SocketConnection socket, bool authenticated) + { + if (socket.Connected) + return new CallResult(true); + + var connectResult = await ConnectSocketAsync(socket).ConfigureAwait(false); + if (!connectResult) + return new CallResult(connectResult.Error!); + + if (!authenticated || socket.Authenticated) + return new CallResult(true); + + _log.Write(LogLevel.Debug, $"Attempting to authenticate {socket.SocketId}"); + var result = await AuthenticateSocketAsync(socket).ConfigureAwait(false); + if (!result) + { + _log.Write(LogLevel.Warning, $"Socket {socket.SocketId} authentication failed"); + if (socket.Connected) + await socket.CloseAsync().ConfigureAwait(false); + + result.Error!.Message = "Authentication failed: " + result.Error.Message; + return new CallResult(result.Error); + } + + socket.Authenticated = true; + return new CallResult(true); + } + + /// + /// The socketConnection received data (the data JToken parameter). The implementation of this method should check if the received data is a response to the query that was send (the request parameter). + /// For example; A query is sent in a request message with an Id parameter with value 10. The socket receives data and calls this method to see if the data it received is an + /// anwser to any query that was done. The implementation of this method should check if the response.Id == request.Id to see if they match (assuming the api has some sort of Id tracking on messages, + /// if not some other method has be implemented to match the messages). + /// If the messages match, the callResult out parameter should be set with the deserialized data in the from of (T) and return true. + /// + /// The type of response that is expected on the query + /// The socket connection + /// The request that a response is awaited for + /// The message received from the server + /// The interpretation (null if message wasn't a response to the request) + /// True if the message was a response to the query + protected internal abstract bool HandleQueryResponse(SocketConnection socketConnection, object request, JToken data, [NotNullWhen(true)] out CallResult? callResult); + /// + /// The socketConnection received data (the data JToken parameter). The implementation of this method should check if the received data is a response to the subscription request that was send (the request parameter). + /// For example; A subscribe request message is send with an Id parameter with value 10. The socket receives data and calls this method to see if the data it received is an + /// anwser to any subscription request that was done. The implementation of this method should check if the response.Id == request.Id to see if they match (assuming the api has some sort of Id tracking on messages, + /// if not some other method has be implemented to match the messages). + /// If the messages match, the callResult out parameter should be set with the deserialized data in the from of (T) and return true. + /// + /// The socket connection + /// A subscription that waiting for a subscription response + /// The request that the subscription sent + /// The message received from the server + /// The interpretation (null if message wasn't a response to the request) + /// True if the message was a response to the subscription request + protected internal abstract bool HandleSubscriptionResponse(SocketConnection socketConnection, SocketSubscription subscription, object request, JToken data, out CallResult? callResult); + /// + /// Needs to check if a received message matches a handler by request. After subscribing data message will come in. These data messages need to be matched to a specific connection + /// to pass the correct data to the correct handler. The implementation of this method should check if the message received matches the subscribe request that was sent. + /// + /// The socket connection the message was recieved on + /// The received data + /// The subscription request + /// True if the message is for the subscription which sent the request + protected internal abstract bool MessageMatchesHandler(SocketConnection socketConnection, JToken message, object request); + /// + /// Needs to check if a received message matches a handler by identifier. Generally used by GenericHandlers. For example; a generic handler is registered which handles ping messages + /// from the server. This method should check if the message received is a ping message and the identifer is the identifier of the GenericHandler + /// + /// The socket connection the message was recieved on + /// The received data + /// The string identifier of the handler + /// True if the message is for the handler which has the identifier + protected internal abstract bool MessageMatchesHandler(SocketConnection socketConnection, JToken message, string identifier); + /// + /// Needs to authenticate the socket so authenticated queries/subscriptions can be made on this socket connection + /// + /// The socket connection that should be authenticated + /// + protected internal abstract Task> AuthenticateSocketAsync(SocketConnection socketConnection); + /// + /// Needs to unsubscribe a subscription, typically by sending an unsubscribe request. If multiple subscriptions per socket is not allowed this can just return since the socket will be closed anyway + /// + /// The connection on which to unsubscribe + /// The subscription to unsubscribe + /// + protected internal abstract Task UnsubscribeAsync(SocketConnection connection, SocketSubscription subscriptionToUnsub); + + /// + /// Optional handler to interpolate data before sending it to the handlers + /// + /// + /// + protected internal virtual JToken ProcessTokenData(JToken message) + { + return message; + } + + /// + /// Add a subscription to a connection + /// + /// The type of data the subscription expects + /// The request of the subscription + /// The identifier of the subscription (can be null if request param is used) + /// Whether or not this is a user subscription (counts towards the max amount of handlers on a socket) + /// The socket connection the handler is on + /// The handler of the data received + /// Whether the subscription needs authentication + /// + protected virtual SocketSubscription? AddSubscription(object? request, string? identifier, bool userSubscription, SocketConnection connection, Action> dataHandler, bool authenticated) + { + void InternalHandler(MessageEvent messageEvent) + { + if (typeof(T) == typeof(string)) + { + var stringData = (T)Convert.ChangeType(messageEvent.JsonData.ToString(), typeof(T)); + dataHandler(new DataEvent(stringData, null, Options.OutputOriginalData ? messageEvent.OriginalData : null, messageEvent.ReceivedTimestamp)); + return; + } + + var desResult = Deserialize(messageEvent.JsonData); + if (!desResult) + { + _log.Write(LogLevel.Warning, $"Socket {connection.SocketId} Failed to deserialize data into type {typeof(T)}: {desResult.Error}"); + return; + } + + dataHandler(new DataEvent(desResult.Data, null, Options.OutputOriginalData ? messageEvent.OriginalData : null, messageEvent.ReceivedTimestamp)); + } + + var subscription = request == null + ? SocketSubscription.CreateForIdentifier(NextId(), identifier!, userSubscription, authenticated, InternalHandler) + : SocketSubscription.CreateForRequest(NextId(), request, userSubscription, authenticated, InternalHandler); + if (!connection.AddSubscription(subscription)) + return null; + return subscription; + } + + /// + /// Adds a generic message handler. Used for example to reply to ping requests + /// + /// The name of the request handler. Needs to be unique + /// The action to execute when receiving a message for this handler (checked by ) + protected void AddGenericHandler(string identifier, Action action) + { + genericHandlers.Add(identifier, action); + var subscription = SocketSubscription.CreateForIdentifier(NextId(), identifier, false, false, action); + foreach (var connection in socketConnections.Values) + connection.AddSubscription(subscription); + } + + /// + /// Get the url to connect to (defaults to BaseAddress form the client options) + /// + /// + /// + /// + /// + protected virtual Task> GetConnectionUrlAsync(SocketApiClient apiClient, string address, bool authentication) + { + return Task.FromResult(new CallResult(address)); + } + + /// + /// Get the url to reconnect to after losing a connection + /// + /// + /// + /// + public virtual Task GetReconnectUriAsync(SocketApiClient apiClient, SocketConnection connection) + { + return Task.FromResult(connection.ConnectionUri); + } + + /// + /// Gets a connection for a new subscription or query. Can be an existing if there are open position or a new one. + /// + /// The API client the connection is for + /// The address the socket is for + /// Whether the socket should be authenticated + /// + protected virtual async Task> GetSocketConnection(SocketApiClient apiClient, string address, bool authenticated) + { + var socketResult = socketConnections.Where(s => (s.Value.Status == SocketConnection.SocketStatus.None || s.Value.Status == SocketConnection.SocketStatus.Connected) + && s.Value.Tag.TrimEnd('/') == address.TrimEnd('/') + && (s.Value.ApiClient.GetType() == apiClient.GetType()) + && (s.Value.Authenticated == authenticated || !authenticated) && s.Value.Connected).OrderBy(s => s.Value.SubscriptionCount).FirstOrDefault(); + var result = socketResult.Equals(default(KeyValuePair)) ? null : socketResult.Value; + if (result != null) + { + if (result.SubscriptionCount < Options.SocketSubscriptionsCombineTarget || (socketConnections.Count >= Options.MaxSocketConnections && socketConnections.All(s => s.Value.SubscriptionCount >= Options.SocketSubscriptionsCombineTarget))) + { + // Use existing socket if it has less than target connections OR it has the least connections and we can't make new + return new CallResult(result); + } + } + + var connectionAddress = await GetConnectionUrlAsync(apiClient, address, authenticated).ConfigureAwait(false); + if (!connectionAddress) + { + _log.Write(LogLevel.Warning, $"Failed to determine connection url: " + connectionAddress.Error); + return connectionAddress.As(null); + } + + if (connectionAddress.Data != address) + _log.Write(LogLevel.Debug, $"Connection address set to " + connectionAddress.Data); + + // Create new socket + var socket = CreateSocket(connectionAddress.Data!); + var socketConnection = new SocketConnection(_log, apiClient, socket, address); + socketConnection.UnhandledMessage += HandleUnhandledMessage; + foreach (var kvp in genericHandlers) + { + var handler = SocketSubscription.CreateForIdentifier(NextId(), kvp.Key, false, false, kvp.Value); + socketConnection.AddSubscription(handler); + } + + return new CallResult(socketConnection); + } + + /// + /// Process an unhandled message + /// + /// The token that wasn't processed + protected virtual void HandleUnhandledMessage(JToken token) + { + } + + /// + /// Connect a socket + /// + /// The socket to connect + /// + protected virtual async Task> ConnectSocketAsync(SocketConnection socketConnection) + { + if (await socketConnection.ConnectAsync().ConfigureAwait(false)) + { + socketConnections.TryAdd(socketConnection.SocketId, socketConnection); + return new CallResult(true); + } + + socketConnection.Dispose(); + return new CallResult(new CantConnectError()); + } + + /// + /// Get parameters for the websocket connection + /// + /// The address to connect to + /// + protected virtual WebSocketParameters GetWebSocketParameters(string address) + => new(new Uri(address), Options.AutoReconnect) + { + DataInterpreterBytes = dataInterpreterBytes, + DataInterpreterString = dataInterpreterString, + KeepAliveInterval = KeepAliveInterval, + ReconnectInterval = Options.ReconnectInterval, + RatelimitPerSecond = RateLimitPerSocketPerSecond, + Proxy = Options.Proxy, + Timeout = Options.SocketNoDataTimeout + }; + + /// + /// Create a socket for an address + /// + /// The address the socket should connect to + /// + protected virtual IWebsocket CreateSocket(string address) + { + var socket = SocketFactory.CreateWebsocket(_log, GetWebSocketParameters(address)); + _log.Write(LogLevel.Debug, $"Socket {socket.Id} new socket created for " + address); + return socket; + } + + /// + /// Periodically sends data over a socket connection + /// + /// Identifier for the periodic send + /// How often + /// Method returning the object to send + public virtual void SendPeriodic(string identifier, TimeSpan interval, Func objGetter) + { + if (objGetter == null) + throw new ArgumentNullException(nameof(objGetter)); + + periodicEvent = new AsyncResetEvent(); + periodicTask = Task.Run(async () => + { + while (!_disposing) + { + await periodicEvent.WaitAsync(interval).ConfigureAwait(false); + if (_disposing) + break; + + foreach (var socketConnection in socketConnections.Values) + { + if (_disposing) + break; + + if (!socketConnection.Connected) + continue; + + var obj = objGetter(socketConnection); + if (obj == null) + continue; + + _log.Write(LogLevel.Trace, $"Socket {socketConnection.SocketId} sending periodic {identifier}"); + + try + { + socketConnection.Send(obj); + } + catch (Exception ex) + { + _log.Write(LogLevel.Warning, $"Socket {socketConnection.SocketId} Periodic send {identifier} failed: " + ex.ToLogString()); + } + } + } + }); + } + + /// + /// Unsubscribe an update subscription + /// + /// The id of the subscription to unsubscribe + /// + public virtual async Task UnsubscribeAsync(int subscriptionId) + { + SocketSubscription? subscription = null; + SocketConnection? connection = null; + foreach (var socket in socketConnections.Values.ToList()) + { + subscription = socket.GetSubscription(subscriptionId); + if (subscription != null) + { + connection = socket; + break; + } + } + + if (subscription == null || connection == null) + return false; + + _log.Write(LogLevel.Information, $"Socket {connection.SocketId} Unsubscribing subscription " + subscriptionId); + await connection.CloseAsync(subscription).ConfigureAwait(false); + return true; + } + + /// + /// Unsubscribe an update subscription + /// + /// The subscription to unsubscribe + /// + public virtual async Task UnsubscribeAsync(UpdateSubscription subscription) + { + if (subscription == null) + throw new ArgumentNullException(nameof(subscription)); + + _log.Write(LogLevel.Information, $"Socket {subscription.SocketId} Unsubscribing subscription " + subscription.Id); + await subscription.CloseAsync().ConfigureAwait(false); + } + + /// + /// Unsubscribe all subscriptions + /// + /// + public virtual async Task UnsubscribeAllAsync() + { + _log.Write(LogLevel.Information, $"Unsubscribing all {socketConnections.Sum(s => s.Value.SubscriptionCount)} subscriptions"); + var tasks = new List(); + { + var socketList = socketConnections.Values; + foreach (var sub in socketList) + tasks.Add(sub.CloseAsync()); + } + + await Task.WhenAll(tasks.ToArray()).ConfigureAwait(false); + } + + /// + /// Reconnect all connections + /// + /// + public virtual async Task ReconnectAsync() + { + _log.Write(LogLevel.Information, $"Reconnecting all {socketConnections.Count} connections"); + var tasks = new List(); + { + var socketList = socketConnections.Values; + foreach (var sub in socketList) + tasks.Add(sub.TriggerReconnectAsync()); + } + + await Task.WhenAll(tasks.ToArray()).ConfigureAwait(false); + } + + /// + /// Log the current state of connections and subscriptions + /// + public string GetSubscriptionsState() + { + var sb = new StringBuilder(); + sb.AppendLine($"{socketConnections.Count} connections, {CurrentSubscriptions} subscriptions, kbps: {IncomingKbps}"); + foreach (var connection in socketConnections) + { + sb.AppendLine($" Connection {connection.Key}: {connection.Value.SubscriptionCount} subscriptions, status: {connection.Value.Status}, authenticated: {connection.Value.Authenticated}, kbps: {connection.Value.IncomingKbps}"); + foreach (var subscription in connection.Value.Subscriptions) + sb.AppendLine($" Subscription {subscription.Id}, authenticated: {subscription.Authenticated}, confirmed: {subscription.Confirmed}"); + } + return sb.ToString(); + } + + /// + /// Dispose the client + /// + public override void Dispose() + { + _disposing = true; + periodicEvent?.Set(); + periodicEvent?.Dispose(); + if (socketConnections.Sum(s => s.Value.SubscriptionCount) > 0) + { + _log.Write(LogLevel.Debug, "Disposing socket client, closing all subscriptions"); + _ = UnsubscribeAllAsync(); + } + semaphoreSlim?.Dispose(); + base.Dispose(); + } } } diff --git a/CryptoExchange.Net/Objects/Options.cs b/CryptoExchange.Net/Objects/Options.cs index 5c727cc..b00f3d1 100644 --- a/CryptoExchange.Net/Objects/Options.cs +++ b/CryptoExchange.Net/Objects/Options.cs @@ -9,10 +9,7 @@ using Microsoft.Extensions.Logging; namespace CryptoExchange.Net.Objects { - /// - /// Base options, applicable to everything - /// - public class BaseOptions + public class ClientOptions { internal event Action? OnLoggingChanged; @@ -44,195 +41,27 @@ namespace CryptoExchange.Net.Objects } } - /// - /// If true, the CallResult and DataEvent objects will also include the originally received json data in the OriginalData property - /// - public bool OutputOriginalData { get; set; } = false; - - /// - /// ctor - /// - public BaseOptions(): this(null) - { - } - - /// - /// ctor - /// - /// Copy options from these options to the new options - public BaseOptions(BaseOptions? baseOptions) - { - if (baseOptions == null) - return; - - LogLevel = baseOptions.LogLevel; - LogWriters = baseOptions.LogWriters.ToList(); - OutputOriginalData = baseOptions.OutputOriginalData; - } - - /// - public override string ToString() - { - return $"LogLevel: {LogLevel}, Writers: {LogWriters.Count}, OutputOriginalData: {OutputOriginalData}"; - } - } - - /// - /// Client options, for both the socket and rest clients - /// - public class BaseClientOptions : BaseOptions - { /// /// Proxy to use when connecting /// public ApiProxy? Proxy { get; set; } - /// - /// Api credentials to be used for signing requests to private endpoints. These credentials will be used for each API in the client, unless overriden in the API options - /// - public ApiCredentials? ApiCredentials { get; set; } - /// /// ctor /// - public BaseClientOptions() : this(null) + /// Copy values for the provided options + /// Copy values for the provided options + public ClientOptions(ClientOptions baseOptions, ClientOptions? newValues) { - } - - /// - /// ctor - /// - /// Copy options from these options to the new options - public BaseClientOptions(BaseClientOptions? baseOptions) : base(baseOptions) - { - if (baseOptions == null) - return; - - Proxy = baseOptions.Proxy; - ApiCredentials = baseOptions.ApiCredentials?.Copy(); + Proxy = newValues?.Proxy ?? baseOptions.Proxy; + LogLevel = baseOptions.LogLevel; + LogWriters = baseOptions.LogWriters.ToList(); } /// public override string ToString() { - return $"{base.ToString()}, Proxy: {(Proxy == null ? "-" : Proxy.Host)}, Base.ApiCredentials: {(ApiCredentials == null ? "-" : "set")}"; - } - } - - /// - /// Rest client options - /// - public class BaseRestClientOptions : BaseClientOptions - { - /// - /// The time the server has to respond to a request before timing out - /// - public TimeSpan RequestTimeout { get; set; } = TimeSpan.FromSeconds(30); - - /// - /// Http client to use. If a HttpClient is provided in this property the RequestTimeout and Proxy options provided in these options will be ignored in requests and should be set on the provided HttpClient instance - /// - public HttpClient? HttpClient { get; set; } - - /// - /// ctor - /// - public BaseRestClientOptions(): this(null) - { - } - - /// - /// ctor - /// - /// Copy options from these options to the new options - public BaseRestClientOptions(BaseRestClientOptions? baseOptions): base(baseOptions) - { - if (baseOptions == null) - return; - - HttpClient = baseOptions.HttpClient; - RequestTimeout = baseOptions.RequestTimeout; - } - - /// - public override string ToString() - { - return $"{base.ToString()}, RequestTimeout: {RequestTimeout:c}, HttpClient: {(HttpClient == null ? "-" : "set")}"; - } - } - - /// - /// Socket client options - /// - public class BaseSocketClientOptions : BaseClientOptions - { - /// - /// Whether or not the socket should automatically reconnect when losing connection - /// - public bool AutoReconnect { get; set; } = true; - - /// - /// Time to wait between reconnect attempts - /// - public TimeSpan ReconnectInterval { get; set; } = TimeSpan.FromSeconds(5); - - /// - /// Max number of concurrent resubscription tasks per socket after reconnecting a socket - /// - public int MaxConcurrentResubscriptionsPerSocket { get; set; } = 5; - - /// - /// The max time to wait for a response after sending a request on the socket before giving a timeout - /// - public TimeSpan SocketResponseTimeout { get; set; } = TimeSpan.FromSeconds(10); - - /// - /// The max time of not receiving any data after which the connection is assumed to be dropped. This can only be used for socket connections where a steady flow of data is expected, - /// for example when the server sends intermittent ping requests - /// - public TimeSpan SocketNoDataTimeout { get; set; } - - /// - /// The amount of subscriptions that should be made on a single socket connection. Not all API's support multiple subscriptions on a single socket. - /// Setting this to a higher number increases subscription speed because not every subscription needs to connect to the server, but having more subscriptions on a - /// single connection will also increase the amount of traffic on that single connection, potentially leading to issues. - /// - public int? SocketSubscriptionsCombineTarget { get; set; } - - /// - /// The max amount of connections to make to the server. Can be used for API's which only allow a certain number of connections. Changing this to a high value might cause issues. - /// - public int? MaxSocketConnections { get; set; } - - /// - /// ctor - /// - public BaseSocketClientOptions(): this(null) - { - } - - /// - /// ctor - /// - /// Copy options from these options to the new options - public BaseSocketClientOptions(BaseSocketClientOptions? baseOptions): base(baseOptions) - { - if (baseOptions == null) - return; - - AutoReconnect = baseOptions.AutoReconnect; - ReconnectInterval = baseOptions.ReconnectInterval; - MaxConcurrentResubscriptionsPerSocket = baseOptions.MaxConcurrentResubscriptionsPerSocket; - SocketResponseTimeout = baseOptions.SocketResponseTimeout; - SocketNoDataTimeout = baseOptions.SocketNoDataTimeout; - SocketSubscriptionsCombineTarget = baseOptions.SocketSubscriptionsCombineTarget; - MaxSocketConnections = baseOptions.MaxSocketConnections; - } - - /// - public override string ToString() - { - return $"{base.ToString()}, AutoReconnect: {AutoReconnect}, ReconnectInterval: {ReconnectInterval}, MaxConcurrentResubscriptionsPerSocket: {MaxConcurrentResubscriptionsPerSocket}, SocketResponseTimeout: {SocketResponseTimeout:c}, SocketNoDataTimeout: {SocketNoDataTimeout}, SocketSubscriptionsCombineTarget: {SocketSubscriptionsCombineTarget}, MaxSocketConnections: {MaxSocketConnections}"; + return $"LogLevel: {LogLevel}, Writers: {LogWriters.Count}, Proxy: {(Proxy == null ? "-" : Proxy.Host)}"; } } @@ -241,6 +70,11 @@ namespace CryptoExchange.Net.Objects /// public class ApiClientOptions { + /// + /// If true, the CallResult and DataEvent objects will also include the originally received json data in the OriginalData property + /// + public bool OutputOriginalData { get; set; } = false; + /// /// The base address of the API /// @@ -278,12 +112,13 @@ namespace CryptoExchange.Net.Objects { BaseAddress = newValues?.BaseAddress ?? baseOptions.BaseAddress; ApiCredentials = newValues?.ApiCredentials?.Copy() ?? baseOptions.ApiCredentials?.Copy(); + OutputOriginalData = baseOptions.OutputOriginalData; } /// public override string ToString() { - return $"Credentials: {(ApiCredentials == null ? "-" : "Set")}, BaseAddress: {BaseAddress}"; + return $"OutputOriginalData: {OutputOriginalData}, Credentials: {(ApiCredentials == null ? "-" : "Set")}, BaseAddress: {BaseAddress}"; } } @@ -292,6 +127,16 @@ namespace CryptoExchange.Net.Objects /// public class RestApiClientOptions: ApiClientOptions { + /// + /// The time the server has to respond to a request before timing out + /// + public TimeSpan RequestTimeout { get; set; } = TimeSpan.FromSeconds(30); + + /// + /// Http client to use. If a HttpClient is provided in this property the RequestTimeout and Proxy options provided in these options will be ignored in requests and should be set on the provided HttpClient instance + /// + public HttpClient? HttpClient { get; set; } + /// /// List of rate limiters to use /// @@ -334,6 +179,8 @@ namespace CryptoExchange.Net.Objects /// Copy values for the provided options public RestApiClientOptions(RestApiClientOptions baseOn, RestApiClientOptions? newValues): base(baseOn, newValues) { + HttpClient = newValues?.HttpClient ?? baseOn.HttpClient; + RequestTimeout = newValues == default ? baseOn.RequestTimeout : newValues.RequestTimeout; RateLimitingBehaviour = newValues?.RateLimitingBehaviour ?? baseOn.RateLimitingBehaviour; AutoTimestamp = newValues?.AutoTimestamp ?? baseOn.AutoTimestamp; TimestampRecalculationInterval = newValues?.TimestampRecalculationInterval ?? baseOn.TimestampRecalculationInterval; @@ -343,14 +190,98 @@ namespace CryptoExchange.Net.Objects /// public override string ToString() { - return $"{base.ToString()}, RateLimiters: {RateLimiters?.Count}, RateLimitBehaviour: {RateLimitingBehaviour}, AutoTimestamp: {AutoTimestamp}, TimestampRecalculationInterval: {TimestampRecalculationInterval}"; + return $"{base.ToString()}, RequestTimeout: {RequestTimeout:c}, HttpClient: {(HttpClient == null ? "-" : "set")}, RateLimiters: {RateLimiters?.Count}, RateLimitBehaviour: {RateLimitingBehaviour}, AutoTimestamp: {AutoTimestamp}, TimestampRecalculationInterval: {TimestampRecalculationInterval}"; + } + } + + /// + /// Rest API client options + /// + public class SocketApiClientOptions : ApiClientOptions + { + /// + /// Whether or not the socket should automatically reconnect when losing connection + /// + public bool AutoReconnect { get; set; } = true; + + /// + /// Time to wait between reconnect attempts + /// + public TimeSpan ReconnectInterval { get; set; } = TimeSpan.FromSeconds(5); + + /// + /// Max number of concurrent resubscription tasks per socket after reconnecting a socket + /// + public int MaxConcurrentResubscriptionsPerSocket { get; set; } = 5; + + /// + /// The max time to wait for a response after sending a request on the socket before giving a timeout + /// + public TimeSpan SocketResponseTimeout { get; set; } = TimeSpan.FromSeconds(10); + + /// + /// The max time of not receiving any data after which the connection is assumed to be dropped. This can only be used for socket connections where a steady flow of data is expected, + /// for example when the server sends intermittent ping requests + /// + public TimeSpan SocketNoDataTimeout { get; set; } + + /// + /// The amount of subscriptions that should be made on a single socket connection. Not all API's support multiple subscriptions on a single socket. + /// Setting this to a higher number increases subscription speed because not every subscription needs to connect to the server, but having more subscriptions on a + /// single connection will also increase the amount of traffic on that single connection, potentially leading to issues. + /// + public int? SocketSubscriptionsCombineTarget { get; set; } + + /// + /// The max amount of connections to make to the server. Can be used for API's which only allow a certain number of connections. Changing this to a high value might cause issues. + /// + public int? MaxSocketConnections { get; set; } + + /// + /// ctor + /// + public SocketApiClientOptions() + { + } + + /// + /// ctor + /// + /// Base address for the API + public SocketApiClientOptions(string baseAddress) : base(baseAddress) + { + } + + /// + /// ctor + /// + /// Copy values for the provided options + /// Copy values for the provided options + public SocketApiClientOptions(SocketApiClientOptions baseOptions, SocketApiClientOptions? newValues) : base(baseOptions, newValues) + { + if (baseOptions == null) + return; + + AutoReconnect = baseOptions.AutoReconnect; + ReconnectInterval = baseOptions.ReconnectInterval; + MaxConcurrentResubscriptionsPerSocket = baseOptions.MaxConcurrentResubscriptionsPerSocket; + SocketResponseTimeout = baseOptions.SocketResponseTimeout; + SocketNoDataTimeout = baseOptions.SocketNoDataTimeout; + SocketSubscriptionsCombineTarget = baseOptions.SocketSubscriptionsCombineTarget; + MaxSocketConnections = baseOptions.MaxSocketConnections; + } + + /// + public override string ToString() + { + return $"{base.ToString()}, AutoReconnect: {AutoReconnect}, ReconnectInterval: {ReconnectInterval}, MaxConcurrentResubscriptionsPerSocket: {MaxConcurrentResubscriptionsPerSocket}, SocketResponseTimeout: {SocketResponseTimeout:c}, SocketNoDataTimeout: {SocketNoDataTimeout}, SocketSubscriptionsCombineTarget: {SocketSubscriptionsCombineTarget}, MaxSocketConnections: {MaxSocketConnections}"; } } /// /// Base for order book options /// - public class OrderBookOptions : BaseOptions + public class OrderBookOptions : ApiClientOptions { /// /// Whether or not checksum validation is enabled. Default is true, disabling will ignore checksum messages. diff --git a/CryptoExchange.Net/Sockets/SocketConnection.cs b/CryptoExchange.Net/Sockets/SocketConnection.cs index 703d358..a33fd09 100644 --- a/CryptoExchange.Net/Sockets/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/SocketConnection.cs @@ -149,7 +149,6 @@ namespace CryptoExchange.Net.Sockets private readonly object subscriptionLock = new(); private readonly Log log; - private readonly BaseSocketClient socketClient; private readonly List pendingRequests; @@ -163,14 +162,13 @@ namespace CryptoExchange.Net.Sockets /// /// New socket connection /// - /// The socket client + /// The logger /// The api client /// The socket /// - public SocketConnection(BaseSocketClient client, SocketApiClient apiClient, IWebsocket socket, string tag) + public SocketConnection(Log log, SocketApiClient apiClient, IWebsocket socket, string tag) { - log = client.log; - socketClient = client; + this.log = log; ApiClient = apiClient; Tag = tag; @@ -234,7 +232,7 @@ namespace CryptoExchange.Net.Sockets /// protected virtual async Task GetReconnectionUrlAsync() { - return await socketClient.GetReconnectUriAsync(ApiClient, this).ConfigureAwait(false); + return await ApiClient.GetReconnectUriAsync(ApiClient, this).ConfigureAwait(false); } /// @@ -315,7 +313,7 @@ namespace CryptoExchange.Net.Sockets lock (pendingRequests) pendingRequests.Remove(pendingRequest); - if (!socketClient.ContinueOnQueryResponse) + if (!ApiClient.ContinueOnQueryResponse) return; handledResponse = true; @@ -324,11 +322,11 @@ namespace CryptoExchange.Net.Sockets } // Message was not a request response, check data handlers - var messageEvent = new MessageEvent(this, tokenData, socketClient.ClientOptions.OutputOriginalData ? data : null, timestamp); + var messageEvent = new MessageEvent(this, tokenData, ApiClient.Options.OutputOriginalData ? data : null, timestamp); var (handled, userProcessTime, subscription) = HandleData(messageEvent); if (!handled && !handledResponse) { - if (!socketClient.UnhandledMessageExpected) + if (!ApiClient.UnhandledMessageExpected) log.Write(LogLevel.Warning, $"Socket {SocketId} Message not handled: " + tokenData); UnhandledMessage?.Invoke(tokenData); } @@ -368,8 +366,8 @@ namespace CryptoExchange.Net.Sockets if (Status == SocketStatus.Closed || Status == SocketStatus.Disposed) return; - if (socketClient.socketConnections.ContainsKey(SocketId)) - socketClient.socketConnections.TryRemove(SocketId, out _); + if (ApiClient.socketConnections.ContainsKey(SocketId)) + ApiClient.socketConnections.TryRemove(SocketId, out _); lock (subscriptionLock) { @@ -407,7 +405,7 @@ namespace CryptoExchange.Net.Sockets subscription.CancellationTokenRegistration.Value.Dispose(); if (subscription.Confirmed && _socket.IsOpen) - await socketClient.UnsubscribeAsync(this, subscription).ConfigureAwait(false); + await ApiClient.UnsubscribeAsync(this, subscription).ConfigureAwait(false); bool shouldCloseConnection; lock (subscriptionLock) @@ -504,7 +502,7 @@ namespace CryptoExchange.Net.Sockets currentSubscription = subscription; if (subscription.Request == null) { - if (socketClient.MessageMatchesHandler(this, messageEvent.JsonData, subscription.Identifier!)) + if (ApiClient.MessageMatchesHandler(this, messageEvent.JsonData, subscription.Identifier!)) { handled = true; var userSw = Stopwatch.StartNew(); @@ -515,10 +513,10 @@ namespace CryptoExchange.Net.Sockets } else { - if (socketClient.MessageMatchesHandler(this, messageEvent.JsonData, subscription.Request)) + if (ApiClient.MessageMatchesHandler(this, messageEvent.JsonData, subscription.Request)) { handled = true; - messageEvent.JsonData = socketClient.ProcessTokenData(messageEvent.JsonData); + messageEvent.JsonData = ApiClient.ProcessTokenData(messageEvent.JsonData); var userSw = Stopwatch.StartNew(); subscription.MessageHandler(messageEvent); userSw.Stop(); @@ -611,7 +609,7 @@ namespace CryptoExchange.Net.Sockets if (subscriptions.Any(s => s.Authenticated)) { // If we reconnected a authenticated connection we need to re-authenticate - var authResult = await socketClient.AuthenticateSocketAsync(this).ConfigureAwait(false); + var authResult = await ApiClient.AuthenticateSocketAsync(this).ConfigureAwait(false); if (!authResult) { log.Write(LogLevel.Warning, $"Socket {SocketId} authentication failed on reconnected socket. Disconnecting and reconnecting."); @@ -636,14 +634,14 @@ namespace CryptoExchange.Net.Sockets } // Foreach subscription which is subscribed by a subscription request we will need to resend that request to resubscribe - for (var i = 0; i < subscriptionList.Count; i += socketClient.ClientOptions.MaxConcurrentResubscriptionsPerSocket) + for (var i = 0; i < subscriptionList.Count; i += ApiClient.Options.MaxConcurrentResubscriptionsPerSocket) { if (!_socket.IsOpen) return new CallResult(new WebError("Socket not connected")); var taskList = new List>>(); - foreach (var subscription in subscriptionList.Skip(i).Take(socketClient.ClientOptions.MaxConcurrentResubscriptionsPerSocket)) - taskList.Add(socketClient.SubscribeAndWaitAsync(this, subscription.Request!, subscription)); + foreach (var subscription in subscriptionList.Skip(i).Take(ApiClient.Options.MaxConcurrentResubscriptionsPerSocket)) + taskList.Add(ApiClient.SubscribeAndWaitAsync(this, subscription.Request!, subscription)); await Task.WhenAll(taskList).ConfigureAwait(false); if (taskList.Any(t => !t.Result.Success)) @@ -662,7 +660,7 @@ namespace CryptoExchange.Net.Sockets internal async Task UnsubscribeAsync(SocketSubscription socketSubscription) { - await socketClient.UnsubscribeAsync(this, socketSubscription).ConfigureAwait(false); + await ApiClient.UnsubscribeAsync(this, socketSubscription).ConfigureAwait(false); } internal async Task> ResubscribeAsync(SocketSubscription socketSubscription) @@ -670,7 +668,7 @@ namespace CryptoExchange.Net.Sockets if (!_socket.IsOpen) return new CallResult(new UnknownError("Socket is not connected")); - return await socketClient.SubscribeAndWaitAsync(this, socketSubscription.Request!, socketSubscription).ConfigureAwait(false); + return await ApiClient.SubscribeAndWaitAsync(this, socketSubscription.Request!, socketSubscription).ConfigureAwait(false); } ///