1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2026-04-07 02:01:12 +00:00

Updated lock call SocketConnection to outside the try

This commit is contained in:
Jkorf 2026-02-24 13:14:48 +01:00
parent 73fcb47b17
commit 02b70398b3

View File

@ -123,9 +123,9 @@ namespace CryptoExchange.Net.Sockets.Default
{ {
get get
{ {
_listenersLock.EnterReadLock();
try try
{ {
_listenersLock.EnterReadLock();
return _listeners.OfType<Subscription>().Count(h => h.UserSubscription); return _listeners.OfType<Subscription>().Count(h => h.UserSubscription);
} }
finally finally
@ -142,9 +142,9 @@ namespace CryptoExchange.Net.Sockets.Default
{ {
get get
{ {
_listenersLock.EnterReadLock();
try try
{ {
_listenersLock.EnterReadLock();
return _listeners.OfType<Subscription>().Where(h => h.UserSubscription).ToArray(); return _listeners.OfType<Subscription>().Where(h => h.UserSubscription).ToArray();
} }
@ -255,9 +255,9 @@ namespace CryptoExchange.Net.Sockets.Default
{ {
get get
{ {
_listenersLock.EnterReadLock();
try try
{ {
_listenersLock.EnterReadLock();
return _listeners.OfType<Subscription>().Select(x => x.Topic).Where(t => t != null).ToArray()!; return _listeners.OfType<Subscription>().Select(x => x.Topic).Where(t => t != null).ToArray()!;
} }
finally finally
@ -274,9 +274,9 @@ namespace CryptoExchange.Net.Sockets.Default
{ {
get get
{ {
_listenersLock.EnterReadLock();
try try
{ {
_listenersLock.EnterReadLock();
return _listeners.OfType<Query>().Where(x => !x.Completed).Count(); return _listeners.OfType<Query>().Where(x => !x.Completed).Count();
} }
finally finally
@ -365,9 +365,9 @@ namespace CryptoExchange.Net.Sockets.Default
if (ApiClient._socketConnections.ContainsKey(SocketId)) if (ApiClient._socketConnections.ContainsKey(SocketId))
ApiClient._socketConnections.TryRemove(SocketId, out _); ApiClient._socketConnections.TryRemove(SocketId, out _);
_listenersLock.EnterWriteLock();
try try
{ {
_listenersLock.EnterWriteLock();
foreach (var subscription in _listeners.OfType<Subscription>().Where(l => l.UserSubscription && !l.IsClosingConnection)) foreach (var subscription in _listeners.OfType<Subscription>().Where(l => l.UserSubscription && !l.IsClosingConnection))
{ {
subscription.IsClosingConnection = true; subscription.IsClosingConnection = true;
@ -399,9 +399,9 @@ namespace CryptoExchange.Net.Sockets.Default
Authenticated = false; Authenticated = false;
_lastSequenceNumber = 0; _lastSequenceNumber = 0;
_listenersLock.EnterWriteLock();
try try
{ {
_listenersLock.EnterWriteLock();
foreach (var subscription in _listeners.OfType<Subscription>().Where(l => l.UserSubscription)) foreach (var subscription in _listeners.OfType<Subscription>().Where(l => l.UserSubscription))
subscription.Reset(); subscription.Reset();
@ -436,9 +436,9 @@ namespace CryptoExchange.Net.Sockets.Default
{ {
Status = SocketStatus.Resubscribing; Status = SocketStatus.Resubscribing;
_listenersLock.EnterWriteLock();
try try
{ {
_listenersLock.EnterWriteLock();
foreach (var query in _listeners.OfType<Query>().ToList()) foreach (var query in _listeners.OfType<Query>().ToList())
{ {
query.Fail(new WebError("Connection interrupted")); query.Fail(new WebError("Connection interrupted"));
@ -504,9 +504,9 @@ namespace CryptoExchange.Net.Sockets.Default
protected virtual Task HandleRequestRateLimitedAsync(int requestId) protected virtual Task HandleRequestRateLimitedAsync(int requestId)
{ {
Query? query; Query? query;
_listenersLock.EnterReadLock();
try try
{ {
_listenersLock.EnterReadLock();
query = _listeners.OfType<Query>().FirstOrDefault(x => x.Id == requestId); query = _listeners.OfType<Query>().FirstOrDefault(x => x.Id == requestId);
} }
finally finally
@ -538,9 +538,9 @@ namespace CryptoExchange.Net.Sockets.Default
protected virtual Task HandleRequestSentAsync(int requestId) protected virtual Task HandleRequestSentAsync(int requestId)
{ {
Query? query; Query? query;
_listenersLock.EnterReadLock();
try try
{ {
_listenersLock.EnterReadLock();
query = _listeners.OfType<Query>().FirstOrDefault(x => x.Id == requestId); query = _listeners.OfType<Query>().FirstOrDefault(x => x.Id == requestId);
} }
finally finally
@ -593,9 +593,9 @@ namespace CryptoExchange.Net.Sockets.Default
} }
Type? deserializationType = null; Type? deserializationType = null;
_listenersLock.EnterReadLock();
try try
{ {
_listenersLock.EnterReadLock();
foreach (var subscription in _listeners) foreach (var subscription in _listeners)
{ {
foreach (var route in subscription.MessageRouter.Routes) foreach (var route in subscription.MessageRouter.Routes)
@ -660,9 +660,9 @@ namespace CryptoExchange.Net.Sockets.Default
var topicFilter = messageConverter.GetTopicFilter(result); var topicFilter = messageConverter.GetTopicFilter(result);
bool processed = false; bool processed = false;
_listenersLock.EnterReadLock();
try try
{ {
_listenersLock.EnterReadLock();
var currentCount = _listeners.Count; var currentCount = _listeners.Count;
for(var i = 0; i < _listeners.Count; i++) for(var i = 0; i < _listeners.Count; i++)
{ {
@ -735,7 +735,8 @@ namespace CryptoExchange.Net.Sockets.Default
{ {
if (!ApiClient.HandleUnhandledMessage(this, typeIdentifier, data)) if (!ApiClient.HandleUnhandledMessage(this, typeIdentifier, data))
{ {
lock (_listenersLock) _listenersLock.EnterReadLock();
try
{ {
_logger.ReceivedMessageNotMatchedToAnyListener( _logger.ReceivedMessageNotMatchedToAnyListener(
SocketId, SocketId,
@ -743,6 +744,10 @@ namespace CryptoExchange.Net.Sockets.Default
topicFilter!, topicFilter!,
string.Join(",", _listeners.Select(x => string.Join(",", x.MessageRouter.Routes.Where(x => x.TypeIdentifier == typeIdentifier).Select(x => x.TopicFilter != null ? string.Join(",", x.TopicFilter) : "[null]"))))); string.Join(",", _listeners.Select(x => string.Join(",", x.MessageRouter.Routes.Where(x => x.TypeIdentifier == typeIdentifier).Select(x => x.TopicFilter != null ? string.Join(",", x.TopicFilter) : "[null]")))));
} }
finally
{
_listenersLock.ExitReadLock();
}
} }
} }
} }
@ -787,9 +792,9 @@ namespace CryptoExchange.Net.Sockets.Default
if (ApiClient._socketConnections.ContainsKey(SocketId)) if (ApiClient._socketConnections.ContainsKey(SocketId))
ApiClient._socketConnections.TryRemove(SocketId, out _); ApiClient._socketConnections.TryRemove(SocketId, out _);
_listenersLock.EnterReadLock();
try try
{ {
_listenersLock.EnterReadLock();
foreach (var subscription in _listeners.OfType<Subscription>()) foreach (var subscription in _listeners.OfType<Subscription>())
{ {
if (subscription.CancellationTokenRegistration.HasValue) if (subscription.CancellationTokenRegistration.HasValue)
@ -830,9 +835,9 @@ namespace CryptoExchange.Net.Sockets.Default
bool anyDuplicateSubscription; bool anyDuplicateSubscription;
bool shouldCloseConnection; bool shouldCloseConnection;
_listenersLock.EnterReadLock();
try try
{ {
_listenersLock.EnterReadLock();
anyDuplicateSubscription = _listeners.OfType<Subscription>().Any(x => x != subscription && x.MessageRouter.Routes.All(l => subscription.MessageRouter.ContainsCheck(l))); anyDuplicateSubscription = _listeners.OfType<Subscription>().Any(x => x != subscription && x.MessageRouter.Routes.All(l => subscription.MessageRouter.ContainsCheck(l)));
shouldCloseConnection = _listeners.OfType<Subscription>().All(r => !r.UserSubscription || r.Status == SubscriptionStatus.Closing || r.Status == SubscriptionStatus.Closed) && !DedicatedRequestConnection.IsDedicatedRequestConnection; shouldCloseConnection = _listeners.OfType<Subscription>().All(r => !r.UserSubscription || r.Status == SubscriptionStatus.Closing || r.Status == SubscriptionStatus.Closed) && !DedicatedRequestConnection.IsDedicatedRequestConnection;
} }
@ -844,9 +849,9 @@ namespace CryptoExchange.Net.Sockets.Default
if (!anyDuplicateSubscription) if (!anyDuplicateSubscription)
{ {
bool needUnsub; bool needUnsub;
_listenersLock.EnterReadLock();
try try
{ {
_listenersLock.EnterReadLock();
needUnsub = _listeners.Contains(subscription) && !shouldCloseConnection; needUnsub = _listeners.Contains(subscription) && !shouldCloseConnection;
} }
finally finally
@ -877,9 +882,9 @@ namespace CryptoExchange.Net.Sockets.Default
await CloseAsync().ConfigureAwait(false); await CloseAsync().ConfigureAwait(false);
} }
_listenersLock.EnterWriteLock();
try try
{ {
_listenersLock.EnterWriteLock();
_listeners.Remove(subscription); _listeners.Remove(subscription);
} }
finally finally
@ -908,9 +913,10 @@ namespace CryptoExchange.Net.Sockets.Default
{ {
if (Status != SocketStatus.None && Status != SocketStatus.Connected) if (Status != SocketStatus.None && Status != SocketStatus.Connected)
return false; return false;
_listenersLock.EnterWriteLock();
try try
{ {
_listenersLock.EnterWriteLock();
_listeners.Add(subscription); _listeners.Add(subscription);
} }
finally finally
@ -929,9 +935,9 @@ namespace CryptoExchange.Net.Sockets.Default
/// <param name="id"></param> /// <param name="id"></param>
public Subscription? GetSubscription(int id) public Subscription? GetSubscription(int id)
{ {
_listenersLock.EnterReadLock();
try try
{ {
_listenersLock.EnterReadLock();
return _listeners.OfType<Subscription>().SingleOrDefault(s => s.Id == id); return _listeners.OfType<Subscription>().SingleOrDefault(s => s.Id == id);
} }
finally finally
@ -985,9 +991,9 @@ namespace CryptoExchange.Net.Sockets.Default
private async Task SendAndWaitIntAsync(Query query, CancellationToken ct = default) private async Task SendAndWaitIntAsync(Query query, CancellationToken ct = default)
{ {
_listenersLock.EnterWriteLock();
try try
{ {
_listenersLock.EnterWriteLock();
_listeners.Add(query); _listeners.Add(query);
} }
finally finally
@ -999,9 +1005,9 @@ namespace CryptoExchange.Net.Sockets.Default
if (!sendResult) if (!sendResult)
{ {
query.Fail(sendResult.Error!); query.Fail(sendResult.Error!);
_listenersLock.EnterWriteLock();
try try
{ {
_listenersLock.EnterWriteLock();
_listeners.Remove(query); _listeners.Remove(query);
} }
finally finally
@ -1038,9 +1044,9 @@ namespace CryptoExchange.Net.Sockets.Default
} }
finally finally
{ {
_listenersLock.EnterWriteLock();
try try
{ {
_listenersLock.EnterWriteLock();
_listeners.Remove(query); _listeners.Remove(query);
} }
finally finally
@ -1153,9 +1159,9 @@ namespace CryptoExchange.Net.Sockets.Default
if (!DedicatedRequestConnection.IsDedicatedRequestConnection) if (!DedicatedRequestConnection.IsDedicatedRequestConnection)
{ {
bool anySubscriptions; bool anySubscriptions;
_listenersLock.EnterReadLock();
try try
{ {
_listenersLock.EnterReadLock();
anySubscriptions = _listeners.OfType<Subscription>().Any(s => s.UserSubscription); anySubscriptions = _listeners.OfType<Subscription>().Any(s => s.UserSubscription);
} }
finally finally
@ -1173,9 +1179,9 @@ namespace CryptoExchange.Net.Sockets.Default
} }
bool anyAuthenticated; bool anyAuthenticated;
_listenersLock.EnterReadLock();
try try
{ {
_listenersLock.EnterReadLock();
anyAuthenticated = _listeners.OfType<Subscription>().Any(s => s.Authenticated) anyAuthenticated = _listeners.OfType<Subscription>().Any(s => s.Authenticated)
|| DedicatedRequestConnection.IsDedicatedRequestConnection && DedicatedRequestConnection.Authenticated; || DedicatedRequestConnection.IsDedicatedRequestConnection && DedicatedRequestConnection.Authenticated;
} }
@ -1207,9 +1213,9 @@ namespace CryptoExchange.Net.Sockets.Default
return new CallResult(new WebError("Socket not connected")); return new CallResult(new WebError("Socket not connected"));
List<Subscription> subList; List<Subscription> subList;
_listenersLock.EnterReadLock();
try try
{ {
_listenersLock.EnterReadLock();
subList = _listeners.OfType<Subscription>().Where(x => x.Active).Skip(batch * batchSize).Take(batchSize).ToList(); subList = _listeners.OfType<Subscription>().Where(x => x.Active).Skip(batch * batchSize).Take(batchSize).ToList();
} }
finally finally