1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-06-07 07:56:12 +00:00

Added dedicated request websocket connection support

This commit is contained in:
JKorf 2024-06-16 16:55:44 +02:00
parent 64ee50d98c
commit 3e5a34fb56
7 changed files with 128 additions and 32 deletions

View File

@ -8,6 +8,7 @@ using CryptoExchange.Net.RateLimiting.Interfaces;
using CryptoExchange.Net.Sockets;
using Microsoft.Extensions.Logging;
using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
@ -67,6 +68,11 @@ namespace CryptoExchange.Net.Clients
/// </summary>
protected List<PeriodicTaskRegistration> PeriodicTaskRegistrations { get; set; } = new List<PeriodicTaskRegistration>();
/// <summary>
/// List of address to keep an alive connection to
/// </summary>
protected List<DedicatedConnectionConfig> DedicatedConnectionConfigs { get; set; } = new List<DedicatedConnectionConfig>();
/// <inheritdoc />
public double IncomingKbps
{
@ -131,6 +137,16 @@ namespace CryptoExchange.Net.Clients
/// <returns></returns>
protected internal virtual IMessageSerializer CreateSerializer() => new JsonNetMessageSerializer();
/// <summary>
/// Keep an open connection to this url
/// </summary>
/// <param name="url"></param>
/// <param name="auth"></param>
protected virtual void SetDedicatedConnection(string url, bool auth)
{
DedicatedConnectionConfigs.Add(new DedicatedConnectionConfig() { SocketAddress = url, Authenticated = auth });
}
/// <summary>
/// Add a query to periodically send on each connection
/// </summary>
@ -193,7 +209,7 @@ namespace CryptoExchange.Net.Clients
while (true)
{
// Get a new or existing socket connection
var socketResult = await GetSocketConnection(url, subscription.Authenticated).ConfigureAwait(false);
var socketResult = await GetSocketConnection(url, subscription.Authenticated, false).ConfigureAwait(false);
if (!socketResult)
return socketResult.As<UpdateSubscription>(null);
@ -311,7 +327,7 @@ namespace CryptoExchange.Net.Clients
await semaphoreSlim.WaitAsync().ConfigureAwait(false);
try
{
var socketResult = await GetSocketConnection(url, query.Authenticated).ConfigureAwait(false);
var socketResult = await GetSocketConnection(url, query.Authenticated, true).ConfigureAwait(false);
if (!socketResult)
return socketResult.As<THandlerResponse>(default);
@ -455,19 +471,31 @@ namespace CryptoExchange.Net.Clients
/// </summary>
/// <param name="address">The address the socket is for</param>
/// <param name="authenticated">Whether the socket should be authenticated</param>
/// <param name="dedicatedRequestConnection">Whether a dedicated request connection should be returned</param>
/// <returns></returns>
protected virtual async Task<CallResult<SocketConnection>> GetSocketConnection(string address, bool authenticated)
protected virtual async Task<CallResult<SocketConnection>> GetSocketConnection(string address, bool authenticated, bool dedicatedRequestConnection)
{
var socketResult = socketConnections.Where(s => (s.Value.Status == SocketConnection.SocketStatus.None || s.Value.Status == SocketConnection.SocketStatus.Connected)
&& s.Value.Tag.TrimEnd('/') == address.TrimEnd('/')
&& s.Value.ApiClient.GetType() == GetType()
&& (s.Value.Authenticated == authenticated || !authenticated) && s.Value.Connected).OrderBy(s => s.Value.UserSubscriptionCount).FirstOrDefault();
var result = socketResult.Equals(default(KeyValuePair<int, SocketConnection>)) ? null : socketResult.Value;
if (result != null)
var socketQuery = socketConnections.Where(s => (s.Value.Status == SocketConnection.SocketStatus.None || s.Value.Status == SocketConnection.SocketStatus.Connected)
&& s.Value.Tag.TrimEnd('/') == address.TrimEnd('/')
&& s.Value.ApiClient.GetType() == GetType()
&& (s.Value.Authenticated == authenticated || !authenticated)
&& s.Value.Connected);
SocketConnection connection;
if (!dedicatedRequestConnection)
{
if (result.UserSubscriptionCount < ClientOptions.SocketSubscriptionsCombineTarget || socketConnections.Count >= (ApiOptions.MaxSocketConnections ?? ClientOptions.MaxSocketConnections) && socketConnections.All(s => s.Value.UserSubscriptionCount >= ClientOptions.SocketSubscriptionsCombineTarget))
connection = socketQuery.Where(s => !s.Value.DedicatedRequestConnection).OrderBy(s => s.Value.UserSubscriptionCount).FirstOrDefault().Value;
}
else
{
connection = socketQuery.Where(s => s.Value.DedicatedRequestConnection).FirstOrDefault().Value;
}
if (connection != null)
{
if (connection.UserSubscriptionCount < ClientOptions.SocketSubscriptionsCombineTarget || socketConnections.Count >= (ApiOptions.MaxSocketConnections ?? ClientOptions.MaxSocketConnections) && socketConnections.All(s => s.Value.UserSubscriptionCount >= ClientOptions.SocketSubscriptionsCombineTarget))
// Use existing socket if it has less than target connections OR it has the least connections and we can't make new
return new CallResult<SocketConnection>(result);
return new CallResult<SocketConnection>(connection);
}
var connectionAddress = await GetConnectionUrlAsync(address, authenticated).ConfigureAwait(false);
@ -484,6 +512,7 @@ namespace CryptoExchange.Net.Clients
var socket = CreateSocket(connectionAddress.Data!);
var socketConnection = new SocketConnection(_logger, this, socket, address);
socketConnection.UnhandledMessage += HandleUnhandledMessage;
socketConnection.DedicatedRequestConnection = dedicatedRequestConnection;
foreach (var ptg in PeriodicTaskRegistrations)
socketConnection.QueryPeriodic(ptg.Identifier, ptg.Interval, ptg.QueryDelegate, ptg.Callback);
@ -603,8 +632,8 @@ namespace CryptoExchange.Net.Clients
var tasks = new List<Task>();
{
var socketList = socketConnections.Values;
foreach (var sub in socketList)
tasks.Add(sub.CloseAsync());
foreach (var connection in socketList.Where(s => !s.DedicatedRequestConnection))
tasks.Add(connection.CloseAsync());
}
await Task.WhenAll(tasks.ToArray()).ConfigureAwait(false);
@ -627,6 +656,23 @@ namespace CryptoExchange.Net.Clients
await Task.WhenAll(tasks.ToArray()).ConfigureAwait(false);
}
/// <inheritdoc />
public virtual async Task<CallResult> PrepareConnectionsAsync()
{
foreach (var item in DedicatedConnectionConfigs)
{
var socketResult = await GetSocketConnection(item.SocketAddress, item.Authenticated, true).ConfigureAwait(false);
if (!socketResult)
return socketResult.AsDataless();
var connectResult = await ConnectIfNeededAsync(socketResult.Data, item.Authenticated).ConfigureAwait(false);
if (!connectResult)
return new CallResult(connectResult.Error!);
}
return new CallResult(null);
}
/// <summary>
/// Log the current state of connections and subscriptions
/// </summary>
@ -710,11 +756,18 @@ namespace CryptoExchange.Net.Clients
public override void Dispose()
{
_disposing = true;
if (socketConnections.Sum(s => s.Value.UserSubscriptionCount) > 0)
var tasks = new List<Task>();
{
_logger.DisposingSocketClient();
_ = UnsubscribeAllAsync();
var socketList = socketConnections.Values.Where(x => x.UserSubscriptionCount > 0 || x.Connected);
if (socketList.Any())
_logger.DisposingSocketClient();
foreach (var connection in socketList)
{
tasks.Add(connection.CloseAsync());
}
}
semaphoreSlim?.Dispose();
base.Dispose();
}

View File

@ -1,4 +1,5 @@
using CryptoExchange.Net.Objects.Options;
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Options;
using CryptoExchange.Net.Objects.Sockets;
using System.Threading.Tasks;
@ -59,5 +60,11 @@ namespace CryptoExchange.Net.Interfaces
/// <param name="subscription">The subscription to unsubscribe</param>
/// <returns></returns>
Task UnsubscribeAsync(UpdateSubscription subscription);
/// <summary>
/// Prepare connections which can subsequently be used for sending websocket requests.
/// </summary>
/// <returns></returns>
Task<CallResult> PrepareConnectionsAsync();
}
}

View File

@ -0,0 +1,21 @@
using System;
using System.Collections.Generic;
using System.Text;
namespace CryptoExchange.Net.Sockets
{
/// <summary>
/// Dedicated connection configuration
/// </summary>
public class DedicatedConnectionConfig
{
/// <summary>
/// Socket address
/// </summary>
public string SocketAddress { get; set; } = string.Empty;
/// <summary>
/// authenticated
/// </summary>
public bool Authenticated { get; set; }
}
}

View File

@ -175,6 +175,11 @@ namespace CryptoExchange.Net.Sockets
}
}
/// <summary>
/// Whether this connection should be kept alive even when there is no subscription
/// </summary>
public bool DedicatedRequestConnection { get; internal set; }
private bool _pausedActivity;
private readonly object _listenersLock;
private readonly List<IMessageProcessor> _listeners;
@ -608,7 +613,7 @@ namespace CryptoExchange.Net.Sockets
bool shouldCloseConnection;
lock (_listenersLock)
{
shouldCloseConnection = _listeners.OfType<Subscription>().All(r => !r.UserSubscription || r.Closed);
shouldCloseConnection = _listeners.OfType<Subscription>().All(r => !r.UserSubscription || r.Closed) && !DedicatedRequestConnection;
if (shouldCloseConnection)
Status = SocketStatus.Closing;
}
@ -811,20 +816,23 @@ namespace CryptoExchange.Net.Sockets
if (!_socket.IsOpen)
return new CallResult(new WebError("Socket not connected"));
bool anySubscriptions;
lock (_listenersLock)
anySubscriptions = _listeners.OfType<Subscription>().Any(s => s.UserSubscription);
if (!anySubscriptions)
if (!DedicatedRequestConnection)
{
// No need to resubscribe anything
_logger.NothingToResubscribeCloseConnection(SocketId);
_ = _socket.CloseAsync();
return new CallResult(null);
bool anySubscriptions;
lock (_listenersLock)
anySubscriptions = _listeners.OfType<Subscription>().Any(s => s.UserSubscription);
if (!anySubscriptions)
{
// No need to resubscribe anything
_logger.NothingToResubscribeCloseConnection(SocketId);
_ = _socket.CloseAsync();
return new CallResult(null);
}
}
bool anyAuthenticated;
lock (_listenersLock)
anyAuthenticated = _listeners.OfType<Subscription>().Any(s => s.Authenticated);
anyAuthenticated = _listeners.OfType<Subscription>().Any(s => s.Authenticated) || DedicatedRequestConnection;
if (anyAuthenticated)
{
// If we reconnected a authenticated connection we need to re-authenticate

View File

@ -30,9 +30,14 @@ namespace CryptoExchange.Net.Testing.Implementations
public bool IsClosed => !Connected;
public bool IsOpen => Connected;
public double IncomingKbps => 0;
public Uri Uri => new("wss://test.com/ws");
public Uri Uri { get; set; }
public Func<Task<Uri?>>? GetReconnectionUrl { get; set; }
public TestSocket(string address)
{
Uri = new Uri(address);
}
public Task<CallResult> ConnectAsync()
{
Connected = CanConnect;

View File

@ -50,13 +50,15 @@ namespace CryptoExchange.Net.Testing
/// <param name="name">Method name for looking up json test values</param>
/// <param name="nestedJsonProperty">Use nested json property for compare</param>
/// <param name="ignoreProperties">Ignore certain properties</param>
/// <param name="addressPath">Path</param>
/// <returns></returns>
/// <exception cref="Exception"></exception>
public async Task ValidateAsync<TUpdate>(
Func<TClient, Action<DataEvent<TUpdate>>, Task<CallResult<UpdateSubscription>>> methodInvoke,
string name,
string? nestedJsonProperty = null,
List<string>? ignoreProperties = null)
List<string>? ignoreProperties = null,
string? addressPath = null)
{
var listener = new EnumValueTraceListener();
Trace.Listeners.Add(listener);
@ -79,7 +81,7 @@ namespace CryptoExchange.Net.Testing
var data = Encoding.UTF8.GetString(buffer);
using var reader = new StringReader(data);
var socket = TestHelpers.ConfigureSocketClient(_client);
var socket = TestHelpers.ConfigureSocketClient(_client, addressPath == null ? _baseAddress : _baseAddress.AppendPath(addressPath));
var waiter = new AutoResetEvent(false);
string? lastMessage = null;

View File

@ -57,9 +57,9 @@ namespace CryptoExchange.Net.Testing
return self == to;
}
internal static TestSocket ConfigureSocketClient<T>(T client) where T : BaseSocketClient
internal static TestSocket ConfigureSocketClient<T>(T client, string address) where T : BaseSocketClient
{
var socket = new TestSocket();
var socket = new TestSocket(address);
foreach (var apiClient in client.ApiClients.OfType<SocketApiClient>())
{
apiClient.SocketFactory = new TestWebsocketFactory(socket);