From 3e5a34fb56ad68c02633860d7f6c10f27a4e9ac8 Mon Sep 17 00:00:00 2001 From: JKorf Date: Sun, 16 Jun 2024 16:55:44 +0200 Subject: [PATCH] Added dedicated request websocket connection support --- CryptoExchange.Net/Clients/SocketApiClient.cs | 85 +++++++++++++++---- .../Interfaces/ISocketApiClient.cs | 9 +- .../Sockets/DedicatedConnectionConfig.cs | 21 +++++ .../Sockets/SocketConnection.cs | 28 +++--- .../Testing/Implementations/TestSocket.cs | 7 +- .../Testing/SocketSubscriptionValidator.cs | 6 +- CryptoExchange.Net/Testing/TestHelpers.cs | 4 +- 7 files changed, 128 insertions(+), 32 deletions(-) create mode 100644 CryptoExchange.Net/Sockets/DedicatedConnectionConfig.cs diff --git a/CryptoExchange.Net/Clients/SocketApiClient.cs b/CryptoExchange.Net/Clients/SocketApiClient.cs index f14682b..0d981a4 100644 --- a/CryptoExchange.Net/Clients/SocketApiClient.cs +++ b/CryptoExchange.Net/Clients/SocketApiClient.cs @@ -8,6 +8,7 @@ using CryptoExchange.Net.RateLimiting.Interfaces; using CryptoExchange.Net.Sockets; using Microsoft.Extensions.Logging; using System; +using System.Collections; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; @@ -67,6 +68,11 @@ namespace CryptoExchange.Net.Clients /// protected List PeriodicTaskRegistrations { get; set; } = new List(); + /// + /// List of address to keep an alive connection to + /// + protected List DedicatedConnectionConfigs { get; set; } = new List(); + /// public double IncomingKbps { @@ -131,6 +137,16 @@ namespace CryptoExchange.Net.Clients /// protected internal virtual IMessageSerializer CreateSerializer() => new JsonNetMessageSerializer(); + /// + /// Keep an open connection to this url + /// + /// + /// + protected virtual void SetDedicatedConnection(string url, bool auth) + { + DedicatedConnectionConfigs.Add(new DedicatedConnectionConfig() { SocketAddress = url, Authenticated = auth }); + } + /// /// Add a query to periodically send on each connection /// @@ -193,7 +209,7 @@ namespace CryptoExchange.Net.Clients while (true) { // Get a new or existing socket connection - var socketResult = await GetSocketConnection(url, subscription.Authenticated).ConfigureAwait(false); + var socketResult = await GetSocketConnection(url, subscription.Authenticated, false).ConfigureAwait(false); if (!socketResult) return socketResult.As(null); @@ -311,7 +327,7 @@ namespace CryptoExchange.Net.Clients await semaphoreSlim.WaitAsync().ConfigureAwait(false); try { - var socketResult = await GetSocketConnection(url, query.Authenticated).ConfigureAwait(false); + var socketResult = await GetSocketConnection(url, query.Authenticated, true).ConfigureAwait(false); if (!socketResult) return socketResult.As(default); @@ -455,19 +471,31 @@ namespace CryptoExchange.Net.Clients /// /// The address the socket is for /// Whether the socket should be authenticated + /// Whether a dedicated request connection should be returned /// - protected virtual async Task> GetSocketConnection(string address, bool authenticated) + protected virtual async Task> GetSocketConnection(string address, bool authenticated, bool dedicatedRequestConnection) { - var socketResult = socketConnections.Where(s => (s.Value.Status == SocketConnection.SocketStatus.None || s.Value.Status == SocketConnection.SocketStatus.Connected) - && s.Value.Tag.TrimEnd('/') == address.TrimEnd('/') - && s.Value.ApiClient.GetType() == GetType() - && (s.Value.Authenticated == authenticated || !authenticated) && s.Value.Connected).OrderBy(s => s.Value.UserSubscriptionCount).FirstOrDefault(); - var result = socketResult.Equals(default(KeyValuePair)) ? null : socketResult.Value; - if (result != null) + var socketQuery = socketConnections.Where(s => (s.Value.Status == SocketConnection.SocketStatus.None || s.Value.Status == SocketConnection.SocketStatus.Connected) + && s.Value.Tag.TrimEnd('/') == address.TrimEnd('/') + && s.Value.ApiClient.GetType() == GetType() + && (s.Value.Authenticated == authenticated || !authenticated) + && s.Value.Connected); + + SocketConnection connection; + if (!dedicatedRequestConnection) { - if (result.UserSubscriptionCount < ClientOptions.SocketSubscriptionsCombineTarget || socketConnections.Count >= (ApiOptions.MaxSocketConnections ?? ClientOptions.MaxSocketConnections) && socketConnections.All(s => s.Value.UserSubscriptionCount >= ClientOptions.SocketSubscriptionsCombineTarget)) + connection = socketQuery.Where(s => !s.Value.DedicatedRequestConnection).OrderBy(s => s.Value.UserSubscriptionCount).FirstOrDefault().Value; + } + else + { + connection = socketQuery.Where(s => s.Value.DedicatedRequestConnection).FirstOrDefault().Value; + } + + if (connection != null) + { + if (connection.UserSubscriptionCount < ClientOptions.SocketSubscriptionsCombineTarget || socketConnections.Count >= (ApiOptions.MaxSocketConnections ?? ClientOptions.MaxSocketConnections) && socketConnections.All(s => s.Value.UserSubscriptionCount >= ClientOptions.SocketSubscriptionsCombineTarget)) // Use existing socket if it has less than target connections OR it has the least connections and we can't make new - return new CallResult(result); + return new CallResult(connection); } var connectionAddress = await GetConnectionUrlAsync(address, authenticated).ConfigureAwait(false); @@ -484,6 +512,7 @@ namespace CryptoExchange.Net.Clients var socket = CreateSocket(connectionAddress.Data!); var socketConnection = new SocketConnection(_logger, this, socket, address); socketConnection.UnhandledMessage += HandleUnhandledMessage; + socketConnection.DedicatedRequestConnection = dedicatedRequestConnection; foreach (var ptg in PeriodicTaskRegistrations) socketConnection.QueryPeriodic(ptg.Identifier, ptg.Interval, ptg.QueryDelegate, ptg.Callback); @@ -603,8 +632,8 @@ namespace CryptoExchange.Net.Clients var tasks = new List(); { var socketList = socketConnections.Values; - foreach (var sub in socketList) - tasks.Add(sub.CloseAsync()); + foreach (var connection in socketList.Where(s => !s.DedicatedRequestConnection)) + tasks.Add(connection.CloseAsync()); } await Task.WhenAll(tasks.ToArray()).ConfigureAwait(false); @@ -627,6 +656,23 @@ namespace CryptoExchange.Net.Clients await Task.WhenAll(tasks.ToArray()).ConfigureAwait(false); } + /// + public virtual async Task PrepareConnectionsAsync() + { + foreach (var item in DedicatedConnectionConfigs) + { + var socketResult = await GetSocketConnection(item.SocketAddress, item.Authenticated, true).ConfigureAwait(false); + if (!socketResult) + return socketResult.AsDataless(); + + var connectResult = await ConnectIfNeededAsync(socketResult.Data, item.Authenticated).ConfigureAwait(false); + if (!connectResult) + return new CallResult(connectResult.Error!); + } + + return new CallResult(null); + } + /// /// Log the current state of connections and subscriptions /// @@ -710,11 +756,18 @@ namespace CryptoExchange.Net.Clients public override void Dispose() { _disposing = true; - if (socketConnections.Sum(s => s.Value.UserSubscriptionCount) > 0) + var tasks = new List(); { - _logger.DisposingSocketClient(); - _ = UnsubscribeAllAsync(); + var socketList = socketConnections.Values.Where(x => x.UserSubscriptionCount > 0 || x.Connected); + if (socketList.Any()) + _logger.DisposingSocketClient(); + + foreach (var connection in socketList) + { + tasks.Add(connection.CloseAsync()); + } } + semaphoreSlim?.Dispose(); base.Dispose(); } diff --git a/CryptoExchange.Net/Interfaces/ISocketApiClient.cs b/CryptoExchange.Net/Interfaces/ISocketApiClient.cs index 7c7a430..0a0a751 100644 --- a/CryptoExchange.Net/Interfaces/ISocketApiClient.cs +++ b/CryptoExchange.Net/Interfaces/ISocketApiClient.cs @@ -1,4 +1,5 @@ -using CryptoExchange.Net.Objects.Options; +using CryptoExchange.Net.Objects; +using CryptoExchange.Net.Objects.Options; using CryptoExchange.Net.Objects.Sockets; using System.Threading.Tasks; @@ -59,5 +60,11 @@ namespace CryptoExchange.Net.Interfaces /// The subscription to unsubscribe /// Task UnsubscribeAsync(UpdateSubscription subscription); + + /// + /// Prepare connections which can subsequently be used for sending websocket requests. + /// + /// + Task PrepareConnectionsAsync(); } } \ No newline at end of file diff --git a/CryptoExchange.Net/Sockets/DedicatedConnectionConfig.cs b/CryptoExchange.Net/Sockets/DedicatedConnectionConfig.cs new file mode 100644 index 0000000..15840e4 --- /dev/null +++ b/CryptoExchange.Net/Sockets/DedicatedConnectionConfig.cs @@ -0,0 +1,21 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace CryptoExchange.Net.Sockets +{ + /// + /// Dedicated connection configuration + /// + public class DedicatedConnectionConfig + { + /// + /// Socket address + /// + public string SocketAddress { get; set; } = string.Empty; + /// + /// authenticated + /// + public bool Authenticated { get; set; } + } +} diff --git a/CryptoExchange.Net/Sockets/SocketConnection.cs b/CryptoExchange.Net/Sockets/SocketConnection.cs index 912909b..dad3ad1 100644 --- a/CryptoExchange.Net/Sockets/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/SocketConnection.cs @@ -175,6 +175,11 @@ namespace CryptoExchange.Net.Sockets } } + /// + /// Whether this connection should be kept alive even when there is no subscription + /// + public bool DedicatedRequestConnection { get; internal set; } + private bool _pausedActivity; private readonly object _listenersLock; private readonly List _listeners; @@ -608,7 +613,7 @@ namespace CryptoExchange.Net.Sockets bool shouldCloseConnection; lock (_listenersLock) { - shouldCloseConnection = _listeners.OfType().All(r => !r.UserSubscription || r.Closed); + shouldCloseConnection = _listeners.OfType().All(r => !r.UserSubscription || r.Closed) && !DedicatedRequestConnection; if (shouldCloseConnection) Status = SocketStatus.Closing; } @@ -811,20 +816,23 @@ namespace CryptoExchange.Net.Sockets if (!_socket.IsOpen) return new CallResult(new WebError("Socket not connected")); - bool anySubscriptions; - lock (_listenersLock) - anySubscriptions = _listeners.OfType().Any(s => s.UserSubscription); - if (!anySubscriptions) + if (!DedicatedRequestConnection) { - // No need to resubscribe anything - _logger.NothingToResubscribeCloseConnection(SocketId); - _ = _socket.CloseAsync(); - return new CallResult(null); + bool anySubscriptions; + lock (_listenersLock) + anySubscriptions = _listeners.OfType().Any(s => s.UserSubscription); + if (!anySubscriptions) + { + // No need to resubscribe anything + _logger.NothingToResubscribeCloseConnection(SocketId); + _ = _socket.CloseAsync(); + return new CallResult(null); + } } bool anyAuthenticated; lock (_listenersLock) - anyAuthenticated = _listeners.OfType().Any(s => s.Authenticated); + anyAuthenticated = _listeners.OfType().Any(s => s.Authenticated) || DedicatedRequestConnection; if (anyAuthenticated) { // If we reconnected a authenticated connection we need to re-authenticate diff --git a/CryptoExchange.Net/Testing/Implementations/TestSocket.cs b/CryptoExchange.Net/Testing/Implementations/TestSocket.cs index 06195b1..61b1c5a 100644 --- a/CryptoExchange.Net/Testing/Implementations/TestSocket.cs +++ b/CryptoExchange.Net/Testing/Implementations/TestSocket.cs @@ -30,9 +30,14 @@ namespace CryptoExchange.Net.Testing.Implementations public bool IsClosed => !Connected; public bool IsOpen => Connected; public double IncomingKbps => 0; - public Uri Uri => new("wss://test.com/ws"); + public Uri Uri { get; set; } public Func>? GetReconnectionUrl { get; set; } + public TestSocket(string address) + { + Uri = new Uri(address); + } + public Task ConnectAsync() { Connected = CanConnect; diff --git a/CryptoExchange.Net/Testing/SocketSubscriptionValidator.cs b/CryptoExchange.Net/Testing/SocketSubscriptionValidator.cs index 8a7ae06..d273a65 100644 --- a/CryptoExchange.Net/Testing/SocketSubscriptionValidator.cs +++ b/CryptoExchange.Net/Testing/SocketSubscriptionValidator.cs @@ -50,13 +50,15 @@ namespace CryptoExchange.Net.Testing /// Method name for looking up json test values /// Use nested json property for compare /// Ignore certain properties + /// Path /// /// public async Task ValidateAsync( Func>, Task>> methodInvoke, string name, string? nestedJsonProperty = null, - List? ignoreProperties = null) + List? ignoreProperties = null, + string? addressPath = null) { var listener = new EnumValueTraceListener(); Trace.Listeners.Add(listener); @@ -79,7 +81,7 @@ namespace CryptoExchange.Net.Testing var data = Encoding.UTF8.GetString(buffer); using var reader = new StringReader(data); - var socket = TestHelpers.ConfigureSocketClient(_client); + var socket = TestHelpers.ConfigureSocketClient(_client, addressPath == null ? _baseAddress : _baseAddress.AppendPath(addressPath)); var waiter = new AutoResetEvent(false); string? lastMessage = null; diff --git a/CryptoExchange.Net/Testing/TestHelpers.cs b/CryptoExchange.Net/Testing/TestHelpers.cs index c32a75a..74df021 100644 --- a/CryptoExchange.Net/Testing/TestHelpers.cs +++ b/CryptoExchange.Net/Testing/TestHelpers.cs @@ -57,9 +57,9 @@ namespace CryptoExchange.Net.Testing return self == to; } - internal static TestSocket ConfigureSocketClient(T client) where T : BaseSocketClient + internal static TestSocket ConfigureSocketClient(T client, string address) where T : BaseSocketClient { - var socket = new TestSocket(); + var socket = new TestSocket(address); foreach (var apiClient in client.ApiClients.OfType()) { apiClient.SocketFactory = new TestWebsocketFactory(socket);