diff --git a/CryptoExchange.Net.UnitTests/AsyncResetEventTests.cs b/CryptoExchange.Net.UnitTests/AsyncResetEventTests.cs new file mode 100644 index 0000000..31285b2 --- /dev/null +++ b/CryptoExchange.Net.UnitTests/AsyncResetEventTests.cs @@ -0,0 +1,142 @@ +using CryptoExchange.Net.Objects; +using NUnit.Framework; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace CryptoExchange.Net.UnitTests +{ + [TestFixture()] + public class AsyncResetEventTests + { + [Test] + public async Task InitialFalseAndResetFalse_Should_BothCompleteAfterSingleSet() + { + var evnt = new AsyncResetEvent(false, false); + + var waiter1 = evnt.WaitAsync(); + var waiter2 = evnt.WaitAsync(); + + evnt.Set(); + + var result1 = await waiter1; + var result2 = await waiter2; + + Assert.True(result1); + Assert.True(result2); + } + + [Test] + public async Task InitialTrueAndResetFalse_Should_BothCompleteImmediately() + { + var evnt = new AsyncResetEvent(true, false); + + var waiter1 = evnt.WaitAsync(); + var waiter2 = evnt.WaitAsync(); + + var result1 = await waiter1; + var result2 = await waiter2; + + Assert.True(result1); + Assert.True(result2); + } + + [Test] + public async Task InitialFalseAndResetTrue_Should_CompleteEachAfterASet() + { + var evnt = new AsyncResetEvent(false, true); + + var waiter1 = evnt.WaitAsync(); + var waiter2 = evnt.WaitAsync(); + + evnt.Set(); + + var result1 = await waiter1; + + Assert.True(result1); + Assert.True(waiter2.Status != TaskStatus.RanToCompletion); + + evnt.Set(); + + var result2 = await waiter2; + + Assert.True(result2); + } + + [Test] + public async Task InitialTrueAndResetTrue_Should_CompleteFirstImmediatelyAndSecondAfterSet() + { + var evnt = new AsyncResetEvent(true, true); + + var waiter1 = evnt.WaitAsync(); + var waiter2 = evnt.WaitAsync(); + + var result1 = await waiter1; + + Assert.True(result1); + Assert.True(waiter2.Status != TaskStatus.RanToCompletion); + evnt.Set(); + + var result2 = await waiter2; + + Assert.True(result2); + } + + [Test] + public async Task Awaiting10TimesOnSameEvent_Should_AllCompleteAfter10Sets() + { + var evnt = new AsyncResetEvent(false, true); + + var waiters = new List>(); + for(var i = 0; i < 10; i++) + { + waiters.Add(evnt.WaitAsync()); + } + + List results = null; + var resultsWaiter = Task.Run(async () => + { + await Task.WhenAll(waiters); + results = waiters.Select(w => w.Result).ToList(); + }); + + for(var i = 1; i <= 10; i++) + { + evnt.Set(); + Assert.AreEqual(10 - i, waiters.Count(w => w.Status != TaskStatus.RanToCompletion)); + } + + await resultsWaiter; + + Assert.AreEqual(10, results.Count(r => r)); + } + + [Test] + public async Task WaitingShorterThanTimeout_Should_ReturnTrue() + { + var evnt = new AsyncResetEvent(false, true); + + var waiter1 = evnt.WaitAsync(TimeSpan.FromMilliseconds(100)); + await Task.Delay(50); + evnt.Set(); + + var result1 = await waiter1; + + Assert.True(result1); + } + + [Test] + public async Task WaitingLongerThanTimeout_Should_ReturnFalse() + { + var evnt = new AsyncResetEvent(false, true); + + var waiter1 = evnt.WaitAsync(TimeSpan.FromMilliseconds(100)); + + var result1 = await waiter1; + + Assert.False(result1); + } + } +} diff --git a/CryptoExchange.Net/CryptoExchange.Net.xml b/CryptoExchange.Net/CryptoExchange.Net.xml index 949dc94..b595b3a 100644 --- a/CryptoExchange.Net/CryptoExchange.Net.xml +++ b/CryptoExchange.Net/CryptoExchange.Net.xml @@ -1021,30 +1021,6 @@ - - - Wait one async - - - - - - - - - Wait one async - - - - - - - Wait one async - - - - - String to JToken @@ -1762,6 +1738,35 @@ The proxy login The proxy password + + + Async auto reset based on Stephen Toub`s implementation + https://devblogs.microsoft.com/pfxteam/building-async-coordination-primitives-part-2-asyncautoresetevent/ + + + + + New AsyncResetEvent + + + + + + + Wait for the AutoResetEvent to be set + + + + + + Signal a waiter + + + + + Dispose + + Comparer for byte order @@ -4012,148 +4017,5 @@ - - - Specifies that is allowed as an input even if the - corresponding type disallows it. - - - - - Initializes a new instance of the class. - - - - - Specifies that is disallowed as an input even if the - corresponding type allows it. - - - - - Initializes a new instance of the class. - - - - - Specifies that a method that will never return under any circumstance. - - - - - Initializes a new instance of the class. - - - - - Specifies that the method will not return if the associated - parameter is passed the specified value. - - - - - Gets the condition parameter value. - Code after the method is considered unreachable by diagnostics if the argument - to the associated parameter matches this value. - - - - - Initializes a new instance of the - class with the specified parameter value. - - - The condition parameter value. - Code after the method is considered unreachable by diagnostics if the argument - to the associated parameter matches this value. - - - - - Specifies that an output may be even if the - corresponding type disallows it. - - - - - Initializes a new instance of the class. - - - - - Specifies that when a method returns , - the parameter may be even if the corresponding type disallows it. - - - - - Gets the return value condition. - If the method returns this value, the associated parameter may be . - - - - - Initializes the attribute with the specified return value condition. - - - The return value condition. - If the method returns this value, the associated parameter may be . - - - - - Specifies that an output is not even if the - corresponding type allows it. - - - - - Initializes a new instance of the class. - - - - - Specifies that the output will be non- if the - named parameter is non-. - - - - - Gets the associated parameter name. - The output will be non- if the argument to the - parameter specified is non-. - - - - - Initializes the attribute with the associated parameter name. - - - The associated parameter name. - The output will be non- if the argument to the - parameter specified is non-. - - - - - Specifies that when a method returns , - the parameter will not be even if the corresponding type allows it. - - - - - Gets the return value condition. - If the method returns this value, the associated parameter will not be . - - - - - Initializes the attribute with the specified return value condition. - - - The return value condition. - If the method returns this value, the associated parameter will not be . - - diff --git a/CryptoExchange.Net/ExtensionMethods.cs b/CryptoExchange.Net/ExtensionMethods.cs index c015933..bba9ff4 100644 --- a/CryptoExchange.Net/ExtensionMethods.cs +++ b/CryptoExchange.Net/ExtensionMethods.cs @@ -191,59 +191,6 @@ namespace CryptoExchange.Net return secureString; } - /// - /// Wait one async - /// - /// - /// - /// - /// - public static async Task WaitOneAsync(this WaitHandle handle, int millisecondsTimeout, CancellationToken cancellationToken) - { - RegisteredWaitHandle? registeredHandle = null; - CancellationTokenRegistration tokenRegistration = default; - try - { - var tcs = new TaskCompletionSource(); - registeredHandle = ThreadPool.RegisterWaitForSingleObject( - handle, - (state, timedOut) => ((TaskCompletionSource)state).TrySetResult(!timedOut), - tcs, - millisecondsTimeout, - true); - tokenRegistration = cancellationToken.Register( - state => ((TaskCompletionSource)state).TrySetCanceled(), - tcs); - return await tcs.Task.ConfigureAwait(false); - } - finally - { - registeredHandle?.Unregister(null); - tokenRegistration.Dispose(); - } - } - - /// - /// Wait one async - /// - /// - /// - public static Task WaitOneAsync(this WaitHandle handle) - { - return handle.WaitOneAsync(-1, CancellationToken.None); - } - - /// - /// Wait one async - /// - /// - /// - /// - public static Task WaitOneAsync(this WaitHandle handle, TimeSpan timeout) - { - return handle.WaitOneAsync((int)timeout.TotalMilliseconds, CancellationToken.None); - } - /// /// String to JToken /// diff --git a/CryptoExchange.Net/Objects/AsyncAutoResetEvent.cs b/CryptoExchange.Net/Objects/AsyncAutoResetEvent.cs new file mode 100644 index 0000000..f0e5b76 --- /dev/null +++ b/CryptoExchange.Net/Objects/AsyncAutoResetEvent.cs @@ -0,0 +1,102 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Threading; +using System.Threading.Tasks; + +namespace CryptoExchange.Net.Objects +{ + /// + /// Async auto reset based on Stephen Toub`s implementation + /// https://devblogs.microsoft.com/pfxteam/building-async-coordination-primitives-part-2-asyncautoresetevent/ + /// + public class AsyncResetEvent : IDisposable + { + private readonly static Task _completed = Task.FromResult(true); + private readonly Queue> _waits = new Queue>(); + private bool _signaled; + private bool _reset; + + /// + /// New AsyncResetEvent + /// + /// + /// + public AsyncResetEvent(bool initialState = false, bool reset = true) + { + _signaled = initialState; + _reset = reset; + } + + /// + /// Wait for the AutoResetEvent to be set + /// + /// + public Task WaitAsync(TimeSpan? timeout = null) + { + lock (_waits) + { + if (_signaled) + { + if(_reset) + _signaled = false; + return _completed; + } + else + { + var tcs = new TaskCompletionSource(); + if(timeout != null) + { + var cancellationSource = new CancellationTokenSource(timeout.Value); + var registration = cancellationSource.Token.Register(() => + { + tcs.TrySetResult(false); + }, useSynchronizationContext: false); + } + + _waits.Enqueue(tcs); + return tcs.Task; + } + } + } + + /// + /// Signal a waiter + /// + public void Set() + { + lock (_waits) + { + if (!_reset) + { + // Act as ManualResetEvent. Once set keep it signaled and signal everyone who is waiting + _signaled = true; + while (_waits.Count > 0) + { + var toRelease = _waits.Dequeue(); + toRelease.TrySetResult(true); + } + } + else + { + // Act as AutoResetEvent. When set signal 1 waiter + if (_waits.Count > 0) + { + var toRelease = _waits.Dequeue(); + toRelease.TrySetResult(true); + } + else if (!_signaled) + _signaled = true; + } + } + } + + /// + /// Dispose + /// + public void Dispose() + { + _waits.Clear(); + } + } +} diff --git a/CryptoExchange.Net/SocketClient.cs b/CryptoExchange.Net/SocketClient.cs index c0f2a41..1d26a8d 100644 --- a/CryptoExchange.Net/SocketClient.cs +++ b/CryptoExchange.Net/SocketClient.cs @@ -75,7 +75,7 @@ namespace CryptoExchange.Net /// /// Wait event for the periodicTask /// - protected AutoResetEvent? periodicEvent; + protected AsyncResetEvent? periodicEvent; /// /// If client is disposing /// @@ -571,12 +571,12 @@ namespace CryptoExchange.Net if (objGetter == null) throw new ArgumentNullException(nameof(objGetter)); - periodicEvent = new AutoResetEvent(false); + periodicEvent = new AsyncResetEvent(); periodicTask = Task.Run(async () => { while (!disposing) { - await periodicEvent.WaitOneAsync(interval).ConfigureAwait(false); + await periodicEvent.WaitAsync(interval).ConfigureAwait(false); if (disposing) break; diff --git a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs index 293b97a..2754b7b 100644 --- a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs +++ b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs @@ -29,7 +29,7 @@ namespace CryptoExchange.Net.Sockets private Task? _sendTask; private Task? _receiveTask; private Task? _timeoutTask; - private readonly AutoResetEvent _sendEvent; + private readonly AsyncResetEvent _sendEvent; private readonly ConcurrentQueue _sendBuffer; private readonly IDictionary cookies; private readonly IDictionary headers; @@ -216,7 +216,7 @@ namespace CryptoExchange.Net.Sockets _outgoingMessages = new List(); _receivedMessages = new Dictionary(); - _sendEvent = new AutoResetEvent(false); + _sendEvent = new AsyncResetEvent(); _sendBuffer = new ConcurrentQueue(); _ctsSource = new CancellationTokenSource(); _receivedMessagesLock = new object(); @@ -385,7 +385,7 @@ namespace CryptoExchange.Net.Sockets if (_closing) break; - await _sendEvent.WaitOneAsync().ConfigureAwait(false); + await _sendEvent.WaitAsync().ConfigureAwait(false); if (_closing) break; diff --git a/CryptoExchange.Net/Sockets/SocketConnection.cs b/CryptoExchange.Net/Sockets/SocketConnection.cs index 0fdce68..8402edb 100644 --- a/CryptoExchange.Net/Sockets/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/SocketConnection.cs @@ -282,7 +282,7 @@ namespace CryptoExchange.Net.Sockets pendingRequests.Add(pending); } Send(obj); - return pending.Event.WaitOneAsync(timeout); + return pending.Event.WaitAsync(timeout); } /// @@ -525,7 +525,7 @@ namespace CryptoExchange.Net.Sockets { public Func Handler { get; } public JToken? Result { get; private set; } - public ManualResetEvent Event { get; } + public AsyncResetEvent Event { get; } public TimeSpan Timeout { get; } private readonly DateTime startTime; @@ -533,7 +533,7 @@ namespace CryptoExchange.Net.Sockets public PendingRequest(Func handler, TimeSpan timeout) { Handler = handler; - Event = new ManualResetEvent(false); + Event = new AsyncResetEvent(false, false); Timeout = timeout; startTime = DateTime.UtcNow; }