diff --git a/CryptoExchange.Net.UnitTests/AsyncResetEventTests.cs b/CryptoExchange.Net.UnitTests/AsyncResetEventTests.cs index 78bab0f..8fb13bc 100644 --- a/CryptoExchange.Net.UnitTests/AsyncResetEventTests.cs +++ b/CryptoExchange.Net.UnitTests/AsyncResetEventTests.cs @@ -4,6 +4,7 @@ using NUnit.Framework.Legacy; using System; using System.Collections.Generic; using System.Linq; +using System.Threading; using System.Threading.Tasks; namespace CryptoExchange.Net.UnitTests @@ -139,5 +140,17 @@ namespace CryptoExchange.Net.UnitTests ClassicAssert.False(result1); } + + [Test] + public async Task CancellingWait_Should_ReturnFalse() + { + var evnt = new AsyncResetEvent(false, true); + + var waiter1 = evnt.WaitAsync(ct: new CancellationTokenSource(50).Token); + + var result1 = await waiter1; + + ClassicAssert.False(result1); + } } } diff --git a/CryptoExchange.Net/Objects/AsyncAutoResetEvent.cs b/CryptoExchange.Net/Objects/AsyncAutoResetEvent.cs deleted file mode 100644 index e34a1a2..0000000 --- a/CryptoExchange.Net/Objects/AsyncAutoResetEvent.cs +++ /dev/null @@ -1,129 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; - -namespace CryptoExchange.Net.Objects -{ - /// - /// Async auto reset based on Stephen Toub`s implementation - /// https://devblogs.microsoft.com/pfxteam/building-async-coordination-primitives-part-2-asyncautoresetevent/ - /// - public class AsyncResetEvent : IDisposable - { - private static readonly Task _completed = Task.FromResult(true); - private Queue> _waits = new Queue>(); -#if NET9_0_OR_GREATER - private readonly Lock _waitsLock = new Lock(); -#else - private readonly object _waitsLock = new object(); -#endif - private bool _signaled; - private readonly bool _reset; - - /// - /// New AsyncResetEvent - /// - /// - /// - public AsyncResetEvent(bool initialState = false, bool reset = true) - { - _signaled = initialState; - _reset = reset; - } - - /// - /// Wait for the AutoResetEvent to be set - /// - /// - public async Task WaitAsync(TimeSpan? timeout = null, CancellationToken ct = default) - { - CancellationTokenRegistration registration = default; - try - { - Task waiter = _completed; - lock (_waitsLock) - { - if (_signaled) - { - if (_reset) - _signaled = false; - } - else if (!ct.IsCancellationRequested) - { - var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - if (timeout.HasValue) - { - var timeoutSource = new CancellationTokenSource(timeout.Value); - var cancellationSource = CancellationTokenSource.CreateLinkedTokenSource(timeoutSource.Token, ct); - ct = cancellationSource.Token; - } - - registration = ct.Register(() => - { - lock (_waitsLock) - { - tcs.TrySetResult(false); - - // Not the cleanest but it works - _waits = new Queue>(_waits.Where(i => i != tcs)); - } - }, useSynchronizationContext: false); - - - _waits.Enqueue(tcs); - waiter = tcs.Task; - } - } - - return await waiter.ConfigureAwait(false); - } - finally - { - registration.Dispose(); - } - } - - /// - /// Signal a waiter - /// - public void Set() - { - lock (_waitsLock) - { - if (!_reset) - { - // Act as ManualResetEvent. Once set keep it signaled and signal everyone who is waiting - _signaled = true; - while (_waits.Count > 0) - { - var toRelease = _waits.Dequeue(); - toRelease.TrySetResult(true); - } - } - else - { - // Act as AutoResetEvent. When set signal 1 waiter - if (_waits.Count > 0) - { - var toRelease = _waits.Dequeue(); - toRelease.TrySetResult(true); - } - else if (!_signaled) - { - _signaled = true; - } - } - } - } - - /// - /// Dispose - /// - public void Dispose() - { - _waits.Clear(); - } - } -} diff --git a/CryptoExchange.Net/Objects/AsyncResetEvent.cs b/CryptoExchange.Net/Objects/AsyncResetEvent.cs new file mode 100644 index 0000000..7d0b70b --- /dev/null +++ b/CryptoExchange.Net/Objects/AsyncResetEvent.cs @@ -0,0 +1,129 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; + +namespace CryptoExchange.Net.Objects +{ + /// + /// Async auto/manual reset event implementation + /// + public class AsyncResetEvent + { + private readonly Queue> _waiters = new(); + private readonly bool _autoReset; + private bool _signaled; +#if NET9_0_OR_GREATER + private readonly Lock _waitersLock = new Lock(); +#else + private readonly object _waitersLock = new object(); +#endif + + /// + /// ctor + /// + public AsyncResetEvent(bool initialState = false, bool autoReset = true) + { + _signaled = initialState; + _autoReset = autoReset; + } + + /// + /// Wait for the set event + /// + /// + public async Task WaitAsync( + TimeSpan? timeout = null, + CancellationToken ct = default) + { + TaskCompletionSource tcs; + + lock (_waitersLock) + { + if (_signaled) + { + // Already was signaled, can return immediately + if (_autoReset) + _signaled = false; + + return true; + } + + tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + _waiters.Enqueue(tcs); + } + + if (timeout.HasValue || ct.CanBeCanceled) + { + // Wait for either timeout, cancellation token or set result + var delayTask = Task.Delay(timeout ?? Timeout.InfiniteTimeSpan, ct); + var completedTask = await Task.WhenAny(tcs.Task, delayTask).ConfigureAwait(false); + + if (completedTask != tcs.Task) + { + // This was a timeout or cancellation, need to remove tcs from waiters + // if the tcs was set instead it will be removed in the Set method + + if (tcs.TrySetResult(false)) + { + lock (_waitersLock) + { + // Dequeue and put in the back of the queue again except for the one we need to remove + int count = _waiters.Count; + for (int i = 0; i < count; i++) + { + var w = _waiters.Dequeue(); + if (w != tcs) + _waiters.Enqueue(w); + } + } + } + + return false; + } + } + else + { + await tcs.Task.ConfigureAwait(false); + } + + return true; + } + + /// + /// Signal a waiter + /// + public void Set() + { + lock (_waitersLock) + { + if (_autoReset) + { + while (_waiters.Count > 0) + { + // Try to dequeue and set the result + // If result setting was not successful it means timeout/cancellation happened at the same time + // If this is the case this Set isn't the one setting the result and we need to continue + var w = _waiters.Dequeue(); + if (w.TrySetResult(true)) + return; + } + + // No queued waiters, set signaled for next waiter + _signaled = true; + } + else + { + _signaled = true; + + // Signal all current waiters + while (_waiters.Count > 0) + { + var w = _waiters.Dequeue(); + w.TrySetResult(true); + } + } + } + } + } +} diff --git a/CryptoExchange.Net/Sockets/Default/CryptoExchangeWebSocketClient.cs b/CryptoExchange.Net/Sockets/Default/CryptoExchangeWebSocketClient.cs index b8713c7..c0de716 100644 --- a/CryptoExchange.Net/Sockets/Default/CryptoExchangeWebSocketClient.cs +++ b/CryptoExchange.Net/Sockets/Default/CryptoExchangeWebSocketClient.cs @@ -486,7 +486,6 @@ namespace CryptoExchange.Net.Sockets.Default _disposed = true; _socket.Dispose(); _ctsSource?.Dispose(); - _sendEvent.Dispose(); _logger.SocketDisposed(Id); } diff --git a/CryptoExchange.Net/Sockets/Default/SocketConnection.cs b/CryptoExchange.Net/Sockets/Default/SocketConnection.cs index 5255762..1f67f89 100644 --- a/CryptoExchange.Net/Sockets/Default/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/Default/SocketConnection.cs @@ -792,7 +792,6 @@ namespace CryptoExchange.Net.Sockets.Default { Status = SocketStatus.Disposed; periodicEvent?.Set(); - periodicEvent?.Dispose(); _socket.Dispose(); } diff --git a/CryptoExchange.Net/Sockets/HighPerf/HighPerfSocketConnection.cs b/CryptoExchange.Net/Sockets/HighPerf/HighPerfSocketConnection.cs index c452670..af1644d 100644 --- a/CryptoExchange.Net/Sockets/HighPerf/HighPerfSocketConnection.cs +++ b/CryptoExchange.Net/Sockets/HighPerf/HighPerfSocketConnection.cs @@ -233,7 +233,6 @@ namespace CryptoExchange.Net.Sockets.HighPerf { Status = SocketStatus.Disposed; periodicEvent?.Set(); - periodicEvent?.Dispose(); _socket.Dispose(); }