1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-06-07 16:06:15 +00:00

Added custom async wait event implementation as the previously used method seemed to not work 100% of the time

This commit is contained in:
Jkorf 2021-10-05 14:31:27 +02:00
parent 4ef20a9637
commit f3102a4dad
7 changed files with 282 additions and 229 deletions

View File

@ -0,0 +1,142 @@
using CryptoExchange.Net.Objects;
using NUnit.Framework;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace CryptoExchange.Net.UnitTests
{
[TestFixture()]
public class AsyncResetEventTests
{
[Test]
public async Task InitialFalseAndResetFalse_Should_BothCompleteAfterSingleSet()
{
var evnt = new AsyncResetEvent(false, false);
var waiter1 = evnt.WaitAsync();
var waiter2 = evnt.WaitAsync();
evnt.Set();
var result1 = await waiter1;
var result2 = await waiter2;
Assert.True(result1);
Assert.True(result2);
}
[Test]
public async Task InitialTrueAndResetFalse_Should_BothCompleteImmediately()
{
var evnt = new AsyncResetEvent(true, false);
var waiter1 = evnt.WaitAsync();
var waiter2 = evnt.WaitAsync();
var result1 = await waiter1;
var result2 = await waiter2;
Assert.True(result1);
Assert.True(result2);
}
[Test]
public async Task InitialFalseAndResetTrue_Should_CompleteEachAfterASet()
{
var evnt = new AsyncResetEvent(false, true);
var waiter1 = evnt.WaitAsync();
var waiter2 = evnt.WaitAsync();
evnt.Set();
var result1 = await waiter1;
Assert.True(result1);
Assert.True(waiter2.Status != TaskStatus.RanToCompletion);
evnt.Set();
var result2 = await waiter2;
Assert.True(result2);
}
[Test]
public async Task InitialTrueAndResetTrue_Should_CompleteFirstImmediatelyAndSecondAfterSet()
{
var evnt = new AsyncResetEvent(true, true);
var waiter1 = evnt.WaitAsync();
var waiter2 = evnt.WaitAsync();
var result1 = await waiter1;
Assert.True(result1);
Assert.True(waiter2.Status != TaskStatus.RanToCompletion);
evnt.Set();
var result2 = await waiter2;
Assert.True(result2);
}
[Test]
public async Task Awaiting10TimesOnSameEvent_Should_AllCompleteAfter10Sets()
{
var evnt = new AsyncResetEvent(false, true);
var waiters = new List<Task<bool>>();
for(var i = 0; i < 10; i++)
{
waiters.Add(evnt.WaitAsync());
}
List<bool> results = null;
var resultsWaiter = Task.Run(async () =>
{
await Task.WhenAll(waiters);
results = waiters.Select(w => w.Result).ToList();
});
for(var i = 1; i <= 10; i++)
{
evnt.Set();
Assert.AreEqual(10 - i, waiters.Count(w => w.Status != TaskStatus.RanToCompletion));
}
await resultsWaiter;
Assert.AreEqual(10, results.Count(r => r));
}
[Test]
public async Task WaitingShorterThanTimeout_Should_ReturnTrue()
{
var evnt = new AsyncResetEvent(false, true);
var waiter1 = evnt.WaitAsync(TimeSpan.FromMilliseconds(100));
await Task.Delay(50);
evnt.Set();
var result1 = await waiter1;
Assert.True(result1);
}
[Test]
public async Task WaitingLongerThanTimeout_Should_ReturnFalse()
{
var evnt = new AsyncResetEvent(false, true);
var waiter1 = evnt.WaitAsync(TimeSpan.FromMilliseconds(100));
var result1 = await waiter1;
Assert.False(result1);
}
}
}

View File

@ -1021,30 +1021,6 @@
<param name="source"></param>
<returns></returns>
</member>
<member name="M:CryptoExchange.Net.ExtensionMethods.WaitOneAsync(System.Threading.WaitHandle,System.Int32,System.Threading.CancellationToken)">
<summary>
Wait one async
</summary>
<param name="handle"></param>
<param name="millisecondsTimeout"></param>
<param name="cancellationToken"></param>
<returns></returns>
</member>
<member name="M:CryptoExchange.Net.ExtensionMethods.WaitOneAsync(System.Threading.WaitHandle)">
<summary>
Wait one async
</summary>
<param name="handle"></param>
<returns></returns>
</member>
<member name="M:CryptoExchange.Net.ExtensionMethods.WaitOneAsync(System.Threading.WaitHandle,System.TimeSpan)">
<summary>
Wait one async
</summary>
<param name="handle"></param>
<param name="timeout"></param>
<returns></returns>
</member>
<member name="M:CryptoExchange.Net.ExtensionMethods.ToJToken(System.String,CryptoExchange.Net.Logging.Log)">
<summary>
String to JToken
@ -1762,6 +1738,35 @@
<param name="login">The proxy login</param>
<param name="password">The proxy password</param>
</member>
<member name="T:CryptoExchange.Net.Objects.AsyncResetEvent">
<summary>
Async auto reset based on Stephen Toub`s implementation
https://devblogs.microsoft.com/pfxteam/building-async-coordination-primitives-part-2-asyncautoresetevent/
</summary>
</member>
<member name="M:CryptoExchange.Net.Objects.AsyncResetEvent.#ctor(System.Boolean,System.Boolean)">
<summary>
New AsyncResetEvent
</summary>
<param name="initialState"></param>
<param name="reset"></param>
</member>
<member name="M:CryptoExchange.Net.Objects.AsyncResetEvent.WaitAsync(System.Nullable{System.TimeSpan})">
<summary>
Wait for the AutoResetEvent to be set
</summary>
<returns></returns>
</member>
<member name="M:CryptoExchange.Net.Objects.AsyncResetEvent.Set">
<summary>
Signal a waiter
</summary>
</member>
<member name="M:CryptoExchange.Net.Objects.AsyncResetEvent.Dispose">
<summary>
Dispose
</summary>
</member>
<member name="T:CryptoExchange.Net.Objects.ByteOrderComparer">
<summary>
Comparer for byte order
@ -4012,148 +4017,5 @@
<member name="M:CryptoExchange.Net.Sockets.WebsocketFactory.CreateWebsocket(CryptoExchange.Net.Logging.Log,System.String,System.Collections.Generic.IDictionary{System.String,System.String},System.Collections.Generic.IDictionary{System.String,System.String})">
<inheritdoc />
</member>
<member name="T:System.Diagnostics.CodeAnalysis.AllowNullAttribute">
<summary>
Specifies that <see langword="null"/> is allowed as an input even if the
corresponding type disallows it.
</summary>
</member>
<member name="M:System.Diagnostics.CodeAnalysis.AllowNullAttribute.#ctor">
<summary>
Initializes a new instance of the <see cref="T:System.Diagnostics.CodeAnalysis.AllowNullAttribute"/> class.
</summary>
</member>
<member name="T:System.Diagnostics.CodeAnalysis.DisallowNullAttribute">
<summary>
Specifies that <see langword="null"/> is disallowed as an input even if the
corresponding type allows it.
</summary>
</member>
<member name="M:System.Diagnostics.CodeAnalysis.DisallowNullAttribute.#ctor">
<summary>
Initializes a new instance of the <see cref="T:System.Diagnostics.CodeAnalysis.DisallowNullAttribute"/> class.
</summary>
</member>
<member name="T:System.Diagnostics.CodeAnalysis.DoesNotReturnAttribute">
<summary>
Specifies that a method that will never return under any circumstance.
</summary>
</member>
<member name="M:System.Diagnostics.CodeAnalysis.DoesNotReturnAttribute.#ctor">
<summary>
Initializes a new instance of the <see cref="T:System.Diagnostics.CodeAnalysis.DoesNotReturnAttribute"/> class.
</summary>
</member>
<member name="T:System.Diagnostics.CodeAnalysis.DoesNotReturnIfAttribute">
<summary>
Specifies that the method will not return if the associated <see cref="T:System.Boolean"/>
parameter is passed the specified value.
</summary>
</member>
<member name="P:System.Diagnostics.CodeAnalysis.DoesNotReturnIfAttribute.ParameterValue">
<summary>
Gets the condition parameter value.
Code after the method is considered unreachable by diagnostics if the argument
to the associated parameter matches this value.
</summary>
</member>
<member name="M:System.Diagnostics.CodeAnalysis.DoesNotReturnIfAttribute.#ctor(System.Boolean)">
<summary>
Initializes a new instance of the <see cref="T:System.Diagnostics.CodeAnalysis.DoesNotReturnIfAttribute"/>
class with the specified parameter value.
</summary>
<param name="parameterValue">
The condition parameter value.
Code after the method is considered unreachable by diagnostics if the argument
to the associated parameter matches this value.
</param>
</member>
<member name="T:System.Diagnostics.CodeAnalysis.MaybeNullAttribute">
<summary>
Specifies that an output may be <see langword="null"/> even if the
corresponding type disallows it.
</summary>
</member>
<member name="M:System.Diagnostics.CodeAnalysis.MaybeNullAttribute.#ctor">
<summary>
Initializes a new instance of the <see cref="T:System.Diagnostics.CodeAnalysis.MaybeNullAttribute"/> class.
</summary>
</member>
<member name="T:System.Diagnostics.CodeAnalysis.MaybeNullWhenAttribute">
<summary>
Specifies that when a method returns <see cref="P:System.Diagnostics.CodeAnalysis.MaybeNullWhenAttribute.ReturnValue"/>,
the parameter may be <see langword="null"/> even if the corresponding type disallows it.
</summary>
</member>
<member name="P:System.Diagnostics.CodeAnalysis.MaybeNullWhenAttribute.ReturnValue">
<summary>
Gets the return value condition.
If the method returns this value, the associated parameter may be <see langword="null"/>.
</summary>
</member>
<member name="M:System.Diagnostics.CodeAnalysis.MaybeNullWhenAttribute.#ctor(System.Boolean)">
<summary>
Initializes the attribute with the specified return value condition.
</summary>
<param name="returnValue">
The return value condition.
If the method returns this value, the associated parameter may be <see langword="null"/>.
</param>
</member>
<member name="T:System.Diagnostics.CodeAnalysis.NotNullAttribute">
<summary>
Specifies that an output is not <see langword="null"/> even if the
corresponding type allows it.
</summary>
</member>
<member name="M:System.Diagnostics.CodeAnalysis.NotNullAttribute.#ctor">
<summary>
Initializes a new instance of the <see cref="T:System.Diagnostics.CodeAnalysis.NotNullAttribute"/> class.
</summary>
</member>
<member name="T:System.Diagnostics.CodeAnalysis.NotNullIfNotNullAttribute">
<summary>
Specifies that the output will be non-<see langword="null"/> if the
named parameter is non-<see langword="null"/>.
</summary>
</member>
<member name="P:System.Diagnostics.CodeAnalysis.NotNullIfNotNullAttribute.ParameterName">
<summary>
Gets the associated parameter name.
The output will be non-<see langword="null"/> if the argument to the
parameter specified is non-<see langword="null"/>.
</summary>
</member>
<member name="M:System.Diagnostics.CodeAnalysis.NotNullIfNotNullAttribute.#ctor(System.String)">
<summary>
Initializes the attribute with the associated parameter name.
</summary>
<param name="parameterName">
The associated parameter name.
The output will be non-<see langword="null"/> if the argument to the
parameter specified is non-<see langword="null"/>.
</param>
</member>
<member name="T:System.Diagnostics.CodeAnalysis.NotNullWhenAttribute">
<summary>
Specifies that when a method returns <see cref="P:System.Diagnostics.CodeAnalysis.NotNullWhenAttribute.ReturnValue"/>,
the parameter will not be <see langword="null"/> even if the corresponding type allows it.
</summary>
</member>
<member name="P:System.Diagnostics.CodeAnalysis.NotNullWhenAttribute.ReturnValue">
<summary>
Gets the return value condition.
If the method returns this value, the associated parameter will not be <see langword="null"/>.
</summary>
</member>
<member name="M:System.Diagnostics.CodeAnalysis.NotNullWhenAttribute.#ctor(System.Boolean)">
<summary>
Initializes the attribute with the specified return value condition.
</summary>
<param name="returnValue">
The return value condition.
If the method returns this value, the associated parameter will not be <see langword="null"/>.
</param>
</member>
</members>
</doc>

View File

@ -191,59 +191,6 @@ namespace CryptoExchange.Net
return secureString;
}
/// <summary>
/// Wait one async
/// </summary>
/// <param name="handle"></param>
/// <param name="millisecondsTimeout"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public static async Task<bool> WaitOneAsync(this WaitHandle handle, int millisecondsTimeout, CancellationToken cancellationToken)
{
RegisteredWaitHandle? registeredHandle = null;
CancellationTokenRegistration tokenRegistration = default;
try
{
var tcs = new TaskCompletionSource<bool>();
registeredHandle = ThreadPool.RegisterWaitForSingleObject(
handle,
(state, timedOut) => ((TaskCompletionSource<bool>)state).TrySetResult(!timedOut),
tcs,
millisecondsTimeout,
true);
tokenRegistration = cancellationToken.Register(
state => ((TaskCompletionSource<bool>)state).TrySetCanceled(),
tcs);
return await tcs.Task.ConfigureAwait(false);
}
finally
{
registeredHandle?.Unregister(null);
tokenRegistration.Dispose();
}
}
/// <summary>
/// Wait one async
/// </summary>
/// <param name="handle"></param>
/// <returns></returns>
public static Task<bool> WaitOneAsync(this WaitHandle handle)
{
return handle.WaitOneAsync(-1, CancellationToken.None);
}
/// <summary>
/// Wait one async
/// </summary>
/// <param name="handle"></param>
/// <param name="timeout"></param>
/// <returns></returns>
public static Task<bool> WaitOneAsync(this WaitHandle handle, TimeSpan timeout)
{
return handle.WaitOneAsync((int)timeout.TotalMilliseconds, CancellationToken.None);
}
/// <summary>
/// String to JToken
/// </summary>

View File

@ -0,0 +1,102 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Objects
{
/// <summary>
/// Async auto reset based on Stephen Toub`s implementation
/// https://devblogs.microsoft.com/pfxteam/building-async-coordination-primitives-part-2-asyncautoresetevent/
/// </summary>
public class AsyncResetEvent : IDisposable
{
private readonly static Task<bool> _completed = Task.FromResult(true);
private readonly Queue<TaskCompletionSource<bool>> _waits = new Queue<TaskCompletionSource<bool>>();
private bool _signaled;
private bool _reset;
/// <summary>
/// New AsyncResetEvent
/// </summary>
/// <param name="initialState"></param>
/// <param name="reset"></param>
public AsyncResetEvent(bool initialState = false, bool reset = true)
{
_signaled = initialState;
_reset = reset;
}
/// <summary>
/// Wait for the AutoResetEvent to be set
/// </summary>
/// <returns></returns>
public Task<bool> WaitAsync(TimeSpan? timeout = null)
{
lock (_waits)
{
if (_signaled)
{
if(_reset)
_signaled = false;
return _completed;
}
else
{
var tcs = new TaskCompletionSource<bool>();
if(timeout != null)
{
var cancellationSource = new CancellationTokenSource(timeout.Value);
var registration = cancellationSource.Token.Register(() =>
{
tcs.TrySetResult(false);
}, useSynchronizationContext: false);
}
_waits.Enqueue(tcs);
return tcs.Task;
}
}
}
/// <summary>
/// Signal a waiter
/// </summary>
public void Set()
{
lock (_waits)
{
if (!_reset)
{
// Act as ManualResetEvent. Once set keep it signaled and signal everyone who is waiting
_signaled = true;
while (_waits.Count > 0)
{
var toRelease = _waits.Dequeue();
toRelease.TrySetResult(true);
}
}
else
{
// Act as AutoResetEvent. When set signal 1 waiter
if (_waits.Count > 0)
{
var toRelease = _waits.Dequeue();
toRelease.TrySetResult(true);
}
else if (!_signaled)
_signaled = true;
}
}
}
/// <summary>
/// Dispose
/// </summary>
public void Dispose()
{
_waits.Clear();
}
}
}

View File

@ -75,7 +75,7 @@ namespace CryptoExchange.Net
/// <summary>
/// Wait event for the periodicTask
/// </summary>
protected AutoResetEvent? periodicEvent;
protected AsyncResetEvent? periodicEvent;
/// <summary>
/// If client is disposing
/// </summary>
@ -571,12 +571,12 @@ namespace CryptoExchange.Net
if (objGetter == null)
throw new ArgumentNullException(nameof(objGetter));
periodicEvent = new AutoResetEvent(false);
periodicEvent = new AsyncResetEvent();
periodicTask = Task.Run(async () =>
{
while (!disposing)
{
await periodicEvent.WaitOneAsync(interval).ConfigureAwait(false);
await periodicEvent.WaitAsync(interval).ConfigureAwait(false);
if (disposing)
break;

View File

@ -29,7 +29,7 @@ namespace CryptoExchange.Net.Sockets
private Task? _sendTask;
private Task? _receiveTask;
private Task? _timeoutTask;
private readonly AutoResetEvent _sendEvent;
private readonly AsyncResetEvent _sendEvent;
private readonly ConcurrentQueue<byte[]> _sendBuffer;
private readonly IDictionary<string, string> cookies;
private readonly IDictionary<string, string> headers;
@ -216,7 +216,7 @@ namespace CryptoExchange.Net.Sockets
_outgoingMessages = new List<DateTime>();
_receivedMessages = new Dictionary<DateTime, int>();
_sendEvent = new AutoResetEvent(false);
_sendEvent = new AsyncResetEvent();
_sendBuffer = new ConcurrentQueue<byte[]>();
_ctsSource = new CancellationTokenSource();
_receivedMessagesLock = new object();
@ -385,7 +385,7 @@ namespace CryptoExchange.Net.Sockets
if (_closing)
break;
await _sendEvent.WaitOneAsync().ConfigureAwait(false);
await _sendEvent.WaitAsync().ConfigureAwait(false);
if (_closing)
break;

View File

@ -282,7 +282,7 @@ namespace CryptoExchange.Net.Sockets
pendingRequests.Add(pending);
}
Send(obj);
return pending.Event.WaitOneAsync(timeout);
return pending.Event.WaitAsync(timeout);
}
/// <summary>
@ -525,7 +525,7 @@ namespace CryptoExchange.Net.Sockets
{
public Func<JToken, bool> Handler { get; }
public JToken? Result { get; private set; }
public ManualResetEvent Event { get; }
public AsyncResetEvent Event { get; }
public TimeSpan Timeout { get; }
private readonly DateTime startTime;
@ -533,7 +533,7 @@ namespace CryptoExchange.Net.Sockets
public PendingRequest(Func<JToken, bool> handler, TimeSpan timeout)
{
Handler = handler;
Event = new ManualResetEvent(false);
Event = new AsyncResetEvent(false, false);
Timeout = timeout;
startTime = DateTime.UtcNow;
}