mirror of
https://github.com/JKorf/CryptoExchange.Net
synced 2026-04-07 02:01:12 +00:00
Updated internal lock for subscription to ReaderWriterLockSlim on SocketConnection
This commit is contained in:
parent
bea2b2bd7b
commit
d41ca3459e
@ -123,9 +123,16 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
{
|
||||
get
|
||||
{
|
||||
lock (_listenersLock)
|
||||
try
|
||||
{
|
||||
_listenersLock.EnterReadLock();
|
||||
return _listeners.OfType<Subscription>().Count(h => h.UserSubscription);
|
||||
}
|
||||
finally
|
||||
{
|
||||
_listenersLock.ExitReadLock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@ -135,8 +142,16 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
{
|
||||
get
|
||||
{
|
||||
lock (_listenersLock)
|
||||
try
|
||||
{
|
||||
_listenersLock.EnterReadLock();
|
||||
return _listeners.OfType<Subscription>().Where(h => h.UserSubscription).ToArray();
|
||||
|
||||
}
|
||||
finally
|
||||
{
|
||||
_listenersLock.ExitReadLock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -240,9 +255,16 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
{
|
||||
get
|
||||
{
|
||||
lock (_listenersLock)
|
||||
try
|
||||
{
|
||||
_listenersLock.EnterReadLock();
|
||||
return _listeners.OfType<Subscription>().Select(x => x.Topic).Where(t => t != null).ToArray()!;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_listenersLock.ExitReadLock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@ -252,18 +274,21 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
{
|
||||
get
|
||||
{
|
||||
lock (_listenersLock)
|
||||
try
|
||||
{
|
||||
_listenersLock.EnterReadLock();
|
||||
return _listeners.OfType<Query>().Where(x => !x.Completed).Count();
|
||||
}
|
||||
finally
|
||||
{
|
||||
_listenersLock.ExitReadLock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private bool _pausedActivity;
|
||||
#if NET9_0_OR_GREATER
|
||||
private readonly Lock _listenersLock = new Lock();
|
||||
#else
|
||||
private readonly object _listenersLock = new object();
|
||||
#endif
|
||||
private readonly ReaderWriterLockSlim _listenersLock = new ReaderWriterLockSlim();
|
||||
private readonly List<IMessageProcessor> _listeners;
|
||||
private readonly ILogger _logger;
|
||||
private SocketStatus _status;
|
||||
@ -340,8 +365,9 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
if (ApiClient._socketConnections.ContainsKey(SocketId))
|
||||
ApiClient._socketConnections.TryRemove(SocketId, out _);
|
||||
|
||||
lock (_listenersLock)
|
||||
try
|
||||
{
|
||||
_listenersLock.EnterWriteLock();
|
||||
foreach (var subscription in _listeners.OfType<Subscription>().Where(l => l.UserSubscription && !l.IsClosingConnection))
|
||||
{
|
||||
subscription.IsClosingConnection = true;
|
||||
@ -354,6 +380,10 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
_listeners.Remove(query);
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
_listenersLock.ExitWriteLock();
|
||||
}
|
||||
|
||||
_ = Task.Run(() => ConnectionClosed?.Invoke());
|
||||
return Task.CompletedTask;
|
||||
@ -369,8 +399,9 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
Authenticated = false;
|
||||
_lastSequenceNumber = 0;
|
||||
|
||||
lock (_listenersLock)
|
||||
try
|
||||
{
|
||||
_listenersLock.EnterWriteLock();
|
||||
foreach (var subscription in _listeners.OfType<Subscription>().Where(l => l.UserSubscription))
|
||||
subscription.Reset();
|
||||
|
||||
@ -380,6 +411,10 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
_listeners.Remove(query);
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
_listenersLock.ExitWriteLock();
|
||||
}
|
||||
|
||||
_ = Task.Run(() => ConnectionLost?.Invoke());
|
||||
return Task.CompletedTask;
|
||||
@ -401,14 +436,19 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
{
|
||||
Status = SocketStatus.Resubscribing;
|
||||
|
||||
lock (_listenersLock)
|
||||
try
|
||||
{
|
||||
_listenersLock.EnterWriteLock();
|
||||
foreach (var query in _listeners.OfType<Query>().ToList())
|
||||
{
|
||||
query.Fail(new WebError("Connection interrupted"));
|
||||
_listeners.Remove(query);
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
_listenersLock.ExitWriteLock();
|
||||
}
|
||||
|
||||
// Can't wait for this as it would cause a deadlock
|
||||
_ = Task.Run(async () =>
|
||||
@ -464,10 +504,15 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
protected virtual Task HandleRequestRateLimitedAsync(int requestId)
|
||||
{
|
||||
Query? query;
|
||||
lock (_listenersLock)
|
||||
try
|
||||
{
|
||||
_listenersLock.EnterReadLock();
|
||||
query = _listeners.OfType<Query>().FirstOrDefault(x => x.Id == requestId);
|
||||
}
|
||||
finally
|
||||
{
|
||||
_listenersLock.ExitReadLock();
|
||||
}
|
||||
|
||||
if (query == null)
|
||||
return Task.CompletedTask;
|
||||
@ -493,10 +538,15 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
protected virtual Task HandleRequestSentAsync(int requestId)
|
||||
{
|
||||
Query? query;
|
||||
lock (_listenersLock)
|
||||
try
|
||||
{
|
||||
_listenersLock.EnterReadLock();
|
||||
query = _listeners.OfType<Query>().FirstOrDefault(x => x.Id == requestId);
|
||||
}
|
||||
finally
|
||||
{
|
||||
_listenersLock.ExitReadLock();
|
||||
}
|
||||
|
||||
if (query == null)
|
||||
return Task.CompletedTask;
|
||||
@ -543,8 +593,9 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
}
|
||||
|
||||
Type? deserializationType = null;
|
||||
lock (_listenersLock)
|
||||
try
|
||||
{
|
||||
_listenersLock.EnterReadLock();
|
||||
foreach (var subscription in _listeners)
|
||||
{
|
||||
foreach (var route in subscription.MessageRouter.Routes)
|
||||
@ -560,6 +611,10 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
break;
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
_listenersLock.ExitReadLock();
|
||||
}
|
||||
|
||||
if (deserializationType == null)
|
||||
{
|
||||
@ -605,8 +660,9 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
var topicFilter = messageConverter.GetTopicFilter(result);
|
||||
|
||||
bool processed = false;
|
||||
lock (_listenersLock)
|
||||
try
|
||||
{
|
||||
_listenersLock.EnterReadLock();
|
||||
var currentCount = _listeners.Count;
|
||||
for(var i = 0; i < _listeners.Count; i++)
|
||||
{
|
||||
@ -670,6 +726,10 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
break;
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
_listenersLock.ExitReadLock();
|
||||
}
|
||||
|
||||
if (!processed)
|
||||
{
|
||||
@ -727,14 +787,19 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
if (ApiClient._socketConnections.ContainsKey(SocketId))
|
||||
ApiClient._socketConnections.TryRemove(SocketId, out _);
|
||||
|
||||
lock (_listenersLock)
|
||||
try
|
||||
{
|
||||
_listenersLock.EnterReadLock();
|
||||
foreach (var subscription in _listeners.OfType<Subscription>())
|
||||
{
|
||||
if (subscription.CancellationTokenRegistration.HasValue)
|
||||
subscription.CancellationTokenRegistration.Value.Dispose();
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
_listenersLock.ExitReadLock();
|
||||
}
|
||||
|
||||
await _socket.CloseAsync().ConfigureAwait(false);
|
||||
_socket.Dispose();
|
||||
@ -764,18 +829,30 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
subscription.CancellationTokenRegistration.Value.Dispose();
|
||||
|
||||
bool anyDuplicateSubscription;
|
||||
lock (_listenersLock)
|
||||
anyDuplicateSubscription = _listeners.OfType<Subscription>().Any(x => x != subscription && x.MessageRouter.Routes.All(l => subscription.MessageRouter.ContainsCheck(l)));
|
||||
|
||||
bool shouldCloseConnection;
|
||||
lock (_listenersLock)
|
||||
try
|
||||
{
|
||||
_listenersLock.EnterReadLock();
|
||||
anyDuplicateSubscription = _listeners.OfType<Subscription>().Any(x => x != subscription && x.MessageRouter.Routes.All(l => subscription.MessageRouter.ContainsCheck(l)));
|
||||
shouldCloseConnection = _listeners.OfType<Subscription>().All(r => !r.UserSubscription || r.Status == SubscriptionStatus.Closing || r.Status == SubscriptionStatus.Closed) && !DedicatedRequestConnection.IsDedicatedRequestConnection;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_listenersLock.ExitReadLock();
|
||||
}
|
||||
|
||||
if (!anyDuplicateSubscription)
|
||||
{
|
||||
bool needUnsub;
|
||||
lock (_listenersLock)
|
||||
try
|
||||
{
|
||||
_listenersLock.EnterReadLock();
|
||||
needUnsub = _listeners.Contains(subscription) && !shouldCloseConnection;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_listenersLock.ExitReadLock();
|
||||
}
|
||||
|
||||
if (needUnsub && _socket.IsOpen)
|
||||
await UnsubscribeAsync(subscription).ConfigureAwait(false);
|
||||
@ -800,8 +877,15 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
await CloseAsync().ConfigureAwait(false);
|
||||
}
|
||||
|
||||
lock (_listenersLock)
|
||||
try
|
||||
{
|
||||
_listenersLock.EnterWriteLock();
|
||||
_listeners.Remove(subscription);
|
||||
}
|
||||
finally
|
||||
{
|
||||
_listenersLock.ExitWriteLock();
|
||||
}
|
||||
|
||||
subscription.Status = SubscriptionStatus.Closed;
|
||||
}
|
||||
@ -824,9 +908,15 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
{
|
||||
if (Status != SocketStatus.None && Status != SocketStatus.Connected)
|
||||
return false;
|
||||
|
||||
lock (_listenersLock)
|
||||
try
|
||||
{
|
||||
_listenersLock.EnterWriteLock();
|
||||
_listeners.Add(subscription);
|
||||
}
|
||||
finally
|
||||
{
|
||||
_listenersLock.ExitWriteLock();
|
||||
}
|
||||
|
||||
if (subscription.UserSubscription)
|
||||
_logger.AddingNewSubscription(SocketId, subscription.Id, UserSubscriptionCount);
|
||||
@ -839,9 +929,16 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
/// <param name="id"></param>
|
||||
public Subscription? GetSubscription(int id)
|
||||
{
|
||||
lock (_listenersLock)
|
||||
try
|
||||
{
|
||||
_listenersLock.EnterReadLock();
|
||||
return _listeners.OfType<Subscription>().SingleOrDefault(s => s.Id == id);
|
||||
}
|
||||
finally
|
||||
{
|
||||
_listenersLock.ExitReadLock();
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Get the state of the connection
|
||||
@ -888,15 +985,29 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
|
||||
private async Task SendAndWaitIntAsync(Query query, CancellationToken ct = default)
|
||||
{
|
||||
lock (_listenersLock)
|
||||
try
|
||||
{
|
||||
_listenersLock.EnterWriteLock();
|
||||
_listeners.Add(query);
|
||||
}
|
||||
finally
|
||||
{
|
||||
_listenersLock.ExitWriteLock();
|
||||
}
|
||||
|
||||
var sendResult = await SendAsync(query.Id, query.Request, query.Weight).ConfigureAwait(false);
|
||||
if (!sendResult)
|
||||
{
|
||||
query.Fail(sendResult.Error!);
|
||||
lock (_listenersLock)
|
||||
try
|
||||
{
|
||||
_listenersLock.EnterWriteLock();
|
||||
_listeners.Remove(query);
|
||||
}
|
||||
finally
|
||||
{
|
||||
_listenersLock.ExitWriteLock();
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
@ -927,9 +1038,16 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
}
|
||||
finally
|
||||
{
|
||||
lock (_listenersLock)
|
||||
try
|
||||
{
|
||||
_listenersLock.EnterWriteLock();
|
||||
_listeners.Remove(query);
|
||||
}
|
||||
finally
|
||||
{
|
||||
_listenersLock.ExitWriteLock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@ -1035,8 +1153,16 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
if (!DedicatedRequestConnection.IsDedicatedRequestConnection)
|
||||
{
|
||||
bool anySubscriptions;
|
||||
lock (_listenersLock)
|
||||
try
|
||||
{
|
||||
_listenersLock.EnterReadLock();
|
||||
anySubscriptions = _listeners.OfType<Subscription>().Any(s => s.UserSubscription);
|
||||
}
|
||||
finally
|
||||
{
|
||||
_listenersLock.ExitReadLock();
|
||||
}
|
||||
|
||||
if (!anySubscriptions)
|
||||
{
|
||||
// No need to resubscribe anything
|
||||
@ -1047,11 +1173,16 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
}
|
||||
|
||||
bool anyAuthenticated;
|
||||
lock (_listenersLock)
|
||||
try
|
||||
{
|
||||
_listenersLock.EnterReadLock();
|
||||
anyAuthenticated = _listeners.OfType<Subscription>().Any(s => s.Authenticated)
|
||||
|| DedicatedRequestConnection.IsDedicatedRequestConnection && DedicatedRequestConnection.Authenticated;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_listenersLock.ExitReadLock();
|
||||
}
|
||||
|
||||
if (anyAuthenticated)
|
||||
{
|
||||
@ -1076,8 +1207,15 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
return new CallResult(new WebError("Socket not connected"));
|
||||
|
||||
List<Subscription> subList;
|
||||
lock (_listenersLock)
|
||||
try
|
||||
{
|
||||
_listenersLock.EnterReadLock();
|
||||
subList = _listeners.OfType<Subscription>().Where(x => x.Active).Skip(batch * batchSize).Take(batchSize).ToList();
|
||||
}
|
||||
finally
|
||||
{
|
||||
_listenersLock.ExitReadLock();
|
||||
}
|
||||
|
||||
if (subList.Count == 0)
|
||||
break;
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user