From d41ca3459e919e15f0233d6c22a231d056e5ed31 Mon Sep 17 00:00:00 2001 From: Jkorf Date: Tue, 24 Feb 2026 11:32:35 +0100 Subject: [PATCH] Updated internal lock for subscription to ReaderWriterLockSlim on SocketConnection --- .../Sockets/Default/SocketConnection.cs | 202 +++++++++++++++--- 1 file changed, 170 insertions(+), 32 deletions(-) diff --git a/CryptoExchange.Net/Sockets/Default/SocketConnection.cs b/CryptoExchange.Net/Sockets/Default/SocketConnection.cs index 829724e..6a50bd9 100644 --- a/CryptoExchange.Net/Sockets/Default/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/Default/SocketConnection.cs @@ -123,8 +123,15 @@ namespace CryptoExchange.Net.Sockets.Default { get { - lock (_listenersLock) + try + { + _listenersLock.EnterReadLock(); return _listeners.OfType().Count(h => h.UserSubscription); + } + finally + { + _listenersLock.ExitReadLock(); + } } } @@ -135,8 +142,16 @@ namespace CryptoExchange.Net.Sockets.Default { get { - lock (_listenersLock) + try + { + _listenersLock.EnterReadLock(); return _listeners.OfType().Where(h => h.UserSubscription).ToArray(); + + } + finally + { + _listenersLock.ExitReadLock(); + } } } @@ -240,8 +255,15 @@ namespace CryptoExchange.Net.Sockets.Default { get { - lock (_listenersLock) + try + { + _listenersLock.EnterReadLock(); return _listeners.OfType().Select(x => x.Topic).Where(t => t != null).ToArray()!; + } + finally + { + _listenersLock.ExitReadLock(); + } } } @@ -252,18 +274,21 @@ namespace CryptoExchange.Net.Sockets.Default { get { - lock (_listenersLock) + try + { + _listenersLock.EnterReadLock(); return _listeners.OfType().Where(x => !x.Completed).Count(); + } + finally + { + _listenersLock.ExitReadLock(); + } } } private bool _pausedActivity; -#if NET9_0_OR_GREATER - private readonly Lock _listenersLock = new Lock(); -#else - private readonly object _listenersLock = new object(); -#endif + private readonly ReaderWriterLockSlim _listenersLock = new ReaderWriterLockSlim(); private readonly List _listeners; private readonly ILogger _logger; private SocketStatus _status; @@ -340,8 +365,9 @@ namespace CryptoExchange.Net.Sockets.Default if (ApiClient._socketConnections.ContainsKey(SocketId)) ApiClient._socketConnections.TryRemove(SocketId, out _); - lock (_listenersLock) + try { + _listenersLock.EnterWriteLock(); foreach (var subscription in _listeners.OfType().Where(l => l.UserSubscription && !l.IsClosingConnection)) { subscription.IsClosingConnection = true; @@ -354,6 +380,10 @@ namespace CryptoExchange.Net.Sockets.Default _listeners.Remove(query); } } + finally + { + _listenersLock.ExitWriteLock(); + } _ = Task.Run(() => ConnectionClosed?.Invoke()); return Task.CompletedTask; @@ -369,8 +399,9 @@ namespace CryptoExchange.Net.Sockets.Default Authenticated = false; _lastSequenceNumber = 0; - lock (_listenersLock) + try { + _listenersLock.EnterWriteLock(); foreach (var subscription in _listeners.OfType().Where(l => l.UserSubscription)) subscription.Reset(); @@ -380,6 +411,10 @@ namespace CryptoExchange.Net.Sockets.Default _listeners.Remove(query); } } + finally + { + _listenersLock.ExitWriteLock(); + } _ = Task.Run(() => ConnectionLost?.Invoke()); return Task.CompletedTask; @@ -401,14 +436,19 @@ namespace CryptoExchange.Net.Sockets.Default { Status = SocketStatus.Resubscribing; - lock (_listenersLock) + try { + _listenersLock.EnterWriteLock(); foreach (var query in _listeners.OfType().ToList()) { query.Fail(new WebError("Connection interrupted")); _listeners.Remove(query); } } + finally + { + _listenersLock.ExitWriteLock(); + } // Can't wait for this as it would cause a deadlock _ = Task.Run(async () => @@ -464,10 +504,15 @@ namespace CryptoExchange.Net.Sockets.Default protected virtual Task HandleRequestRateLimitedAsync(int requestId) { Query? query; - lock (_listenersLock) + try { + _listenersLock.EnterReadLock(); query = _listeners.OfType().FirstOrDefault(x => x.Id == requestId); } + finally + { + _listenersLock.ExitReadLock(); + } if (query == null) return Task.CompletedTask; @@ -493,10 +538,15 @@ namespace CryptoExchange.Net.Sockets.Default protected virtual Task HandleRequestSentAsync(int requestId) { Query? query; - lock (_listenersLock) + try { + _listenersLock.EnterReadLock(); query = _listeners.OfType().FirstOrDefault(x => x.Id == requestId); } + finally + { + _listenersLock.ExitReadLock(); + } if (query == null) return Task.CompletedTask; @@ -543,8 +593,9 @@ namespace CryptoExchange.Net.Sockets.Default } Type? deserializationType = null; - lock (_listenersLock) + try { + _listenersLock.EnterReadLock(); foreach (var subscription in _listeners) { foreach (var route in subscription.MessageRouter.Routes) @@ -560,6 +611,10 @@ namespace CryptoExchange.Net.Sockets.Default break; } } + finally + { + _listenersLock.ExitReadLock(); + } if (deserializationType == null) { @@ -605,8 +660,9 @@ namespace CryptoExchange.Net.Sockets.Default var topicFilter = messageConverter.GetTopicFilter(result); bool processed = false; - lock (_listenersLock) + try { + _listenersLock.EnterReadLock(); var currentCount = _listeners.Count; for(var i = 0; i < _listeners.Count; i++) { @@ -670,6 +726,10 @@ namespace CryptoExchange.Net.Sockets.Default break; } } + finally + { + _listenersLock.ExitReadLock(); + } if (!processed) { @@ -727,14 +787,19 @@ namespace CryptoExchange.Net.Sockets.Default if (ApiClient._socketConnections.ContainsKey(SocketId)) ApiClient._socketConnections.TryRemove(SocketId, out _); - lock (_listenersLock) + try { + _listenersLock.EnterReadLock(); foreach (var subscription in _listeners.OfType()) { if (subscription.CancellationTokenRegistration.HasValue) subscription.CancellationTokenRegistration.Value.Dispose(); } } + finally + { + _listenersLock.ExitReadLock(); + } await _socket.CloseAsync().ConfigureAwait(false); _socket.Dispose(); @@ -764,18 +829,30 @@ namespace CryptoExchange.Net.Sockets.Default subscription.CancellationTokenRegistration.Value.Dispose(); bool anyDuplicateSubscription; - lock (_listenersLock) - anyDuplicateSubscription = _listeners.OfType().Any(x => x != subscription && x.MessageRouter.Routes.All(l => subscription.MessageRouter.ContainsCheck(l))); - bool shouldCloseConnection; - lock (_listenersLock) + try + { + _listenersLock.EnterReadLock(); + anyDuplicateSubscription = _listeners.OfType().Any(x => x != subscription && x.MessageRouter.Routes.All(l => subscription.MessageRouter.ContainsCheck(l))); shouldCloseConnection = _listeners.OfType().All(r => !r.UserSubscription || r.Status == SubscriptionStatus.Closing || r.Status == SubscriptionStatus.Closed) && !DedicatedRequestConnection.IsDedicatedRequestConnection; + } + finally + { + _listenersLock.ExitReadLock(); + } if (!anyDuplicateSubscription) { bool needUnsub; - lock (_listenersLock) + try + { + _listenersLock.EnterReadLock(); needUnsub = _listeners.Contains(subscription) && !shouldCloseConnection; + } + finally + { + _listenersLock.ExitReadLock(); + } if (needUnsub && _socket.IsOpen) await UnsubscribeAsync(subscription).ConfigureAwait(false); @@ -800,8 +877,15 @@ namespace CryptoExchange.Net.Sockets.Default await CloseAsync().ConfigureAwait(false); } - lock (_listenersLock) + try + { + _listenersLock.EnterWriteLock(); _listeners.Remove(subscription); + } + finally + { + _listenersLock.ExitWriteLock(); + } subscription.Status = SubscriptionStatus.Closed; } @@ -824,9 +908,15 @@ namespace CryptoExchange.Net.Sockets.Default { if (Status != SocketStatus.None && Status != SocketStatus.Connected) return false; - - lock (_listenersLock) + try + { + _listenersLock.EnterWriteLock(); _listeners.Add(subscription); + } + finally + { + _listenersLock.ExitWriteLock(); + } if (subscription.UserSubscription) _logger.AddingNewSubscription(SocketId, subscription.Id, UserSubscriptionCount); @@ -839,8 +929,15 @@ namespace CryptoExchange.Net.Sockets.Default /// public Subscription? GetSubscription(int id) { - lock (_listenersLock) + try + { + _listenersLock.EnterReadLock(); return _listeners.OfType().SingleOrDefault(s => s.Id == id); + } + finally + { + _listenersLock.ExitReadLock(); + } } /// @@ -888,15 +985,29 @@ namespace CryptoExchange.Net.Sockets.Default private async Task SendAndWaitIntAsync(Query query, CancellationToken ct = default) { - lock (_listenersLock) + try + { + _listenersLock.EnterWriteLock(); _listeners.Add(query); + } + finally + { + _listenersLock.ExitWriteLock(); + } var sendResult = await SendAsync(query.Id, query.Request, query.Weight).ConfigureAwait(false); if (!sendResult) { query.Fail(sendResult.Error!); - lock (_listenersLock) + try + { + _listenersLock.EnterWriteLock(); _listeners.Remove(query); + } + finally + { + _listenersLock.ExitWriteLock(); + } return; } @@ -927,8 +1038,15 @@ namespace CryptoExchange.Net.Sockets.Default } finally { - lock (_listenersLock) + try + { + _listenersLock.EnterWriteLock(); _listeners.Remove(query); + } + finally + { + _listenersLock.ExitWriteLock(); + } } } @@ -1035,8 +1153,16 @@ namespace CryptoExchange.Net.Sockets.Default if (!DedicatedRequestConnection.IsDedicatedRequestConnection) { bool anySubscriptions; - lock (_listenersLock) + try + { + _listenersLock.EnterReadLock(); anySubscriptions = _listeners.OfType().Any(s => s.UserSubscription); + } + finally + { + _listenersLock.ExitReadLock(); + } + if (!anySubscriptions) { // No need to resubscribe anything @@ -1047,11 +1173,16 @@ namespace CryptoExchange.Net.Sockets.Default } bool anyAuthenticated; - lock (_listenersLock) + try { + _listenersLock.EnterReadLock(); anyAuthenticated = _listeners.OfType().Any(s => s.Authenticated) || DedicatedRequestConnection.IsDedicatedRequestConnection && DedicatedRequestConnection.Authenticated; } + finally + { + _listenersLock.ExitReadLock(); + } if (anyAuthenticated) { @@ -1076,8 +1207,15 @@ namespace CryptoExchange.Net.Sockets.Default return new CallResult(new WebError("Socket not connected")); List subList; - lock (_listenersLock) + try + { + _listenersLock.EnterReadLock(); subList = _listeners.OfType().Where(x => x.Active).Skip(batch * batchSize).Take(batchSize).ToList(); + } + finally + { + _listenersLock.ExitReadLock(); + } if (subList.Count == 0) break;