using CryptoExchange.Net.Converters.MessageParsing.DynamicConverters;
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Interfaces.Clients;
using CryptoExchange.Net.Logging.Extensions;
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Errors;
using CryptoExchange.Net.Objects.Options;
using CryptoExchange.Net.Objects.Sockets;
using CryptoExchange.Net.RateLimiting;
using CryptoExchange.Net.RateLimiting.Interfaces;
using CryptoExchange.Net.Sockets;
using CryptoExchange.Net.Sockets.Default;
using CryptoExchange.Net.Sockets.Default.Interfaces;
using CryptoExchange.Net.Sockets.HighPerf;
using CryptoExchange.Net.Sockets.HighPerf.Interfaces;
using CryptoExchange.Net.Sockets.Interfaces;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Net.WebSockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Clients
{
///
/// Base socket API client for interaction with a websocket API
///
public abstract class SocketApiClient : BaseApiClient, ISocketApiClient
{
#region Fields
///
public IWebsocketFactory SocketFactory { get; set; } = new WebsocketFactory();
///
public IHighPerfConnectionFactory? HighPerfConnectionFactory { get; set; }
///
/// List of socket connections currently connecting/connected
///
protected internal ConcurrentDictionary _socketConnections = new();
///
/// List of HighPerf socket connections currently connecting/connected
///
protected internal ConcurrentDictionary _highPerfSocketConnections = new();
///
/// Semaphore used while creating sockets
///
protected internal readonly SemaphoreSlim semaphoreSlim = new(1);
///
/// Keep alive interval for websocket connection
///
protected TimeSpan KeepAliveInterval { get; set; } = TimeSpan.FromSeconds(10);
///
/// Keep alive timeout for websocket connection
///
protected TimeSpan KeepAliveTimeout { get; set; } = TimeSpan.FromSeconds(10);
///
/// Handlers for data from the socket which doesn't need to be forwarded to the caller. Ping or welcome messages for example.
///
protected List systemSubscriptions = new();
///
/// If a message is received on the socket which is not handled by a handler this boolean determines whether this logs an error message
///
protected internal bool UnhandledMessageExpected { get; set; }
///
/// The rate limiters
///
protected internal IRateLimitGate? RateLimiter { get; set; }
///
/// The max size a websocket message size can be
///
protected internal int? MessageSendSizeLimit { get; set; }
///
/// Periodic task registrations
///
protected List PeriodicTaskRegistrations { get; set; } = new List();
///
/// List of address to keep an alive connection to
///
protected List DedicatedConnectionConfigs { get; set; } = new List();
///
/// Whether to allow multiple subscriptions with the same topic on the same connection
///
protected bool AllowTopicsOnTheSameConnection { get; set; } = true;
///
public double IncomingKbps
{
get
{
if (_socketConnections.IsEmpty)
return 0;
return _socketConnections.Sum(s => s.Value.IncomingKbps);
}
}
///
public int CurrentConnections => _socketConnections.Count;
///
public int CurrentSubscriptions
{
get
{
if (_socketConnections.IsEmpty)
return 0;
return _socketConnections.Sum(s => s.Value.UserSubscriptionCount);
}
}
///
public new SocketExchangeOptions ClientOptions => (SocketExchangeOptions)base.ClientOptions;
///
public new SocketApiOptions ApiOptions => (SocketApiOptions)base.ApiOptions;
///
/// The max number of individual subscriptions on a single connection
///
public int? MaxIndividualSubscriptionsPerConnection { get; set; }
///
/// Whether or not to enforce that sequence number updates are always (lastSequenceNumber + 1)
///
public bool EnforceSequenceNumbers { get; set; }
#endregion
///
/// ctor
///
/// log
/// Client options
/// Base address for this API client
/// The Api client options
public SocketApiClient(ILogger logger, string baseAddress, SocketExchangeOptions options, SocketApiOptions apiOptions)
: base(logger,
apiOptions.OutputOriginalData ?? options.OutputOriginalData,
apiOptions.ApiCredentials ?? options.ApiCredentials,
baseAddress,
options,
apiOptions)
{
}
///
/// Create a serializer instance
///
///
protected internal abstract IMessageSerializer CreateSerializer();
///
/// Keep an open connection to this url
///
///
///
protected virtual void SetDedicatedConnection(string url, bool auth)
{
DedicatedConnectionConfigs.Add(new DedicatedConnectionConfig() { SocketAddress = url, Authenticated = auth });
}
///
/// Update the timestamp offset between client and server based on the timestamp
///
/// Timestamp received from the server
public virtual void UpdateTimeOffset(DateTime timestamp)
{
if (timestamp == default)
return;
TimeOffsetManager.UpdateSocketOffset(ClientName, (DateTime.UtcNow - timestamp).TotalMilliseconds);
}
///
/// Get the time offset between client and server
///
///
public virtual TimeSpan? GetTimeOffset() => TimeOffsetManager.GetSocketOffset(ClientName);
///
/// Add a query to periodically send on each connection
///
///
///
///
///
protected virtual void RegisterPeriodicQuery(string identifier, TimeSpan interval, Func queryDelegate, Action? callback)
{
PeriodicTaskRegistrations.Add(new PeriodicTaskRegistration
{
Identifier = identifier,
Callback = callback,
Interval = interval,
QueryDelegate = queryDelegate
});
}
///
/// Connect to an url and listen for data on the BaseAddress
///
/// The subscription
/// Cancellation token for closing this subscription
///
protected virtual Task> SubscribeAsync(Subscription subscription, CancellationToken ct)
{
return SubscribeAsync(BaseAddress, subscription, ct);
}
///
/// Connect to an url and listen for data
///
/// The URL to connect to
/// The subscription
/// Cancellation token for closing this subscription
///
protected virtual async Task> SubscribeAsync(string url, Subscription subscription, CancellationToken ct)
{
if (_disposing)
return new CallResult(new InvalidOperationError("Client disposed, can't subscribe"));
if (subscription.Authenticated && AuthenticationProvider == null)
{
_logger.LogWarning("Failed to subscribe, private subscription but no API credentials set");
return new CallResult(new NoApiCredentialsError());
}
if (subscription.IndividualSubscriptionCount > MaxIndividualSubscriptionsPerConnection)
return new CallResult(ArgumentError.Invalid("subscriptions", $"Max number of subscriptions in a single call is {MaxIndividualSubscriptionsPerConnection}"));
SocketConnection socketConnection;
var released = false;
// Wait for a semaphore here, so we only connect 1 socket at a time.
// This is necessary for being able to see if connections can be combined
try
{
await semaphoreSlim.WaitAsync(ct).ConfigureAwait(false);
}
catch (OperationCanceledException tce)
{
return new CallResult(new CancellationRequestedError(tce));
}
try
{
while (true)
{
// Get a new or existing socket connection
var socketResult = await GetSocketConnection(url, subscription.Authenticated, false, ct, subscription.Topic, subscription.IndividualSubscriptionCount).ConfigureAwait(false);
if (!socketResult)
return socketResult.As(null);
socketConnection = socketResult.Data;
// Add a subscription on the socket connection
var success = socketConnection.AddSubscription(subscription);
if (!success)
{
_logger.FailedToAddSubscriptionRetryOnDifferentConnection(socketConnection.SocketId);
continue;
}
if (ClientOptions.SocketSubscriptionsCombineTarget == 1)
{
// Only 1 subscription per connection, so no need to wait for connection since a new subscription will create a new connection anyway
semaphoreSlim.Release();
released = true;
}
var needsConnecting = !socketConnection.Connected;
var connectResult = await ConnectIfNeededAsync(socketConnection, subscription.Authenticated, ct).ConfigureAwait(false);
if (!connectResult)
return new CallResult(connectResult.Error!);
break;
}
}
finally
{
if (!released)
semaphoreSlim.Release();
}
if (socketConnection.PausedActivity)
{
_logger.HasBeenPausedCantSubscribeAtThisMoment(socketConnection.SocketId);
return new CallResult(new ServerError(new ErrorInfo(ErrorType.WebsocketPaused, "Socket is paused")));
}
var subscribeResult = await socketConnection.TrySubscribeAsync(subscription, true, ct).ConfigureAwait(false);
if (!subscribeResult)
return new CallResult(subscribeResult.Error!);
_logger.SubscriptionCompletedSuccessfully(socketConnection.SocketId, subscription.Id);
return new CallResult(new UpdateSubscription(socketConnection, subscription));
}
///
/// Connect to an url and listen for data
///
/// The URL to connect to
/// The subscription
/// The factory for creating a socket connection
/// Cancellation token for closing this subscription
///
protected virtual async Task> SubscribeHighPerfAsync(
string url,
HighPerfSubscription subscription,
IHighPerfConnectionFactory connectionFactory,
CancellationToken ct)
{
if (_disposing)
return new CallResult(new InvalidOperationError("Client disposed, can't subscribe"));
HighPerfSocketConnection socketConnection;
var released = false;
// Wait for a semaphore here, so we only connect 1 socket at a time.
// This is necessary for being able to see if connections can be combined
try
{
await semaphoreSlim.WaitAsync(ct).ConfigureAwait(false);
}
catch (OperationCanceledException tce)
{
return new CallResult(new CancellationRequestedError(tce));
}
try
{
while (true)
{
// Get a new or existing socket connection
var socketResult = await GetHighPerfSocketConnection(url, connectionFactory, ct).ConfigureAwait(false);
if (!socketResult)
return socketResult.As(null);
socketConnection = socketResult.Data;
// Add a subscription on the socket connection
var success = socketConnection.AddSubscription(subscription);
if (!success)
{
_logger.FailedToAddSubscriptionRetryOnDifferentConnection(socketConnection.SocketId);
continue;
}
if (ClientOptions.SocketSubscriptionsCombineTarget == 1)
{
// Only 1 subscription per connection, so no need to wait for connection since a new subscription will create a new connection anyway
semaphoreSlim.Release();
released = true;
}
var needsConnecting = !socketConnection.Connected;
var connectResult = await ConnectIfNeededAsync(socketConnection, false, ct).ConfigureAwait(false);
if (!connectResult)
return new CallResult(connectResult.Error!);
break;
}
}
finally
{
if (!released)
semaphoreSlim.Release();
}
var subRequest = subscription.CreateSubscriptionQuery(socketConnection);
if (subRequest != null)
{
// Send the request and wait for answer
var sendResult = await socketConnection.SendAsync(subRequest).ConfigureAwait(false);
if (!sendResult)
{
await socketConnection.CloseAsync().ConfigureAwait(false);
return new CallResult(sendResult.Error!);
}
}
if (ct != default)
{
subscription.CancellationTokenRegistration = ct.Register(async () =>
{
_logger.CancellationTokenSetClosingSubscription(socketConnection.SocketId, subscription.Id);
await socketConnection.CloseAsync().ConfigureAwait(false);
}, false);
}
_logger.SubscriptionCompletedSuccessfully(socketConnection.SocketId, subscription.Id);
return new CallResult(new HighPerfUpdateSubscription(socketConnection, subscription));
}
///
/// Send a query on a socket connection to the BaseAddress and wait for the response
///
/// Expected result type
/// The query
/// Cancellation token
///
protected virtual Task> QueryAsync(Query query, CancellationToken ct = default)
{
return QueryAsync(BaseAddress, query, ct);
}
///
/// Send a query on a socket connection and wait for the response
///
/// Expected result type
/// The url for the request
/// The query
/// Cancellation token
///
protected virtual async Task> QueryAsync(string url, Query query, CancellationToken ct = default)
{
if (_disposing)
return new CallResult(new InvalidOperationError("Client disposed, can't query"));
if (ct.IsCancellationRequested)
return new CallResult(new CancellationRequestedError());
SocketConnection socketConnection;
var released = false;
await semaphoreSlim.WaitAsync().ConfigureAwait(false);
try
{
var socketResult = await GetSocketConnection(url, query.Authenticated, true, ct).ConfigureAwait(false);
if (!socketResult)
return socketResult.As(default);
socketConnection = socketResult.Data;
if (ClientOptions.SocketSubscriptionsCombineTarget == 1)
{
// Can release early when only a single sub per connection
semaphoreSlim.Release();
released = true;
}
var connectResult = await ConnectIfNeededAsync(socketConnection, query.Authenticated, ct).ConfigureAwait(false);
if (!connectResult)
return new CallResult(connectResult.Error!);
}
finally
{
if (!released)
semaphoreSlim.Release();
}
if (socketConnection.PausedActivity)
{
_logger.HasBeenPausedCantSendQueryAtThisMoment(socketConnection.SocketId);
return new CallResult(new ServerError(new ErrorInfo(ErrorType.WebsocketPaused, "Socket is paused")));
}
if (ct.IsCancellationRequested)
return new CallResult(new CancellationRequestedError());
return await socketConnection.SendAndWaitQueryAsync(query, ct).ConfigureAwait(false);
}
///
/// Checks if a socket needs to be connected and does so if needed. Also authenticates on the socket if needed
///
/// The connection to check
/// Whether the socket should authenticated
/// Cancellation token
///
protected virtual async Task ConnectIfNeededAsync(ISocketConnection socket, bool authenticated, CancellationToken ct)
{
if (socket.Connected)
return CallResult.SuccessResult;
var connectResult = await ConnectSocketAsync(socket, ct).ConfigureAwait(false);
if (!connectResult)
return connectResult;
if (ClientOptions.DelayAfterConnect != TimeSpan.Zero)
await Task.Delay(ClientOptions.DelayAfterConnect).ConfigureAwait(false);
if (!authenticated || socket.Authenticated)
return CallResult.SuccessResult;
if (socket is not SocketConnection sc)
throw new InvalidOperationException("HighPerfSocketConnection not supported for authentication");
var result = await AuthenticateSocketAsync(sc).ConfigureAwait(false);
if (!result)
await socket.CloseAsync().ConfigureAwait(false);
return result;
}
///
/// Authenticate a socket connection
///
/// Socket to authenticate
///
public virtual async Task AuthenticateSocketAsync(SocketConnection socket)
{
if (AuthenticationProvider == null)
return new CallResult(new NoApiCredentialsError());
_logger.AttemptingToAuthenticate(socket.SocketId);
var authRequest = await GetAuthenticationRequestAsync(socket).ConfigureAwait(false);
if (authRequest != null)
{
var result = await socket.SendAndWaitQueryAsync(authRequest).ConfigureAwait(false);
if (!result)
{
_logger.AuthenticationFailed(socket.SocketId);
if (socket.Connected)
await socket.CloseAsync().ConfigureAwait(false);
result.Error!.Message = "Authentication failed: " + result.Error.Message;
return new CallResult(result.Error)!;
}
_logger.Authenticated(socket.SocketId);
}
socket.Authenticated = true;
return CallResult.SuccessResult;
}
///
/// Should return the request which can be used to authenticate a socket connection
///
///
protected internal virtual Task GetAuthenticationRequestAsync(SocketConnection connection) =>
Task.FromResult(AuthenticationProvider!.GetAuthenticationQuery(this, connection));
///
/// Adds a system subscription. Used for example to reply to ping requests
///
/// The subscription
protected void AddSystemSubscription(SystemSubscription systemSubscription)
{
systemSubscriptions.Add(systemSubscription);
foreach (var connection in _socketConnections.Values)
connection.AddSubscription(systemSubscription);
}
///
/// Get the url to connect to (defaults to BaseAddress form the client options)
///
///
///
///
protected virtual Task> GetConnectionUrlAsync(string address, bool authentication)
{
return Task.FromResult(new CallResult(address));
}
///
/// Get the url to reconnect to after losing a connection
///
///
///
protected internal virtual Task GetReconnectUriAsync(ISocketConnection connection)
{
return Task.FromResult(connection.ConnectionUri);
}
///
/// Update the subscription when the connection is restored after disconnecting. Can be used to update an authentication token for example.
///
/// The subscription
///
protected internal virtual Task RevitalizeRequestAsync(Subscription subscription)
{
return Task.FromResult(CallResult.SuccessResult);
}
///
/// Gets a connection for a new subscription or query. Can be an existing if there are open position or a new one.
///
/// The address the socket is for
/// Whether the socket should be authenticated
/// Whether a dedicated request connection should be returned
/// Cancellation token
/// The subscription topic, can be provided when multiple of the same topics are not allowed on a connection
/// The number of individual subscriptions in this subscribe request
///
protected virtual async Task> GetSocketConnection(
string address,
bool authenticated,
bool dedicatedRequestConnection,
CancellationToken ct,
string? topic = null,
int individualSubscriptionCount = 1)
{
var socketQuery = _socketConnections.Where(s => s.Value.Tag.TrimEnd('/') == address.TrimEnd('/')
&& s.Value.ApiClient.GetType() == GetType()
&& (AllowTopicsOnTheSameConnection || !s.Value.Topics.Contains(topic)))
.Select(x => x.Value)
.ToList();
// If all current socket connections are reconnecting or resubscribing wait for that to finish as we can probably use the existing connection
var delayStart = DateTime.UtcNow;
var delayed = false;
while (socketQuery.Count >= 1 && socketQuery.All(x => x.Status == SocketStatus.Reconnecting || x.Status == SocketStatus.Resubscribing))
{
if (DateTime.UtcNow - delayStart > TimeSpan.FromSeconds(10))
{
if (socketQuery.Count >= 1 && socketQuery.All(x => x.Status == SocketStatus.Reconnecting || x.Status == SocketStatus.Resubscribing))
{
// If after this time we still trying to reconnect/reprocess there is some issue in the connection
_logger.TimeoutWaitingForReconnectingSocket();
return new CallResult(new CantConnectError());
}
break;
}
delayed = true;
try { await Task.Delay(50, ct).ConfigureAwait(false); } catch (Exception) { }
if (ct.IsCancellationRequested)
return new CallResult(new CancellationRequestedError());
}
if (delayed)
_logger.WaitedForReconnectingSocket((long)(DateTime.UtcNow - delayStart).TotalMilliseconds);
socketQuery = socketQuery.Where(s => (s.Status == SocketStatus.None || s.Status == SocketStatus.Connected)
&& (s.Authenticated == authenticated || !authenticated)
&& s.Connected).ToList();
SocketConnection? connection;
if (!dedicatedRequestConnection)
{
connection = socketQuery.Where(s => !s.DedicatedRequestConnection.IsDedicatedRequestConnection).OrderBy(s => s.UserSubscriptionCount).FirstOrDefault();
}
else
{
connection = socketQuery.Where(s => s.DedicatedRequestConnection.IsDedicatedRequestConnection).FirstOrDefault();
if (connection != null && !connection.DedicatedRequestConnection.Authenticated)
// Mark dedicated request connection as authenticated if the request is authenticated
connection.DedicatedRequestConnection.Authenticated = authenticated;
if (connection == null)
// Fall back to an existing connection if there is no dedicated request connection available
connection = socketQuery.OrderBy(s => s.UserSubscriptionCount).FirstOrDefault();
}
bool maxConnectionsReached = _socketConnections.Count >= (ApiOptions.MaxSocketConnections ?? ClientOptions.MaxSocketConnections);
if (connection != null)
{
bool lessThanBatchSubCombineTarget = connection.UserSubscriptionCount < ClientOptions.SocketSubscriptionsCombineTarget;
bool lessThanIndividualSubCombineTarget = connection.Subscriptions.Sum(x => x.IndividualSubscriptionCount) < ClientOptions.SocketIndividualSubscriptionCombineTarget;
if ((lessThanBatchSubCombineTarget && lessThanIndividualSubCombineTarget)
|| maxConnectionsReached)
{
// Use existing socket if it has less than target connections OR it has the least connections and we can't make new
// If there is a max subscriptions per connection limit also only use existing if the new subscription doesn't go over the limit
if (MaxIndividualSubscriptionsPerConnection == null)
return new CallResult(connection);
var currentCount = connection.Subscriptions.Sum(x => x.IndividualSubscriptionCount);
if (currentCount + individualSubscriptionCount <= MaxIndividualSubscriptionsPerConnection)
return new CallResult(connection);
}
}
if (maxConnectionsReached)
return new CallResult(new InvalidOperationError("Max amount of socket connections reached"));
var connectionAddress = await GetConnectionUrlAsync(address, authenticated).ConfigureAwait(false);
if (!connectionAddress)
{
_logger.FailedToDetermineConnectionUrl(connectionAddress.Error?.ToString());
return connectionAddress.As(null);
}
if (connectionAddress.Data != address)
_logger.ConnectionAddressSetTo(connectionAddress.Data!);
// Create new socket connection
var socketConnection = new SocketConnection(_logger, SocketFactory, GetWebSocketParameters(connectionAddress.Data!), this, address);
socketConnection.ConnectRateLimitedAsync += HandleConnectRateLimitedAsync;
if (dedicatedRequestConnection)
{
socketConnection.DedicatedRequestConnection = new DedicatedConnectionState
{
IsDedicatedRequestConnection = dedicatedRequestConnection,
Authenticated = authenticated
};
}
foreach (var ptg in PeriodicTaskRegistrations)
socketConnection.QueryPeriodic(ptg.Identifier, ptg.Interval, ptg.QueryDelegate, ptg.Callback);
foreach (var systemSubscription in systemSubscriptions)
socketConnection.AddSubscription(systemSubscription);
return new CallResult(socketConnection);
}
///
/// Gets a connection for a new subscription or query. Can be an existing if there are open position or a new one.
///
/// The address the socket is for
/// The factory for creating a socket connection
/// Cancellation token
///
protected virtual async Task>> GetHighPerfSocketConnection(
string address,
IHighPerfConnectionFactory connectionFactory,
CancellationToken ct)
{
var connectionAddress = await GetConnectionUrlAsync(address, false).ConfigureAwait(false);
if (!connectionAddress)
{
_logger.FailedToDetermineConnectionUrl(connectionAddress.Error?.ToString());
return connectionAddress.As>(null);
}
if (connectionAddress.Data != address)
_logger.ConnectionAddressSetTo(connectionAddress.Data!);
// Create new socket connection
var socketConnection = connectionFactory.CreateHighPerfConnection(_logger, SocketFactory, GetWebSocketParameters(connectionAddress.Data!), this, address);
foreach (var ptg in PeriodicTaskRegistrations)
socketConnection.QueryPeriodic(ptg.Identifier, ptg.Interval, (con) => ptg.QueryDelegate(con).Request);
return new CallResult>(socketConnection);
}
///
/// Process an unhandled message
///
/// The socket connection
/// The type as identified
/// The data
protected internal virtual bool HandleUnhandledMessage(SocketConnection connection, string typeIdentifier, ReadOnlySpan data) => false;
///
/// Process connect rate limited
///
protected async virtual Task HandleConnectRateLimitedAsync()
{
if (ClientOptions.RateLimiterEnabled && ClientOptions.ConnectDelayAfterRateLimited.HasValue)
{
var retryAfter = DateTime.UtcNow.Add(ClientOptions.ConnectDelayAfterRateLimited.Value);
_logger.AddingRetryAfterGuard(retryAfter);
RateLimiter ??= new RateLimitGate("Connection");
await RateLimiter.SetRetryAfterGuardAsync(retryAfter, RateLimitItemType.Connection).ConfigureAwait(false);
}
}
///
/// Connect a socket
///
/// The socket to connect
/// Cancellation token
///
protected virtual async Task ConnectSocketAsync(ISocketConnection socketConnection, CancellationToken ct)
{
var connectResult = await socketConnection.ConnectAsync(ct).ConfigureAwait(false);
if (connectResult)
{
if (socketConnection is SocketConnection sc)
_socketConnections.TryAdd(socketConnection.SocketId, sc);
else if (socketConnection is HighPerfSocketConnection hsc)
_highPerfSocketConnections.TryAdd(socketConnection.SocketId, hsc);
return connectResult;
}
socketConnection.Dispose();
return connectResult;
}
///
/// Get parameters for the websocket connection
///
/// The address to connect to
///
protected virtual WebSocketParameters GetWebSocketParameters(string address)
=> new(new Uri(address), ClientOptions.ReconnectPolicy)
{
KeepAliveInterval = KeepAliveInterval,
KeepAliveTimeout = KeepAliveTimeout,
ReconnectInterval = ClientOptions.ReconnectInterval,
RateLimiter = ClientOptions.RateLimiterEnabled ? RateLimiter : null,
RateLimitingBehavior = ClientOptions.RateLimitingBehaviour,
Proxy = ClientOptions.Proxy,
Timeout = ApiOptions.SocketNoDataTimeout ?? ClientOptions.SocketNoDataTimeout,
ReceiveBufferSize = ClientOptions.ReceiveBufferSize,
};
///
/// Unsubscribe an update subscription
///
/// The id of the subscription to unsubscribe
///
public virtual async Task UnsubscribeAsync(int subscriptionId)
{
Subscription? subscription = null;
SocketConnection? connection = null;
foreach (var socket in _socketConnections.Values.ToList())
{
subscription = socket.GetSubscription(subscriptionId);
if (subscription != null)
{
connection = socket;
break;
}
}
if (subscription == null || connection == null)
return false;
_logger.UnsubscribingSubscription(connection.SocketId, subscriptionId);
await connection.CloseAsync(subscription).ConfigureAwait(false);
return true;
}
///
/// Unsubscribe an update subscription
///
/// The subscription to unsubscribe
///
public virtual async Task UnsubscribeAsync(UpdateSubscription subscription)
{
if (subscription == null)
throw new ArgumentNullException(nameof(subscription));
_logger.UnsubscribingSubscription(subscription.SocketId, subscription.Id);
await subscription.CloseAsync().ConfigureAwait(false);
}
///
/// Unsubscribe all subscriptions
///
///
public virtual async Task UnsubscribeAllAsync()
{
var sum = _socketConnections.Sum(s => s.Value.UserSubscriptionCount) + _highPerfSocketConnections.Sum(s => s.Value.UserSubscriptionCount);
if (sum == 0)
return;
_logger.UnsubscribingAll(sum);
var tasks = new List();
var socketList = _socketConnections.Values;
foreach (var connection in socketList)
{
foreach(var subscription in connection.Subscriptions.Where(x => x.UserSubscription))
tasks.Add(connection.CloseAsync(subscription));
}
var highPerfSocketList = _highPerfSocketConnections.Values;
foreach (var connection in highPerfSocketList)
tasks.Add(connection.CloseAsync());
await Task.WhenAll(tasks.ToArray()).ConfigureAwait(false);
}
///
/// Reconnect all connections
///
///
public virtual async Task ReconnectAsync()
{
_logger.ReconnectingAllConnections(_socketConnections.Count);
var tasks = new List();
{
var socketList = _socketConnections.Values;
foreach (var sub in socketList)
tasks.Add(sub.TriggerReconnectAsync());
}
await Task.WhenAll(tasks.ToArray()).ConfigureAwait(false);
}
///
public virtual async Task PrepareConnectionsAsync()
{
foreach (var item in DedicatedConnectionConfigs)
{
var socketResult = await GetSocketConnection(item.SocketAddress, item.Authenticated, true, CancellationToken.None).ConfigureAwait(false);
if (!socketResult)
return socketResult.AsDataless();
var connectResult = await ConnectIfNeededAsync(socketResult.Data, item.Authenticated, default).ConfigureAwait(false);
if (!connectResult)
return new CallResult(connectResult.Error!);
}
return CallResult.SuccessResult;
}
///
public override void SetOptions(UpdateOptions options)
{
var previousProxyIsSet = ClientOptions.Proxy != null;
base.SetOptions(options);
if ((!previousProxyIsSet && options.Proxy == null)
|| _socketConnections.IsEmpty)
{
return;
}
_logger.LogInformation("Reconnecting websockets to apply proxy");
// Update proxy, also triggers reconnect
foreach (var connection in _socketConnections)
_ = connection.Value.UpdateProxy(options.Proxy);
}
///
/// Log the current state of connections and subscriptions
///
public string GetSubscriptionsState(bool includeSubDetails = true)
{
return GetState(includeSubDetails).ToString();
}
///
/// Gets the state of the client
///
/// True to get details for each subscription
///
public SocketApiClientState GetState(bool includeSubDetails = true)
{
var connectionStates = new List();
foreach (var socketIdAndConnection in _socketConnections)
{
SocketConnection connection = socketIdAndConnection.Value;
SocketConnectionState connectionState = connection.GetState(includeSubDetails);
connectionStates.Add(connectionState);
}
return new SocketApiClientState(_socketConnections.Count, CurrentSubscriptions, IncomingKbps, connectionStates);
}
///
/// Get the current state of the client
///
/// Number of sockets for this client
/// Total number of subscriptions
/// Total download speed
/// State of each socket connection
public record SocketApiClientState(
int Connections,
int Subscriptions,
double DownloadSpeed,
List ConnectionStates)
{
///
/// Print the state of the client
///
///
///
protected virtual bool PrintMembers(StringBuilder sb)
{
sb.AppendLine();
sb.AppendLine($"\tTotal connections: {Connections}");
sb.AppendLine($"\tTotal subscriptions: {Subscriptions}");
sb.AppendLine($"\tDownload speed: {DownloadSpeed} kbps");
sb.AppendLine($"\tConnections:");
ConnectionStates.ForEach(cs =>
{
sb.AppendLine($"\t\tId: {cs.Id}");
sb.AppendLine($"\t\tAddress: {cs.Address}");
sb.AppendLine($"\t\tTotal subscriptions: {cs.Subscriptions}");
sb.AppendLine($"\t\tStatus: {cs.Status}");
sb.AppendLine($"\t\tAuthenticated: {cs.Authenticated}");
sb.AppendLine($"\t\tDownload speed: {cs.DownloadSpeed} kbps");
sb.AppendLine($"\t\tPending queries: {cs.PendingQueries}");
if (cs.SubscriptionStates?.Count > 0)
{
sb.AppendLine($"\t\tSubscriptions:");
cs.SubscriptionStates.ForEach(subState =>
{
sb.AppendLine($"\t\t\tId: {subState.Id}");
sb.AppendLine($"\t\t\tStatus: {subState.Status}");
sb.AppendLine($"\t\t\tInvocations: {subState.Invocations}");
});
}
});
return true;
}
}
///
/// Dispose the client
///
public override void Dispose()
{
_disposing = true;
var tasks = new List();
{
var socketList = _socketConnections.Values.Where(x => x.UserSubscriptionCount > 0 || x.Connected);
if (socketList.Any())
_logger.DisposingSocketClient();
foreach (var connection in socketList)
{
tasks.Add(connection.CloseAsync());
}
}
semaphoreSlim?.Dispose();
base.Dispose();
}
///
/// Preprocess a stream message
///
public virtual ReadOnlySpan PreprocessStreamMessage(SocketConnection connection, WebSocketMessageType type, ReadOnlySpan data) => data;
///
/// Create a new message converter instance
///
///
public abstract ISocketMessageHandler CreateMessageConverter(WebSocketMessageType messageType);
}
}