From 68f772a13a50d52c65f475fe54daa1be681800fe Mon Sep 17 00:00:00 2001 From: JKorf Date: Sat, 15 Nov 2025 17:46:22 +0100 Subject: [PATCH] wip --- .../SocketClientTests.cs | 33 +++--- .../TestImplementations/TestSocketClient.cs | 3 + CryptoExchange.Net/Caching/MemoryCache.cs | 5 + CryptoExchange.Net/Clients/BaseClient.cs | 5 + CryptoExchange.Net/CryptoExchange.Net.csproj | 2 +- CryptoExchange.Net/ExchangeHelpers.cs | 3 +- .../Interfaces/IWebsocketFactory.cs | 5 +- CryptoExchange.Net/LibraryHelpers.cs | 6 ++ .../Objects/AsyncAutoResetEvent.cs | 13 ++- .../Sockets/HighPerfUpdateSubscription.cs | 15 +-- .../Objects/Sockets/UpdateSubscription.cs | 8 +- .../Objects/Sockets/WebSocketParameters.cs | 2 - .../OrderBook/SymbolOrderBook.cs | 4 + .../Sockets/CryptoExchangeWebSocketClient.cs | 20 ++-- .../HighPerfPeriodicTaskRegistration.cs | 5 +- .../HighPerf/HighPerfSocketConnection.cs | 95 +++++++++++------ .../Sockets/HighPerf/HighPerfSubscription.cs | 18 ++-- .../HighPerf/HighPerfWebSocketClient.cs | 100 +++++++----------- .../Sockets/ISocketConnection.cs | 41 +++++-- .../Sockets/SocketConnection.cs | 7 +- .../Testing/Implementations/TestSocket.cs | 6 +- .../Trackers/Klines/KlineTracker.cs | 7 +- .../Trackers/Trades/TradeTracker.cs | 7 +- 23 files changed, 256 insertions(+), 154 deletions(-) diff --git a/CryptoExchange.Net.UnitTests/SocketClientTests.cs b/CryptoExchange.Net.UnitTests/SocketClientTests.cs index 1273221..2112aae 100644 --- a/CryptoExchange.Net.UnitTests/SocketClientTests.cs +++ b/CryptoExchange.Net.UnitTests/SocketClientTests.cs @@ -1,17 +1,19 @@ -using System; -using System.Collections.Generic; -using System.Text.Json; -using System.Threading; -using System.Threading.Tasks; -using CryptoExchange.Net.Objects; +using CryptoExchange.Net.Objects; using CryptoExchange.Net.Objects.Sockets; using CryptoExchange.Net.Sockets; +using CryptoExchange.Net.Testing.Implementations; using CryptoExchange.Net.UnitTests.TestImplementations; using CryptoExchange.Net.UnitTests.TestImplementations.Sockets; using Microsoft.Extensions.Logging; using Moq; using NUnit.Framework; using NUnit.Framework.Legacy; +using System; +using System.Collections.Generic; +using System.Net.Sockets; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; namespace CryptoExchange.Net.UnitTests { @@ -44,7 +46,8 @@ namespace CryptoExchange.Net.UnitTests socket.CanConnect = canConnect; //act - var connectResult = client.SubClient.ConnectSocketSub(new SocketConnection(new TraceLogger(), client.SubClient, socket, null)); + var connectResult = client.SubClient.ConnectSocketSub( + new SocketConnection(new TraceLogger(), new TestWebsocketFactory(socket), new WebSocketParameters(new Uri("https://localhost/"), ReconnectPolicy.Disabled), client.SubClient, "")); //assert Assert.That(connectResult.Success == canConnect); @@ -59,7 +62,7 @@ namespace CryptoExchange.Net.UnitTests }); var socket = client.CreateSocket(); socket.CanConnect = true; - var sub = new SocketConnection(new TraceLogger(), client.SubClient, socket, null); + var sub = new SocketConnection(new TraceLogger(), new TestWebsocketFactory(socket), new WebSocketParameters(new Uri("https://localhost/"), ReconnectPolicy.Disabled), client.SubClient, ""); var rstEvent = new ManualResetEvent(false); Dictionary result = null; @@ -92,7 +95,7 @@ namespace CryptoExchange.Net.UnitTests }); var socket = client.CreateSocket(); socket.CanConnect = true; - var sub = new SocketConnection(new TraceLogger(), client.SubClient, socket, null); + var sub = new SocketConnection(new TraceLogger(), new TestWebsocketFactory(socket), new WebSocketParameters(new Uri("https://localhost/"), ReconnectPolicy.Disabled), client.SubClient, ""); var rstEvent = new ManualResetEvent(false); string original = null; @@ -123,7 +126,7 @@ namespace CryptoExchange.Net.UnitTests }); var socket = client.CreateSocket(); socket.CanConnect = true; - var sub = new SocketConnection(new TraceLogger(), client.SubClient, socket, null); + var sub = new SocketConnection(new TraceLogger(), new TestWebsocketFactory(socket), new WebSocketParameters(new Uri("https://localhost/"), ReconnectPolicy.Disabled), client.SubClient, ""); client.SubClient.ConnectSocketSub(sub); var subscription = new TestSubscription>(Mock.Of(), (messageEvent) => { }); @@ -146,8 +149,8 @@ namespace CryptoExchange.Net.UnitTests var socket2 = client.CreateSocket(); socket1.CanConnect = true; socket2.CanConnect = true; - var sub1 = new SocketConnection(new TraceLogger(), client.SubClient, socket1, null); - var sub2 = new SocketConnection(new TraceLogger(), client.SubClient, socket2, null); + var sub1 = new SocketConnection(new TraceLogger(), new TestWebsocketFactory(socket1), new WebSocketParameters(new Uri("https://localhost/"), ReconnectPolicy.Disabled), client.SubClient, ""); + var sub2 = new SocketConnection(new TraceLogger(), new TestWebsocketFactory(socket2), new WebSocketParameters(new Uri("https://localhost/"), ReconnectPolicy.Disabled), client.SubClient, ""); client.SubClient.ConnectSocketSub(sub1); client.SubClient.ConnectSocketSub(sub2); var subscription1 = new TestSubscription>(Mock.Of(), (messageEvent) => { }); @@ -173,7 +176,7 @@ namespace CryptoExchange.Net.UnitTests var client = new TestSocketClient(options => { options.ReconnectInterval = TimeSpan.Zero; }); var socket = client.CreateSocket(); socket.CanConnect = false; - var sub1 = new SocketConnection(new TraceLogger(), client.SubClient, socket, null); + var sub1 = new SocketConnection(new TraceLogger(), new TestWebsocketFactory(socket), new WebSocketParameters(new Uri("https://localhost/"), ReconnectPolicy.Disabled), client.SubClient, ""); // act var connectResult = client.SubClient.ConnectSocketSub(sub1); @@ -194,7 +197,7 @@ namespace CryptoExchange.Net.UnitTests }); var socket = client.CreateSocket(); socket.CanConnect = true; - client.SubClient.ConnectSocketSub(new SocketConnection(new TraceLogger(), client.SubClient, socket, "https://test.test")); + client.SubClient.ConnectSocketSub(new SocketConnection(new TraceLogger(), new TestWebsocketFactory(socket), new WebSocketParameters(new Uri("https://localhost/"), ReconnectPolicy.Disabled), client.SubClient, "")); // act var sub = client.SubClient.SubscribeToSomethingAsync(channel, onUpdate => {}, ct: default); @@ -217,7 +220,7 @@ namespace CryptoExchange.Net.UnitTests }); var socket = client.CreateSocket(); socket.CanConnect = true; - client.SubClient.ConnectSocketSub(new SocketConnection(new TraceLogger(), client.SubClient, socket, "https://test.test")); + client.SubClient.ConnectSocketSub(new SocketConnection(new TraceLogger(), new TestWebsocketFactory(socket), new WebSocketParameters(new Uri("https://localhost/"), ReconnectPolicy.Disabled), client.SubClient, "")); // act var sub = client.SubClient.SubscribeToSomethingAsync(channel, onUpdate => {}, ct: default); diff --git a/CryptoExchange.Net.UnitTests/TestImplementations/TestSocketClient.cs b/CryptoExchange.Net.UnitTests/TestImplementations/TestSocketClient.cs index 9df7288..094fc62 100644 --- a/CryptoExchange.Net.UnitTests/TestImplementations/TestSocketClient.cs +++ b/CryptoExchange.Net.UnitTests/TestImplementations/TestSocketClient.cs @@ -18,6 +18,7 @@ using CryptoExchange.Net.SharedApis; using Microsoft.Extensions.Options; using CryptoExchange.Net.Converters.SystemTextJson; using System.Net.WebSockets; +using System.Text.Json; namespace CryptoExchange.Net.UnitTests.TestImplementations { @@ -94,6 +95,8 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations public Subscription TestSubscription { get; private set; } = null; + public override JsonSerializerOptions JsonSerializerOptions => new JsonSerializerOptions(); + public TestSubSocketClient(TestSocketOptions options, SocketApiOptions apiOptions) : base(new TraceLogger(), options.Environment.TestAddress, options, apiOptions) { diff --git a/CryptoExchange.Net/Caching/MemoryCache.cs b/CryptoExchange.Net/Caching/MemoryCache.cs index ca2c3c4..4507ac6 100644 --- a/CryptoExchange.Net/Caching/MemoryCache.cs +++ b/CryptoExchange.Net/Caching/MemoryCache.cs @@ -1,13 +1,18 @@ using System; using System.Collections.Concurrent; using System.Linq; +using System.Threading; namespace CryptoExchange.Net.Caching { internal class MemoryCache { private readonly ConcurrentDictionary _cache = new ConcurrentDictionary(); +#if NET9_0_OR_GREATER + private readonly Lock _lock = new Lock(); +#else private readonly object _lock = new object(); +#endif /// /// Add a new cache entry. Will override an existing entry if it already exists diff --git a/CryptoExchange.Net/Clients/BaseClient.cs b/CryptoExchange.Net/Clients/BaseClient.cs index 5d3fc5f..bd91d53 100644 --- a/CryptoExchange.Net/Clients/BaseClient.cs +++ b/CryptoExchange.Net/Clients/BaseClient.cs @@ -4,6 +4,7 @@ using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using System; using System.Collections.Generic; +using System.Threading; namespace CryptoExchange.Net.Clients { @@ -49,7 +50,11 @@ namespace CryptoExchange.Net.Clients /// protected internal ILogger _logger; +#if NET9_0_OR_GREATER + private readonly Lock _versionLock = new Lock(); +#else private readonly object _versionLock = new object(); +#endif private Version _exchangeVersion; /// diff --git a/CryptoExchange.Net/CryptoExchange.Net.csproj b/CryptoExchange.Net/CryptoExchange.Net.csproj index fa81f7d..864a3bf 100644 --- a/CryptoExchange.Net/CryptoExchange.Net.csproj +++ b/CryptoExchange.Net/CryptoExchange.Net.csproj @@ -20,7 +20,7 @@ true https://github.com/JKorf/CryptoExchange.Net?tab=readme-ov-file#release-notes enable - 12.0 + latest MIT diff --git a/CryptoExchange.Net/ExchangeHelpers.cs b/CryptoExchange.Net/ExchangeHelpers.cs index 7d425d0..785ed0d 100644 --- a/CryptoExchange.Net/ExchangeHelpers.cs +++ b/CryptoExchange.Net/ExchangeHelpers.cs @@ -381,13 +381,14 @@ namespace CryptoExchange.Net } /// - /// Queue updates received from a websocket subscriptions and process them async + /// Queue updates and process them async /// /// The queued update type /// The subscribe call /// The async update handler /// The max number of updates to be queued up. When happens when the queue is full and a new write is attempted can be specified with fullMode /// What should happen if the queue contains maxQueuedItems pending updates. If no max is set this setting is ignored + /// Cancellation token to stop the processing public static async Task ProcessQueuedAsync( Func, Task> subscribeCall, Func asyncHandler, diff --git a/CryptoExchange.Net/Interfaces/IWebsocketFactory.cs b/CryptoExchange.Net/Interfaces/IWebsocketFactory.cs index a5870da..5caa97e 100644 --- a/CryptoExchange.Net/Interfaces/IWebsocketFactory.cs +++ b/CryptoExchange.Net/Interfaces/IWebsocketFactory.cs @@ -1,8 +1,6 @@ using CryptoExchange.Net.Objects.Sockets; using Microsoft.Extensions.Logging; -using System; using System.IO.Pipelines; -using System.Threading.Tasks; namespace CryptoExchange.Net.Interfaces { @@ -19,6 +17,9 @@ namespace CryptoExchange.Net.Interfaces /// IWebsocket CreateWebsocket(ILogger logger, WebSocketParameters parameters); + /// + /// Create high performance websocket + /// IHighPerfWebsocket CreateHighPerfWebsocket(ILogger logger, WebSocketParameters parameters, PipeWriter pipeWriter); } } diff --git a/CryptoExchange.Net/LibraryHelpers.cs b/CryptoExchange.Net/LibraryHelpers.cs index a3b3700..9201ce4 100644 --- a/CryptoExchange.Net/LibraryHelpers.cs +++ b/CryptoExchange.Net/LibraryHelpers.cs @@ -158,6 +158,9 @@ namespace CryptoExchange.Net #endif } + /// + /// Waits for all of the ValueTasks to complete + /// public static async ValueTask WhenAll(IReadOnlyList tasks) { if (tasks.Count == 0) @@ -184,6 +187,9 @@ namespace CryptoExchange.Net await Task.WhenAll(toAwait!).ConfigureAwait(false); } + /// + /// Waits for all of the ValueTasks to complete + /// public static ValueTask WhenAll(IEnumerable tasks) { return WhenAll(tasks.ToList()); diff --git a/CryptoExchange.Net/Objects/AsyncAutoResetEvent.cs b/CryptoExchange.Net/Objects/AsyncAutoResetEvent.cs index ff4895d..e34a1a2 100644 --- a/CryptoExchange.Net/Objects/AsyncAutoResetEvent.cs +++ b/CryptoExchange.Net/Objects/AsyncAutoResetEvent.cs @@ -14,6 +14,11 @@ namespace CryptoExchange.Net.Objects { private static readonly Task _completed = Task.FromResult(true); private Queue> _waits = new Queue>(); +#if NET9_0_OR_GREATER + private readonly Lock _waitsLock = new Lock(); +#else + private readonly object _waitsLock = new object(); +#endif private bool _signaled; private readonly bool _reset; @@ -38,7 +43,7 @@ namespace CryptoExchange.Net.Objects try { Task waiter = _completed; - lock (_waits) + lock (_waitsLock) { if (_signaled) { @@ -57,7 +62,7 @@ namespace CryptoExchange.Net.Objects registration = ct.Register(() => { - lock (_waits) + lock (_waitsLock) { tcs.TrySetResult(false); @@ -85,7 +90,7 @@ namespace CryptoExchange.Net.Objects /// public void Set() { - lock (_waits) + lock (_waitsLock) { if (!_reset) { @@ -106,7 +111,9 @@ namespace CryptoExchange.Net.Objects toRelease.TrySetResult(true); } else if (!_signaled) + { _signaled = true; + } } } } diff --git a/CryptoExchange.Net/Objects/Sockets/HighPerfUpdateSubscription.cs b/CryptoExchange.Net/Objects/Sockets/HighPerfUpdateSubscription.cs index d3f3e40..8bf9b76 100644 --- a/CryptoExchange.Net/Objects/Sockets/HighPerfUpdateSubscription.cs +++ b/CryptoExchange.Net/Objects/Sockets/HighPerfUpdateSubscription.cs @@ -2,6 +2,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Threading; using System.Threading.Tasks; namespace CryptoExchange.Net.Objects.Sockets @@ -14,14 +15,14 @@ namespace CryptoExchange.Net.Objects.Sockets private readonly HighPerfSocketConnection _connection; internal readonly HighPerfSubscription _subscription; - private object _eventLock = new object(); - private bool _connectionEventsSubscribed = true; - private List _connectionClosedEventHandlers = new List(); +#if NET9_0_OR_GREATER + private readonly Lock _eventLock = new Lock(); +#else + private readonly object _eventLock = new object(); +#endif - /// - /// Event when the status of the subscription changes - /// - public event Action? SubscriptionStatusChanged; + private bool _connectionEventsSubscribed = true; + private readonly List _connectionClosedEventHandlers = new List(); /// /// Event when the connection is closed and will not be reconnected diff --git a/CryptoExchange.Net/Objects/Sockets/UpdateSubscription.cs b/CryptoExchange.Net/Objects/Sockets/UpdateSubscription.cs index a86af2d..b871663 100644 --- a/CryptoExchange.Net/Objects/Sockets/UpdateSubscription.cs +++ b/CryptoExchange.Net/Objects/Sockets/UpdateSubscription.cs @@ -2,6 +2,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Threading; using System.Threading.Tasks; namespace CryptoExchange.Net.Objects.Sockets @@ -14,7 +15,12 @@ namespace CryptoExchange.Net.Objects.Sockets private readonly SocketConnection _connection; internal readonly Subscription _subscription; - private object _eventLock = new object(); +#if NET9_0_OR_GREATER + private readonly Lock _eventLock = new Lock(); +#else + private readonly object _eventLock = new object(); +#endif + private bool _connectionEventsSubscribed = true; private List _connectionClosedEventHandlers = new List(); private List _connectionLostEventHandlers = new List(); diff --git a/CryptoExchange.Net/Objects/Sockets/WebSocketParameters.cs b/CryptoExchange.Net/Objects/Sockets/WebSocketParameters.cs index 460e2ea..a72da68 100644 --- a/CryptoExchange.Net/Objects/Sockets/WebSocketParameters.cs +++ b/CryptoExchange.Net/Objects/Sockets/WebSocketParameters.cs @@ -75,8 +75,6 @@ namespace CryptoExchange.Net.Objects.Sockets /// public int? ReceiveBufferSize { get; set; } = null; - public PipeWriter? PipeWriter { get; set; } = null; - /// /// ctor /// diff --git a/CryptoExchange.Net/OrderBook/SymbolOrderBook.cs b/CryptoExchange.Net/OrderBook/SymbolOrderBook.cs index cedb443..b65938e 100644 --- a/CryptoExchange.Net/OrderBook/SymbolOrderBook.cs +++ b/CryptoExchange.Net/OrderBook/SymbolOrderBook.cs @@ -22,7 +22,11 @@ namespace CryptoExchange.Net.OrderBook /// public abstract class SymbolOrderBook : ISymbolOrderBook, IDisposable { +#if NET9_0_OR_GREATER + private readonly Lock _bookLock = new Lock(); +#else private readonly object _bookLock = new object(); +#endif private OrderBookStatus _status; private UpdateSubscription? _subscription; diff --git a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs index 817c90e..d29e697 100644 --- a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs +++ b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs @@ -33,7 +33,11 @@ namespace CryptoExchange.Net.Sockets } internal static int _lastStreamId; - private static readonly object _streamIdLock = new(); +#if NET9_0_OR_GREATER + private static readonly Lock _streamIdLock = new Lock(); +#else + private static readonly object _streamIdLock = new object(); +#endif private static readonly ArrayPool _receiveBufferPool = ArrayPool.Shared; private readonly AsyncResetEvent _sendEvent; @@ -64,7 +68,11 @@ namespace CryptoExchange.Net.Sockets /// /// Received messages lock /// - protected readonly object _receivedMessagesLock; +#if NET9_0_OR_GREATER + private readonly Lock _receivedMessagesLock = new Lock(); +#else + private readonly object _receivedMessagesLock = new object(); +#endif /// /// Log @@ -152,7 +160,6 @@ namespace CryptoExchange.Net.Sockets _sendEvent = new AsyncResetEvent(); _sendBuffer = new ConcurrentQueue(); _ctsSource = new CancellationTokenSource(); - _receivedMessagesLock = new object(); _receiveBufferSize = websocketParameters.ReceiveBufferSize ?? _defaultReceiveBufferSize; _closeSem = new SemaphoreSlim(1, 1); @@ -460,11 +467,11 @@ namespace CryptoExchange.Net.Sockets { if (_socket.State == WebSocketState.CloseReceived) { - await _socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing", default).ConfigureAwait(false); + await _socket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Closing", default).ConfigureAwait(false); } else if (_socket.State == WebSocketState.Open) { - await _socket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Closing", default).ConfigureAwait(false); + await _socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing", default).ConfigureAwait(false); var startWait = DateTime.UtcNow; while (_socket.State != WebSocketState.Closed && _socket.State != WebSocketState.Aborted) { @@ -482,7 +489,8 @@ namespace CryptoExchange.Net.Sockets // So socket might go to aborted state, might still be open } - _ctsSource.Cancel(); + if (!_disposed) + _ctsSource.Cancel(); } /// diff --git a/CryptoExchange.Net/Sockets/HighPerf/HighPerfPeriodicTaskRegistration.cs b/CryptoExchange.Net/Sockets/HighPerf/HighPerfPeriodicTaskRegistration.cs index f152374..b6618a0 100644 --- a/CryptoExchange.Net/Sockets/HighPerf/HighPerfPeriodicTaskRegistration.cs +++ b/CryptoExchange.Net/Sockets/HighPerf/HighPerfPeriodicTaskRegistration.cs @@ -1,5 +1,4 @@ -using CryptoExchange.Net.Objects; -using System; +using System; namespace CryptoExchange.Net.Sockets.HighPerf { @@ -17,7 +16,7 @@ namespace CryptoExchange.Net.Sockets.HighPerf /// public TimeSpan Interval { get; set; } /// - /// Delegate for getting the query + /// Delegate for getting the request which should be send /// public Func GetRequestDelegate { get; set; } = null!; } diff --git a/CryptoExchange.Net/Sockets/HighPerf/HighPerfSocketConnection.cs b/CryptoExchange.Net/Sockets/HighPerf/HighPerfSocketConnection.cs index fbd84e7..4f424a6 100644 --- a/CryptoExchange.Net/Sockets/HighPerf/HighPerfSocketConnection.cs +++ b/CryptoExchange.Net/Sockets/HighPerf/HighPerfSocketConnection.cs @@ -7,7 +7,6 @@ using Microsoft.Extensions.Logging; using CryptoExchange.Net.Objects; using System.Net.WebSockets; using CryptoExchange.Net.Objects.Sockets; -using System.Diagnostics; using CryptoExchange.Net.Clients; using CryptoExchange.Net.Logging.Extensions; using System.Threading; @@ -32,12 +31,12 @@ namespace CryptoExchange.Net.Sockets /// /// The amount of subscriptions on this connection /// - public int UserSubscriptionCount => _subscriptions.Length; + public int UserSubscriptionCount => Subscriptions.Length; /// /// Get a copy of the current message subscriptions /// - public HighPerfSubscription[] Subscriptions => _subscriptions; + public abstract HighPerfSubscription[] Subscriptions { get; } /// /// If connection is made @@ -86,16 +85,27 @@ namespace CryptoExchange.Net.Sockets } } - private readonly ILogger _logger; - private SocketStatus _status; + /// + /// Logger + /// + protected readonly ILogger _logger; + private readonly IMessageSerializer _serializer; - protected readonly JsonSerializerOptions _serializerOptions; - protected readonly Pipe _pipe; + private readonly CancellationTokenSource _cts = new CancellationTokenSource(); + private SocketStatus _status; private Task? _processTask; - private CancellationTokenSource _cts = new CancellationTokenSource(); - - protected abstract HighPerfSubscription[] _subscriptions { get; } + /// + /// Serializer options + /// + protected readonly JsonSerializerOptions _serializerOptions; + /// + /// The pipe the websocket will write to + /// + protected readonly Pipe _pipe; + /// + /// Update type + /// public abstract Type UpdateType { get; } /// @@ -119,10 +129,7 @@ namespace CryptoExchange.Net.Sockets public HighPerfSocketConnection(ILogger logger, IWebsocketFactory socketFactory, WebSocketParameters parameters, SocketApiClient apiClient, JsonSerializerOptions serializerOptions, string tag) { _logger = logger; - _pipe = new Pipe(new PipeOptions - { - //ReaderScheduler - }); + _pipe = new Pipe(); _serializerOptions = serializerOptions; ApiClient = apiClient; Tag = tag; @@ -139,6 +146,9 @@ namespace CryptoExchange.Net.Sockets _serializer = apiClient.CreateSerializer(); } + /// + /// Process message from the pipe + /// protected abstract Task ProcessAsync(CancellationToken ct); /// @@ -156,7 +166,7 @@ namespace CryptoExchange.Net.Sockets protected virtual async Task HandleCloseAsync() { Status = SocketStatus.Closed; - _cts.Cancel(); + _cts.CancelAfter(TimeSpan.FromSeconds(1)); // Cancel after 1 second to make sure we process pending messages from the pipe if (ApiClient.highPerfSocketConnections.ContainsKey(SocketId)) ApiClient.highPerfSocketConnections.TryRemove(SocketId, out _); @@ -193,12 +203,6 @@ namespace CryptoExchange.Net.Sockets return result; } - /// - /// Retrieve the underlying socket - /// - /// - public IHighPerfWebsocket GetSocket() => _socket; - /// /// Close the connection /// @@ -211,7 +215,7 @@ namespace CryptoExchange.Net.Sockets if (ApiClient.socketConnections.ContainsKey(SocketId)) ApiClient.socketConnections.TryRemove(SocketId, out _); - foreach (var subscription in _subscriptions) + foreach (var subscription in Subscriptions) { if (subscription.CancellationTokenRegistration.HasValue) subscription.CancellationTokenRegistration.Value.Dispose(); @@ -235,7 +239,7 @@ namespace CryptoExchange.Net.Sockets if (subscription.CancellationTokenRegistration.HasValue) subscription.CancellationTokenRegistration.Value.Dispose(); - var anyOtherSubscriptions = _subscriptions.Any(x => x != subscription); + var anyOtherSubscriptions = Subscriptions.Any(x => x != subscription); if (anyOtherSubscriptions) await UnsubscribeAsync(subscription).ConfigureAwait(false); @@ -413,11 +417,19 @@ namespace CryptoExchange.Net.Sockets } } + /// public class HighPerfSocketConnection : HighPerfSocketConnection { +#if NET9_0_OR_GREATER + private readonly Lock _listenersLock = new Lock(); +#else private readonly object _listenersLock = new object(); - private List> _typedSubscriptions; - protected override HighPerfSubscription[] _subscriptions +#endif + + private readonly List> _typedSubscriptions; + + /// + public override HighPerfSubscription[] Subscriptions { get { @@ -426,42 +438,65 @@ namespace CryptoExchange.Net.Sockets } } + /// public override Type UpdateType => typeof(T); + /// + /// ctor + /// public HighPerfSocketConnection(ILogger logger, IWebsocketFactory socketFactory, WebSocketParameters parameters, SocketApiClient apiClient, JsonSerializerOptions serializerOptions, string tag) : base(logger, socketFactory, parameters, apiClient, serializerOptions, tag) { _typedSubscriptions = new List>(); } /// - /// Add a subscription to this connection + /// Add a new subscription /// - /// public bool AddSubscription(HighPerfSubscription subscription) { if (Status != SocketStatus.None && Status != SocketStatus.Connected) return false; - //lock (_listenersLock) _typedSubscriptions.Add(subscription); - //_logger.AddingNewSubscription(SocketId, subscription.Id, UserSubscriptionCount); + _logger.AddingNewSubscription(SocketId, subscription.Id, UserSubscriptionCount); return true; } + /// + /// Remove a subscription + /// + /// public void RemoveSubscription(HighPerfSubscription subscription) { lock (_listenersLock) _typedSubscriptions.Remove(subscription); } + /// protected override async Task ProcessAsync(CancellationToken ct) { try { +#pragma warning disable IL2026 // Members annotated with 'RequiresUnreferencedCodeAttribute' require dynamic access otherwise can break functionality when trimming application code +#pragma warning disable IL3050 // Calling members annotated with 'RequiresDynamicCodeAttribute' may break functionality when AOT compiling. await foreach (var update in JsonSerializer.DeserializeAsyncEnumerable(_pipe.Reader, true, _serializerOptions, ct).ConfigureAwait(false)) +#pragma warning restore IL3050 // Calling members annotated with 'RequiresDynamicCodeAttribute' may break functionality when AOT compiling. +#pragma warning restore IL2026 // Members annotated with 'RequiresUnreferencedCodeAttribute' require dynamic access otherwise can break functionality when trimming application code { - var tasks = _typedSubscriptions.Select(sub => sub.HandleAsync(update!)); + var tasks = _typedSubscriptions.Select(sub => + { + try + { + return sub.HandleAsync(update!); + } + catch (Exception ex) + { + sub.InvokeExceptionHandler(ex); + _logger.UserMessageProcessingFailed(SocketId, ex.Message, ex); + return new ValueTask(); + } + }); await LibraryHelpers.WhenAll(tasks).ConfigureAwait(false); } } diff --git a/CryptoExchange.Net/Sockets/HighPerf/HighPerfSubscription.cs b/CryptoExchange.Net/Sockets/HighPerf/HighPerfSubscription.cs index 046dc66..895887d 100644 --- a/CryptoExchange.Net/Sockets/HighPerf/HighPerfSubscription.cs +++ b/CryptoExchange.Net/Sockets/HighPerf/HighPerfSubscription.cs @@ -1,5 +1,4 @@ -using Microsoft.Extensions.Logging; -using System; +using System; using System.Threading; using System.Threading.Tasks; @@ -20,11 +19,6 @@ namespace CryptoExchange.Net.Sockets /// public int TotalInvocations { get; set; } - /// - /// Logger - /// - protected readonly ILogger _logger; - /// /// Cancellation token registration /// @@ -48,9 +42,8 @@ namespace CryptoExchange.Net.Sockets /// /// ctor /// - public HighPerfSubscription(ILogger logger) + public HighPerfSubscription() { - _logger = logger; Id = ExchangeHelpers.NextId(); } @@ -94,7 +87,6 @@ namespace CryptoExchange.Net.Sockets { Exception?.Invoke(e); } - } /// @@ -105,13 +97,17 @@ namespace CryptoExchange.Net.Sockets /// /// ctor /// - protected HighPerfSubscription(ILogger logger, Func handler) : base(logger) + protected HighPerfSubscription(Func handler) : base() { _handler = handler; } + /// + /// Handle an update + /// public ValueTask HandleAsync(TUpdateType update) { + TotalInvocations++; return _handler.Invoke(update); } } diff --git a/CryptoExchange.Net/Sockets/HighPerf/HighPerfWebSocketClient.cs b/CryptoExchange.Net/Sockets/HighPerf/HighPerfWebSocketClient.cs index 9ee89ed..6d8050a 100644 --- a/CryptoExchange.Net/Sockets/HighPerf/HighPerfWebSocketClient.cs +++ b/CryptoExchange.Net/Sockets/HighPerf/HighPerfWebSocketClient.cs @@ -6,20 +6,16 @@ using CryptoExchange.Net.Objects.Sockets; using Microsoft.Extensions.Logging; using System; using System.Buffers; -using System.Collections.Generic; using System.IO.Pipelines; using System.Net; -using System.Net.Http; using System.Net.WebSockets; -using System.Text; using System.Threading; -using System.Threading.Channels; using System.Threading.Tasks; namespace CryptoExchange.Net.Sockets { /// - /// A wrapper around the ClientWebSocket + /// A high performance websocket client implementation /// public class HighPerfWebSocketClient : IHighPerfWebsocket { @@ -43,8 +39,6 @@ namespace CryptoExchange.Net.Sockets private const int _defaultReceiveBufferSize = 4096; private const int _sendBufferSize = 4096; - private byte[] _commaBytes = new byte[] { 44 }; - /// /// Log /// @@ -90,12 +84,6 @@ namespace CryptoExchange.Net.Sockets _closeSem = new SemaphoreSlim(1, 1); } - /// - public void UpdateProxy(ApiProxy? proxy) - { - Parameters.Proxy = proxy; - } - /// public virtual async Task ConnectAsync(CancellationToken ct) { @@ -169,7 +157,7 @@ namespace CryptoExchange.Net.Sockets if (e is WebSocketException we) { #if (NET6_0_OR_GREATER) - if (_socket.HttpStatusCode == HttpStatusCode.TooManyRequests) + if (_socket!.HttpStatusCode == HttpStatusCode.TooManyRequests) { return new CallResult(new ServerRateLimitError(we.Message, we)); } @@ -296,19 +284,20 @@ namespace CryptoExchange.Net.Sockets { if (_socket!.State == WebSocketState.CloseReceived) { - await _socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing", default).ConfigureAwait(false); + await _socket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Closing", default).ConfigureAwait(false); } else if (_socket.State == WebSocketState.Open) { - await _socket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Closing", default).ConfigureAwait(false); + await _socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing", default).ConfigureAwait(false); + var startWait = DateTime.UtcNow; - while (_socket.State != WebSocketState.Closed && _socket.State != WebSocketState.Aborted) + while (_processing && _socket.State != WebSocketState.Closed && _socket.State != WebSocketState.Aborted) { // Wait until we receive close confirmation await Task.Delay(10).ConfigureAwait(false); if (DateTime.UtcNow - startWait > TimeSpan.FromSeconds(1)) break; // Wait for max 1 second, then just abort the connection - } + } } } catch (Exception) @@ -318,7 +307,8 @@ namespace CryptoExchange.Net.Sockets // So socket might go to aborted state, might still be open } - _ctsSource.Cancel(); + if (!_disposed) + _ctsSource.Cancel(); } /// @@ -342,9 +332,9 @@ namespace CryptoExchange.Net.Sockets #if NETSTANDARD2_1 || NET8_0_OR_GREATER private async Task ReceiveLoopAsync() { + Exception? exitException = null; try { - Exception? exitException = null; while (true) { if (_ctsSource.IsCancellationRequested) @@ -356,7 +346,7 @@ namespace CryptoExchange.Net.Sockets { receiveResult = await _socket!.ReceiveAsync(_pipeWriter.GetMemory(_receiveBufferSize), _ctsSource.Token).ConfigureAwait(false); - // Advance the writer to communicate which part the memory was written + // Advance the writer to communicate which part of the memory was written _pipeWriter.Advance(receiveResult.Count); } catch (OperationCanceledException ex) @@ -371,7 +361,6 @@ namespace CryptoExchange.Net.Sockets if (_closeTask?.IsCompleted != false) _closeTask = CloseInternalAsync(); - // canceled exitException = ex; break; } @@ -388,42 +377,39 @@ namespace CryptoExchange.Net.Sockets break; } - if (receiveResult.MessageType == WebSocketMessageType.Close) - { - // Connection closed - if (_socket.State == WebSocketState.CloseReceived) - { - // Close received means it server initiated, we should send a confirmation and close the socket - //_logger.SocketReceivedCloseMessage(Id, receiveResult.CloseStatus.ToString()!, receiveResult.CloseStatusDescription ?? string.Empty); - if (_closeTask?.IsCompleted != false) - _closeTask = CloseInternalAsync(); - } - else - { - // Means the socket is now closed and we were the one initiating it - //_logger.SocketReceivedCloseConfirmation(Id, receiveResult.CloseStatus.ToString()!, receiveResult.CloseStatusDescription ?? string.Empty); - } - - break; - } - if (receiveResult.EndOfMessage) { - // Write a comma to split the json data for the reader - // This will also flush the written bytes + // Flush the full message var flushResult = await _pipeWriter.FlushAsync().ConfigureAwait(false); if (flushResult.IsCompleted) { - // Flush indicated that the reader is no longer listening + // Flush indicated that the reader is no longer listening, so we should stop writing if (_closeTask?.IsCompleted != false) _closeTask = CloseInternalAsync(); break; } } - } - await _pipeWriter.CompleteAsync(exitException).ConfigureAwait(false); + if (receiveResult.MessageType == WebSocketMessageType.Close) + { + // Connection closed + if (_socket.State == WebSocketState.CloseReceived) + { + // Close received means it's server initiated, we should send a confirmation and close the socket + _logger.SocketReceivedCloseMessage(Id, _socket.CloseStatus?.ToString() ?? string.Empty, _socket.CloseStatusDescription ?? string.Empty); + if (_closeTask?.IsCompleted != false) + _closeTask = CloseInternalAsync(); + } + else + { + // Means the socket is now closed and we were the one initiating it + _logger.SocketReceivedCloseConfirmation(Id, _socket.CloseStatus?.ToString() ?? string.Empty, _socket.CloseStatusDescription ?? string.Empty); + } + + break; + } + } } catch (Exception e) { @@ -432,32 +418,27 @@ namespace CryptoExchange.Net.Sockets // Make sure we at least let the owner know there was an error _logger.SocketReceiveLoopStoppedWithException(Id, e); - await _pipeWriter.CompleteAsync(e).ConfigureAwait(false); - + exitException = e; await (OnError?.Invoke(e) ?? Task.CompletedTask).ConfigureAwait(false); if (_closeTask?.IsCompleted != false) _closeTask = CloseInternalAsync(); } finally { + await _pipeWriter.CompleteAsync(exitException).ConfigureAwait(false); _logger.SocketReceiveLoopFinished(Id); } } #else - /// - /// Loop for receiving and reassembling data - /// - /// private async Task ReceiveLoopAsync() { byte[] rentedBuffer = _receiveBufferPool.Rent(_receiveBufferSize); var buffer = new ArraySegment(rentedBuffer); + Exception? exitException = null; try { - Exception? exitException = null; - while (true) { if (_ctsSource.IsCancellationRequested) @@ -480,7 +461,6 @@ namespace CryptoExchange.Net.Sockets if (_closeTask?.IsCompleted != false) _closeTask = CloseInternalAsync(); - // canceled exitException = ex; break; } @@ -497,6 +477,9 @@ namespace CryptoExchange.Net.Sockets break; } + if (receiveResult.Count > 0) + await _pipeWriter.WriteAsync(buffer.AsMemory(0, receiveResult.Count)).ConfigureAwait(false); + if (receiveResult.MessageType == WebSocketMessageType.Close) { // Connection closed @@ -515,12 +498,8 @@ namespace CryptoExchange.Net.Sockets break; } - - - await _pipeWriter.WriteAsync(buffer.AsMemory(0, receiveResult.Count)).ConfigureAwait(false); } - await _pipeWriter.CompleteAsync(exitException).ConfigureAwait(false); } catch (Exception e) { @@ -529,13 +508,14 @@ namespace CryptoExchange.Net.Sockets // Make sure we at least let the owner know there was an error _logger.SocketReceiveLoopStoppedWithException(Id, e); - await _pipeWriter.CompleteAsync(e).ConfigureAwait(false); + exitException = e; await (OnError?.Invoke(e) ?? Task.CompletedTask).ConfigureAwait(false); if (_closeTask?.IsCompleted != false) _closeTask = CloseInternalAsync(); } finally { + await _pipeWriter.CompleteAsync(exitException).ConfigureAwait(false); _receiveBufferPool.Return(rentedBuffer, true); _logger.SocketReceiveLoopFinished(Id); diff --git a/CryptoExchange.Net/Sockets/ISocketConnection.cs b/CryptoExchange.Net/Sockets/ISocketConnection.cs index 455974c..e4568f5 100644 --- a/CryptoExchange.Net/Sockets/ISocketConnection.cs +++ b/CryptoExchange.Net/Sockets/ISocketConnection.cs @@ -1,30 +1,57 @@ using CryptoExchange.Net.Clients; -using CryptoExchange.Net.Interfaces; using CryptoExchange.Net.Objects; using System; -using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; namespace CryptoExchange.Net.Sockets { + /// + /// Socket connection + /// public interface ISocketConnection { + /// + /// The API client the connection belongs to + /// SocketApiClient ApiClient { get; set; } + /// + /// Whether the connection has been authenticated + /// bool Authenticated { get; set; } + /// + /// Whether the connection is established + /// bool Connected { get; } + /// + /// Connection URI + /// Uri ConnectionUri { get; } + /// + /// Id + /// int SocketId { get; } + /// + /// Tag + /// string Tag { get; set; } + /// + /// Closed event + /// event Action? ConnectionClosed; - + /// + /// Connect the websocket + /// Task ConnectAsync(CancellationToken ct); + /// + /// Close the connection + /// + /// Task CloseAsync(); + /// + /// Dispose + /// void Dispose(); - - //ValueTask SendStringAsync(int requestId, string data, int weight); - //ValueTask SendAsync(int requestId, T obj, int weight); - //ValueTask SendBytesAsync(int requestId, byte[] data, int weight); } } \ No newline at end of file diff --git a/CryptoExchange.Net/Sockets/SocketConnection.cs b/CryptoExchange.Net/Sockets/SocketConnection.cs index a402083..00ee53f 100644 --- a/CryptoExchange.Net/Sockets/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/SocketConnection.cs @@ -253,7 +253,11 @@ namespace CryptoExchange.Net.Sockets } private bool _pausedActivity; - private readonly object _listenersLock; +#if NET9_0_OR_GREATER + private readonly Lock _listenersLock = new Lock(); +#else + private readonly object _listenersLock = new object(); +#endif private readonly List _listeners; private readonly ILogger _logger; private SocketStatus _status; @@ -306,7 +310,6 @@ namespace CryptoExchange.Net.Sockets _socket.OnError += HandleErrorAsync; _socket.GetReconnectionUrl = GetReconnectionUrlAsync; - _listenersLock = new object(); _listeners = new List(); _serializer = apiClient.CreateSerializer(); diff --git a/CryptoExchange.Net/Testing/Implementations/TestSocket.cs b/CryptoExchange.Net/Testing/Implementations/TestSocket.cs index 5df0dae..49eb2e5 100644 --- a/CryptoExchange.Net/Testing/Implementations/TestSocket.cs +++ b/CryptoExchange.Net/Testing/Implementations/TestSocket.cs @@ -39,7 +39,11 @@ namespace CryptoExchange.Net.Testing.Implementations public Func>? GetReconnectionUrl { get; set; } public static int lastId = 0; - public static object lastIdLock = new object(); +#if NET9_0_OR_GREATER + public static readonly Lock lastIdLock = new Lock(); +#else + public static readonly object lastIdLock = new object(); +#endif public TestSocket(string address) { diff --git a/CryptoExchange.Net/Trackers/Klines/KlineTracker.cs b/CryptoExchange.Net/Trackers/Klines/KlineTracker.cs index cfc4c33..13b2ac0 100644 --- a/CryptoExchange.Net/Trackers/Klines/KlineTracker.cs +++ b/CryptoExchange.Net/Trackers/Klines/KlineTracker.cs @@ -8,6 +8,7 @@ using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; +using System.Threading; using System.Threading.Tasks; namespace CryptoExchange.Net.Trackers.Klines @@ -31,7 +32,11 @@ namespace CryptoExchange.Net.Trackers.Klines /// /// Lock for accessing _data /// - protected readonly object _lock = new object(); +#if NET9_0_OR_GREATER + private readonly Lock _lock = new Lock(); +#else + private readonly object _lock = new object(); +#endif /// /// The last time the window was applied /// diff --git a/CryptoExchange.Net/Trackers/Trades/TradeTracker.cs b/CryptoExchange.Net/Trackers/Trades/TradeTracker.cs index 460f779..6a39aa1 100644 --- a/CryptoExchange.Net/Trackers/Trades/TradeTracker.cs +++ b/CryptoExchange.Net/Trackers/Trades/TradeTracker.cs @@ -8,6 +8,7 @@ using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; +using System.Threading; using System.Threading.Tasks; namespace CryptoExchange.Net.Trackers.Trades @@ -42,7 +43,11 @@ namespace CryptoExchange.Net.Trackers.Trades /// /// Lock for accessing _data /// - protected readonly object _lock = new object(); +#if NET9_0_OR_GREATER + private readonly Lock _lock = new Lock(); +#else + private readonly object _lock = new object(); +#endif /// /// Whether the snapshot has been set ///