diff --git a/CryptoExchange.Net/Sockets/Default/SocketConnection.cs b/CryptoExchange.Net/Sockets/Default/SocketConnection.cs index 3930194..1abe24a 100644 --- a/CryptoExchange.Net/Sockets/Default/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/Default/SocketConnection.cs @@ -8,7 +8,9 @@ using CryptoExchange.Net.Sockets.Default.Interfaces; using CryptoExchange.Net.Sockets.Interfaces; using Microsoft.Extensions.Logging; using System; +using System.Collections; using System.Collections.Generic; +using System.Collections.ObjectModel; using System.Diagnostics; using System.Linq; using System.Net.WebSockets; @@ -123,15 +125,7 @@ namespace CryptoExchange.Net.Sockets.Default { get { - _listenersLock.EnterReadLock(); - try - { - return _listeners.OfType().Count(h => h.UserSubscription); - } - finally - { - _listenersLock.ExitReadLock(); - } + return _listeners.OfType().Count(h => h.UserSubscription); } } @@ -142,16 +136,7 @@ namespace CryptoExchange.Net.Sockets.Default { get { - _listenersLock.EnterReadLock(); - try - { - return _listeners.OfType().Where(h => h.UserSubscription).ToArray(); - - } - finally - { - _listenersLock.ExitReadLock(); - } + return _listeners.OfType().Where(h => h.UserSubscription).ToArray(); } } @@ -255,15 +240,7 @@ namespace CryptoExchange.Net.Sockets.Default { get { - _listenersLock.EnterReadLock(); - try - { - return _listeners.OfType().Select(x => x.Topic).Where(t => t != null).ToArray()!; - } - finally - { - _listenersLock.ExitReadLock(); - } + return _listeners.OfType().Select(x => x.Topic).Where(t => t != null).ToArray()!; } } @@ -274,22 +251,18 @@ namespace CryptoExchange.Net.Sockets.Default { get { - _listenersLock.EnterReadLock(); - try - { - return _listeners.OfType().Where(x => !x.Completed).Count(); - } - finally - { - _listenersLock.ExitReadLock(); - } + return _listeners.OfType().Where(x => !x.Completed).Count(); } } private bool _pausedActivity; - private readonly ReaderWriterLockSlim _listenersLock = new ReaderWriterLockSlim(); - private readonly List _listeners; +#if NET9_0_OR_GREATER + private readonly Lock _listenersLock = new Lock(); +#else + private readonly object _listenersLock = new object(); +#endif + private ReadOnlyCollection _listeners; private readonly ILogger _logger; private SocketStatus _status; @@ -338,7 +311,7 @@ namespace CryptoExchange.Net.Sockets.Default _socket.OnError += HandleErrorAsync; _socket.GetReconnectionUrl = GetReconnectionUrlAsync; - _listeners = new List(); + _listeners = new ReadOnlyCollection([]); _serializer = apiClient.CreateSerializer(); } @@ -365,25 +338,17 @@ namespace CryptoExchange.Net.Sockets.Default if (ApiClient._socketConnections.ContainsKey(SocketId)) ApiClient._socketConnections.TryRemove(SocketId, out _); - _listenersLock.EnterWriteLock(); - try + foreach (var subscription in _listeners.OfType().Where(l => l.UserSubscription && !l.IsClosingConnection)) { - foreach (var subscription in _listeners.OfType().Where(l => l.UserSubscription && !l.IsClosingConnection)) - { - subscription.IsClosingConnection = true; - subscription.Reset(); - } + subscription.IsClosingConnection = true; + subscription.Reset(); + } - foreach (var query in _listeners.OfType().ToList()) - { - query.Fail(new WebError("Connection interrupted")); - _listeners.Remove(query); - } - } - finally - { - _listenersLock.ExitWriteLock(); - } + var queryList = _listeners.OfType().ToList(); + foreach (var query in queryList) + query.Fail(new WebError("Connection interrupted")); + + RemoveMessageProcessors(queryList); _ = Task.Run(() => ConnectionClosed?.Invoke()); return Task.CompletedTask; @@ -399,22 +364,14 @@ namespace CryptoExchange.Net.Sockets.Default Authenticated = false; _lastSequenceNumber = 0; - _listenersLock.EnterWriteLock(); - try - { - foreach (var subscription in _listeners.OfType().Where(l => l.UserSubscription)) - subscription.Reset(); + foreach (var subscription in _listeners.OfType().Where(l => l.UserSubscription)) + subscription.Reset(); - foreach (var query in _listeners.OfType().ToList()) - { - query.Fail(new WebError("Connection interrupted")); - _listeners.Remove(query); - } - } - finally - { - _listenersLock.ExitWriteLock(); - } + var queryList = _listeners.OfType().ToList(); + foreach (var query in queryList) + query.Fail(new WebError("Connection interrupted")); + + RemoveMessageProcessors(queryList); _ = Task.Run(() => ConnectionLost?.Invoke()); return Task.CompletedTask; @@ -436,19 +393,11 @@ namespace CryptoExchange.Net.Sockets.Default { Status = SocketStatus.Resubscribing; - _listenersLock.EnterWriteLock(); - try - { - foreach (var query in _listeners.OfType().ToList()) - { - query.Fail(new WebError("Connection interrupted")); - _listeners.Remove(query); - } - } - finally - { - _listenersLock.ExitWriteLock(); - } + var queryList = _listeners.OfType().ToList(); + foreach (var query in queryList) + query.Fail(new WebError("Connection interrupted")); + + RemoveMessageProcessors(queryList); // Can't wait for this as it would cause a deadlock _ = Task.Run(async () => @@ -503,17 +452,7 @@ namespace CryptoExchange.Net.Sockets.Default /// protected virtual Task HandleRequestRateLimitedAsync(int requestId) { - Query? query; - _listenersLock.EnterReadLock(); - try - { - query = _listeners.OfType().FirstOrDefault(x => x.Id == requestId); - } - finally - { - _listenersLock.ExitReadLock(); - } - + var query = _listeners.OfType().FirstOrDefault(x => x.Id == requestId); if (query == null) return Task.CompletedTask; @@ -537,17 +476,7 @@ namespace CryptoExchange.Net.Sockets.Default /// Id of the request sent protected virtual Task HandleRequestSentAsync(int requestId) { - Query? query; - _listenersLock.EnterReadLock(); - try - { - query = _listeners.OfType().FirstOrDefault(x => x.Id == requestId); - } - finally - { - _listenersLock.ExitReadLock(); - } - + var query = _listeners.OfType().FirstOrDefault(x => x.Id == requestId); if (query == null) return Task.CompletedTask; @@ -593,27 +522,19 @@ namespace CryptoExchange.Net.Sockets.Default } Type? deserializationType = null; - _listenersLock.EnterReadLock(); - try + foreach (var subscription in _listeners) { - foreach (var subscription in _listeners) + foreach (var route in subscription.MessageRouter.Routes) { - foreach (var route in subscription.MessageRouter.Routes) - { - if (!route.TypeIdentifier.Equals(typeIdentifier, StringComparison.Ordinal)) - continue; + if (!route.TypeIdentifier.Equals(typeIdentifier, StringComparison.Ordinal)) + continue; - deserializationType = route.DeserializationType; - break; - } - - if (deserializationType != null) - break; + deserializationType = route.DeserializationType; + break; } - } - finally - { - _listenersLock.ExitReadLock(); + + if (deserializationType != null) + break; } if (deserializationType == null) @@ -660,94 +581,69 @@ namespace CryptoExchange.Net.Sockets.Default var topicFilter = messageConverter.GetTopicFilter(result); bool processed = false; - _listenersLock.EnterReadLock(); - try + foreach (var processor in _listeners) { - var currentCount = _listeners.Count; - for(var i = 0; i < _listeners.Count; i++) + bool isQuery = false; + Query? query = null; + if (processor is Query cquery) { - if (_listeners.Count != currentCount) - { - // Possible a query added or removed. If added it's not a problem, if removed it is - if (_listeners.Count < currentCount) - throw new Exception("Listeners list adjusted, can't continue processing"); - } - - var processor = _listeners[i]; - bool isQuery = false; - Query? query = null; - if (processor is Query cquery) - { - isQuery = true; - query = cquery; - } - - var complete = false; - - foreach (var route in processor.MessageRouter.Routes) - { - if (route.TypeIdentifier != typeIdentifier) - continue; - - // Forward message rules: - // | Message Topic | Route Topic Filter | Topics Match | Forward | Description - // | N | N | - | Y | No topic filter applied - // | N | Y | - | N | Route only listens to specific topic - // | Y | N | - | Y | Route listens to all message regardless of topic - // | Y | Y | Y | Y | Route listens to specific message topic - // | Y | Y | N | N | Route listens to different topic - if (topicFilter == null) - { - if (route.TopicFilter != null) - // No topic on message, but route is filtering on topic - continue; - } - else - { - if (route.TopicFilter != null && !route.TopicFilter.Equals(topicFilter, StringComparison.Ordinal)) - // Message has a topic, and the route has a filter for another topic - continue; - } - - processed = true; - - if (isQuery && query!.Completed) - continue; - - processor.Handle(this, receiveTime, originalData, result, route); - if (isQuery && !route.MultipleReaders) - { - complete = true; - break; - } - } - - if (complete) - break; + isQuery = true; + query = cquery; } - } - finally - { - _listenersLock.ExitReadLock(); + + var complete = false; + + foreach (var route in processor.MessageRouter.Routes) + { + if (route.TypeIdentifier != typeIdentifier) + continue; + + // Forward message rules: + // | Message Topic | Route Topic Filter | Topics Match | Forward | Description + // | N | N | - | Y | No topic filter applied + // | N | Y | - | N | Route only listens to specific topic + // | Y | N | - | Y | Route listens to all message regardless of topic + // | Y | Y | Y | Y | Route listens to specific message topic + // | Y | Y | N | N | Route listens to different topic + if (topicFilter == null) + { + if (route.TopicFilter != null) + // No topic on message, but route is filtering on topic + continue; + } + else + { + if (route.TopicFilter != null && !route.TopicFilter.Equals(topicFilter, StringComparison.Ordinal)) + // Message has a topic, and the route has a filter for another topic + continue; + } + + processed = true; + + if (isQuery && query!.Completed) + continue; + + processor.Handle(this, receiveTime, originalData, result, route); + if (isQuery && !route.MultipleReaders) + { + complete = true; + break; + } + } + + if (complete) + break; } if (!processed) { if (!ApiClient.HandleUnhandledMessage(this, typeIdentifier, data)) { - _listenersLock.EnterReadLock(); - try - { - _logger.ReceivedMessageNotMatchedToAnyListener( - SocketId, - typeIdentifier, - topicFilter!, - string.Join(",", _listeners.Select(x => string.Join(",", x.MessageRouter.Routes.Where(x => x.TypeIdentifier == typeIdentifier).Select(x => x.TopicFilter != null ? string.Join(",", x.TopicFilter) : "[null]"))))); - } - finally - { - _listenersLock.ExitReadLock(); - } + _logger.ReceivedMessageNotMatchedToAnyListener( + SocketId, + typeIdentifier, + topicFilter!, + string.Join(",", _listeners.Select(x => string.Join(",", x.MessageRouter.Routes.Where(x => x.TypeIdentifier == typeIdentifier).Select(x => x.TopicFilter != null ? string.Join(",", x.TopicFilter) : "[null]"))))); } } } @@ -792,18 +688,10 @@ namespace CryptoExchange.Net.Sockets.Default if (ApiClient._socketConnections.ContainsKey(SocketId)) ApiClient._socketConnections.TryRemove(SocketId, out _); - _listenersLock.EnterReadLock(); - try + foreach (var subscription in _listeners.OfType()) { - foreach (var subscription in _listeners.OfType()) - { - if (subscription.CancellationTokenRegistration.HasValue) - subscription.CancellationTokenRegistration.Value.Dispose(); - } - } - finally - { - _listenersLock.ExitReadLock(); + if (subscription.CancellationTokenRegistration.HasValue) + subscription.CancellationTokenRegistration.Value.Dispose(); } await _socket.CloseAsync().ConfigureAwait(false); @@ -833,32 +721,12 @@ namespace CryptoExchange.Net.Sockets.Default if (subscription.CancellationTokenRegistration.HasValue) subscription.CancellationTokenRegistration.Value.Dispose(); - bool anyDuplicateSubscription; - bool shouldCloseConnection; - _listenersLock.EnterReadLock(); - try - { - anyDuplicateSubscription = _listeners.OfType().Any(x => x != subscription && x.MessageRouter.Routes.All(l => subscription.MessageRouter.ContainsCheck(l))); - shouldCloseConnection = _listeners.OfType().All(r => !r.UserSubscription || r.Status == SubscriptionStatus.Closing || r.Status == SubscriptionStatus.Closed) && !DedicatedRequestConnection.IsDedicatedRequestConnection; - } - finally - { - _listenersLock.ExitReadLock(); - } + bool anyDuplicateSubscription = _listeners.OfType().Any(x => x != subscription && x.MessageRouter.Routes.All(l => subscription.MessageRouter.ContainsCheck(l))); + bool shouldCloseConnection = _listeners.OfType().All(r => !r.UserSubscription || r.Status == SubscriptionStatus.Closing || r.Status == SubscriptionStatus.Closed) && !DedicatedRequestConnection.IsDedicatedRequestConnection; if (!anyDuplicateSubscription) { - bool needUnsub; - _listenersLock.EnterReadLock(); - try - { - needUnsub = _listeners.Contains(subscription) && !shouldCloseConnection; - } - finally - { - _listenersLock.ExitReadLock(); - } - + var needUnsub = _listeners.Contains(subscription) && !shouldCloseConnection; if (needUnsub && _socket.IsOpen) await UnsubscribeAsync(subscription).ConfigureAwait(false); } @@ -882,15 +750,7 @@ namespace CryptoExchange.Net.Sockets.Default await CloseAsync().ConfigureAwait(false); } - _listenersLock.EnterWriteLock(); - try - { - _listeners.Remove(subscription); - } - finally - { - _listenersLock.ExitWriteLock(); - } + RemoveMessageProcessor(subscription); subscription.Status = SubscriptionStatus.Closed; } @@ -914,15 +774,7 @@ namespace CryptoExchange.Net.Sockets.Default if (Status != SocketStatus.None && Status != SocketStatus.Connected) return false; - _listenersLock.EnterWriteLock(); - try - { - _listeners.Add(subscription); - } - finally - { - _listenersLock.ExitWriteLock(); - } + AddMessageProcessor(subscription); if (subscription.UserSubscription) _logger.AddingNewSubscription(SocketId, subscription.Id, UserSubscriptionCount); @@ -935,15 +787,7 @@ namespace CryptoExchange.Net.Sockets.Default /// public Subscription? GetSubscription(int id) { - _listenersLock.EnterReadLock(); - try - { - return _listeners.OfType().SingleOrDefault(s => s.Id == id); - } - finally - { - _listenersLock.ExitReadLock(); - } + return _listeners.OfType().SingleOrDefault(s => s.Id == id); } /// @@ -991,29 +835,12 @@ namespace CryptoExchange.Net.Sockets.Default private async Task SendAndWaitIntAsync(Query query, CancellationToken ct = default) { - _listenersLock.EnterWriteLock(); - try - { - _listeners.Add(query); - } - finally - { - _listenersLock.ExitWriteLock(); - } - + AddMessageProcessor(query); var sendResult = await SendAsync(query.Id, query.Request, query.Weight).ConfigureAwait(false); if (!sendResult) { query.Fail(sendResult.Error!); - _listenersLock.EnterWriteLock(); - try - { - _listeners.Remove(query); - } - finally - { - _listenersLock.ExitWriteLock(); - } + RemoveMessageProcessor(query); return; } @@ -1044,15 +871,7 @@ namespace CryptoExchange.Net.Sockets.Default } finally { - _listenersLock.EnterWriteLock(); - try - { - _listeners.Remove(query); - } - finally - { - _listenersLock.ExitWriteLock(); - } + RemoveMessageProcessor(query); } } @@ -1158,17 +977,7 @@ namespace CryptoExchange.Net.Sockets.Default if (!DedicatedRequestConnection.IsDedicatedRequestConnection) { - bool anySubscriptions; - _listenersLock.EnterReadLock(); - try - { - anySubscriptions = _listeners.OfType().Any(s => s.UserSubscription); - } - finally - { - _listenersLock.ExitReadLock(); - } - + var anySubscriptions = _listeners.OfType().Any(s => s.UserSubscription); if (!anySubscriptions) { // No need to resubscribe anything @@ -1178,18 +987,8 @@ namespace CryptoExchange.Net.Sockets.Default } } - bool anyAuthenticated; - _listenersLock.EnterReadLock(); - try - { - anyAuthenticated = _listeners.OfType().Any(s => s.Authenticated) + bool anyAuthenticated = _listeners.OfType().Any(s => s.Authenticated) || DedicatedRequestConnection.IsDedicatedRequestConnection && DedicatedRequestConnection.Authenticated; - } - finally - { - _listenersLock.ExitReadLock(); - } - if (anyAuthenticated) { // If we reconnected a authenticated connection we need to re-authenticate @@ -1212,17 +1011,7 @@ namespace CryptoExchange.Net.Sockets.Default if (!_socket.IsOpen) return new CallResult(new WebError("Socket not connected")); - List subList; - _listenersLock.EnterReadLock(); - try - { - subList = _listeners.OfType().Where(x => x.Active).Skip(batch * batchSize).Take(batchSize).ToList(); - } - finally - { - _listenersLock.ExitReadLock(); - } - + var subList = _listeners.OfType().Where(x => x.Active).Skip(batch * batchSize).Take(batchSize).ToList(); if (subList.Count == 0) break; @@ -1404,6 +1193,37 @@ namespace CryptoExchange.Net.Sockets.Default }); } + private void AddMessageProcessor(IMessageProcessor processor) + { + lock (_listenersLock) + { + var updatedList = new List(_listeners); + updatedList.Add(processor); + _listeners = updatedList.AsReadOnly(); + } + } + + private void RemoveMessageProcessor(IMessageProcessor processor) + { + lock (_listenersLock) + { + var updatedList = new List(_listeners); + updatedList.Remove(processor); + _listeners = updatedList.AsReadOnly(); + } + } + + private void RemoveMessageProcessors(IEnumerable processors) + { + lock (_listenersLock) + { + var updatedList = new List(_listeners); + foreach (var processor in processors) + updatedList.Remove(processor); + _listeners = updatedList.AsReadOnly(); + } + } + } }