diff --git a/CryptoExchange.Net/Clients/BaseApiClient.cs b/CryptoExchange.Net/Clients/BaseApiClient.cs
index 9bde40c..b2b5418 100644
--- a/CryptoExchange.Net/Clients/BaseApiClient.cs
+++ b/CryptoExchange.Net/Clients/BaseApiClient.cs
@@ -2,7 +2,9 @@ using System;
using System.Collections.Generic;
using System.Net.Http;
using CryptoExchange.Net.Authentication;
+using CryptoExchange.Net.Logging;
using CryptoExchange.Net.Objects;
+using Microsoft.Extensions.Logging;
namespace CryptoExchange.Net
{
@@ -13,8 +15,9 @@ namespace CryptoExchange.Net
{
private ApiCredentials? _apiCredentials;
private AuthenticationProvider? _authenticationProvider;
+ protected Log _log;
+ protected bool _disposing;
private bool _created;
- private bool _disposing;
///
/// The authentication provider for this API client. (null if no credentials are set)
@@ -70,19 +73,20 @@ namespace CryptoExchange.Net
internal protected string BaseAddress { get; }
///
- /// Api client options
+ /// Options
///
- internal ApiClientOptions Options { get; }
+ public ApiClientOptions Options { get; }
///
/// ctor
///
- /// Client options
+ /// Logger
/// Api client options
- protected BaseApiClient(BaseClientOptions options, ApiClientOptions apiOptions)
+ protected BaseApiClient(Log log, ApiClientOptions apiOptions)
{
Options = apiOptions;
- _apiCredentials = apiOptions.ApiCredentials?.Copy() ?? options.ApiCredentials?.Copy();
+ _log = log;
+ _apiCredentials = apiOptions.ApiCredentials?.Copy();
BaseAddress = apiOptions.BaseAddress;
}
@@ -104,7 +108,7 @@ namespace CryptoExchange.Net
///
/// Dispose
///
- public void Dispose()
+ public virtual void Dispose()
{
_disposing = true;
_apiCredentials?.Dispose();
diff --git a/CryptoExchange.Net/Clients/BaseClient.cs b/CryptoExchange.Net/Clients/BaseClient.cs
index 360ba44..e03e986 100644
--- a/CryptoExchange.Net/Clients/BaseClient.cs
+++ b/CryptoExchange.Net/Clients/BaseClient.cs
@@ -51,14 +51,14 @@ namespace CryptoExchange.Net
///
/// Provided client options
///
- public BaseClientOptions ClientOptions { get; }
+ public ClientOptions ClientOptions { get; }
///
/// ctor
///
/// The name of the API this client is for
/// The options for this client
- protected BaseClient(string name, BaseClientOptions options)
+ protected BaseClient(string name, ClientOptions options)
{
log = new Log(name);
log.UpdateWriters(options.LogWriters);
diff --git a/CryptoExchange.Net/Clients/BaseSocketClient.cs b/CryptoExchange.Net/Clients/BaseSocketClient.cs
index 8d6c26f..c4dbb9d 100644
--- a/CryptoExchange.Net/Clients/BaseSocketClient.cs
+++ b/CryptoExchange.Net/Clients/BaseSocketClient.cs
@@ -21,95 +21,16 @@ namespace CryptoExchange.Net
public abstract class BaseSocketClient: BaseClient, ISocketClient
{
#region fields
- ///
- /// The factory for creating sockets. Used for unit testing
- ///
- public IWebsocketFactory SocketFactory { get; set; } = new WebsocketFactory();
-
- ///
- /// List of socket connections currently connecting/connected
- ///
- protected internal ConcurrentDictionary socketConnections = new();
- ///
- /// Semaphore used while creating sockets
- ///
- protected internal readonly SemaphoreSlim semaphoreSlim = new(1);
- ///
- /// Keep alive interval for websocket connection
- ///
- protected TimeSpan KeepAliveInterval { get; set; } = TimeSpan.FromSeconds(10);
- ///
- /// Delegate used for processing byte data received from socket connections before it is processed by handlers
- ///
- protected Func? dataInterpreterBytes;
- ///
- /// Delegate used for processing string data received from socket connections before it is processed by handlers
- ///
- protected Func? dataInterpreterString;
- ///
- /// Handlers for data from the socket which doesn't need to be forwarded to the caller. Ping or welcome messages for example.
- ///
- protected Dictionary> genericHandlers = new();
- ///
- /// The task that is sending periodic data on the websocket. Can be used for sending Ping messages every x seconds or similair. Not necesarry.
- ///
- protected Task? periodicTask;
- ///
- /// Wait event for the periodicTask
- ///
- protected AsyncResetEvent? periodicEvent;
+
///
/// If client is disposing
///
protected bool disposing;
-
- ///
- /// If true; data which is a response to a query will also be distributed to subscriptions
- /// If false; data which is a response to a query won't get forwarded to subscriptions as well
- ///
- protected internal bool ContinueOnQueryResponse { get; protected set; }
-
- ///
- /// If a message is received on the socket which is not handled by a handler this boolean determines whether this logs an error message
- ///
- protected internal bool UnhandledMessageExpected { get; set; }
-
- ///
- /// The max amount of outgoing messages per socket per second
- ///
- protected internal int? RateLimitPerSocketPerSecond { get; set; }
-
+
///
- public double IncomingKbps
- {
- get
- {
- if (!socketConnections.Any())
- return 0;
-
- return socketConnections.Sum(s => s.Value.IncomingKbps);
- }
- }
-
+ public int CurrentConnections => ApiClients.OfType().Sum(c => c.CurrentConnections);
///
- public int CurrentConnections => socketConnections.Count;
- ///
- public int CurrentSubscriptions
- {
- get
- {
- if (!socketConnections.Any())
- return 0;
-
- return socketConnections.Sum(s => s.Value.SubscriptionCount);
- }
- }
-
- ///
- /// Client options
- ///
- public new BaseSocketClientOptions ClientOptions { get; }
-
+ public int CurrentSubscriptions => ApiClients.OfType().Sum(s => s.CurrentSubscriptions);
#endregion
///
@@ -117,9 +38,8 @@ namespace CryptoExchange.Net
///
/// The name of the API this client is for
/// The options for this client
- protected BaseSocketClient(string name, BaseSocketClientOptions options) : base(name, options)
+ protected BaseSocketClient(string name, ClientOptions options) : base(name, options)
{
- ClientOptions = options ?? throw new ArgumentNullException(nameof(options));
}
///
@@ -129,578 +49,6 @@ namespace CryptoExchange.Net
apiClient.SetApiCredentials(credentials);
}
- ///
- /// Set a delegate to be used for processing data received from socket connections before it is processed by handlers
- ///
- /// Handler for byte data
- /// Handler for string data
- protected void SetDataInterpreter(Func? byteHandler, Func? stringHandler)
- {
- dataInterpreterBytes = byteHandler;
- dataInterpreterString = stringHandler;
- }
-
- ///
- /// Connect to an url and listen for data on the BaseAddress
- ///
- /// The type of the expected data
- /// The API client the subscription is for
- /// The optional request object to send, will be serialized to json
- /// The identifier to use, necessary if no request object is sent
- /// If the subscription is to an authenticated endpoint
- /// The handler of update data
- /// Cancellation token for closing this subscription
- ///
- protected virtual Task> SubscribeAsync(SocketApiClient apiClient, object? request, string? identifier, bool authenticated, Action> dataHandler, CancellationToken ct)
- {
- return SubscribeAsync(apiClient, apiClient.Options.BaseAddress, request, identifier, authenticated, dataHandler, ct);
- }
-
- ///
- /// Connect to an url and listen for data
- ///
- /// The type of the expected data
- /// The API client the subscription is for
- /// The URL to connect to
- /// The optional request object to send, will be serialized to json
- /// The identifier to use, necessary if no request object is sent
- /// If the subscription is to an authenticated endpoint
- /// The handler of update data
- /// Cancellation token for closing this subscription
- ///
- protected virtual async Task> SubscribeAsync(SocketApiClient apiClient, string url, object? request, string? identifier, bool authenticated, Action> dataHandler, CancellationToken ct)
- {
- if (disposing)
- return new CallResult(new InvalidOperationError("Client disposed, can't subscribe"));
-
- SocketConnection socketConnection;
- SocketSubscription? subscription;
- var released = false;
- // Wait for a semaphore here, so we only connect 1 socket at a time.
- // This is necessary for being able to see if connections can be combined
- try
- {
- await semaphoreSlim.WaitAsync(ct).ConfigureAwait(false);
- }
- catch (OperationCanceledException)
- {
- return new CallResult(new CancellationRequestedError());
- }
-
- try
- {
- while (true)
- {
- // Get a new or existing socket connection
- var socketResult = await GetSocketConnection(apiClient, url, authenticated).ConfigureAwait(false);
- if(!socketResult)
- return socketResult.As(null);
-
- socketConnection = socketResult.Data;
-
- // Add a subscription on the socket connection
- subscription = AddSubscription(request, identifier, true, socketConnection, dataHandler, authenticated);
- if (subscription == null)
- {
- log.Write(LogLevel.Trace, $"Socket {socketConnection.SocketId} failed to add subscription, retrying on different connection");
- continue;
- }
-
- if (ClientOptions.SocketSubscriptionsCombineTarget == 1)
- {
- // Only 1 subscription per connection, so no need to wait for connection since a new subscription will create a new connection anyway
- semaphoreSlim.Release();
- released = true;
- }
-
- var needsConnecting = !socketConnection.Connected;
-
- var connectResult = await ConnectIfNeededAsync(socketConnection, authenticated).ConfigureAwait(false);
- if (!connectResult)
- return new CallResult(connectResult.Error!);
-
- break;
- }
- }
- finally
- {
- if(!released)
- semaphoreSlim.Release();
- }
-
- if (socketConnection.PausedActivity)
- {
- log.Write(LogLevel.Warning, $"Socket {socketConnection.SocketId} has been paused, can't subscribe at this moment");
- return new CallResult( new ServerError("Socket is paused"));
- }
-
- if (request != null)
- {
- // Send the request and wait for answer
- var subResult = await SubscribeAndWaitAsync(socketConnection, request, subscription).ConfigureAwait(false);
- if (!subResult)
- {
- log.Write(LogLevel.Warning, $"Socket {socketConnection.SocketId} failed to subscribe: {subResult.Error}");
- await socketConnection.CloseAsync(subscription).ConfigureAwait(false);
- return new CallResult(subResult.Error!);
- }
- }
- else
- {
- // No request to be sent, so just mark the subscription as comfirmed
- subscription.Confirmed = true;
- }
-
- if (ct != default)
- {
- subscription.CancellationTokenRegistration = ct.Register(async () =>
- {
- log.Write(LogLevel.Information, $"Socket {socketConnection.SocketId} Cancellation token set, closing subscription");
- await socketConnection.CloseAsync(subscription).ConfigureAwait(false);
- }, false);
- }
-
- log.Write(LogLevel.Information, $"Socket {socketConnection.SocketId} subscription {subscription.Id} completed successfully");
- return new CallResult(new UpdateSubscription(socketConnection, subscription));
- }
-
- ///
- /// Sends the subscribe request and waits for a response to that request
- ///
- /// The connection to send the request on
- /// The request to send, will be serialized to json
- /// The subscription the request is for
- ///
- protected internal virtual async Task> SubscribeAndWaitAsync(SocketConnection socketConnection, object request, SocketSubscription subscription)
- {
- CallResult
public abstract class SocketApiClient : BaseApiClient
{
+ #region Fields
+
+ ///
+ /// The factory for creating sockets. Used for unit testing
+ ///
+ public IWebsocketFactory SocketFactory { get; set; } = new WebsocketFactory();
+
+ ///
+ /// List of socket connections currently connecting/connected
+ ///
+ protected internal ConcurrentDictionary socketConnections = new();
+ ///
+ /// Semaphore used while creating sockets
+ ///
+ protected internal readonly SemaphoreSlim semaphoreSlim = new(1);
+ ///
+ /// Keep alive interval for websocket connection
+ ///
+ protected TimeSpan KeepAliveInterval { get; set; } = TimeSpan.FromSeconds(10);
+ ///
+ /// Delegate used for processing byte data received from socket connections before it is processed by handlers
+ ///
+ protected Func? dataInterpreterBytes;
+ ///
+ /// Delegate used for processing string data received from socket connections before it is processed by handlers
+ ///
+ protected Func? dataInterpreterString;
+ ///
+ /// Handlers for data from the socket which doesn't need to be forwarded to the caller. Ping or welcome messages for example.
+ ///
+ protected Dictionary> genericHandlers = new();
+ ///
+ /// The task that is sending periodic data on the websocket. Can be used for sending Ping messages every x seconds or similair. Not necesarry.
+ ///
+ protected Task? periodicTask;
+ ///
+ /// Wait event for the periodicTask
+ ///
+ protected AsyncResetEvent? periodicEvent;
+
+ ///
+ /// If true; data which is a response to a query will also be distributed to subscriptions
+ /// If false; data which is a response to a query won't get forwarded to subscriptions as well
+ ///
+ protected internal bool ContinueOnQueryResponse { get; protected set; }
+
+ ///
+ /// If a message is received on the socket which is not handled by a handler this boolean determines whether this logs an error message
+ ///
+ protected internal bool UnhandledMessageExpected { get; set; }
+
+ ///
+ /// The max amount of outgoing messages per socket per second
+ ///
+ protected internal int? RateLimitPerSocketPerSecond { get; set; }
+
+ ///
+ public double IncomingKbps
+ {
+ get
+ {
+ if (!socketConnections.Any())
+ return 0;
+
+ return socketConnections.Sum(s => s.Value.IncomingKbps);
+ }
+ }
+
+ ///
+ public int CurrentConnections => socketConnections.Count;
+ ///
+ public int CurrentSubscriptions
+ {
+ get
+ {
+ if (!socketConnections.Any())
+ return 0;
+
+ return socketConnections.Sum(s => s.Value.SubscriptionCount);
+ }
+ }
+
+ ///
+ public new SocketApiClientOptions Options => (SocketApiClientOptions)base.Options;
+
+ #endregion
+
///
/// ctor
///
- /// The base client options
+ /// log
/// The Api client options
- public SocketApiClient(BaseClientOptions options, ApiClientOptions apiOptions): base(options, apiOptions)
+ public SocketApiClient(Log log, SocketApiClientOptions apiOptions): base(log, apiOptions)
{
}
+
+ ///
+ /// Set a delegate to be used for processing data received from socket connections before it is processed by handlers
+ ///
+ /// Handler for byte data
+ /// Handler for string data
+ protected void SetDataInterpreter(Func? byteHandler, Func? stringHandler)
+ {
+ dataInterpreterBytes = byteHandler;
+ dataInterpreterString = stringHandler;
+ }
+
+ ///
+ /// Connect to an url and listen for data on the BaseAddress
+ ///
+ /// The type of the expected data
+ /// The API client the subscription is for
+ /// The optional request object to send, will be serialized to json
+ /// The identifier to use, necessary if no request object is sent
+ /// If the subscription is to an authenticated endpoint
+ /// The handler of update data
+ /// Cancellation token for closing this subscription
+ ///
+ protected virtual Task> SubscribeAsync(SocketApiClient apiClient, object? request, string? identifier, bool authenticated, Action> dataHandler, CancellationToken ct)
+ {
+ return SubscribeAsync(apiClient, Options.BaseAddress, request, identifier, authenticated, dataHandler, ct);
+ }
+
+ ///
+ /// Connect to an url and listen for data
+ ///
+ /// The type of the expected data
+ /// The API client the subscription is for
+ /// The URL to connect to
+ /// The optional request object to send, will be serialized to json
+ /// The identifier to use, necessary if no request object is sent
+ /// If the subscription is to an authenticated endpoint
+ /// The handler of update data
+ /// Cancellation token for closing this subscription
+ ///
+ protected virtual async Task> SubscribeAsync(SocketApiClient apiClient, string url, object? request, string? identifier, bool authenticated, Action> dataHandler, CancellationToken ct)
+ {
+ if (_disposing)
+ return new CallResult(new InvalidOperationError("Client disposed, can't subscribe"));
+
+ SocketConnection socketConnection;
+ SocketSubscription? subscription;
+ var released = false;
+ // Wait for a semaphore here, so we only connect 1 socket at a time.
+ // This is necessary for being able to see if connections can be combined
+ try
+ {
+ await semaphoreSlim.WaitAsync(ct).ConfigureAwait(false);
+ }
+ catch (OperationCanceledException)
+ {
+ return new CallResult(new CancellationRequestedError());
+ }
+
+ try
+ {
+ while (true)
+ {
+ // Get a new or existing socket connection
+ var socketResult = await GetSocketConnection(apiClient, url, authenticated).ConfigureAwait(false);
+ if (!socketResult)
+ return socketResult.As(null);
+
+ socketConnection = socketResult.Data;
+
+ // Add a subscription on the socket connection
+ subscription = AddSubscription(request, identifier, true, socketConnection, dataHandler, authenticated);
+ if (subscription == null)
+ {
+ _log.Write(LogLevel.Trace, $"Socket {socketConnection.SocketId} failed to add subscription, retrying on different connection");
+ continue;
+ }
+
+ if (Options.SocketSubscriptionsCombineTarget == 1)
+ {
+ // Only 1 subscription per connection, so no need to wait for connection since a new subscription will create a new connection anyway
+ semaphoreSlim.Release();
+ released = true;
+ }
+
+ var needsConnecting = !socketConnection.Connected;
+
+ var connectResult = await ConnectIfNeededAsync(socketConnection, authenticated).ConfigureAwait(false);
+ if (!connectResult)
+ return new CallResult(connectResult.Error!);
+
+ break;
+ }
+ }
+ finally
+ {
+ if (!released)
+ semaphoreSlim.Release();
+ }
+
+ if (socketConnection.PausedActivity)
+ {
+ _log.Write(LogLevel.Warning, $"Socket {socketConnection.SocketId} has been paused, can't subscribe at this moment");
+ return new CallResult(new ServerError("Socket is paused"));
+ }
+
+ if (request != null)
+ {
+ // Send the request and wait for answer
+ var subResult = await SubscribeAndWaitAsync(socketConnection, request, subscription).ConfigureAwait(false);
+ if (!subResult)
+ {
+ _log.Write(LogLevel.Warning, $"Socket {socketConnection.SocketId} failed to subscribe: {subResult.Error}");
+ await socketConnection.CloseAsync(subscription).ConfigureAwait(false);
+ return new CallResult(subResult.Error!);
+ }
+ }
+ else
+ {
+ // No request to be sent, so just mark the subscription as comfirmed
+ subscription.Confirmed = true;
+ }
+
+ if (ct != default)
+ {
+ subscription.CancellationTokenRegistration = ct.Register(async () =>
+ {
+ _log.Write(LogLevel.Information, $"Socket {socketConnection.SocketId} Cancellation token set, closing subscription");
+ await socketConnection.CloseAsync(subscription).ConfigureAwait(false);
+ }, false);
+ }
+
+ _log.Write(LogLevel.Information, $"Socket {socketConnection.SocketId} subscription {subscription.Id} completed successfully");
+ return new CallResult(new UpdateSubscription(socketConnection, subscription));
+ }
+
+ ///
+ /// Sends the subscribe request and waits for a response to that request
+ ///
+ /// The connection to send the request on
+ /// The request to send, will be serialized to json
+ /// The subscription the request is for
+ ///
+ protected internal virtual async Task> SubscribeAndWaitAsync(SocketConnection socketConnection, object request, SocketSubscription subscription)
+ {
+ CallResult? callResult = null;
+ await socketConnection.SendAndWaitAsync(request, Options.SocketResponseTimeout, data => HandleSubscriptionResponse(socketConnection, subscription, request, data, out callResult)).ConfigureAwait(false);
+
+ if (callResult?.Success == true)
+ {
+ subscription.Confirmed = true;
+ return new CallResult(true);
+ }
+
+ if (callResult == null)
+ return new CallResult(new ServerError("No response on subscription request received"));
+
+ return new CallResult(callResult.Error!);
+ }
+
+ ///
+ /// Send a query on a socket connection to the BaseAddress and wait for the response
+ ///
+ /// Expected result type
+ /// The request to send, will be serialized to json
+ /// If the query is to an authenticated endpoint
+ ///
+ protected virtual Task> QueryAsync(object request, bool authenticated)
+ {
+ return QueryAsync(Options.BaseAddress, request, authenticated);
+ }
+
+ ///
+ /// Send a query on a socket connection and wait for the response
+ ///
+ /// The expected result type
+ /// The url for the request
+ /// The request to send
+ /// Whether the socket should be authenticated
+ ///
+ protected virtual async Task> QueryAsync(string url, object request, bool authenticated)
+ {
+ if (_disposing)
+ return new CallResult(new InvalidOperationError("Client disposed, can't query"));
+
+ SocketConnection socketConnection;
+ var released = false;
+ await semaphoreSlim.WaitAsync().ConfigureAwait(false);
+ try
+ {
+ var socketResult = await GetSocketConnection(url, authenticated).ConfigureAwait(false);
+ if (!socketResult)
+ return socketResult.As(default);
+
+ socketConnection = socketResult.Data;
+
+ if (Options.SocketSubscriptionsCombineTarget == 1)
+ {
+ // Can release early when only a single sub per connection
+ semaphoreSlim.Release();
+ released = true;
+ }
+
+ var connectResult = await ConnectIfNeededAsync(socketConnection, authenticated).ConfigureAwait(false);
+ if (!connectResult)
+ return new CallResult(connectResult.Error!);
+ }
+ finally
+ {
+ if (!released)
+ semaphoreSlim.Release();
+ }
+
+ if (socketConnection.PausedActivity)
+ {
+ _log.Write(LogLevel.Warning, $"Socket {socketConnection.SocketId} has been paused, can't send query at this moment");
+ return new CallResult(new ServerError("Socket is paused"));
+ }
+
+ return await QueryAndWaitAsync(socketConnection, request).ConfigureAwait(false);
+ }
+
+ ///
+ /// Sends the query request and waits for the result
+ ///
+ /// The expected result type
+ /// The connection to send and wait on
+ /// The request to send
+ ///
+ protected virtual async Task> QueryAndWaitAsync(SocketConnection socket, object request)
+ {
+ var dataResult = new CallResult(new ServerError("No response on query received"));
+ await socket.SendAndWaitAsync(request, Options.SocketResponseTimeout, data =>
+ {
+ if (!HandleQueryResponse(socket, request, data, out var callResult))
+ return false;
+
+ dataResult = callResult;
+ return true;
+ }).ConfigureAwait(false);
+
+ return dataResult;
+ }
+
+ ///
+ /// Checks if a socket needs to be connected and does so if needed. Also authenticates on the socket if needed
+ ///
+ /// The connection to check
+ /// Whether the socket should authenticated
+ ///
+ protected virtual async Task> ConnectIfNeededAsync(SocketConnection socket, bool authenticated)
+ {
+ if (socket.Connected)
+ return new CallResult(true);
+
+ var connectResult = await ConnectSocketAsync(socket).ConfigureAwait(false);
+ if (!connectResult)
+ return new CallResult(connectResult.Error!);
+
+ if (!authenticated || socket.Authenticated)
+ return new CallResult(true);
+
+ _log.Write(LogLevel.Debug, $"Attempting to authenticate {socket.SocketId}");
+ var result = await AuthenticateSocketAsync(socket).ConfigureAwait(false);
+ if (!result)
+ {
+ _log.Write(LogLevel.Warning, $"Socket {socket.SocketId} authentication failed");
+ if (socket.Connected)
+ await socket.CloseAsync().ConfigureAwait(false);
+
+ result.Error!.Message = "Authentication failed: " + result.Error.Message;
+ return new CallResult(result.Error);
+ }
+
+ socket.Authenticated = true;
+ return new CallResult(true);
+ }
+
+ ///
+ /// The socketConnection received data (the data JToken parameter). The implementation of this method should check if the received data is a response to the query that was send (the request parameter).
+ /// For example; A query is sent in a request message with an Id parameter with value 10. The socket receives data and calls this method to see if the data it received is an
+ /// anwser to any query that was done. The implementation of this method should check if the response.Id == request.Id to see if they match (assuming the api has some sort of Id tracking on messages,
+ /// if not some other method has be implemented to match the messages).
+ /// If the messages match, the callResult out parameter should be set with the deserialized data in the from of (T) and return true.
+ ///
+ /// The type of response that is expected on the query
+ /// The socket connection
+ /// The request that a response is awaited for
+ /// The message received from the server
+ /// The interpretation (null if message wasn't a response to the request)
+ /// True if the message was a response to the query
+ protected internal abstract bool HandleQueryResponse(SocketConnection socketConnection, object request, JToken data, [NotNullWhen(true)] out CallResult? callResult);
+ ///
+ /// The socketConnection received data (the data JToken parameter). The implementation of this method should check if the received data is a response to the subscription request that was send (the request parameter).
+ /// For example; A subscribe request message is send with an Id parameter with value 10. The socket receives data and calls this method to see if the data it received is an
+ /// anwser to any subscription request that was done. The implementation of this method should check if the response.Id == request.Id to see if they match (assuming the api has some sort of Id tracking on messages,
+ /// if not some other method has be implemented to match the messages).
+ /// If the messages match, the callResult out parameter should be set with the deserialized data in the from of (T) and return true.
+ ///
+ /// The socket connection
+ /// A subscription that waiting for a subscription response
+ /// The request that the subscription sent
+ /// The message received from the server
+ /// The interpretation (null if message wasn't a response to the request)
+ /// True if the message was a response to the subscription request
+ protected internal abstract bool HandleSubscriptionResponse(SocketConnection socketConnection, SocketSubscription subscription, object request, JToken data, out CallResult? callResult);
+ ///
+ /// Needs to check if a received message matches a handler by request. After subscribing data message will come in. These data messages need to be matched to a specific connection
+ /// to pass the correct data to the correct handler. The implementation of this method should check if the message received matches the subscribe request that was sent.
+ ///
+ /// The socket connection the message was recieved on
+ /// The received data
+ /// The subscription request
+ /// True if the message is for the subscription which sent the request
+ protected internal abstract bool MessageMatchesHandler(SocketConnection socketConnection, JToken message, object request);
+ ///
+ /// Needs to check if a received message matches a handler by identifier. Generally used by GenericHandlers. For example; a generic handler is registered which handles ping messages
+ /// from the server. This method should check if the message received is a ping message and the identifer is the identifier of the GenericHandler
+ ///
+ /// The socket connection the message was recieved on
+ /// The received data
+ /// The string identifier of the handler
+ /// True if the message is for the handler which has the identifier
+ protected internal abstract bool MessageMatchesHandler(SocketConnection socketConnection, JToken message, string identifier);
+ ///
+ /// Needs to authenticate the socket so authenticated queries/subscriptions can be made on this socket connection
+ ///
+ /// The socket connection that should be authenticated
+ ///
+ protected internal abstract Task> AuthenticateSocketAsync(SocketConnection socketConnection);
+ ///
+ /// Needs to unsubscribe a subscription, typically by sending an unsubscribe request. If multiple subscriptions per socket is not allowed this can just return since the socket will be closed anyway
+ ///
+ /// The connection on which to unsubscribe
+ /// The subscription to unsubscribe
+ ///
+ protected internal abstract Task UnsubscribeAsync(SocketConnection connection, SocketSubscription subscriptionToUnsub);
+
+ ///
+ /// Optional handler to interpolate data before sending it to the handlers
+ ///
+ ///
+ ///
+ protected internal virtual JToken ProcessTokenData(JToken message)
+ {
+ return message;
+ }
+
+ ///
+ /// Add a subscription to a connection
+ ///
+ /// The type of data the subscription expects
+ /// The request of the subscription
+ /// The identifier of the subscription (can be null if request param is used)
+ /// Whether or not this is a user subscription (counts towards the max amount of handlers on a socket)
+ /// The socket connection the handler is on
+ /// The handler of the data received
+ /// Whether the subscription needs authentication
+ ///
+ protected virtual SocketSubscription? AddSubscription(object? request, string? identifier, bool userSubscription, SocketConnection connection, Action> dataHandler, bool authenticated)
+ {
+ void InternalHandler(MessageEvent messageEvent)
+ {
+ if (typeof(T) == typeof(string))
+ {
+ var stringData = (T)Convert.ChangeType(messageEvent.JsonData.ToString(), typeof(T));
+ dataHandler(new DataEvent(stringData, null, Options.OutputOriginalData ? messageEvent.OriginalData : null, messageEvent.ReceivedTimestamp));
+ return;
+ }
+
+ var desResult = Deserialize(messageEvent.JsonData);
+ if (!desResult)
+ {
+ _log.Write(LogLevel.Warning, $"Socket {connection.SocketId} Failed to deserialize data into type {typeof(T)}: {desResult.Error}");
+ return;
+ }
+
+ dataHandler(new DataEvent(desResult.Data, null, Options.OutputOriginalData ? messageEvent.OriginalData : null, messageEvent.ReceivedTimestamp));
+ }
+
+ var subscription = request == null
+ ? SocketSubscription.CreateForIdentifier(NextId(), identifier!, userSubscription, authenticated, InternalHandler)
+ : SocketSubscription.CreateForRequest(NextId(), request, userSubscription, authenticated, InternalHandler);
+ if (!connection.AddSubscription(subscription))
+ return null;
+ return subscription;
+ }
+
+ ///
+ /// Adds a generic message handler. Used for example to reply to ping requests
+ ///
+ /// The name of the request handler. Needs to be unique
+ /// The action to execute when receiving a message for this handler (checked by )
+ protected void AddGenericHandler(string identifier, Action action)
+ {
+ genericHandlers.Add(identifier, action);
+ var subscription = SocketSubscription.CreateForIdentifier(NextId(), identifier, false, false, action);
+ foreach (var connection in socketConnections.Values)
+ connection.AddSubscription(subscription);
+ }
+
+ ///
+ /// Get the url to connect to (defaults to BaseAddress form the client options)
+ ///
+ ///
+ ///
+ ///
+ ///
+ protected virtual Task> GetConnectionUrlAsync(SocketApiClient apiClient, string address, bool authentication)
+ {
+ return Task.FromResult(new CallResult(address));
+ }
+
+ ///
+ /// Get the url to reconnect to after losing a connection
+ ///
+ ///
+ ///
+ ///
+ public virtual Task GetReconnectUriAsync(SocketApiClient apiClient, SocketConnection connection)
+ {
+ return Task.FromResult(connection.ConnectionUri);
+ }
+
+ ///
+ /// Gets a connection for a new subscription or query. Can be an existing if there are open position or a new one.
+ ///
+ /// The API client the connection is for
+ /// The address the socket is for
+ /// Whether the socket should be authenticated
+ ///
+ protected virtual async Task> GetSocketConnection(SocketApiClient apiClient, string address, bool authenticated)
+ {
+ var socketResult = socketConnections.Where(s => (s.Value.Status == SocketConnection.SocketStatus.None || s.Value.Status == SocketConnection.SocketStatus.Connected)
+ && s.Value.Tag.TrimEnd('/') == address.TrimEnd('/')
+ && (s.Value.ApiClient.GetType() == apiClient.GetType())
+ && (s.Value.Authenticated == authenticated || !authenticated) && s.Value.Connected).OrderBy(s => s.Value.SubscriptionCount).FirstOrDefault();
+ var result = socketResult.Equals(default(KeyValuePair)) ? null : socketResult.Value;
+ if (result != null)
+ {
+ if (result.SubscriptionCount < Options.SocketSubscriptionsCombineTarget || (socketConnections.Count >= Options.MaxSocketConnections && socketConnections.All(s => s.Value.SubscriptionCount >= Options.SocketSubscriptionsCombineTarget)))
+ {
+ // Use existing socket if it has less than target connections OR it has the least connections and we can't make new
+ return new CallResult(result);
+ }
+ }
+
+ var connectionAddress = await GetConnectionUrlAsync(apiClient, address, authenticated).ConfigureAwait(false);
+ if (!connectionAddress)
+ {
+ _log.Write(LogLevel.Warning, $"Failed to determine connection url: " + connectionAddress.Error);
+ return connectionAddress.As(null);
+ }
+
+ if (connectionAddress.Data != address)
+ _log.Write(LogLevel.Debug, $"Connection address set to " + connectionAddress.Data);
+
+ // Create new socket
+ var socket = CreateSocket(connectionAddress.Data!);
+ var socketConnection = new SocketConnection(_log, apiClient, socket, address);
+ socketConnection.UnhandledMessage += HandleUnhandledMessage;
+ foreach (var kvp in genericHandlers)
+ {
+ var handler = SocketSubscription.CreateForIdentifier(NextId(), kvp.Key, false, false, kvp.Value);
+ socketConnection.AddSubscription(handler);
+ }
+
+ return new CallResult(socketConnection);
+ }
+
+ ///
+ /// Process an unhandled message
+ ///
+ /// The token that wasn't processed
+ protected virtual void HandleUnhandledMessage(JToken token)
+ {
+ }
+
+ ///
+ /// Connect a socket
+ ///
+ /// The socket to connect
+ ///
+ protected virtual async Task> ConnectSocketAsync(SocketConnection socketConnection)
+ {
+ if (await socketConnection.ConnectAsync().ConfigureAwait(false))
+ {
+ socketConnections.TryAdd(socketConnection.SocketId, socketConnection);
+ return new CallResult(true);
+ }
+
+ socketConnection.Dispose();
+ return new CallResult(new CantConnectError());
+ }
+
+ ///
+ /// Get parameters for the websocket connection
+ ///
+ /// The address to connect to
+ ///
+ protected virtual WebSocketParameters GetWebSocketParameters(string address)
+ => new(new Uri(address), Options.AutoReconnect)
+ {
+ DataInterpreterBytes = dataInterpreterBytes,
+ DataInterpreterString = dataInterpreterString,
+ KeepAliveInterval = KeepAliveInterval,
+ ReconnectInterval = Options.ReconnectInterval,
+ RatelimitPerSecond = RateLimitPerSocketPerSecond,
+ Proxy = Options.Proxy,
+ Timeout = Options.SocketNoDataTimeout
+ };
+
+ ///
+ /// Create a socket for an address
+ ///
+ /// The address the socket should connect to
+ ///
+ protected virtual IWebsocket CreateSocket(string address)
+ {
+ var socket = SocketFactory.CreateWebsocket(_log, GetWebSocketParameters(address));
+ _log.Write(LogLevel.Debug, $"Socket {socket.Id} new socket created for " + address);
+ return socket;
+ }
+
+ ///
+ /// Periodically sends data over a socket connection
+ ///
+ /// Identifier for the periodic send
+ /// How often
+ /// Method returning the object to send
+ public virtual void SendPeriodic(string identifier, TimeSpan interval, Func objGetter)
+ {
+ if (objGetter == null)
+ throw new ArgumentNullException(nameof(objGetter));
+
+ periodicEvent = new AsyncResetEvent();
+ periodicTask = Task.Run(async () =>
+ {
+ while (!_disposing)
+ {
+ await periodicEvent.WaitAsync(interval).ConfigureAwait(false);
+ if (_disposing)
+ break;
+
+ foreach (var socketConnection in socketConnections.Values)
+ {
+ if (_disposing)
+ break;
+
+ if (!socketConnection.Connected)
+ continue;
+
+ var obj = objGetter(socketConnection);
+ if (obj == null)
+ continue;
+
+ _log.Write(LogLevel.Trace, $"Socket {socketConnection.SocketId} sending periodic {identifier}");
+
+ try
+ {
+ socketConnection.Send(obj);
+ }
+ catch (Exception ex)
+ {
+ _log.Write(LogLevel.Warning, $"Socket {socketConnection.SocketId} Periodic send {identifier} failed: " + ex.ToLogString());
+ }
+ }
+ }
+ });
+ }
+
+ ///
+ /// Unsubscribe an update subscription
+ ///
+ /// The id of the subscription to unsubscribe
+ ///
+ public virtual async Task UnsubscribeAsync(int subscriptionId)
+ {
+ SocketSubscription? subscription = null;
+ SocketConnection? connection = null;
+ foreach (var socket in socketConnections.Values.ToList())
+ {
+ subscription = socket.GetSubscription(subscriptionId);
+ if (subscription != null)
+ {
+ connection = socket;
+ break;
+ }
+ }
+
+ if (subscription == null || connection == null)
+ return false;
+
+ _log.Write(LogLevel.Information, $"Socket {connection.SocketId} Unsubscribing subscription " + subscriptionId);
+ await connection.CloseAsync(subscription).ConfigureAwait(false);
+ return true;
+ }
+
+ ///
+ /// Unsubscribe an update subscription
+ ///
+ /// The subscription to unsubscribe
+ ///
+ public virtual async Task UnsubscribeAsync(UpdateSubscription subscription)
+ {
+ if (subscription == null)
+ throw new ArgumentNullException(nameof(subscription));
+
+ _log.Write(LogLevel.Information, $"Socket {subscription.SocketId} Unsubscribing subscription " + subscription.Id);
+ await subscription.CloseAsync().ConfigureAwait(false);
+ }
+
+ ///
+ /// Unsubscribe all subscriptions
+ ///
+ ///
+ public virtual async Task UnsubscribeAllAsync()
+ {
+ _log.Write(LogLevel.Information, $"Unsubscribing all {socketConnections.Sum(s => s.Value.SubscriptionCount)} subscriptions");
+ var tasks = new List();
+ {
+ var socketList = socketConnections.Values;
+ foreach (var sub in socketList)
+ tasks.Add(sub.CloseAsync());
+ }
+
+ await Task.WhenAll(tasks.ToArray()).ConfigureAwait(false);
+ }
+
+ ///
+ /// Reconnect all connections
+ ///
+ ///
+ public virtual async Task ReconnectAsync()
+ {
+ _log.Write(LogLevel.Information, $"Reconnecting all {socketConnections.Count} connections");
+ var tasks = new List();
+ {
+ var socketList = socketConnections.Values;
+ foreach (var sub in socketList)
+ tasks.Add(sub.TriggerReconnectAsync());
+ }
+
+ await Task.WhenAll(tasks.ToArray()).ConfigureAwait(false);
+ }
+
+ ///
+ /// Log the current state of connections and subscriptions
+ ///
+ public string GetSubscriptionsState()
+ {
+ var sb = new StringBuilder();
+ sb.AppendLine($"{socketConnections.Count} connections, {CurrentSubscriptions} subscriptions, kbps: {IncomingKbps}");
+ foreach (var connection in socketConnections)
+ {
+ sb.AppendLine($" Connection {connection.Key}: {connection.Value.SubscriptionCount} subscriptions, status: {connection.Value.Status}, authenticated: {connection.Value.Authenticated}, kbps: {connection.Value.IncomingKbps}");
+ foreach (var subscription in connection.Value.Subscriptions)
+ sb.AppendLine($" Subscription {subscription.Id}, authenticated: {subscription.Authenticated}, confirmed: {subscription.Confirmed}");
+ }
+ return sb.ToString();
+ }
+
+ ///
+ /// Dispose the client
+ ///
+ public override void Dispose()
+ {
+ _disposing = true;
+ periodicEvent?.Set();
+ periodicEvent?.Dispose();
+ if (socketConnections.Sum(s => s.Value.SubscriptionCount) > 0)
+ {
+ _log.Write(LogLevel.Debug, "Disposing socket client, closing all subscriptions");
+ _ = UnsubscribeAllAsync();
+ }
+ semaphoreSlim?.Dispose();
+ base.Dispose();
+ }
}
}
diff --git a/CryptoExchange.Net/Objects/Options.cs b/CryptoExchange.Net/Objects/Options.cs
index 5c727cc..b00f3d1 100644
--- a/CryptoExchange.Net/Objects/Options.cs
+++ b/CryptoExchange.Net/Objects/Options.cs
@@ -9,10 +9,7 @@ using Microsoft.Extensions.Logging;
namespace CryptoExchange.Net.Objects
{
- ///
- /// Base options, applicable to everything
- ///
- public class BaseOptions
+ public class ClientOptions
{
internal event Action? OnLoggingChanged;
@@ -44,195 +41,27 @@ namespace CryptoExchange.Net.Objects
}
}
- ///
- /// If true, the CallResult and DataEvent objects will also include the originally received json data in the OriginalData property
- ///
- public bool OutputOriginalData { get; set; } = false;
-
- ///
- /// ctor
- ///
- public BaseOptions(): this(null)
- {
- }
-
- ///
- /// ctor
- ///
- /// Copy options from these options to the new options
- public BaseOptions(BaseOptions? baseOptions)
- {
- if (baseOptions == null)
- return;
-
- LogLevel = baseOptions.LogLevel;
- LogWriters = baseOptions.LogWriters.ToList();
- OutputOriginalData = baseOptions.OutputOriginalData;
- }
-
- ///
- public override string ToString()
- {
- return $"LogLevel: {LogLevel}, Writers: {LogWriters.Count}, OutputOriginalData: {OutputOriginalData}";
- }
- }
-
- ///
- /// Client options, for both the socket and rest clients
- ///
- public class BaseClientOptions : BaseOptions
- {
///
/// Proxy to use when connecting
///
public ApiProxy? Proxy { get; set; }
- ///
- /// Api credentials to be used for signing requests to private endpoints. These credentials will be used for each API in the client, unless overriden in the API options
- ///
- public ApiCredentials? ApiCredentials { get; set; }
-
///
/// ctor
///
- public BaseClientOptions() : this(null)
+ /// Copy values for the provided options
+ /// Copy values for the provided options
+ public ClientOptions(ClientOptions baseOptions, ClientOptions? newValues)
{
- }
-
- ///
- /// ctor
- ///
- /// Copy options from these options to the new options
- public BaseClientOptions(BaseClientOptions? baseOptions) : base(baseOptions)
- {
- if (baseOptions == null)
- return;
-
- Proxy = baseOptions.Proxy;
- ApiCredentials = baseOptions.ApiCredentials?.Copy();
+ Proxy = newValues?.Proxy ?? baseOptions.Proxy;
+ LogLevel = baseOptions.LogLevel;
+ LogWriters = baseOptions.LogWriters.ToList();
}
///
public override string ToString()
{
- return $"{base.ToString()}, Proxy: {(Proxy == null ? "-" : Proxy.Host)}, Base.ApiCredentials: {(ApiCredentials == null ? "-" : "set")}";
- }
- }
-
- ///
- /// Rest client options
- ///
- public class BaseRestClientOptions : BaseClientOptions
- {
- ///
- /// The time the server has to respond to a request before timing out
- ///
- public TimeSpan RequestTimeout { get; set; } = TimeSpan.FromSeconds(30);
-
- ///
- /// Http client to use. If a HttpClient is provided in this property the RequestTimeout and Proxy options provided in these options will be ignored in requests and should be set on the provided HttpClient instance
- ///
- public HttpClient? HttpClient { get; set; }
-
- ///
- /// ctor
- ///
- public BaseRestClientOptions(): this(null)
- {
- }
-
- ///
- /// ctor
- ///
- /// Copy options from these options to the new options
- public BaseRestClientOptions(BaseRestClientOptions? baseOptions): base(baseOptions)
- {
- if (baseOptions == null)
- return;
-
- HttpClient = baseOptions.HttpClient;
- RequestTimeout = baseOptions.RequestTimeout;
- }
-
- ///
- public override string ToString()
- {
- return $"{base.ToString()}, RequestTimeout: {RequestTimeout:c}, HttpClient: {(HttpClient == null ? "-" : "set")}";
- }
- }
-
- ///
- /// Socket client options
- ///
- public class BaseSocketClientOptions : BaseClientOptions
- {
- ///
- /// Whether or not the socket should automatically reconnect when losing connection
- ///
- public bool AutoReconnect { get; set; } = true;
-
- ///
- /// Time to wait between reconnect attempts
- ///
- public TimeSpan ReconnectInterval { get; set; } = TimeSpan.FromSeconds(5);
-
- ///
- /// Max number of concurrent resubscription tasks per socket after reconnecting a socket
- ///
- public int MaxConcurrentResubscriptionsPerSocket { get; set; } = 5;
-
- ///
- /// The max time to wait for a response after sending a request on the socket before giving a timeout
- ///
- public TimeSpan SocketResponseTimeout { get; set; } = TimeSpan.FromSeconds(10);
-
- ///
- /// The max time of not receiving any data after which the connection is assumed to be dropped. This can only be used for socket connections where a steady flow of data is expected,
- /// for example when the server sends intermittent ping requests
- ///
- public TimeSpan SocketNoDataTimeout { get; set; }
-
- ///
- /// The amount of subscriptions that should be made on a single socket connection. Not all API's support multiple subscriptions on a single socket.
- /// Setting this to a higher number increases subscription speed because not every subscription needs to connect to the server, but having more subscriptions on a
- /// single connection will also increase the amount of traffic on that single connection, potentially leading to issues.
- ///
- public int? SocketSubscriptionsCombineTarget { get; set; }
-
- ///
- /// The max amount of connections to make to the server. Can be used for API's which only allow a certain number of connections. Changing this to a high value might cause issues.
- ///
- public int? MaxSocketConnections { get; set; }
-
- ///
- /// ctor
- ///
- public BaseSocketClientOptions(): this(null)
- {
- }
-
- ///
- /// ctor
- ///
- /// Copy options from these options to the new options
- public BaseSocketClientOptions(BaseSocketClientOptions? baseOptions): base(baseOptions)
- {
- if (baseOptions == null)
- return;
-
- AutoReconnect = baseOptions.AutoReconnect;
- ReconnectInterval = baseOptions.ReconnectInterval;
- MaxConcurrentResubscriptionsPerSocket = baseOptions.MaxConcurrentResubscriptionsPerSocket;
- SocketResponseTimeout = baseOptions.SocketResponseTimeout;
- SocketNoDataTimeout = baseOptions.SocketNoDataTimeout;
- SocketSubscriptionsCombineTarget = baseOptions.SocketSubscriptionsCombineTarget;
- MaxSocketConnections = baseOptions.MaxSocketConnections;
- }
-
- ///
- public override string ToString()
- {
- return $"{base.ToString()}, AutoReconnect: {AutoReconnect}, ReconnectInterval: {ReconnectInterval}, MaxConcurrentResubscriptionsPerSocket: {MaxConcurrentResubscriptionsPerSocket}, SocketResponseTimeout: {SocketResponseTimeout:c}, SocketNoDataTimeout: {SocketNoDataTimeout}, SocketSubscriptionsCombineTarget: {SocketSubscriptionsCombineTarget}, MaxSocketConnections: {MaxSocketConnections}";
+ return $"LogLevel: {LogLevel}, Writers: {LogWriters.Count}, Proxy: {(Proxy == null ? "-" : Proxy.Host)}";
}
}
@@ -241,6 +70,11 @@ namespace CryptoExchange.Net.Objects
///
public class ApiClientOptions
{
+ ///
+ /// If true, the CallResult and DataEvent objects will also include the originally received json data in the OriginalData property
+ ///
+ public bool OutputOriginalData { get; set; } = false;
+
///
/// The base address of the API
///
@@ -278,12 +112,13 @@ namespace CryptoExchange.Net.Objects
{
BaseAddress = newValues?.BaseAddress ?? baseOptions.BaseAddress;
ApiCredentials = newValues?.ApiCredentials?.Copy() ?? baseOptions.ApiCredentials?.Copy();
+ OutputOriginalData = baseOptions.OutputOriginalData;
}
///
public override string ToString()
{
- return $"Credentials: {(ApiCredentials == null ? "-" : "Set")}, BaseAddress: {BaseAddress}";
+ return $"OutputOriginalData: {OutputOriginalData}, Credentials: {(ApiCredentials == null ? "-" : "Set")}, BaseAddress: {BaseAddress}";
}
}
@@ -292,6 +127,16 @@ namespace CryptoExchange.Net.Objects
///
public class RestApiClientOptions: ApiClientOptions
{
+ ///
+ /// The time the server has to respond to a request before timing out
+ ///
+ public TimeSpan RequestTimeout { get; set; } = TimeSpan.FromSeconds(30);
+
+ ///
+ /// Http client to use. If a HttpClient is provided in this property the RequestTimeout and Proxy options provided in these options will be ignored in requests and should be set on the provided HttpClient instance
+ ///
+ public HttpClient? HttpClient { get; set; }
+
///
/// List of rate limiters to use
///
@@ -334,6 +179,8 @@ namespace CryptoExchange.Net.Objects
/// Copy values for the provided options
public RestApiClientOptions(RestApiClientOptions baseOn, RestApiClientOptions? newValues): base(baseOn, newValues)
{
+ HttpClient = newValues?.HttpClient ?? baseOn.HttpClient;
+ RequestTimeout = newValues == default ? baseOn.RequestTimeout : newValues.RequestTimeout;
RateLimitingBehaviour = newValues?.RateLimitingBehaviour ?? baseOn.RateLimitingBehaviour;
AutoTimestamp = newValues?.AutoTimestamp ?? baseOn.AutoTimestamp;
TimestampRecalculationInterval = newValues?.TimestampRecalculationInterval ?? baseOn.TimestampRecalculationInterval;
@@ -343,14 +190,98 @@ namespace CryptoExchange.Net.Objects
///
public override string ToString()
{
- return $"{base.ToString()}, RateLimiters: {RateLimiters?.Count}, RateLimitBehaviour: {RateLimitingBehaviour}, AutoTimestamp: {AutoTimestamp}, TimestampRecalculationInterval: {TimestampRecalculationInterval}";
+ return $"{base.ToString()}, RequestTimeout: {RequestTimeout:c}, HttpClient: {(HttpClient == null ? "-" : "set")}, RateLimiters: {RateLimiters?.Count}, RateLimitBehaviour: {RateLimitingBehaviour}, AutoTimestamp: {AutoTimestamp}, TimestampRecalculationInterval: {TimestampRecalculationInterval}";
+ }
+ }
+
+ ///
+ /// Rest API client options
+ ///
+ public class SocketApiClientOptions : ApiClientOptions
+ {
+ ///
+ /// Whether or not the socket should automatically reconnect when losing connection
+ ///
+ public bool AutoReconnect { get; set; } = true;
+
+ ///
+ /// Time to wait between reconnect attempts
+ ///
+ public TimeSpan ReconnectInterval { get; set; } = TimeSpan.FromSeconds(5);
+
+ ///
+ /// Max number of concurrent resubscription tasks per socket after reconnecting a socket
+ ///
+ public int MaxConcurrentResubscriptionsPerSocket { get; set; } = 5;
+
+ ///
+ /// The max time to wait for a response after sending a request on the socket before giving a timeout
+ ///
+ public TimeSpan SocketResponseTimeout { get; set; } = TimeSpan.FromSeconds(10);
+
+ ///
+ /// The max time of not receiving any data after which the connection is assumed to be dropped. This can only be used for socket connections where a steady flow of data is expected,
+ /// for example when the server sends intermittent ping requests
+ ///
+ public TimeSpan SocketNoDataTimeout { get; set; }
+
+ ///
+ /// The amount of subscriptions that should be made on a single socket connection. Not all API's support multiple subscriptions on a single socket.
+ /// Setting this to a higher number increases subscription speed because not every subscription needs to connect to the server, but having more subscriptions on a
+ /// single connection will also increase the amount of traffic on that single connection, potentially leading to issues.
+ ///
+ public int? SocketSubscriptionsCombineTarget { get; set; }
+
+ ///
+ /// The max amount of connections to make to the server. Can be used for API's which only allow a certain number of connections. Changing this to a high value might cause issues.
+ ///
+ public int? MaxSocketConnections { get; set; }
+
+ ///
+ /// ctor
+ ///
+ public SocketApiClientOptions()
+ {
+ }
+
+ ///
+ /// ctor
+ ///
+ /// Base address for the API
+ public SocketApiClientOptions(string baseAddress) : base(baseAddress)
+ {
+ }
+
+ ///
+ /// ctor
+ ///
+ /// Copy values for the provided options
+ /// Copy values for the provided options
+ public SocketApiClientOptions(SocketApiClientOptions baseOptions, SocketApiClientOptions? newValues) : base(baseOptions, newValues)
+ {
+ if (baseOptions == null)
+ return;
+
+ AutoReconnect = baseOptions.AutoReconnect;
+ ReconnectInterval = baseOptions.ReconnectInterval;
+ MaxConcurrentResubscriptionsPerSocket = baseOptions.MaxConcurrentResubscriptionsPerSocket;
+ SocketResponseTimeout = baseOptions.SocketResponseTimeout;
+ SocketNoDataTimeout = baseOptions.SocketNoDataTimeout;
+ SocketSubscriptionsCombineTarget = baseOptions.SocketSubscriptionsCombineTarget;
+ MaxSocketConnections = baseOptions.MaxSocketConnections;
+ }
+
+ ///
+ public override string ToString()
+ {
+ return $"{base.ToString()}, AutoReconnect: {AutoReconnect}, ReconnectInterval: {ReconnectInterval}, MaxConcurrentResubscriptionsPerSocket: {MaxConcurrentResubscriptionsPerSocket}, SocketResponseTimeout: {SocketResponseTimeout:c}, SocketNoDataTimeout: {SocketNoDataTimeout}, SocketSubscriptionsCombineTarget: {SocketSubscriptionsCombineTarget}, MaxSocketConnections: {MaxSocketConnections}";
}
}
///
/// Base for order book options
///
- public class OrderBookOptions : BaseOptions
+ public class OrderBookOptions : ApiClientOptions
{
///
/// Whether or not checksum validation is enabled. Default is true, disabling will ignore checksum messages.
diff --git a/CryptoExchange.Net/Sockets/SocketConnection.cs b/CryptoExchange.Net/Sockets/SocketConnection.cs
index 703d358..a33fd09 100644
--- a/CryptoExchange.Net/Sockets/SocketConnection.cs
+++ b/CryptoExchange.Net/Sockets/SocketConnection.cs
@@ -149,7 +149,6 @@ namespace CryptoExchange.Net.Sockets
private readonly object subscriptionLock = new();
private readonly Log log;
- private readonly BaseSocketClient socketClient;
private readonly List pendingRequests;
@@ -163,14 +162,13 @@ namespace CryptoExchange.Net.Sockets
///
/// New socket connection
///
- /// The socket client
+ /// The logger
/// The api client
/// The socket
///
- public SocketConnection(BaseSocketClient client, SocketApiClient apiClient, IWebsocket socket, string tag)
+ public SocketConnection(Log log, SocketApiClient apiClient, IWebsocket socket, string tag)
{
- log = client.log;
- socketClient = client;
+ this.log = log;
ApiClient = apiClient;
Tag = tag;
@@ -234,7 +232,7 @@ namespace CryptoExchange.Net.Sockets
///
protected virtual async Task GetReconnectionUrlAsync()
{
- return await socketClient.GetReconnectUriAsync(ApiClient, this).ConfigureAwait(false);
+ return await ApiClient.GetReconnectUriAsync(ApiClient, this).ConfigureAwait(false);
}
///
@@ -315,7 +313,7 @@ namespace CryptoExchange.Net.Sockets
lock (pendingRequests)
pendingRequests.Remove(pendingRequest);
- if (!socketClient.ContinueOnQueryResponse)
+ if (!ApiClient.ContinueOnQueryResponse)
return;
handledResponse = true;
@@ -324,11 +322,11 @@ namespace CryptoExchange.Net.Sockets
}
// Message was not a request response, check data handlers
- var messageEvent = new MessageEvent(this, tokenData, socketClient.ClientOptions.OutputOriginalData ? data : null, timestamp);
+ var messageEvent = new MessageEvent(this, tokenData, ApiClient.Options.OutputOriginalData ? data : null, timestamp);
var (handled, userProcessTime, subscription) = HandleData(messageEvent);
if (!handled && !handledResponse)
{
- if (!socketClient.UnhandledMessageExpected)
+ if (!ApiClient.UnhandledMessageExpected)
log.Write(LogLevel.Warning, $"Socket {SocketId} Message not handled: " + tokenData);
UnhandledMessage?.Invoke(tokenData);
}
@@ -368,8 +366,8 @@ namespace CryptoExchange.Net.Sockets
if (Status == SocketStatus.Closed || Status == SocketStatus.Disposed)
return;
- if (socketClient.socketConnections.ContainsKey(SocketId))
- socketClient.socketConnections.TryRemove(SocketId, out _);
+ if (ApiClient.socketConnections.ContainsKey(SocketId))
+ ApiClient.socketConnections.TryRemove(SocketId, out _);
lock (subscriptionLock)
{
@@ -407,7 +405,7 @@ namespace CryptoExchange.Net.Sockets
subscription.CancellationTokenRegistration.Value.Dispose();
if (subscription.Confirmed && _socket.IsOpen)
- await socketClient.UnsubscribeAsync(this, subscription).ConfigureAwait(false);
+ await ApiClient.UnsubscribeAsync(this, subscription).ConfigureAwait(false);
bool shouldCloseConnection;
lock (subscriptionLock)
@@ -504,7 +502,7 @@ namespace CryptoExchange.Net.Sockets
currentSubscription = subscription;
if (subscription.Request == null)
{
- if (socketClient.MessageMatchesHandler(this, messageEvent.JsonData, subscription.Identifier!))
+ if (ApiClient.MessageMatchesHandler(this, messageEvent.JsonData, subscription.Identifier!))
{
handled = true;
var userSw = Stopwatch.StartNew();
@@ -515,10 +513,10 @@ namespace CryptoExchange.Net.Sockets
}
else
{
- if (socketClient.MessageMatchesHandler(this, messageEvent.JsonData, subscription.Request))
+ if (ApiClient.MessageMatchesHandler(this, messageEvent.JsonData, subscription.Request))
{
handled = true;
- messageEvent.JsonData = socketClient.ProcessTokenData(messageEvent.JsonData);
+ messageEvent.JsonData = ApiClient.ProcessTokenData(messageEvent.JsonData);
var userSw = Stopwatch.StartNew();
subscription.MessageHandler(messageEvent);
userSw.Stop();
@@ -611,7 +609,7 @@ namespace CryptoExchange.Net.Sockets
if (subscriptions.Any(s => s.Authenticated))
{
// If we reconnected a authenticated connection we need to re-authenticate
- var authResult = await socketClient.AuthenticateSocketAsync(this).ConfigureAwait(false);
+ var authResult = await ApiClient.AuthenticateSocketAsync(this).ConfigureAwait(false);
if (!authResult)
{
log.Write(LogLevel.Warning, $"Socket {SocketId} authentication failed on reconnected socket. Disconnecting and reconnecting.");
@@ -636,14 +634,14 @@ namespace CryptoExchange.Net.Sockets
}
// Foreach subscription which is subscribed by a subscription request we will need to resend that request to resubscribe
- for (var i = 0; i < subscriptionList.Count; i += socketClient.ClientOptions.MaxConcurrentResubscriptionsPerSocket)
+ for (var i = 0; i < subscriptionList.Count; i += ApiClient.Options.MaxConcurrentResubscriptionsPerSocket)
{
if (!_socket.IsOpen)
return new CallResult(new WebError("Socket not connected"));
var taskList = new List>>();
- foreach (var subscription in subscriptionList.Skip(i).Take(socketClient.ClientOptions.MaxConcurrentResubscriptionsPerSocket))
- taskList.Add(socketClient.SubscribeAndWaitAsync(this, subscription.Request!, subscription));
+ foreach (var subscription in subscriptionList.Skip(i).Take(ApiClient.Options.MaxConcurrentResubscriptionsPerSocket))
+ taskList.Add(ApiClient.SubscribeAndWaitAsync(this, subscription.Request!, subscription));
await Task.WhenAll(taskList).ConfigureAwait(false);
if (taskList.Any(t => !t.Result.Success))
@@ -662,7 +660,7 @@ namespace CryptoExchange.Net.Sockets
internal async Task UnsubscribeAsync(SocketSubscription socketSubscription)
{
- await socketClient.UnsubscribeAsync(this, socketSubscription).ConfigureAwait(false);
+ await ApiClient.UnsubscribeAsync(this, socketSubscription).ConfigureAwait(false);
}
internal async Task> ResubscribeAsync(SocketSubscription socketSubscription)
@@ -670,7 +668,7 @@ namespace CryptoExchange.Net.Sockets
if (!_socket.IsOpen)
return new CallResult(new UnknownError("Socket is not connected"));
- return await socketClient.SubscribeAndWaitAsync(this, socketSubscription.Request!, socketSubscription).ConfigureAwait(false);
+ return await ApiClient.SubscribeAndWaitAsync(this, socketSubscription.Request!, socketSubscription).ConfigureAwait(false);
}
///