mirror of
https://github.com/JKorf/CryptoExchange.Net
synced 2025-06-09 17:06:19 +00:00
Wip client work
This commit is contained in:
parent
997e71f3b7
commit
0d3e05880a
@ -2,7 +2,9 @@ using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Net.Http;
|
||||
using CryptoExchange.Net.Authentication;
|
||||
using CryptoExchange.Net.Logging;
|
||||
using CryptoExchange.Net.Objects;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
namespace CryptoExchange.Net
|
||||
{
|
||||
@ -13,8 +15,9 @@ namespace CryptoExchange.Net
|
||||
{
|
||||
private ApiCredentials? _apiCredentials;
|
||||
private AuthenticationProvider? _authenticationProvider;
|
||||
protected Log _log;
|
||||
protected bool _disposing;
|
||||
private bool _created;
|
||||
private bool _disposing;
|
||||
|
||||
/// <summary>
|
||||
/// The authentication provider for this API client. (null if no credentials are set)
|
||||
@ -70,19 +73,20 @@ namespace CryptoExchange.Net
|
||||
internal protected string BaseAddress { get; }
|
||||
|
||||
/// <summary>
|
||||
/// Api client options
|
||||
/// Options
|
||||
/// </summary>
|
||||
internal ApiClientOptions Options { get; }
|
||||
public ApiClientOptions Options { get; }
|
||||
|
||||
/// <summary>
|
||||
/// ctor
|
||||
/// </summary>
|
||||
/// <param name="options">Client options</param>
|
||||
/// <param name="log">Logger</param>
|
||||
/// <param name="apiOptions">Api client options</param>
|
||||
protected BaseApiClient(BaseClientOptions options, ApiClientOptions apiOptions)
|
||||
protected BaseApiClient(Log log, ApiClientOptions apiOptions)
|
||||
{
|
||||
Options = apiOptions;
|
||||
_apiCredentials = apiOptions.ApiCredentials?.Copy() ?? options.ApiCredentials?.Copy();
|
||||
_log = log;
|
||||
_apiCredentials = apiOptions.ApiCredentials?.Copy();
|
||||
BaseAddress = apiOptions.BaseAddress;
|
||||
}
|
||||
|
||||
@ -104,7 +108,7 @@ namespace CryptoExchange.Net
|
||||
/// <summary>
|
||||
/// Dispose
|
||||
/// </summary>
|
||||
public void Dispose()
|
||||
public virtual void Dispose()
|
||||
{
|
||||
_disposing = true;
|
||||
_apiCredentials?.Dispose();
|
||||
|
@ -51,14 +51,14 @@ namespace CryptoExchange.Net
|
||||
/// <summary>
|
||||
/// Provided client options
|
||||
/// </summary>
|
||||
public BaseClientOptions ClientOptions { get; }
|
||||
public ClientOptions ClientOptions { get; }
|
||||
|
||||
/// <summary>
|
||||
/// ctor
|
||||
/// </summary>
|
||||
/// <param name="name">The name of the API this client is for</param>
|
||||
/// <param name="options">The options for this client</param>
|
||||
protected BaseClient(string name, BaseClientOptions options)
|
||||
protected BaseClient(string name, ClientOptions options)
|
||||
{
|
||||
log = new Log(name);
|
||||
log.UpdateWriters(options.LogWriters);
|
||||
|
@ -21,95 +21,16 @@ namespace CryptoExchange.Net
|
||||
public abstract class BaseSocketClient: BaseClient, ISocketClient
|
||||
{
|
||||
#region fields
|
||||
/// <summary>
|
||||
/// The factory for creating sockets. Used for unit testing
|
||||
/// </summary>
|
||||
public IWebsocketFactory SocketFactory { get; set; } = new WebsocketFactory();
|
||||
|
||||
/// <summary>
|
||||
/// List of socket connections currently connecting/connected
|
||||
/// </summary>
|
||||
protected internal ConcurrentDictionary<int, SocketConnection> socketConnections = new();
|
||||
/// <summary>
|
||||
/// Semaphore used while creating sockets
|
||||
/// </summary>
|
||||
protected internal readonly SemaphoreSlim semaphoreSlim = new(1);
|
||||
/// <summary>
|
||||
/// Keep alive interval for websocket connection
|
||||
/// </summary>
|
||||
protected TimeSpan KeepAliveInterval { get; set; } = TimeSpan.FromSeconds(10);
|
||||
/// <summary>
|
||||
/// Delegate used for processing byte data received from socket connections before it is processed by handlers
|
||||
/// </summary>
|
||||
protected Func<byte[], string>? dataInterpreterBytes;
|
||||
/// <summary>
|
||||
/// Delegate used for processing string data received from socket connections before it is processed by handlers
|
||||
/// </summary>
|
||||
protected Func<string, string>? dataInterpreterString;
|
||||
/// <summary>
|
||||
/// Handlers for data from the socket which doesn't need to be forwarded to the caller. Ping or welcome messages for example.
|
||||
/// </summary>
|
||||
protected Dictionary<string, Action<MessageEvent>> genericHandlers = new();
|
||||
/// <summary>
|
||||
/// The task that is sending periodic data on the websocket. Can be used for sending Ping messages every x seconds or similair. Not necesarry.
|
||||
/// </summary>
|
||||
protected Task? periodicTask;
|
||||
/// <summary>
|
||||
/// Wait event for the periodicTask
|
||||
/// </summary>
|
||||
protected AsyncResetEvent? periodicEvent;
|
||||
|
||||
/// <summary>
|
||||
/// If client is disposing
|
||||
/// </summary>
|
||||
protected bool disposing;
|
||||
|
||||
/// <summary>
|
||||
/// If true; data which is a response to a query will also be distributed to subscriptions
|
||||
/// If false; data which is a response to a query won't get forwarded to subscriptions as well
|
||||
/// </summary>
|
||||
protected internal bool ContinueOnQueryResponse { get; protected set; }
|
||||
|
||||
/// <summary>
|
||||
/// If a message is received on the socket which is not handled by a handler this boolean determines whether this logs an error message
|
||||
/// </summary>
|
||||
protected internal bool UnhandledMessageExpected { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// The max amount of outgoing messages per socket per second
|
||||
/// </summary>
|
||||
protected internal int? RateLimitPerSocketPerSecond { get; set; }
|
||||
|
||||
|
||||
/// <inheritdoc />
|
||||
public double IncomingKbps
|
||||
{
|
||||
get
|
||||
{
|
||||
if (!socketConnections.Any())
|
||||
return 0;
|
||||
|
||||
return socketConnections.Sum(s => s.Value.IncomingKbps);
|
||||
}
|
||||
}
|
||||
|
||||
public int CurrentConnections => ApiClients.OfType<SocketApiClient>().Sum(c => c.CurrentConnections);
|
||||
/// <inheritdoc />
|
||||
public int CurrentConnections => socketConnections.Count;
|
||||
/// <inheritdoc />
|
||||
public int CurrentSubscriptions
|
||||
{
|
||||
get
|
||||
{
|
||||
if (!socketConnections.Any())
|
||||
return 0;
|
||||
|
||||
return socketConnections.Sum(s => s.Value.SubscriptionCount);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Client options
|
||||
/// </summary>
|
||||
public new BaseSocketClientOptions ClientOptions { get; }
|
||||
|
||||
public int CurrentSubscriptions => ApiClients.OfType<SocketApiClient>().Sum(s => s.CurrentSubscriptions);
|
||||
#endregion
|
||||
|
||||
/// <summary>
|
||||
@ -117,9 +38,8 @@ namespace CryptoExchange.Net
|
||||
/// </summary>
|
||||
/// <param name="name">The name of the API this client is for</param>
|
||||
/// <param name="options">The options for this client</param>
|
||||
protected BaseSocketClient(string name, BaseSocketClientOptions options) : base(name, options)
|
||||
protected BaseSocketClient(string name, ClientOptions options) : base(name, options)
|
||||
{
|
||||
ClientOptions = options ?? throw new ArgumentNullException(nameof(options));
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
@ -129,578 +49,6 @@ namespace CryptoExchange.Net
|
||||
apiClient.SetApiCredentials(credentials);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Set a delegate to be used for processing data received from socket connections before it is processed by handlers
|
||||
/// </summary>
|
||||
/// <param name="byteHandler">Handler for byte data</param>
|
||||
/// <param name="stringHandler">Handler for string data</param>
|
||||
protected void SetDataInterpreter(Func<byte[], string>? byteHandler, Func<string, string>? stringHandler)
|
||||
{
|
||||
dataInterpreterBytes = byteHandler;
|
||||
dataInterpreterString = stringHandler;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Connect to an url and listen for data on the BaseAddress
|
||||
/// </summary>
|
||||
/// <typeparam name="T">The type of the expected data</typeparam>
|
||||
/// <param name="apiClient">The API client the subscription is for</param>
|
||||
/// <param name="request">The optional request object to send, will be serialized to json</param>
|
||||
/// <param name="identifier">The identifier to use, necessary if no request object is sent</param>
|
||||
/// <param name="authenticated">If the subscription is to an authenticated endpoint</param>
|
||||
/// <param name="dataHandler">The handler of update data</param>
|
||||
/// <param name="ct">Cancellation token for closing this subscription</param>
|
||||
/// <returns></returns>
|
||||
protected virtual Task<CallResult<UpdateSubscription>> SubscribeAsync<T>(SocketApiClient apiClient, object? request, string? identifier, bool authenticated, Action<DataEvent<T>> dataHandler, CancellationToken ct)
|
||||
{
|
||||
return SubscribeAsync(apiClient, apiClient.Options.BaseAddress, request, identifier, authenticated, dataHandler, ct);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Connect to an url and listen for data
|
||||
/// </summary>
|
||||
/// <typeparam name="T">The type of the expected data</typeparam>
|
||||
/// <param name="apiClient">The API client the subscription is for</param>
|
||||
/// <param name="url">The URL to connect to</param>
|
||||
/// <param name="request">The optional request object to send, will be serialized to json</param>
|
||||
/// <param name="identifier">The identifier to use, necessary if no request object is sent</param>
|
||||
/// <param name="authenticated">If the subscription is to an authenticated endpoint</param>
|
||||
/// <param name="dataHandler">The handler of update data</param>
|
||||
/// <param name="ct">Cancellation token for closing this subscription</param>
|
||||
/// <returns></returns>
|
||||
protected virtual async Task<CallResult<UpdateSubscription>> SubscribeAsync<T>(SocketApiClient apiClient, string url, object? request, string? identifier, bool authenticated, Action<DataEvent<T>> dataHandler, CancellationToken ct)
|
||||
{
|
||||
if (disposing)
|
||||
return new CallResult<UpdateSubscription>(new InvalidOperationError("Client disposed, can't subscribe"));
|
||||
|
||||
SocketConnection socketConnection;
|
||||
SocketSubscription? subscription;
|
||||
var released = false;
|
||||
// Wait for a semaphore here, so we only connect 1 socket at a time.
|
||||
// This is necessary for being able to see if connections can be combined
|
||||
try
|
||||
{
|
||||
await semaphoreSlim.WaitAsync(ct).ConfigureAwait(false);
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
return new CallResult<UpdateSubscription>(new CancellationRequestedError());
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
// Get a new or existing socket connection
|
||||
var socketResult = await GetSocketConnection(apiClient, url, authenticated).ConfigureAwait(false);
|
||||
if(!socketResult)
|
||||
return socketResult.As<UpdateSubscription>(null);
|
||||
|
||||
socketConnection = socketResult.Data;
|
||||
|
||||
// Add a subscription on the socket connection
|
||||
subscription = AddSubscription(request, identifier, true, socketConnection, dataHandler, authenticated);
|
||||
if (subscription == null)
|
||||
{
|
||||
log.Write(LogLevel.Trace, $"Socket {socketConnection.SocketId} failed to add subscription, retrying on different connection");
|
||||
continue;
|
||||
}
|
||||
|
||||
if (ClientOptions.SocketSubscriptionsCombineTarget == 1)
|
||||
{
|
||||
// Only 1 subscription per connection, so no need to wait for connection since a new subscription will create a new connection anyway
|
||||
semaphoreSlim.Release();
|
||||
released = true;
|
||||
}
|
||||
|
||||
var needsConnecting = !socketConnection.Connected;
|
||||
|
||||
var connectResult = await ConnectIfNeededAsync(socketConnection, authenticated).ConfigureAwait(false);
|
||||
if (!connectResult)
|
||||
return new CallResult<UpdateSubscription>(connectResult.Error!);
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
if(!released)
|
||||
semaphoreSlim.Release();
|
||||
}
|
||||
|
||||
if (socketConnection.PausedActivity)
|
||||
{
|
||||
log.Write(LogLevel.Warning, $"Socket {socketConnection.SocketId} has been paused, can't subscribe at this moment");
|
||||
return new CallResult<UpdateSubscription>( new ServerError("Socket is paused"));
|
||||
}
|
||||
|
||||
if (request != null)
|
||||
{
|
||||
// Send the request and wait for answer
|
||||
var subResult = await SubscribeAndWaitAsync(socketConnection, request, subscription).ConfigureAwait(false);
|
||||
if (!subResult)
|
||||
{
|
||||
log.Write(LogLevel.Warning, $"Socket {socketConnection.SocketId} failed to subscribe: {subResult.Error}");
|
||||
await socketConnection.CloseAsync(subscription).ConfigureAwait(false);
|
||||
return new CallResult<UpdateSubscription>(subResult.Error!);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// No request to be sent, so just mark the subscription as comfirmed
|
||||
subscription.Confirmed = true;
|
||||
}
|
||||
|
||||
if (ct != default)
|
||||
{
|
||||
subscription.CancellationTokenRegistration = ct.Register(async () =>
|
||||
{
|
||||
log.Write(LogLevel.Information, $"Socket {socketConnection.SocketId} Cancellation token set, closing subscription");
|
||||
await socketConnection.CloseAsync(subscription).ConfigureAwait(false);
|
||||
}, false);
|
||||
}
|
||||
|
||||
log.Write(LogLevel.Information, $"Socket {socketConnection.SocketId} subscription {subscription.Id} completed successfully");
|
||||
return new CallResult<UpdateSubscription>(new UpdateSubscription(socketConnection, subscription));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Sends the subscribe request and waits for a response to that request
|
||||
/// </summary>
|
||||
/// <param name="socketConnection">The connection to send the request on</param>
|
||||
/// <param name="request">The request to send, will be serialized to json</param>
|
||||
/// <param name="subscription">The subscription the request is for</param>
|
||||
/// <returns></returns>
|
||||
protected internal virtual async Task<CallResult<bool>> SubscribeAndWaitAsync(SocketConnection socketConnection, object request, SocketSubscription subscription)
|
||||
{
|
||||
CallResult<object>? callResult = null;
|
||||
await socketConnection.SendAndWaitAsync(request, ClientOptions.SocketResponseTimeout, data => HandleSubscriptionResponse(socketConnection, subscription, request, data, out callResult)).ConfigureAwait(false);
|
||||
|
||||
if (callResult?.Success == true)
|
||||
{
|
||||
subscription.Confirmed = true;
|
||||
return new CallResult<bool>(true);
|
||||
}
|
||||
|
||||
if(callResult== null)
|
||||
return new CallResult<bool>(new ServerError("No response on subscription request received"));
|
||||
|
||||
return new CallResult<bool>(callResult.Error!);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Send a query on a socket connection to the BaseAddress and wait for the response
|
||||
/// </summary>
|
||||
/// <typeparam name="T">Expected result type</typeparam>
|
||||
/// <param name="apiClient">The API client the query is for</param>
|
||||
/// <param name="request">The request to send, will be serialized to json</param>
|
||||
/// <param name="authenticated">If the query is to an authenticated endpoint</param>
|
||||
/// <returns></returns>
|
||||
protected virtual Task<CallResult<T>> QueryAsync<T>(SocketApiClient apiClient, object request, bool authenticated)
|
||||
{
|
||||
return QueryAsync<T>(apiClient, apiClient.Options.BaseAddress, request, authenticated);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Send a query on a socket connection and wait for the response
|
||||
/// </summary>
|
||||
/// <typeparam name="T">The expected result type</typeparam>
|
||||
/// <param name="apiClient">The API client the query is for</param>
|
||||
/// <param name="url">The url for the request</param>
|
||||
/// <param name="request">The request to send</param>
|
||||
/// <param name="authenticated">Whether the socket should be authenticated</param>
|
||||
/// <returns></returns>
|
||||
protected virtual async Task<CallResult<T>> QueryAsync<T>(SocketApiClient apiClient, string url, object request, bool authenticated)
|
||||
{
|
||||
if (disposing)
|
||||
return new CallResult<T>(new InvalidOperationError("Client disposed, can't query"));
|
||||
|
||||
SocketConnection socketConnection;
|
||||
var released = false;
|
||||
await semaphoreSlim.WaitAsync().ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
var socketResult = await GetSocketConnection(apiClient, url, authenticated).ConfigureAwait(false);
|
||||
if (!socketResult)
|
||||
return socketResult.As<T>(default);
|
||||
|
||||
socketConnection = socketResult.Data;
|
||||
|
||||
if (ClientOptions.SocketSubscriptionsCombineTarget == 1)
|
||||
{
|
||||
// Can release early when only a single sub per connection
|
||||
semaphoreSlim.Release();
|
||||
released = true;
|
||||
}
|
||||
|
||||
var connectResult = await ConnectIfNeededAsync(socketConnection, authenticated).ConfigureAwait(false);
|
||||
if (!connectResult)
|
||||
return new CallResult<T>(connectResult.Error!);
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (!released)
|
||||
semaphoreSlim.Release();
|
||||
}
|
||||
|
||||
if (socketConnection.PausedActivity)
|
||||
{
|
||||
log.Write(LogLevel.Warning, $"Socket {socketConnection.SocketId} has been paused, can't send query at this moment");
|
||||
return new CallResult<T>(new ServerError("Socket is paused"));
|
||||
}
|
||||
|
||||
return await QueryAndWaitAsync<T>(socketConnection, request).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Sends the query request and waits for the result
|
||||
/// </summary>
|
||||
/// <typeparam name="T">The expected result type</typeparam>
|
||||
/// <param name="socket">The connection to send and wait on</param>
|
||||
/// <param name="request">The request to send</param>
|
||||
/// <returns></returns>
|
||||
protected virtual async Task<CallResult<T>> QueryAndWaitAsync<T>(SocketConnection socket, object request)
|
||||
{
|
||||
var dataResult = new CallResult<T>(new ServerError("No response on query received"));
|
||||
await socket.SendAndWaitAsync(request, ClientOptions.SocketResponseTimeout, data =>
|
||||
{
|
||||
if (!HandleQueryResponse<T>(socket, request, data, out var callResult))
|
||||
return false;
|
||||
|
||||
dataResult = callResult;
|
||||
return true;
|
||||
}).ConfigureAwait(false);
|
||||
|
||||
return dataResult;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Checks if a socket needs to be connected and does so if needed. Also authenticates on the socket if needed
|
||||
/// </summary>
|
||||
/// <param name="socket">The connection to check</param>
|
||||
/// <param name="authenticated">Whether the socket should authenticated</param>
|
||||
/// <returns></returns>
|
||||
protected virtual async Task<CallResult<bool>> ConnectIfNeededAsync(SocketConnection socket, bool authenticated)
|
||||
{
|
||||
if (socket.Connected)
|
||||
return new CallResult<bool>(true);
|
||||
|
||||
var connectResult = await ConnectSocketAsync(socket).ConfigureAwait(false);
|
||||
if (!connectResult)
|
||||
return new CallResult<bool>(connectResult.Error!);
|
||||
|
||||
if (!authenticated || socket.Authenticated)
|
||||
return new CallResult<bool>(true);
|
||||
|
||||
log.Write(LogLevel.Debug, $"Attempting to authenticate {socket.SocketId}");
|
||||
var result = await AuthenticateSocketAsync(socket).ConfigureAwait(false);
|
||||
if (!result)
|
||||
{
|
||||
log.Write(LogLevel.Warning, $"Socket {socket.SocketId} authentication failed");
|
||||
if(socket.Connected)
|
||||
await socket.CloseAsync().ConfigureAwait(false);
|
||||
|
||||
result.Error!.Message = "Authentication failed: " + result.Error.Message;
|
||||
return new CallResult<bool>(result.Error);
|
||||
}
|
||||
|
||||
socket.Authenticated = true;
|
||||
return new CallResult<bool>(true);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// The socketConnection received data (the data JToken parameter). The implementation of this method should check if the received data is a response to the query that was send (the request parameter).
|
||||
/// For example; A query is sent in a request message with an Id parameter with value 10. The socket receives data and calls this method to see if the data it received is an
|
||||
/// anwser to any query that was done. The implementation of this method should check if the response.Id == request.Id to see if they match (assuming the api has some sort of Id tracking on messages,
|
||||
/// if not some other method has be implemented to match the messages).
|
||||
/// If the messages match, the callResult out parameter should be set with the deserialized data in the from of (T) and return true.
|
||||
/// </summary>
|
||||
/// <typeparam name="T">The type of response that is expected on the query</typeparam>
|
||||
/// <param name="socketConnection">The socket connection</param>
|
||||
/// <param name="request">The request that a response is awaited for</param>
|
||||
/// <param name="data">The message received from the server</param>
|
||||
/// <param name="callResult">The interpretation (null if message wasn't a response to the request)</param>
|
||||
/// <returns>True if the message was a response to the query</returns>
|
||||
protected internal abstract bool HandleQueryResponse<T>(SocketConnection socketConnection, object request, JToken data, [NotNullWhen(true)]out CallResult<T>? callResult);
|
||||
/// <summary>
|
||||
/// The socketConnection received data (the data JToken parameter). The implementation of this method should check if the received data is a response to the subscription request that was send (the request parameter).
|
||||
/// For example; A subscribe request message is send with an Id parameter with value 10. The socket receives data and calls this method to see if the data it received is an
|
||||
/// anwser to any subscription request that was done. The implementation of this method should check if the response.Id == request.Id to see if they match (assuming the api has some sort of Id tracking on messages,
|
||||
/// if not some other method has be implemented to match the messages).
|
||||
/// If the messages match, the callResult out parameter should be set with the deserialized data in the from of (T) and return true.
|
||||
/// </summary>
|
||||
/// <param name="socketConnection">The socket connection</param>
|
||||
/// <param name="subscription">A subscription that waiting for a subscription response</param>
|
||||
/// <param name="request">The request that the subscription sent</param>
|
||||
/// <param name="data">The message received from the server</param>
|
||||
/// <param name="callResult">The interpretation (null if message wasn't a response to the request)</param>
|
||||
/// <returns>True if the message was a response to the subscription request</returns>
|
||||
protected internal abstract bool HandleSubscriptionResponse(SocketConnection socketConnection, SocketSubscription subscription, object request, JToken data, out CallResult<object>? callResult);
|
||||
/// <summary>
|
||||
/// Needs to check if a received message matches a handler by request. After subscribing data message will come in. These data messages need to be matched to a specific connection
|
||||
/// to pass the correct data to the correct handler. The implementation of this method should check if the message received matches the subscribe request that was sent.
|
||||
/// </summary>
|
||||
/// <param name="socketConnection">The socket connection the message was recieved on</param>
|
||||
/// <param name="message">The received data</param>
|
||||
/// <param name="request">The subscription request</param>
|
||||
/// <returns>True if the message is for the subscription which sent the request</returns>
|
||||
protected internal abstract bool MessageMatchesHandler(SocketConnection socketConnection, JToken message, object request);
|
||||
/// <summary>
|
||||
/// Needs to check if a received message matches a handler by identifier. Generally used by GenericHandlers. For example; a generic handler is registered which handles ping messages
|
||||
/// from the server. This method should check if the message received is a ping message and the identifer is the identifier of the GenericHandler
|
||||
/// </summary>
|
||||
/// <param name="socketConnection">The socket connection the message was recieved on</param>
|
||||
/// <param name="message">The received data</param>
|
||||
/// <param name="identifier">The string identifier of the handler</param>
|
||||
/// <returns>True if the message is for the handler which has the identifier</returns>
|
||||
protected internal abstract bool MessageMatchesHandler(SocketConnection socketConnection, JToken message, string identifier);
|
||||
/// <summary>
|
||||
/// Needs to authenticate the socket so authenticated queries/subscriptions can be made on this socket connection
|
||||
/// </summary>
|
||||
/// <param name="socketConnection">The socket connection that should be authenticated</param>
|
||||
/// <returns></returns>
|
||||
protected internal abstract Task<CallResult<bool>> AuthenticateSocketAsync(SocketConnection socketConnection);
|
||||
/// <summary>
|
||||
/// Needs to unsubscribe a subscription, typically by sending an unsubscribe request. If multiple subscriptions per socket is not allowed this can just return since the socket will be closed anyway
|
||||
/// </summary>
|
||||
/// <param name="connection">The connection on which to unsubscribe</param>
|
||||
/// <param name="subscriptionToUnsub">The subscription to unsubscribe</param>
|
||||
/// <returns></returns>
|
||||
protected internal abstract Task<bool> UnsubscribeAsync(SocketConnection connection, SocketSubscription subscriptionToUnsub);
|
||||
|
||||
/// <summary>
|
||||
/// Optional handler to interpolate data before sending it to the handlers
|
||||
/// </summary>
|
||||
/// <param name="message"></param>
|
||||
/// <returns></returns>
|
||||
protected internal virtual JToken ProcessTokenData(JToken message)
|
||||
{
|
||||
return message;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Add a subscription to a connection
|
||||
/// </summary>
|
||||
/// <typeparam name="T">The type of data the subscription expects</typeparam>
|
||||
/// <param name="request">The request of the subscription</param>
|
||||
/// <param name="identifier">The identifier of the subscription (can be null if request param is used)</param>
|
||||
/// <param name="userSubscription">Whether or not this is a user subscription (counts towards the max amount of handlers on a socket)</param>
|
||||
/// <param name="connection">The socket connection the handler is on</param>
|
||||
/// <param name="dataHandler">The handler of the data received</param>
|
||||
/// <param name="authenticated">Whether the subscription needs authentication</param>
|
||||
/// <returns></returns>
|
||||
protected virtual SocketSubscription? AddSubscription<T>(object? request, string? identifier, bool userSubscription, SocketConnection connection, Action<DataEvent<T>> dataHandler, bool authenticated)
|
||||
{
|
||||
void InternalHandler(MessageEvent messageEvent)
|
||||
{
|
||||
if (typeof(T) == typeof(string))
|
||||
{
|
||||
var stringData = (T)Convert.ChangeType(messageEvent.JsonData.ToString(), typeof(T));
|
||||
dataHandler(new DataEvent<T>(stringData, null, ClientOptions.OutputOriginalData ? messageEvent.OriginalData : null, messageEvent.ReceivedTimestamp));
|
||||
return;
|
||||
}
|
||||
|
||||
var desResult = Deserialize<T>(messageEvent.JsonData);
|
||||
if (!desResult)
|
||||
{
|
||||
log.Write(LogLevel.Warning, $"Socket {connection.SocketId} Failed to deserialize data into type {typeof(T)}: {desResult.Error}");
|
||||
return;
|
||||
}
|
||||
|
||||
dataHandler(new DataEvent<T>(desResult.Data, null, ClientOptions.OutputOriginalData ? messageEvent.OriginalData : null, messageEvent.ReceivedTimestamp));
|
||||
}
|
||||
|
||||
var subscription = request == null
|
||||
? SocketSubscription.CreateForIdentifier(NextId(), identifier!, userSubscription, authenticated, InternalHandler)
|
||||
: SocketSubscription.CreateForRequest(NextId(), request, userSubscription, authenticated, InternalHandler);
|
||||
if (!connection.AddSubscription(subscription))
|
||||
return null;
|
||||
return subscription;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Adds a generic message handler. Used for example to reply to ping requests
|
||||
/// </summary>
|
||||
/// <param name="identifier">The name of the request handler. Needs to be unique</param>
|
||||
/// <param name="action">The action to execute when receiving a message for this handler (checked by <see cref="MessageMatchesHandler(SocketConnection, Newtonsoft.Json.Linq.JToken,string)"/>)</param>
|
||||
protected void AddGenericHandler(string identifier, Action<MessageEvent> action)
|
||||
{
|
||||
genericHandlers.Add(identifier, action);
|
||||
var subscription = SocketSubscription.CreateForIdentifier(NextId(), identifier, false, false, action);
|
||||
foreach (var connection in socketConnections.Values)
|
||||
connection.AddSubscription(subscription);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Get the url to connect to (defaults to BaseAddress form the client options)
|
||||
/// </summary>
|
||||
/// <param name="apiClient"></param>
|
||||
/// <param name="address"></param>
|
||||
/// <param name="authentication"></param>
|
||||
/// <returns></returns>
|
||||
protected virtual Task<CallResult<string?>> GetConnectionUrlAsync(SocketApiClient apiClient, string address, bool authentication)
|
||||
{
|
||||
return Task.FromResult(new CallResult<string?>(address));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Get the url to reconnect to after losing a connection
|
||||
/// </summary>
|
||||
/// <param name="apiClient"></param>
|
||||
/// <param name="connection"></param>
|
||||
/// <returns></returns>
|
||||
public virtual Task<Uri?> GetReconnectUriAsync(SocketApiClient apiClient, SocketConnection connection)
|
||||
{
|
||||
return Task.FromResult<Uri?>(connection.ConnectionUri);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Gets a connection for a new subscription or query. Can be an existing if there are open position or a new one.
|
||||
/// </summary>
|
||||
/// <param name="apiClient">The API client the connection is for</param>
|
||||
/// <param name="address">The address the socket is for</param>
|
||||
/// <param name="authenticated">Whether the socket should be authenticated</param>
|
||||
/// <returns></returns>
|
||||
protected virtual async Task<CallResult<SocketConnection>> GetSocketConnection(SocketApiClient apiClient, string address, bool authenticated)
|
||||
{
|
||||
var socketResult = socketConnections.Where(s => (s.Value.Status == SocketConnection.SocketStatus.None || s.Value.Status == SocketConnection.SocketStatus.Connected)
|
||||
&& s.Value.Tag.TrimEnd('/') == address.TrimEnd('/')
|
||||
&& (s.Value.ApiClient.GetType() == apiClient.GetType())
|
||||
&& (s.Value.Authenticated == authenticated || !authenticated) && s.Value.Connected).OrderBy(s => s.Value.SubscriptionCount).FirstOrDefault();
|
||||
var result = socketResult.Equals(default(KeyValuePair<int, SocketConnection>)) ? null : socketResult.Value;
|
||||
if (result != null)
|
||||
{
|
||||
if (result.SubscriptionCount < ClientOptions.SocketSubscriptionsCombineTarget || (socketConnections.Count >= ClientOptions.MaxSocketConnections && socketConnections.All(s => s.Value.SubscriptionCount >= ClientOptions.SocketSubscriptionsCombineTarget)))
|
||||
{
|
||||
// Use existing socket if it has less than target connections OR it has the least connections and we can't make new
|
||||
return new CallResult<SocketConnection>(result);
|
||||
}
|
||||
}
|
||||
|
||||
var connectionAddress = await GetConnectionUrlAsync(apiClient, address, authenticated).ConfigureAwait(false);
|
||||
if (!connectionAddress)
|
||||
{
|
||||
log.Write(LogLevel.Warning, $"Failed to determine connection url: " + connectionAddress.Error);
|
||||
return connectionAddress.As<SocketConnection>(null);
|
||||
}
|
||||
|
||||
if (connectionAddress.Data != address)
|
||||
log.Write(LogLevel.Debug, $"Connection address set to " + connectionAddress.Data);
|
||||
|
||||
// Create new socket
|
||||
var socket = CreateSocket(connectionAddress.Data!);
|
||||
var socketConnection = new SocketConnection(this, apiClient, socket, address);
|
||||
socketConnection.UnhandledMessage += HandleUnhandledMessage;
|
||||
foreach (var kvp in genericHandlers)
|
||||
{
|
||||
var handler = SocketSubscription.CreateForIdentifier(NextId(), kvp.Key, false, false, kvp.Value);
|
||||
socketConnection.AddSubscription(handler);
|
||||
}
|
||||
|
||||
return new CallResult<SocketConnection>(socketConnection);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Process an unhandled message
|
||||
/// </summary>
|
||||
/// <param name="token">The token that wasn't processed</param>
|
||||
protected virtual void HandleUnhandledMessage(JToken token)
|
||||
{
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Connect a socket
|
||||
/// </summary>
|
||||
/// <param name="socketConnection">The socket to connect</param>
|
||||
/// <returns></returns>
|
||||
protected virtual async Task<CallResult<bool>> ConnectSocketAsync(SocketConnection socketConnection)
|
||||
{
|
||||
if (await socketConnection.ConnectAsync().ConfigureAwait(false))
|
||||
{
|
||||
socketConnections.TryAdd(socketConnection.SocketId, socketConnection);
|
||||
return new CallResult<bool>(true);
|
||||
}
|
||||
|
||||
socketConnection.Dispose();
|
||||
return new CallResult<bool>(new CantConnectError());
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Get parameters for the websocket connection
|
||||
/// </summary>
|
||||
/// <param name="address">The address to connect to</param>
|
||||
/// <returns></returns>
|
||||
protected virtual WebSocketParameters GetWebSocketParameters(string address)
|
||||
=> new (new Uri(address), ClientOptions.AutoReconnect)
|
||||
{
|
||||
DataInterpreterBytes = dataInterpreterBytes,
|
||||
DataInterpreterString = dataInterpreterString,
|
||||
KeepAliveInterval = KeepAliveInterval,
|
||||
ReconnectInterval = ClientOptions.ReconnectInterval,
|
||||
RatelimitPerSecond = RateLimitPerSocketPerSecond,
|
||||
Proxy = ClientOptions.Proxy,
|
||||
Timeout = ClientOptions.SocketNoDataTimeout
|
||||
};
|
||||
|
||||
/// <summary>
|
||||
/// Create a socket for an address
|
||||
/// </summary>
|
||||
/// <param name="address">The address the socket should connect to</param>
|
||||
/// <returns></returns>
|
||||
protected virtual IWebsocket CreateSocket(string address)
|
||||
{
|
||||
var socket = SocketFactory.CreateWebsocket(log, GetWebSocketParameters(address));
|
||||
log.Write(LogLevel.Debug, $"Socket {socket.Id} new socket created for " + address);
|
||||
return socket;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Periodically sends data over a socket connection
|
||||
/// </summary>
|
||||
/// <param name="identifier">Identifier for the periodic send</param>
|
||||
/// <param name="interval">How often</param>
|
||||
/// <param name="objGetter">Method returning the object to send</param>
|
||||
public virtual void SendPeriodic(string identifier, TimeSpan interval, Func<SocketConnection, object> objGetter)
|
||||
{
|
||||
if (objGetter == null)
|
||||
throw new ArgumentNullException(nameof(objGetter));
|
||||
|
||||
periodicEvent = new AsyncResetEvent();
|
||||
periodicTask = Task.Run(async () =>
|
||||
{
|
||||
while (!disposing)
|
||||
{
|
||||
await periodicEvent.WaitAsync(interval).ConfigureAwait(false);
|
||||
if (disposing)
|
||||
break;
|
||||
|
||||
foreach (var socketConnection in socketConnections.Values)
|
||||
{
|
||||
if (disposing)
|
||||
break;
|
||||
|
||||
if (!socketConnection.Connected)
|
||||
continue;
|
||||
|
||||
var obj = objGetter(socketConnection);
|
||||
if (obj == null)
|
||||
continue;
|
||||
|
||||
log.Write(LogLevel.Trace, $"Socket {socketConnection.SocketId} sending periodic {identifier}");
|
||||
|
||||
try
|
||||
{
|
||||
socketConnection.Send(obj);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
log.Write(LogLevel.Warning, $"Socket {socketConnection.SocketId} Periodic send {identifier} failed: " + ex.ToLogString());
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Unsubscribe an update subscription
|
||||
/// </summary>
|
||||
@ -708,23 +56,12 @@ namespace CryptoExchange.Net
|
||||
/// <returns></returns>
|
||||
public virtual async Task UnsubscribeAsync(int subscriptionId)
|
||||
{
|
||||
SocketSubscription? subscription = null;
|
||||
SocketConnection? connection = null;
|
||||
foreach(var socket in socketConnections.Values.ToList())
|
||||
foreach(var socket in ApiClients.OfType<SocketApiClient>())
|
||||
{
|
||||
subscription = socket.GetSubscription(subscriptionId);
|
||||
if (subscription != null)
|
||||
{
|
||||
connection = socket;
|
||||
break;
|
||||
}
|
||||
var result = await socket.UnsubscribeAsync(subscriptionId).ConfigureAwait(false);
|
||||
if (result)
|
||||
break;
|
||||
}
|
||||
|
||||
if (subscription == null || connection == null)
|
||||
return;
|
||||
|
||||
log.Write(LogLevel.Information, $"Socket {connection.SocketId} Unsubscribing subscription " + subscriptionId);
|
||||
await connection.CloseAsync(subscription).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@ -747,14 +84,10 @@ namespace CryptoExchange.Net
|
||||
/// <returns></returns>
|
||||
public virtual async Task UnsubscribeAllAsync()
|
||||
{
|
||||
log.Write(LogLevel.Information, $"Unsubscribing all {socketConnections.Sum(s => s.Value.SubscriptionCount)} subscriptions");
|
||||
var tasks = new List<Task>();
|
||||
{
|
||||
var socketList = socketConnections.Values;
|
||||
foreach (var sub in socketList)
|
||||
tasks.Add(sub.CloseAsync());
|
||||
}
|
||||
|
||||
var tasks = new List<Task>();
|
||||
foreach (var client in ApiClients.OfType<SocketApiClient>())
|
||||
tasks.Add(client.UnsubscribeAllAsync());
|
||||
|
||||
await Task.WhenAll(tasks.ToArray()).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
@ -764,14 +97,12 @@ namespace CryptoExchange.Net
|
||||
/// <returns></returns>
|
||||
public virtual async Task ReconnectAsync()
|
||||
{
|
||||
log.Write(LogLevel.Information, $"Reconnecting all {socketConnections.Count} connections");
|
||||
log.Write(LogLevel.Information, $"Reconnecting all {CurrentConnections} connections");
|
||||
var tasks = new List<Task>();
|
||||
foreach (var client in ApiClients.OfType<SocketApiClient>())
|
||||
{
|
||||
var socketList = socketConnections.Values;
|
||||
foreach (var sub in socketList)
|
||||
tasks.Add(sub.TriggerReconnectAsync());
|
||||
tasks.Add(client.ReconnectAsync());
|
||||
}
|
||||
|
||||
await Task.WhenAll(tasks.ToArray()).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
@ -780,32 +111,16 @@ namespace CryptoExchange.Net
|
||||
/// </summary>
|
||||
public string GetSubscriptionsState()
|
||||
{
|
||||
var sb = new StringBuilder();
|
||||
sb.AppendLine($"{socketConnections.Count} connections, {CurrentSubscriptions} subscriptions, kbps: {IncomingKbps}");
|
||||
foreach(var connection in socketConnections)
|
||||
{
|
||||
sb.AppendLine($" Connection {connection.Key}: {connection.Value.SubscriptionCount} subscriptions, status: {connection.Value.Status}, authenticated: {connection.Value.Authenticated}, kbps: {connection.Value.IncomingKbps}");
|
||||
foreach (var subscription in connection.Value.Subscriptions)
|
||||
sb.AppendLine($" Subscription {subscription.Id}, authenticated: {subscription.Authenticated}, confirmed: {subscription.Confirmed}");
|
||||
}
|
||||
return sb.ToString();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Dispose the client
|
||||
/// </summary>
|
||||
public override void Dispose()
|
||||
{
|
||||
disposing = true;
|
||||
periodicEvent?.Set();
|
||||
periodicEvent?.Dispose();
|
||||
if (socketConnections.Sum(s => s.Value.SubscriptionCount) > 0)
|
||||
{
|
||||
log.Write(LogLevel.Debug, "Disposing socket client, closing all subscriptions");
|
||||
_ = UnsubscribeAllAsync();
|
||||
}
|
||||
semaphoreSlim?.Dispose();
|
||||
base.Dispose();
|
||||
//var sb = new StringBuilder();
|
||||
//sb.AppendLine($"{socketConnections.Count} connections, {CurrentSubscriptions} subscriptions, kbps: {IncomingKbps}");
|
||||
//foreach(var connection in socketConnections)
|
||||
//{
|
||||
// sb.AppendLine($" Connection {connection.Key}: {connection.Value.SubscriptionCount} subscriptions, status: {connection.Value.Status}, authenticated: {connection.Value.Authenticated}, kbps: {connection.Value.IncomingKbps}");
|
||||
// foreach (var subscription in connection.Value.Subscriptions)
|
||||
// sb.AppendLine($" Subscription {subscription.Id}, authenticated: {subscription.Authenticated}, confirmed: {subscription.Confirmed}");
|
||||
//}
|
||||
//return sb.ToString();
|
||||
return "";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,4 +1,17 @@
|
||||
using CryptoExchange.Net.Interfaces;
|
||||
using CryptoExchange.Net.Logging;
|
||||
using CryptoExchange.Net.Objects;
|
||||
using CryptoExchange.Net.Sockets;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Newtonsoft.Json.Linq;
|
||||
using System;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Collections.Generic;
|
||||
using System.Diagnostics.CodeAnalysis;
|
||||
using System.Linq;
|
||||
using System.Text;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace CryptoExchange.Net
|
||||
{
|
||||
@ -7,13 +20,778 @@ namespace CryptoExchange.Net
|
||||
/// </summary>
|
||||
public abstract class SocketApiClient : BaseApiClient
|
||||
{
|
||||
#region Fields
|
||||
|
||||
/// <summary>
|
||||
/// The factory for creating sockets. Used for unit testing
|
||||
/// </summary>
|
||||
public IWebsocketFactory SocketFactory { get; set; } = new WebsocketFactory();
|
||||
|
||||
/// <summary>
|
||||
/// List of socket connections currently connecting/connected
|
||||
/// </summary>
|
||||
protected internal ConcurrentDictionary<int, SocketConnection> socketConnections = new();
|
||||
/// <summary>
|
||||
/// Semaphore used while creating sockets
|
||||
/// </summary>
|
||||
protected internal readonly SemaphoreSlim semaphoreSlim = new(1);
|
||||
/// <summary>
|
||||
/// Keep alive interval for websocket connection
|
||||
/// </summary>
|
||||
protected TimeSpan KeepAliveInterval { get; set; } = TimeSpan.FromSeconds(10);
|
||||
/// <summary>
|
||||
/// Delegate used for processing byte data received from socket connections before it is processed by handlers
|
||||
/// </summary>
|
||||
protected Func<byte[], string>? dataInterpreterBytes;
|
||||
/// <summary>
|
||||
/// Delegate used for processing string data received from socket connections before it is processed by handlers
|
||||
/// </summary>
|
||||
protected Func<string, string>? dataInterpreterString;
|
||||
/// <summary>
|
||||
/// Handlers for data from the socket which doesn't need to be forwarded to the caller. Ping or welcome messages for example.
|
||||
/// </summary>
|
||||
protected Dictionary<string, Action<MessageEvent>> genericHandlers = new();
|
||||
/// <summary>
|
||||
/// The task that is sending periodic data on the websocket. Can be used for sending Ping messages every x seconds or similair. Not necesarry.
|
||||
/// </summary>
|
||||
protected Task? periodicTask;
|
||||
/// <summary>
|
||||
/// Wait event for the periodicTask
|
||||
/// </summary>
|
||||
protected AsyncResetEvent? periodicEvent;
|
||||
|
||||
/// <summary>
|
||||
/// If true; data which is a response to a query will also be distributed to subscriptions
|
||||
/// If false; data which is a response to a query won't get forwarded to subscriptions as well
|
||||
/// </summary>
|
||||
protected internal bool ContinueOnQueryResponse { get; protected set; }
|
||||
|
||||
/// <summary>
|
||||
/// If a message is received on the socket which is not handled by a handler this boolean determines whether this logs an error message
|
||||
/// </summary>
|
||||
protected internal bool UnhandledMessageExpected { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// The max amount of outgoing messages per socket per second
|
||||
/// </summary>
|
||||
protected internal int? RateLimitPerSocketPerSecond { get; set; }
|
||||
|
||||
/// <inheritdoc />
|
||||
public double IncomingKbps
|
||||
{
|
||||
get
|
||||
{
|
||||
if (!socketConnections.Any())
|
||||
return 0;
|
||||
|
||||
return socketConnections.Sum(s => s.Value.IncomingKbps);
|
||||
}
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public int CurrentConnections => socketConnections.Count;
|
||||
/// <inheritdoc />
|
||||
public int CurrentSubscriptions
|
||||
{
|
||||
get
|
||||
{
|
||||
if (!socketConnections.Any())
|
||||
return 0;
|
||||
|
||||
return socketConnections.Sum(s => s.Value.SubscriptionCount);
|
||||
}
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public new SocketApiClientOptions Options => (SocketApiClientOptions)base.Options;
|
||||
|
||||
#endregion
|
||||
|
||||
/// <summary>
|
||||
/// ctor
|
||||
/// </summary>
|
||||
/// <param name="options">The base client options</param>
|
||||
/// <param name="log">log</param>
|
||||
/// <param name="apiOptions">The Api client options</param>
|
||||
public SocketApiClient(BaseClientOptions options, ApiClientOptions apiOptions): base(options, apiOptions)
|
||||
public SocketApiClient(Log log, SocketApiClientOptions apiOptions): base(log, apiOptions)
|
||||
{
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Set a delegate to be used for processing data received from socket connections before it is processed by handlers
|
||||
/// </summary>
|
||||
/// <param name="byteHandler">Handler for byte data</param>
|
||||
/// <param name="stringHandler">Handler for string data</param>
|
||||
protected void SetDataInterpreter(Func<byte[], string>? byteHandler, Func<string, string>? stringHandler)
|
||||
{
|
||||
dataInterpreterBytes = byteHandler;
|
||||
dataInterpreterString = stringHandler;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Connect to an url and listen for data on the BaseAddress
|
||||
/// </summary>
|
||||
/// <typeparam name="T">The type of the expected data</typeparam>
|
||||
/// <param name="apiClient">The API client the subscription is for</param>
|
||||
/// <param name="request">The optional request object to send, will be serialized to json</param>
|
||||
/// <param name="identifier">The identifier to use, necessary if no request object is sent</param>
|
||||
/// <param name="authenticated">If the subscription is to an authenticated endpoint</param>
|
||||
/// <param name="dataHandler">The handler of update data</param>
|
||||
/// <param name="ct">Cancellation token for closing this subscription</param>
|
||||
/// <returns></returns>
|
||||
protected virtual Task<CallResult<UpdateSubscription>> SubscribeAsync<T>(SocketApiClient apiClient, object? request, string? identifier, bool authenticated, Action<DataEvent<T>> dataHandler, CancellationToken ct)
|
||||
{
|
||||
return SubscribeAsync(apiClient, Options.BaseAddress, request, identifier, authenticated, dataHandler, ct);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Connect to an url and listen for data
|
||||
/// </summary>
|
||||
/// <typeparam name="T">The type of the expected data</typeparam>
|
||||
/// <param name="apiClient">The API client the subscription is for</param>
|
||||
/// <param name="url">The URL to connect to</param>
|
||||
/// <param name="request">The optional request object to send, will be serialized to json</param>
|
||||
/// <param name="identifier">The identifier to use, necessary if no request object is sent</param>
|
||||
/// <param name="authenticated">If the subscription is to an authenticated endpoint</param>
|
||||
/// <param name="dataHandler">The handler of update data</param>
|
||||
/// <param name="ct">Cancellation token for closing this subscription</param>
|
||||
/// <returns></returns>
|
||||
protected virtual async Task<CallResult<UpdateSubscription>> SubscribeAsync<T>(SocketApiClient apiClient, string url, object? request, string? identifier, bool authenticated, Action<DataEvent<T>> dataHandler, CancellationToken ct)
|
||||
{
|
||||
if (_disposing)
|
||||
return new CallResult<UpdateSubscription>(new InvalidOperationError("Client disposed, can't subscribe"));
|
||||
|
||||
SocketConnection socketConnection;
|
||||
SocketSubscription? subscription;
|
||||
var released = false;
|
||||
// Wait for a semaphore here, so we only connect 1 socket at a time.
|
||||
// This is necessary for being able to see if connections can be combined
|
||||
try
|
||||
{
|
||||
await semaphoreSlim.WaitAsync(ct).ConfigureAwait(false);
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
return new CallResult<UpdateSubscription>(new CancellationRequestedError());
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
// Get a new or existing socket connection
|
||||
var socketResult = await GetSocketConnection(apiClient, url, authenticated).ConfigureAwait(false);
|
||||
if (!socketResult)
|
||||
return socketResult.As<UpdateSubscription>(null);
|
||||
|
||||
socketConnection = socketResult.Data;
|
||||
|
||||
// Add a subscription on the socket connection
|
||||
subscription = AddSubscription(request, identifier, true, socketConnection, dataHandler, authenticated);
|
||||
if (subscription == null)
|
||||
{
|
||||
_log.Write(LogLevel.Trace, $"Socket {socketConnection.SocketId} failed to add subscription, retrying on different connection");
|
||||
continue;
|
||||
}
|
||||
|
||||
if (Options.SocketSubscriptionsCombineTarget == 1)
|
||||
{
|
||||
// Only 1 subscription per connection, so no need to wait for connection since a new subscription will create a new connection anyway
|
||||
semaphoreSlim.Release();
|
||||
released = true;
|
||||
}
|
||||
|
||||
var needsConnecting = !socketConnection.Connected;
|
||||
|
||||
var connectResult = await ConnectIfNeededAsync(socketConnection, authenticated).ConfigureAwait(false);
|
||||
if (!connectResult)
|
||||
return new CallResult<UpdateSubscription>(connectResult.Error!);
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (!released)
|
||||
semaphoreSlim.Release();
|
||||
}
|
||||
|
||||
if (socketConnection.PausedActivity)
|
||||
{
|
||||
_log.Write(LogLevel.Warning, $"Socket {socketConnection.SocketId} has been paused, can't subscribe at this moment");
|
||||
return new CallResult<UpdateSubscription>(new ServerError("Socket is paused"));
|
||||
}
|
||||
|
||||
if (request != null)
|
||||
{
|
||||
// Send the request and wait for answer
|
||||
var subResult = await SubscribeAndWaitAsync(socketConnection, request, subscription).ConfigureAwait(false);
|
||||
if (!subResult)
|
||||
{
|
||||
_log.Write(LogLevel.Warning, $"Socket {socketConnection.SocketId} failed to subscribe: {subResult.Error}");
|
||||
await socketConnection.CloseAsync(subscription).ConfigureAwait(false);
|
||||
return new CallResult<UpdateSubscription>(subResult.Error!);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// No request to be sent, so just mark the subscription as comfirmed
|
||||
subscription.Confirmed = true;
|
||||
}
|
||||
|
||||
if (ct != default)
|
||||
{
|
||||
subscription.CancellationTokenRegistration = ct.Register(async () =>
|
||||
{
|
||||
_log.Write(LogLevel.Information, $"Socket {socketConnection.SocketId} Cancellation token set, closing subscription");
|
||||
await socketConnection.CloseAsync(subscription).ConfigureAwait(false);
|
||||
}, false);
|
||||
}
|
||||
|
||||
_log.Write(LogLevel.Information, $"Socket {socketConnection.SocketId} subscription {subscription.Id} completed successfully");
|
||||
return new CallResult<UpdateSubscription>(new UpdateSubscription(socketConnection, subscription));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Sends the subscribe request and waits for a response to that request
|
||||
/// </summary>
|
||||
/// <param name="socketConnection">The connection to send the request on</param>
|
||||
/// <param name="request">The request to send, will be serialized to json</param>
|
||||
/// <param name="subscription">The subscription the request is for</param>
|
||||
/// <returns></returns>
|
||||
protected internal virtual async Task<CallResult<bool>> SubscribeAndWaitAsync(SocketConnection socketConnection, object request, SocketSubscription subscription)
|
||||
{
|
||||
CallResult<object>? callResult = null;
|
||||
await socketConnection.SendAndWaitAsync(request, Options.SocketResponseTimeout, data => HandleSubscriptionResponse(socketConnection, subscription, request, data, out callResult)).ConfigureAwait(false);
|
||||
|
||||
if (callResult?.Success == true)
|
||||
{
|
||||
subscription.Confirmed = true;
|
||||
return new CallResult<bool>(true);
|
||||
}
|
||||
|
||||
if (callResult == null)
|
||||
return new CallResult<bool>(new ServerError("No response on subscription request received"));
|
||||
|
||||
return new CallResult<bool>(callResult.Error!);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Send a query on a socket connection to the BaseAddress and wait for the response
|
||||
/// </summary>
|
||||
/// <typeparam name="T">Expected result type</typeparam>
|
||||
/// <param name="request">The request to send, will be serialized to json</param>
|
||||
/// <param name="authenticated">If the query is to an authenticated endpoint</param>
|
||||
/// <returns></returns>
|
||||
protected virtual Task<CallResult<T>> QueryAsync<T>(object request, bool authenticated)
|
||||
{
|
||||
return QueryAsync<T>(Options.BaseAddress, request, authenticated);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Send a query on a socket connection and wait for the response
|
||||
/// </summary>
|
||||
/// <typeparam name="T">The expected result type</typeparam>
|
||||
/// <param name="url">The url for the request</param>
|
||||
/// <param name="request">The request to send</param>
|
||||
/// <param name="authenticated">Whether the socket should be authenticated</param>
|
||||
/// <returns></returns>
|
||||
protected virtual async Task<CallResult<T>> QueryAsync<T>(string url, object request, bool authenticated)
|
||||
{
|
||||
if (_disposing)
|
||||
return new CallResult<T>(new InvalidOperationError("Client disposed, can't query"));
|
||||
|
||||
SocketConnection socketConnection;
|
||||
var released = false;
|
||||
await semaphoreSlim.WaitAsync().ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
var socketResult = await GetSocketConnection(url, authenticated).ConfigureAwait(false);
|
||||
if (!socketResult)
|
||||
return socketResult.As<T>(default);
|
||||
|
||||
socketConnection = socketResult.Data;
|
||||
|
||||
if (Options.SocketSubscriptionsCombineTarget == 1)
|
||||
{
|
||||
// Can release early when only a single sub per connection
|
||||
semaphoreSlim.Release();
|
||||
released = true;
|
||||
}
|
||||
|
||||
var connectResult = await ConnectIfNeededAsync(socketConnection, authenticated).ConfigureAwait(false);
|
||||
if (!connectResult)
|
||||
return new CallResult<T>(connectResult.Error!);
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (!released)
|
||||
semaphoreSlim.Release();
|
||||
}
|
||||
|
||||
if (socketConnection.PausedActivity)
|
||||
{
|
||||
_log.Write(LogLevel.Warning, $"Socket {socketConnection.SocketId} has been paused, can't send query at this moment");
|
||||
return new CallResult<T>(new ServerError("Socket is paused"));
|
||||
}
|
||||
|
||||
return await QueryAndWaitAsync<T>(socketConnection, request).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Sends the query request and waits for the result
|
||||
/// </summary>
|
||||
/// <typeparam name="T">The expected result type</typeparam>
|
||||
/// <param name="socket">The connection to send and wait on</param>
|
||||
/// <param name="request">The request to send</param>
|
||||
/// <returns></returns>
|
||||
protected virtual async Task<CallResult<T>> QueryAndWaitAsync<T>(SocketConnection socket, object request)
|
||||
{
|
||||
var dataResult = new CallResult<T>(new ServerError("No response on query received"));
|
||||
await socket.SendAndWaitAsync(request, Options.SocketResponseTimeout, data =>
|
||||
{
|
||||
if (!HandleQueryResponse<T>(socket, request, data, out var callResult))
|
||||
return false;
|
||||
|
||||
dataResult = callResult;
|
||||
return true;
|
||||
}).ConfigureAwait(false);
|
||||
|
||||
return dataResult;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Checks if a socket needs to be connected and does so if needed. Also authenticates on the socket if needed
|
||||
/// </summary>
|
||||
/// <param name="socket">The connection to check</param>
|
||||
/// <param name="authenticated">Whether the socket should authenticated</param>
|
||||
/// <returns></returns>
|
||||
protected virtual async Task<CallResult<bool>> ConnectIfNeededAsync(SocketConnection socket, bool authenticated)
|
||||
{
|
||||
if (socket.Connected)
|
||||
return new CallResult<bool>(true);
|
||||
|
||||
var connectResult = await ConnectSocketAsync(socket).ConfigureAwait(false);
|
||||
if (!connectResult)
|
||||
return new CallResult<bool>(connectResult.Error!);
|
||||
|
||||
if (!authenticated || socket.Authenticated)
|
||||
return new CallResult<bool>(true);
|
||||
|
||||
_log.Write(LogLevel.Debug, $"Attempting to authenticate {socket.SocketId}");
|
||||
var result = await AuthenticateSocketAsync(socket).ConfigureAwait(false);
|
||||
if (!result)
|
||||
{
|
||||
_log.Write(LogLevel.Warning, $"Socket {socket.SocketId} authentication failed");
|
||||
if (socket.Connected)
|
||||
await socket.CloseAsync().ConfigureAwait(false);
|
||||
|
||||
result.Error!.Message = "Authentication failed: " + result.Error.Message;
|
||||
return new CallResult<bool>(result.Error);
|
||||
}
|
||||
|
||||
socket.Authenticated = true;
|
||||
return new CallResult<bool>(true);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// The socketConnection received data (the data JToken parameter). The implementation of this method should check if the received data is a response to the query that was send (the request parameter).
|
||||
/// For example; A query is sent in a request message with an Id parameter with value 10. The socket receives data and calls this method to see if the data it received is an
|
||||
/// anwser to any query that was done. The implementation of this method should check if the response.Id == request.Id to see if they match (assuming the api has some sort of Id tracking on messages,
|
||||
/// if not some other method has be implemented to match the messages).
|
||||
/// If the messages match, the callResult out parameter should be set with the deserialized data in the from of (T) and return true.
|
||||
/// </summary>
|
||||
/// <typeparam name="T">The type of response that is expected on the query</typeparam>
|
||||
/// <param name="socketConnection">The socket connection</param>
|
||||
/// <param name="request">The request that a response is awaited for</param>
|
||||
/// <param name="data">The message received from the server</param>
|
||||
/// <param name="callResult">The interpretation (null if message wasn't a response to the request)</param>
|
||||
/// <returns>True if the message was a response to the query</returns>
|
||||
protected internal abstract bool HandleQueryResponse<T>(SocketConnection socketConnection, object request, JToken data, [NotNullWhen(true)] out CallResult<T>? callResult);
|
||||
/// <summary>
|
||||
/// The socketConnection received data (the data JToken parameter). The implementation of this method should check if the received data is a response to the subscription request that was send (the request parameter).
|
||||
/// For example; A subscribe request message is send with an Id parameter with value 10. The socket receives data and calls this method to see if the data it received is an
|
||||
/// anwser to any subscription request that was done. The implementation of this method should check if the response.Id == request.Id to see if they match (assuming the api has some sort of Id tracking on messages,
|
||||
/// if not some other method has be implemented to match the messages).
|
||||
/// If the messages match, the callResult out parameter should be set with the deserialized data in the from of (T) and return true.
|
||||
/// </summary>
|
||||
/// <param name="socketConnection">The socket connection</param>
|
||||
/// <param name="subscription">A subscription that waiting for a subscription response</param>
|
||||
/// <param name="request">The request that the subscription sent</param>
|
||||
/// <param name="data">The message received from the server</param>
|
||||
/// <param name="callResult">The interpretation (null if message wasn't a response to the request)</param>
|
||||
/// <returns>True if the message was a response to the subscription request</returns>
|
||||
protected internal abstract bool HandleSubscriptionResponse(SocketConnection socketConnection, SocketSubscription subscription, object request, JToken data, out CallResult<object>? callResult);
|
||||
/// <summary>
|
||||
/// Needs to check if a received message matches a handler by request. After subscribing data message will come in. These data messages need to be matched to a specific connection
|
||||
/// to pass the correct data to the correct handler. The implementation of this method should check if the message received matches the subscribe request that was sent.
|
||||
/// </summary>
|
||||
/// <param name="socketConnection">The socket connection the message was recieved on</param>
|
||||
/// <param name="message">The received data</param>
|
||||
/// <param name="request">The subscription request</param>
|
||||
/// <returns>True if the message is for the subscription which sent the request</returns>
|
||||
protected internal abstract bool MessageMatchesHandler(SocketConnection socketConnection, JToken message, object request);
|
||||
/// <summary>
|
||||
/// Needs to check if a received message matches a handler by identifier. Generally used by GenericHandlers. For example; a generic handler is registered which handles ping messages
|
||||
/// from the server. This method should check if the message received is a ping message and the identifer is the identifier of the GenericHandler
|
||||
/// </summary>
|
||||
/// <param name="socketConnection">The socket connection the message was recieved on</param>
|
||||
/// <param name="message">The received data</param>
|
||||
/// <param name="identifier">The string identifier of the handler</param>
|
||||
/// <returns>True if the message is for the handler which has the identifier</returns>
|
||||
protected internal abstract bool MessageMatchesHandler(SocketConnection socketConnection, JToken message, string identifier);
|
||||
/// <summary>
|
||||
/// Needs to authenticate the socket so authenticated queries/subscriptions can be made on this socket connection
|
||||
/// </summary>
|
||||
/// <param name="socketConnection">The socket connection that should be authenticated</param>
|
||||
/// <returns></returns>
|
||||
protected internal abstract Task<CallResult<bool>> AuthenticateSocketAsync(SocketConnection socketConnection);
|
||||
/// <summary>
|
||||
/// Needs to unsubscribe a subscription, typically by sending an unsubscribe request. If multiple subscriptions per socket is not allowed this can just return since the socket will be closed anyway
|
||||
/// </summary>
|
||||
/// <param name="connection">The connection on which to unsubscribe</param>
|
||||
/// <param name="subscriptionToUnsub">The subscription to unsubscribe</param>
|
||||
/// <returns></returns>
|
||||
protected internal abstract Task<bool> UnsubscribeAsync(SocketConnection connection, SocketSubscription subscriptionToUnsub);
|
||||
|
||||
/// <summary>
|
||||
/// Optional handler to interpolate data before sending it to the handlers
|
||||
/// </summary>
|
||||
/// <param name="message"></param>
|
||||
/// <returns></returns>
|
||||
protected internal virtual JToken ProcessTokenData(JToken message)
|
||||
{
|
||||
return message;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Add a subscription to a connection
|
||||
/// </summary>
|
||||
/// <typeparam name="T">The type of data the subscription expects</typeparam>
|
||||
/// <param name="request">The request of the subscription</param>
|
||||
/// <param name="identifier">The identifier of the subscription (can be null if request param is used)</param>
|
||||
/// <param name="userSubscription">Whether or not this is a user subscription (counts towards the max amount of handlers on a socket)</param>
|
||||
/// <param name="connection">The socket connection the handler is on</param>
|
||||
/// <param name="dataHandler">The handler of the data received</param>
|
||||
/// <param name="authenticated">Whether the subscription needs authentication</param>
|
||||
/// <returns></returns>
|
||||
protected virtual SocketSubscription? AddSubscription<T>(object? request, string? identifier, bool userSubscription, SocketConnection connection, Action<DataEvent<T>> dataHandler, bool authenticated)
|
||||
{
|
||||
void InternalHandler(MessageEvent messageEvent)
|
||||
{
|
||||
if (typeof(T) == typeof(string))
|
||||
{
|
||||
var stringData = (T)Convert.ChangeType(messageEvent.JsonData.ToString(), typeof(T));
|
||||
dataHandler(new DataEvent<T>(stringData, null, Options.OutputOriginalData ? messageEvent.OriginalData : null, messageEvent.ReceivedTimestamp));
|
||||
return;
|
||||
}
|
||||
|
||||
var desResult = Deserialize<T>(messageEvent.JsonData);
|
||||
if (!desResult)
|
||||
{
|
||||
_log.Write(LogLevel.Warning, $"Socket {connection.SocketId} Failed to deserialize data into type {typeof(T)}: {desResult.Error}");
|
||||
return;
|
||||
}
|
||||
|
||||
dataHandler(new DataEvent<T>(desResult.Data, null, Options.OutputOriginalData ? messageEvent.OriginalData : null, messageEvent.ReceivedTimestamp));
|
||||
}
|
||||
|
||||
var subscription = request == null
|
||||
? SocketSubscription.CreateForIdentifier(NextId(), identifier!, userSubscription, authenticated, InternalHandler)
|
||||
: SocketSubscription.CreateForRequest(NextId(), request, userSubscription, authenticated, InternalHandler);
|
||||
if (!connection.AddSubscription(subscription))
|
||||
return null;
|
||||
return subscription;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Adds a generic message handler. Used for example to reply to ping requests
|
||||
/// </summary>
|
||||
/// <param name="identifier">The name of the request handler. Needs to be unique</param>
|
||||
/// <param name="action">The action to execute when receiving a message for this handler (checked by <see cref="MessageMatchesHandler(SocketConnection, Newtonsoft.Json.Linq.JToken,string)"/>)</param>
|
||||
protected void AddGenericHandler(string identifier, Action<MessageEvent> action)
|
||||
{
|
||||
genericHandlers.Add(identifier, action);
|
||||
var subscription = SocketSubscription.CreateForIdentifier(NextId(), identifier, false, false, action);
|
||||
foreach (var connection in socketConnections.Values)
|
||||
connection.AddSubscription(subscription);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Get the url to connect to (defaults to BaseAddress form the client options)
|
||||
/// </summary>
|
||||
/// <param name="apiClient"></param>
|
||||
/// <param name="address"></param>
|
||||
/// <param name="authentication"></param>
|
||||
/// <returns></returns>
|
||||
protected virtual Task<CallResult<string?>> GetConnectionUrlAsync(SocketApiClient apiClient, string address, bool authentication)
|
||||
{
|
||||
return Task.FromResult(new CallResult<string?>(address));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Get the url to reconnect to after losing a connection
|
||||
/// </summary>
|
||||
/// <param name="apiClient"></param>
|
||||
/// <param name="connection"></param>
|
||||
/// <returns></returns>
|
||||
public virtual Task<Uri?> GetReconnectUriAsync(SocketApiClient apiClient, SocketConnection connection)
|
||||
{
|
||||
return Task.FromResult<Uri?>(connection.ConnectionUri);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Gets a connection for a new subscription or query. Can be an existing if there are open position or a new one.
|
||||
/// </summary>
|
||||
/// <param name="apiClient">The API client the connection is for</param>
|
||||
/// <param name="address">The address the socket is for</param>
|
||||
/// <param name="authenticated">Whether the socket should be authenticated</param>
|
||||
/// <returns></returns>
|
||||
protected virtual async Task<CallResult<SocketConnection>> GetSocketConnection(SocketApiClient apiClient, string address, bool authenticated)
|
||||
{
|
||||
var socketResult = socketConnections.Where(s => (s.Value.Status == SocketConnection.SocketStatus.None || s.Value.Status == SocketConnection.SocketStatus.Connected)
|
||||
&& s.Value.Tag.TrimEnd('/') == address.TrimEnd('/')
|
||||
&& (s.Value.ApiClient.GetType() == apiClient.GetType())
|
||||
&& (s.Value.Authenticated == authenticated || !authenticated) && s.Value.Connected).OrderBy(s => s.Value.SubscriptionCount).FirstOrDefault();
|
||||
var result = socketResult.Equals(default(KeyValuePair<int, SocketConnection>)) ? null : socketResult.Value;
|
||||
if (result != null)
|
||||
{
|
||||
if (result.SubscriptionCount < Options.SocketSubscriptionsCombineTarget || (socketConnections.Count >= Options.MaxSocketConnections && socketConnections.All(s => s.Value.SubscriptionCount >= Options.SocketSubscriptionsCombineTarget)))
|
||||
{
|
||||
// Use existing socket if it has less than target connections OR it has the least connections and we can't make new
|
||||
return new CallResult<SocketConnection>(result);
|
||||
}
|
||||
}
|
||||
|
||||
var connectionAddress = await GetConnectionUrlAsync(apiClient, address, authenticated).ConfigureAwait(false);
|
||||
if (!connectionAddress)
|
||||
{
|
||||
_log.Write(LogLevel.Warning, $"Failed to determine connection url: " + connectionAddress.Error);
|
||||
return connectionAddress.As<SocketConnection>(null);
|
||||
}
|
||||
|
||||
if (connectionAddress.Data != address)
|
||||
_log.Write(LogLevel.Debug, $"Connection address set to " + connectionAddress.Data);
|
||||
|
||||
// Create new socket
|
||||
var socket = CreateSocket(connectionAddress.Data!);
|
||||
var socketConnection = new SocketConnection(_log, apiClient, socket, address);
|
||||
socketConnection.UnhandledMessage += HandleUnhandledMessage;
|
||||
foreach (var kvp in genericHandlers)
|
||||
{
|
||||
var handler = SocketSubscription.CreateForIdentifier(NextId(), kvp.Key, false, false, kvp.Value);
|
||||
socketConnection.AddSubscription(handler);
|
||||
}
|
||||
|
||||
return new CallResult<SocketConnection>(socketConnection);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Process an unhandled message
|
||||
/// </summary>
|
||||
/// <param name="token">The token that wasn't processed</param>
|
||||
protected virtual void HandleUnhandledMessage(JToken token)
|
||||
{
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Connect a socket
|
||||
/// </summary>
|
||||
/// <param name="socketConnection">The socket to connect</param>
|
||||
/// <returns></returns>
|
||||
protected virtual async Task<CallResult<bool>> ConnectSocketAsync(SocketConnection socketConnection)
|
||||
{
|
||||
if (await socketConnection.ConnectAsync().ConfigureAwait(false))
|
||||
{
|
||||
socketConnections.TryAdd(socketConnection.SocketId, socketConnection);
|
||||
return new CallResult<bool>(true);
|
||||
}
|
||||
|
||||
socketConnection.Dispose();
|
||||
return new CallResult<bool>(new CantConnectError());
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Get parameters for the websocket connection
|
||||
/// </summary>
|
||||
/// <param name="address">The address to connect to</param>
|
||||
/// <returns></returns>
|
||||
protected virtual WebSocketParameters GetWebSocketParameters(string address)
|
||||
=> new(new Uri(address), Options.AutoReconnect)
|
||||
{
|
||||
DataInterpreterBytes = dataInterpreterBytes,
|
||||
DataInterpreterString = dataInterpreterString,
|
||||
KeepAliveInterval = KeepAliveInterval,
|
||||
ReconnectInterval = Options.ReconnectInterval,
|
||||
RatelimitPerSecond = RateLimitPerSocketPerSecond,
|
||||
Proxy = Options.Proxy,
|
||||
Timeout = Options.SocketNoDataTimeout
|
||||
};
|
||||
|
||||
/// <summary>
|
||||
/// Create a socket for an address
|
||||
/// </summary>
|
||||
/// <param name="address">The address the socket should connect to</param>
|
||||
/// <returns></returns>
|
||||
protected virtual IWebsocket CreateSocket(string address)
|
||||
{
|
||||
var socket = SocketFactory.CreateWebsocket(_log, GetWebSocketParameters(address));
|
||||
_log.Write(LogLevel.Debug, $"Socket {socket.Id} new socket created for " + address);
|
||||
return socket;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Periodically sends data over a socket connection
|
||||
/// </summary>
|
||||
/// <param name="identifier">Identifier for the periodic send</param>
|
||||
/// <param name="interval">How often</param>
|
||||
/// <param name="objGetter">Method returning the object to send</param>
|
||||
public virtual void SendPeriodic(string identifier, TimeSpan interval, Func<SocketConnection, object> objGetter)
|
||||
{
|
||||
if (objGetter == null)
|
||||
throw new ArgumentNullException(nameof(objGetter));
|
||||
|
||||
periodicEvent = new AsyncResetEvent();
|
||||
periodicTask = Task.Run(async () =>
|
||||
{
|
||||
while (!_disposing)
|
||||
{
|
||||
await periodicEvent.WaitAsync(interval).ConfigureAwait(false);
|
||||
if (_disposing)
|
||||
break;
|
||||
|
||||
foreach (var socketConnection in socketConnections.Values)
|
||||
{
|
||||
if (_disposing)
|
||||
break;
|
||||
|
||||
if (!socketConnection.Connected)
|
||||
continue;
|
||||
|
||||
var obj = objGetter(socketConnection);
|
||||
if (obj == null)
|
||||
continue;
|
||||
|
||||
_log.Write(LogLevel.Trace, $"Socket {socketConnection.SocketId} sending periodic {identifier}");
|
||||
|
||||
try
|
||||
{
|
||||
socketConnection.Send(obj);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_log.Write(LogLevel.Warning, $"Socket {socketConnection.SocketId} Periodic send {identifier} failed: " + ex.ToLogString());
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Unsubscribe an update subscription
|
||||
/// </summary>
|
||||
/// <param name="subscriptionId">The id of the subscription to unsubscribe</param>
|
||||
/// <returns></returns>
|
||||
public virtual async Task<bool> UnsubscribeAsync(int subscriptionId)
|
||||
{
|
||||
SocketSubscription? subscription = null;
|
||||
SocketConnection? connection = null;
|
||||
foreach (var socket in socketConnections.Values.ToList())
|
||||
{
|
||||
subscription = socket.GetSubscription(subscriptionId);
|
||||
if (subscription != null)
|
||||
{
|
||||
connection = socket;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (subscription == null || connection == null)
|
||||
return false;
|
||||
|
||||
_log.Write(LogLevel.Information, $"Socket {connection.SocketId} Unsubscribing subscription " + subscriptionId);
|
||||
await connection.CloseAsync(subscription).ConfigureAwait(false);
|
||||
return true;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Unsubscribe an update subscription
|
||||
/// </summary>
|
||||
/// <param name="subscription">The subscription to unsubscribe</param>
|
||||
/// <returns></returns>
|
||||
public virtual async Task UnsubscribeAsync(UpdateSubscription subscription)
|
||||
{
|
||||
if (subscription == null)
|
||||
throw new ArgumentNullException(nameof(subscription));
|
||||
|
||||
_log.Write(LogLevel.Information, $"Socket {subscription.SocketId} Unsubscribing subscription " + subscription.Id);
|
||||
await subscription.CloseAsync().ConfigureAwait(false);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Unsubscribe all subscriptions
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
public virtual async Task UnsubscribeAllAsync()
|
||||
{
|
||||
_log.Write(LogLevel.Information, $"Unsubscribing all {socketConnections.Sum(s => s.Value.SubscriptionCount)} subscriptions");
|
||||
var tasks = new List<Task>();
|
||||
{
|
||||
var socketList = socketConnections.Values;
|
||||
foreach (var sub in socketList)
|
||||
tasks.Add(sub.CloseAsync());
|
||||
}
|
||||
|
||||
await Task.WhenAll(tasks.ToArray()).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Reconnect all connections
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
public virtual async Task ReconnectAsync()
|
||||
{
|
||||
_log.Write(LogLevel.Information, $"Reconnecting all {socketConnections.Count} connections");
|
||||
var tasks = new List<Task>();
|
||||
{
|
||||
var socketList = socketConnections.Values;
|
||||
foreach (var sub in socketList)
|
||||
tasks.Add(sub.TriggerReconnectAsync());
|
||||
}
|
||||
|
||||
await Task.WhenAll(tasks.ToArray()).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Log the current state of connections and subscriptions
|
||||
/// </summary>
|
||||
public string GetSubscriptionsState()
|
||||
{
|
||||
var sb = new StringBuilder();
|
||||
sb.AppendLine($"{socketConnections.Count} connections, {CurrentSubscriptions} subscriptions, kbps: {IncomingKbps}");
|
||||
foreach (var connection in socketConnections)
|
||||
{
|
||||
sb.AppendLine($" Connection {connection.Key}: {connection.Value.SubscriptionCount} subscriptions, status: {connection.Value.Status}, authenticated: {connection.Value.Authenticated}, kbps: {connection.Value.IncomingKbps}");
|
||||
foreach (var subscription in connection.Value.Subscriptions)
|
||||
sb.AppendLine($" Subscription {subscription.Id}, authenticated: {subscription.Authenticated}, confirmed: {subscription.Confirmed}");
|
||||
}
|
||||
return sb.ToString();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Dispose the client
|
||||
/// </summary>
|
||||
public override void Dispose()
|
||||
{
|
||||
_disposing = true;
|
||||
periodicEvent?.Set();
|
||||
periodicEvent?.Dispose();
|
||||
if (socketConnections.Sum(s => s.Value.SubscriptionCount) > 0)
|
||||
{
|
||||
_log.Write(LogLevel.Debug, "Disposing socket client, closing all subscriptions");
|
||||
_ = UnsubscribeAllAsync();
|
||||
}
|
||||
semaphoreSlim?.Dispose();
|
||||
base.Dispose();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -9,10 +9,7 @@ using Microsoft.Extensions.Logging;
|
||||
|
||||
namespace CryptoExchange.Net.Objects
|
||||
{
|
||||
/// <summary>
|
||||
/// Base options, applicable to everything
|
||||
/// </summary>
|
||||
public class BaseOptions
|
||||
public class ClientOptions
|
||||
{
|
||||
internal event Action? OnLoggingChanged;
|
||||
|
||||
@ -44,195 +41,27 @@ namespace CryptoExchange.Net.Objects
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// If true, the CallResult and DataEvent objects will also include the originally received json data in the OriginalData property
|
||||
/// </summary>
|
||||
public bool OutputOriginalData { get; set; } = false;
|
||||
|
||||
/// <summary>
|
||||
/// ctor
|
||||
/// </summary>
|
||||
public BaseOptions(): this(null)
|
||||
{
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// ctor
|
||||
/// </summary>
|
||||
/// <param name="baseOptions">Copy options from these options to the new options</param>
|
||||
public BaseOptions(BaseOptions? baseOptions)
|
||||
{
|
||||
if (baseOptions == null)
|
||||
return;
|
||||
|
||||
LogLevel = baseOptions.LogLevel;
|
||||
LogWriters = baseOptions.LogWriters.ToList();
|
||||
OutputOriginalData = baseOptions.OutputOriginalData;
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public override string ToString()
|
||||
{
|
||||
return $"LogLevel: {LogLevel}, Writers: {LogWriters.Count}, OutputOriginalData: {OutputOriginalData}";
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Client options, for both the socket and rest clients
|
||||
/// </summary>
|
||||
public class BaseClientOptions : BaseOptions
|
||||
{
|
||||
/// <summary>
|
||||
/// Proxy to use when connecting
|
||||
/// </summary>
|
||||
public ApiProxy? Proxy { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Api credentials to be used for signing requests to private endpoints. These credentials will be used for each API in the client, unless overriden in the API options
|
||||
/// </summary>
|
||||
public ApiCredentials? ApiCredentials { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// ctor
|
||||
/// </summary>
|
||||
public BaseClientOptions() : this(null)
|
||||
/// <param name="baseOptions">Copy values for the provided options</param>
|
||||
/// <param name="newValues">Copy values for the provided options</param>
|
||||
public ClientOptions(ClientOptions baseOptions, ClientOptions? newValues)
|
||||
{
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// ctor
|
||||
/// </summary>
|
||||
/// <param name="baseOptions">Copy options from these options to the new options</param>
|
||||
public BaseClientOptions(BaseClientOptions? baseOptions) : base(baseOptions)
|
||||
{
|
||||
if (baseOptions == null)
|
||||
return;
|
||||
|
||||
Proxy = baseOptions.Proxy;
|
||||
ApiCredentials = baseOptions.ApiCredentials?.Copy();
|
||||
Proxy = newValues?.Proxy ?? baseOptions.Proxy;
|
||||
LogLevel = baseOptions.LogLevel;
|
||||
LogWriters = baseOptions.LogWriters.ToList();
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public override string ToString()
|
||||
{
|
||||
return $"{base.ToString()}, Proxy: {(Proxy == null ? "-" : Proxy.Host)}, Base.ApiCredentials: {(ApiCredentials == null ? "-" : "set")}";
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Rest client options
|
||||
/// </summary>
|
||||
public class BaseRestClientOptions : BaseClientOptions
|
||||
{
|
||||
/// <summary>
|
||||
/// The time the server has to respond to a request before timing out
|
||||
/// </summary>
|
||||
public TimeSpan RequestTimeout { get; set; } = TimeSpan.FromSeconds(30);
|
||||
|
||||
/// <summary>
|
||||
/// Http client to use. If a HttpClient is provided in this property the RequestTimeout and Proxy options provided in these options will be ignored in requests and should be set on the provided HttpClient instance
|
||||
/// </summary>
|
||||
public HttpClient? HttpClient { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// ctor
|
||||
/// </summary>
|
||||
public BaseRestClientOptions(): this(null)
|
||||
{
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// ctor
|
||||
/// </summary>
|
||||
/// <param name="baseOptions">Copy options from these options to the new options</param>
|
||||
public BaseRestClientOptions(BaseRestClientOptions? baseOptions): base(baseOptions)
|
||||
{
|
||||
if (baseOptions == null)
|
||||
return;
|
||||
|
||||
HttpClient = baseOptions.HttpClient;
|
||||
RequestTimeout = baseOptions.RequestTimeout;
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public override string ToString()
|
||||
{
|
||||
return $"{base.ToString()}, RequestTimeout: {RequestTimeout:c}, HttpClient: {(HttpClient == null ? "-" : "set")}";
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Socket client options
|
||||
/// </summary>
|
||||
public class BaseSocketClientOptions : BaseClientOptions
|
||||
{
|
||||
/// <summary>
|
||||
/// Whether or not the socket should automatically reconnect when losing connection
|
||||
/// </summary>
|
||||
public bool AutoReconnect { get; set; } = true;
|
||||
|
||||
/// <summary>
|
||||
/// Time to wait between reconnect attempts
|
||||
/// </summary>
|
||||
public TimeSpan ReconnectInterval { get; set; } = TimeSpan.FromSeconds(5);
|
||||
|
||||
/// <summary>
|
||||
/// Max number of concurrent resubscription tasks per socket after reconnecting a socket
|
||||
/// </summary>
|
||||
public int MaxConcurrentResubscriptionsPerSocket { get; set; } = 5;
|
||||
|
||||
/// <summary>
|
||||
/// The max time to wait for a response after sending a request on the socket before giving a timeout
|
||||
/// </summary>
|
||||
public TimeSpan SocketResponseTimeout { get; set; } = TimeSpan.FromSeconds(10);
|
||||
|
||||
/// <summary>
|
||||
/// The max time of not receiving any data after which the connection is assumed to be dropped. This can only be used for socket connections where a steady flow of data is expected,
|
||||
/// for example when the server sends intermittent ping requests
|
||||
/// </summary>
|
||||
public TimeSpan SocketNoDataTimeout { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// The amount of subscriptions that should be made on a single socket connection. Not all API's support multiple subscriptions on a single socket.
|
||||
/// Setting this to a higher number increases subscription speed because not every subscription needs to connect to the server, but having more subscriptions on a
|
||||
/// single connection will also increase the amount of traffic on that single connection, potentially leading to issues.
|
||||
/// </summary>
|
||||
public int? SocketSubscriptionsCombineTarget { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// The max amount of connections to make to the server. Can be used for API's which only allow a certain number of connections. Changing this to a high value might cause issues.
|
||||
/// </summary>
|
||||
public int? MaxSocketConnections { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// ctor
|
||||
/// </summary>
|
||||
public BaseSocketClientOptions(): this(null)
|
||||
{
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// ctor
|
||||
/// </summary>
|
||||
/// <param name="baseOptions">Copy options from these options to the new options</param>
|
||||
public BaseSocketClientOptions(BaseSocketClientOptions? baseOptions): base(baseOptions)
|
||||
{
|
||||
if (baseOptions == null)
|
||||
return;
|
||||
|
||||
AutoReconnect = baseOptions.AutoReconnect;
|
||||
ReconnectInterval = baseOptions.ReconnectInterval;
|
||||
MaxConcurrentResubscriptionsPerSocket = baseOptions.MaxConcurrentResubscriptionsPerSocket;
|
||||
SocketResponseTimeout = baseOptions.SocketResponseTimeout;
|
||||
SocketNoDataTimeout = baseOptions.SocketNoDataTimeout;
|
||||
SocketSubscriptionsCombineTarget = baseOptions.SocketSubscriptionsCombineTarget;
|
||||
MaxSocketConnections = baseOptions.MaxSocketConnections;
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public override string ToString()
|
||||
{
|
||||
return $"{base.ToString()}, AutoReconnect: {AutoReconnect}, ReconnectInterval: {ReconnectInterval}, MaxConcurrentResubscriptionsPerSocket: {MaxConcurrentResubscriptionsPerSocket}, SocketResponseTimeout: {SocketResponseTimeout:c}, SocketNoDataTimeout: {SocketNoDataTimeout}, SocketSubscriptionsCombineTarget: {SocketSubscriptionsCombineTarget}, MaxSocketConnections: {MaxSocketConnections}";
|
||||
return $"LogLevel: {LogLevel}, Writers: {LogWriters.Count}, Proxy: {(Proxy == null ? "-" : Proxy.Host)}";
|
||||
}
|
||||
}
|
||||
|
||||
@ -241,6 +70,11 @@ namespace CryptoExchange.Net.Objects
|
||||
/// </summary>
|
||||
public class ApiClientOptions
|
||||
{
|
||||
/// <summary>
|
||||
/// If true, the CallResult and DataEvent objects will also include the originally received json data in the OriginalData property
|
||||
/// </summary>
|
||||
public bool OutputOriginalData { get; set; } = false;
|
||||
|
||||
/// <summary>
|
||||
/// The base address of the API
|
||||
/// </summary>
|
||||
@ -278,12 +112,13 @@ namespace CryptoExchange.Net.Objects
|
||||
{
|
||||
BaseAddress = newValues?.BaseAddress ?? baseOptions.BaseAddress;
|
||||
ApiCredentials = newValues?.ApiCredentials?.Copy() ?? baseOptions.ApiCredentials?.Copy();
|
||||
OutputOriginalData = baseOptions.OutputOriginalData;
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public override string ToString()
|
||||
{
|
||||
return $"Credentials: {(ApiCredentials == null ? "-" : "Set")}, BaseAddress: {BaseAddress}";
|
||||
return $"OutputOriginalData: {OutputOriginalData}, Credentials: {(ApiCredentials == null ? "-" : "Set")}, BaseAddress: {BaseAddress}";
|
||||
}
|
||||
}
|
||||
|
||||
@ -292,6 +127,16 @@ namespace CryptoExchange.Net.Objects
|
||||
/// </summary>
|
||||
public class RestApiClientOptions: ApiClientOptions
|
||||
{
|
||||
/// <summary>
|
||||
/// The time the server has to respond to a request before timing out
|
||||
/// </summary>
|
||||
public TimeSpan RequestTimeout { get; set; } = TimeSpan.FromSeconds(30);
|
||||
|
||||
/// <summary>
|
||||
/// Http client to use. If a HttpClient is provided in this property the RequestTimeout and Proxy options provided in these options will be ignored in requests and should be set on the provided HttpClient instance
|
||||
/// </summary>
|
||||
public HttpClient? HttpClient { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// List of rate limiters to use
|
||||
/// </summary>
|
||||
@ -334,6 +179,8 @@ namespace CryptoExchange.Net.Objects
|
||||
/// <param name="newValues">Copy values for the provided options</param>
|
||||
public RestApiClientOptions(RestApiClientOptions baseOn, RestApiClientOptions? newValues): base(baseOn, newValues)
|
||||
{
|
||||
HttpClient = newValues?.HttpClient ?? baseOn.HttpClient;
|
||||
RequestTimeout = newValues == default ? baseOn.RequestTimeout : newValues.RequestTimeout;
|
||||
RateLimitingBehaviour = newValues?.RateLimitingBehaviour ?? baseOn.RateLimitingBehaviour;
|
||||
AutoTimestamp = newValues?.AutoTimestamp ?? baseOn.AutoTimestamp;
|
||||
TimestampRecalculationInterval = newValues?.TimestampRecalculationInterval ?? baseOn.TimestampRecalculationInterval;
|
||||
@ -343,14 +190,98 @@ namespace CryptoExchange.Net.Objects
|
||||
/// <inheritdoc />
|
||||
public override string ToString()
|
||||
{
|
||||
return $"{base.ToString()}, RateLimiters: {RateLimiters?.Count}, RateLimitBehaviour: {RateLimitingBehaviour}, AutoTimestamp: {AutoTimestamp}, TimestampRecalculationInterval: {TimestampRecalculationInterval}";
|
||||
return $"{base.ToString()}, RequestTimeout: {RequestTimeout:c}, HttpClient: {(HttpClient == null ? "-" : "set")}, RateLimiters: {RateLimiters?.Count}, RateLimitBehaviour: {RateLimitingBehaviour}, AutoTimestamp: {AutoTimestamp}, TimestampRecalculationInterval: {TimestampRecalculationInterval}";
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Rest API client options
|
||||
/// </summary>
|
||||
public class SocketApiClientOptions : ApiClientOptions
|
||||
{
|
||||
/// <summary>
|
||||
/// Whether or not the socket should automatically reconnect when losing connection
|
||||
/// </summary>
|
||||
public bool AutoReconnect { get; set; } = true;
|
||||
|
||||
/// <summary>
|
||||
/// Time to wait between reconnect attempts
|
||||
/// </summary>
|
||||
public TimeSpan ReconnectInterval { get; set; } = TimeSpan.FromSeconds(5);
|
||||
|
||||
/// <summary>
|
||||
/// Max number of concurrent resubscription tasks per socket after reconnecting a socket
|
||||
/// </summary>
|
||||
public int MaxConcurrentResubscriptionsPerSocket { get; set; } = 5;
|
||||
|
||||
/// <summary>
|
||||
/// The max time to wait for a response after sending a request on the socket before giving a timeout
|
||||
/// </summary>
|
||||
public TimeSpan SocketResponseTimeout { get; set; } = TimeSpan.FromSeconds(10);
|
||||
|
||||
/// <summary>
|
||||
/// The max time of not receiving any data after which the connection is assumed to be dropped. This can only be used for socket connections where a steady flow of data is expected,
|
||||
/// for example when the server sends intermittent ping requests
|
||||
/// </summary>
|
||||
public TimeSpan SocketNoDataTimeout { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// The amount of subscriptions that should be made on a single socket connection. Not all API's support multiple subscriptions on a single socket.
|
||||
/// Setting this to a higher number increases subscription speed because not every subscription needs to connect to the server, but having more subscriptions on a
|
||||
/// single connection will also increase the amount of traffic on that single connection, potentially leading to issues.
|
||||
/// </summary>
|
||||
public int? SocketSubscriptionsCombineTarget { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// The max amount of connections to make to the server. Can be used for API's which only allow a certain number of connections. Changing this to a high value might cause issues.
|
||||
/// </summary>
|
||||
public int? MaxSocketConnections { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// ctor
|
||||
/// </summary>
|
||||
public SocketApiClientOptions()
|
||||
{
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// ctor
|
||||
/// </summary>
|
||||
/// <param name="baseAddress">Base address for the API</param>
|
||||
public SocketApiClientOptions(string baseAddress) : base(baseAddress)
|
||||
{
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// ctor
|
||||
/// </summary>
|
||||
/// <param name="baseOptions">Copy values for the provided options</param>
|
||||
/// <param name="newValues">Copy values for the provided options</param>
|
||||
public SocketApiClientOptions(SocketApiClientOptions baseOptions, SocketApiClientOptions? newValues) : base(baseOptions, newValues)
|
||||
{
|
||||
if (baseOptions == null)
|
||||
return;
|
||||
|
||||
AutoReconnect = baseOptions.AutoReconnect;
|
||||
ReconnectInterval = baseOptions.ReconnectInterval;
|
||||
MaxConcurrentResubscriptionsPerSocket = baseOptions.MaxConcurrentResubscriptionsPerSocket;
|
||||
SocketResponseTimeout = baseOptions.SocketResponseTimeout;
|
||||
SocketNoDataTimeout = baseOptions.SocketNoDataTimeout;
|
||||
SocketSubscriptionsCombineTarget = baseOptions.SocketSubscriptionsCombineTarget;
|
||||
MaxSocketConnections = baseOptions.MaxSocketConnections;
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public override string ToString()
|
||||
{
|
||||
return $"{base.ToString()}, AutoReconnect: {AutoReconnect}, ReconnectInterval: {ReconnectInterval}, MaxConcurrentResubscriptionsPerSocket: {MaxConcurrentResubscriptionsPerSocket}, SocketResponseTimeout: {SocketResponseTimeout:c}, SocketNoDataTimeout: {SocketNoDataTimeout}, SocketSubscriptionsCombineTarget: {SocketSubscriptionsCombineTarget}, MaxSocketConnections: {MaxSocketConnections}";
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Base for order book options
|
||||
/// </summary>
|
||||
public class OrderBookOptions : BaseOptions
|
||||
public class OrderBookOptions : ApiClientOptions
|
||||
{
|
||||
/// <summary>
|
||||
/// Whether or not checksum validation is enabled. Default is true, disabling will ignore checksum messages.
|
||||
|
@ -149,7 +149,6 @@ namespace CryptoExchange.Net.Sockets
|
||||
private readonly object subscriptionLock = new();
|
||||
|
||||
private readonly Log log;
|
||||
private readonly BaseSocketClient socketClient;
|
||||
|
||||
private readonly List<PendingRequest> pendingRequests;
|
||||
|
||||
@ -163,14 +162,13 @@ namespace CryptoExchange.Net.Sockets
|
||||
/// <summary>
|
||||
/// New socket connection
|
||||
/// </summary>
|
||||
/// <param name="client">The socket client</param>
|
||||
/// <param name="log">The logger</param>
|
||||
/// <param name="apiClient">The api client</param>
|
||||
/// <param name="socket">The socket</param>
|
||||
/// <param name="tag"></param>
|
||||
public SocketConnection(BaseSocketClient client, SocketApiClient apiClient, IWebsocket socket, string tag)
|
||||
public SocketConnection(Log log, SocketApiClient apiClient, IWebsocket socket, string tag)
|
||||
{
|
||||
log = client.log;
|
||||
socketClient = client;
|
||||
this.log = log;
|
||||
ApiClient = apiClient;
|
||||
Tag = tag;
|
||||
|
||||
@ -234,7 +232,7 @@ namespace CryptoExchange.Net.Sockets
|
||||
/// <returns></returns>
|
||||
protected virtual async Task<Uri?> GetReconnectionUrlAsync()
|
||||
{
|
||||
return await socketClient.GetReconnectUriAsync(ApiClient, this).ConfigureAwait(false);
|
||||
return await ApiClient.GetReconnectUriAsync(ApiClient, this).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@ -315,7 +313,7 @@ namespace CryptoExchange.Net.Sockets
|
||||
lock (pendingRequests)
|
||||
pendingRequests.Remove(pendingRequest);
|
||||
|
||||
if (!socketClient.ContinueOnQueryResponse)
|
||||
if (!ApiClient.ContinueOnQueryResponse)
|
||||
return;
|
||||
|
||||
handledResponse = true;
|
||||
@ -324,11 +322,11 @@ namespace CryptoExchange.Net.Sockets
|
||||
}
|
||||
|
||||
// Message was not a request response, check data handlers
|
||||
var messageEvent = new MessageEvent(this, tokenData, socketClient.ClientOptions.OutputOriginalData ? data : null, timestamp);
|
||||
var messageEvent = new MessageEvent(this, tokenData, ApiClient.Options.OutputOriginalData ? data : null, timestamp);
|
||||
var (handled, userProcessTime, subscription) = HandleData(messageEvent);
|
||||
if (!handled && !handledResponse)
|
||||
{
|
||||
if (!socketClient.UnhandledMessageExpected)
|
||||
if (!ApiClient.UnhandledMessageExpected)
|
||||
log.Write(LogLevel.Warning, $"Socket {SocketId} Message not handled: " + tokenData);
|
||||
UnhandledMessage?.Invoke(tokenData);
|
||||
}
|
||||
@ -368,8 +366,8 @@ namespace CryptoExchange.Net.Sockets
|
||||
if (Status == SocketStatus.Closed || Status == SocketStatus.Disposed)
|
||||
return;
|
||||
|
||||
if (socketClient.socketConnections.ContainsKey(SocketId))
|
||||
socketClient.socketConnections.TryRemove(SocketId, out _);
|
||||
if (ApiClient.socketConnections.ContainsKey(SocketId))
|
||||
ApiClient.socketConnections.TryRemove(SocketId, out _);
|
||||
|
||||
lock (subscriptionLock)
|
||||
{
|
||||
@ -407,7 +405,7 @@ namespace CryptoExchange.Net.Sockets
|
||||
subscription.CancellationTokenRegistration.Value.Dispose();
|
||||
|
||||
if (subscription.Confirmed && _socket.IsOpen)
|
||||
await socketClient.UnsubscribeAsync(this, subscription).ConfigureAwait(false);
|
||||
await ApiClient.UnsubscribeAsync(this, subscription).ConfigureAwait(false);
|
||||
|
||||
bool shouldCloseConnection;
|
||||
lock (subscriptionLock)
|
||||
@ -504,7 +502,7 @@ namespace CryptoExchange.Net.Sockets
|
||||
currentSubscription = subscription;
|
||||
if (subscription.Request == null)
|
||||
{
|
||||
if (socketClient.MessageMatchesHandler(this, messageEvent.JsonData, subscription.Identifier!))
|
||||
if (ApiClient.MessageMatchesHandler(this, messageEvent.JsonData, subscription.Identifier!))
|
||||
{
|
||||
handled = true;
|
||||
var userSw = Stopwatch.StartNew();
|
||||
@ -515,10 +513,10 @@ namespace CryptoExchange.Net.Sockets
|
||||
}
|
||||
else
|
||||
{
|
||||
if (socketClient.MessageMatchesHandler(this, messageEvent.JsonData, subscription.Request))
|
||||
if (ApiClient.MessageMatchesHandler(this, messageEvent.JsonData, subscription.Request))
|
||||
{
|
||||
handled = true;
|
||||
messageEvent.JsonData = socketClient.ProcessTokenData(messageEvent.JsonData);
|
||||
messageEvent.JsonData = ApiClient.ProcessTokenData(messageEvent.JsonData);
|
||||
var userSw = Stopwatch.StartNew();
|
||||
subscription.MessageHandler(messageEvent);
|
||||
userSw.Stop();
|
||||
@ -611,7 +609,7 @@ namespace CryptoExchange.Net.Sockets
|
||||
if (subscriptions.Any(s => s.Authenticated))
|
||||
{
|
||||
// If we reconnected a authenticated connection we need to re-authenticate
|
||||
var authResult = await socketClient.AuthenticateSocketAsync(this).ConfigureAwait(false);
|
||||
var authResult = await ApiClient.AuthenticateSocketAsync(this).ConfigureAwait(false);
|
||||
if (!authResult)
|
||||
{
|
||||
log.Write(LogLevel.Warning, $"Socket {SocketId} authentication failed on reconnected socket. Disconnecting and reconnecting.");
|
||||
@ -636,14 +634,14 @@ namespace CryptoExchange.Net.Sockets
|
||||
}
|
||||
|
||||
// Foreach subscription which is subscribed by a subscription request we will need to resend that request to resubscribe
|
||||
for (var i = 0; i < subscriptionList.Count; i += socketClient.ClientOptions.MaxConcurrentResubscriptionsPerSocket)
|
||||
for (var i = 0; i < subscriptionList.Count; i += ApiClient.Options.MaxConcurrentResubscriptionsPerSocket)
|
||||
{
|
||||
if (!_socket.IsOpen)
|
||||
return new CallResult<bool>(new WebError("Socket not connected"));
|
||||
|
||||
var taskList = new List<Task<CallResult<bool>>>();
|
||||
foreach (var subscription in subscriptionList.Skip(i).Take(socketClient.ClientOptions.MaxConcurrentResubscriptionsPerSocket))
|
||||
taskList.Add(socketClient.SubscribeAndWaitAsync(this, subscription.Request!, subscription));
|
||||
foreach (var subscription in subscriptionList.Skip(i).Take(ApiClient.Options.MaxConcurrentResubscriptionsPerSocket))
|
||||
taskList.Add(ApiClient.SubscribeAndWaitAsync(this, subscription.Request!, subscription));
|
||||
|
||||
await Task.WhenAll(taskList).ConfigureAwait(false);
|
||||
if (taskList.Any(t => !t.Result.Success))
|
||||
@ -662,7 +660,7 @@ namespace CryptoExchange.Net.Sockets
|
||||
|
||||
internal async Task UnsubscribeAsync(SocketSubscription socketSubscription)
|
||||
{
|
||||
await socketClient.UnsubscribeAsync(this, socketSubscription).ConfigureAwait(false);
|
||||
await ApiClient.UnsubscribeAsync(this, socketSubscription).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
internal async Task<CallResult<bool>> ResubscribeAsync(SocketSubscription socketSubscription)
|
||||
@ -670,7 +668,7 @@ namespace CryptoExchange.Net.Sockets
|
||||
if (!_socket.IsOpen)
|
||||
return new CallResult<bool>(new UnknownError("Socket is not connected"));
|
||||
|
||||
return await socketClient.SubscribeAndWaitAsync(this, socketSubscription.Request!, socketSubscription).ConfigureAwait(false);
|
||||
return await ApiClient.SubscribeAndWaitAsync(this, socketSubscription.Request!, socketSubscription).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
Loading…
x
Reference in New Issue
Block a user