1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2026-02-16 14:13:46 +00:00
This commit is contained in:
Jkorf 2026-02-03 08:47:26 +01:00
commit 2fd3912795
6 changed files with 142 additions and 132 deletions

View File

@ -4,6 +4,7 @@ using NUnit.Framework.Legacy;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace CryptoExchange.Net.UnitTests namespace CryptoExchange.Net.UnitTests
@ -139,5 +140,17 @@ namespace CryptoExchange.Net.UnitTests
ClassicAssert.False(result1); ClassicAssert.False(result1);
} }
[Test]
public async Task CancellingWait_Should_ReturnFalse()
{
var evnt = new AsyncResetEvent(false, true);
var waiter1 = evnt.WaitAsync(ct: new CancellationTokenSource(50).Token);
var result1 = await waiter1;
ClassicAssert.False(result1);
}
} }
} }

View File

@ -1,129 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Objects
{
/// <summary>
/// Async auto reset based on Stephen Toub`s implementation
/// https://devblogs.microsoft.com/pfxteam/building-async-coordination-primitives-part-2-asyncautoresetevent/
/// </summary>
public class AsyncResetEvent : IDisposable
{
private static readonly Task<bool> _completed = Task.FromResult(true);
private Queue<TaskCompletionSource<bool>> _waits = new Queue<TaskCompletionSource<bool>>();
#if NET9_0_OR_GREATER
private readonly Lock _waitsLock = new Lock();
#else
private readonly object _waitsLock = new object();
#endif
private bool _signaled;
private readonly bool _reset;
/// <summary>
/// New AsyncResetEvent
/// </summary>
/// <param name="initialState"></param>
/// <param name="reset"></param>
public AsyncResetEvent(bool initialState = false, bool reset = true)
{
_signaled = initialState;
_reset = reset;
}
/// <summary>
/// Wait for the AutoResetEvent to be set
/// </summary>
/// <returns></returns>
public async Task<bool> WaitAsync(TimeSpan? timeout = null, CancellationToken ct = default)
{
CancellationTokenRegistration registration = default;
try
{
Task<bool> waiter = _completed;
lock (_waitsLock)
{
if (_signaled)
{
if (_reset)
_signaled = false;
}
else if (!ct.IsCancellationRequested)
{
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
if (timeout.HasValue)
{
var timeoutSource = new CancellationTokenSource(timeout.Value);
var cancellationSource = CancellationTokenSource.CreateLinkedTokenSource(timeoutSource.Token, ct);
ct = cancellationSource.Token;
}
registration = ct.Register(() =>
{
lock (_waitsLock)
{
tcs.TrySetResult(false);
// Not the cleanest but it works
_waits = new Queue<TaskCompletionSource<bool>>(_waits.Where(i => i != tcs));
}
}, useSynchronizationContext: false);
_waits.Enqueue(tcs);
waiter = tcs.Task;
}
}
return await waiter.ConfigureAwait(false);
}
finally
{
registration.Dispose();
}
}
/// <summary>
/// Signal a waiter
/// </summary>
public void Set()
{
lock (_waitsLock)
{
if (!_reset)
{
// Act as ManualResetEvent. Once set keep it signaled and signal everyone who is waiting
_signaled = true;
while (_waits.Count > 0)
{
var toRelease = _waits.Dequeue();
toRelease.TrySetResult(true);
}
}
else
{
// Act as AutoResetEvent. When set signal 1 waiter
if (_waits.Count > 0)
{
var toRelease = _waits.Dequeue();
toRelease.TrySetResult(true);
}
else if (!_signaled)
{
_signaled = true;
}
}
}
}
/// <summary>
/// Dispose
/// </summary>
public void Dispose()
{
_waits.Clear();
}
}
}

View File

@ -0,0 +1,129 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Objects
{
/// <summary>
/// Async auto/manual reset event implementation
/// </summary>
public class AsyncResetEvent
{
private readonly Queue<TaskCompletionSource<bool>> _waiters = new();
private readonly bool _autoReset;
private bool _signaled;
#if NET9_0_OR_GREATER
private readonly Lock _waitersLock = new Lock();
#else
private readonly object _waitersLock = new object();
#endif
/// <summary>
/// ctor
/// </summary>
public AsyncResetEvent(bool initialState = false, bool autoReset = true)
{
_signaled = initialState;
_autoReset = autoReset;
}
/// <summary>
/// Wait for the set event
/// </summary>
/// <returns></returns>
public async Task<bool> WaitAsync(
TimeSpan? timeout = null,
CancellationToken ct = default)
{
TaskCompletionSource<bool> tcs;
lock (_waitersLock)
{
if (_signaled)
{
// Already was signaled, can return immediately
if (_autoReset)
_signaled = false;
return true;
}
tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
_waiters.Enqueue(tcs);
}
if (timeout.HasValue || ct.CanBeCanceled)
{
// Wait for either timeout, cancellation token or set result
var delayTask = Task.Delay(timeout ?? Timeout.InfiniteTimeSpan, ct);
var completedTask = await Task.WhenAny(tcs.Task, delayTask).ConfigureAwait(false);
if (completedTask != tcs.Task)
{
// This was a timeout or cancellation, need to remove tcs from waiters
// if the tcs was set instead it will be removed in the Set method
if (tcs.TrySetResult(false))
{
lock (_waitersLock)
{
// Dequeue and put in the back of the queue again except for the one we need to remove
int count = _waiters.Count;
for (int i = 0; i < count; i++)
{
var w = _waiters.Dequeue();
if (w != tcs)
_waiters.Enqueue(w);
}
}
}
return false;
}
}
else
{
await tcs.Task.ConfigureAwait(false);
}
return true;
}
/// <summary>
/// Signal a waiter
/// </summary>
public void Set()
{
lock (_waitersLock)
{
if (_autoReset)
{
while (_waiters.Count > 0)
{
// Try to dequeue and set the result
// If result setting was not successful it means timeout/cancellation happened at the same time
// If this is the case this Set isn't the one setting the result and we need to continue
var w = _waiters.Dequeue();
if (w.TrySetResult(true))
return;
}
// No queued waiters, set signaled for next waiter
_signaled = true;
}
else
{
_signaled = true;
// Signal all current waiters
while (_waiters.Count > 0)
{
var w = _waiters.Dequeue();
w.TrySetResult(true);
}
}
}
}
}
}

View File

@ -487,7 +487,6 @@ namespace CryptoExchange.Net.Sockets.Default
_disposed = true; _disposed = true;
_socket.Dispose(); _socket.Dispose();
_ctsSource?.Dispose(); _ctsSource?.Dispose();
_sendEvent.Dispose();
_logger.SocketDisposed(Id); _logger.SocketDisposed(Id);
} }

View File

@ -787,7 +787,6 @@ namespace CryptoExchange.Net.Sockets.Default
{ {
Status = SocketStatus.Disposed; Status = SocketStatus.Disposed;
periodicEvent?.Set(); periodicEvent?.Set();
periodicEvent?.Dispose();
_socket.Dispose(); _socket.Dispose();
} }

View File

@ -233,7 +233,6 @@ namespace CryptoExchange.Net.Sockets.HighPerf
{ {
Status = SocketStatus.Disposed; Status = SocketStatus.Disposed;
periodicEvent?.Set(); periodicEvent?.Set();
periodicEvent?.Dispose();
_socket.Dispose(); _socket.Dispose();
} }