1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2026-04-07 10:11:10 +00:00

Added implementation for async processing of (websocket) updates

This commit is contained in:
Jkorf 2025-11-12 11:19:40 +01:00
parent 4be986ebe7
commit 84b0444caf
5 changed files with 233 additions and 16 deletions

View File

@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk"> <Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup> <PropertyGroup>
<TargetFrameworks>netstandard2.0;netstandard2.1;net8.0;net9.0</TargetFrameworks> <TargetFrameworks>netstandard2.0;netstandard2.1;net8.0;net9.0</TargetFrameworks>
</PropertyGroup> </PropertyGroup>
@ -57,5 +57,6 @@
<ItemGroup Label="Transitive Client Packages"> <ItemGroup Label="Transitive Client Packages">
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" Version="9.0.10" /> <PackageReference Include="Microsoft.Extensions.Configuration.Binder" Version="9.0.10" />
<PackageReference Include="Microsoft.Extensions.Http" Version="9.0.10" /> <PackageReference Include="Microsoft.Extensions.Http" Version="9.0.10" />
<PackageReference Include="System.Threading.Channels" Version="9.0.10" />
</ItemGroup> </ItemGroup>
</Project> </Project>

View File

@ -1,5 +1,7 @@
using CryptoExchange.Net.Objects; using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Sockets;
using CryptoExchange.Net.SharedApis; using CryptoExchange.Net.SharedApis;
using CryptoExchange.Net.Sockets;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Globalization; using System.Globalization;
@ -341,6 +343,78 @@ namespace CryptoExchange.Net
} }
/// <summary>
/// Queue updates received from a websocket subscriptions and process them async
/// </summary>
/// <typeparam name="T">The queued update type</typeparam>
/// <param name="subscribeCall">The subscribe call</param>
/// <param name="asyncHandler">The async update handler</param>
/// <param name="maxQueuedItems">The max number of updates to be queued up. When happens when the queue is full and a new write is attempted can be specified with <see>fullMode</see></param>
/// <param name="fullBehavior">What should happen if the queue contains <see>maxQueuedItems</see> pending updates. If no max is set this setting is ignored</param>
public static async Task<CallResult<UpdateSubscription>> ProcessQueuedAsync<T>(
Func<Action<DataEvent<T>>, Task<CallResult<UpdateSubscription>>> subscribeCall,
Func<DataEvent<T>, Task> asyncHandler,
int? maxQueuedItems = null,
QueueFullBehavior? fullBehavior = null)
{
var processor = new ProcessQueue<DataEvent<T>>(asyncHandler, maxQueuedItems, fullBehavior);
await processor.StartAsync().ConfigureAwait(false);
var result = await subscribeCall(upd => processor.Write(upd)).ConfigureAwait(false);
if (!result)
{
await processor.StopAsync().ConfigureAwait(false);
return result;
}
processor.Exception += result.Data._subscription.InvokeExceptionHandler;
result.Data.SubscriptionStatusChanged += (upd) =>
{
if (upd == CryptoExchange.Net.Objects.SubscriptionStatus.Closed)
_ = processor.StopAsync(true);
};
return result;
}
/// <summary>
/// Queue updates received from a websocket subscriptions and process them async
/// </summary>
/// <typeparam name="TEventType">The type of the queued item</typeparam>
/// <typeparam name="TOutputType">The type of the item to pass to the processor</typeparam>
/// <param name="subscribeCall">The subscribe call</param>
/// <param name="mapper">The mapper function to go from <see>TEventType</see> to <see>TOutputType</see></param>
/// <param name="asyncHandler">The async update handler</param>
/// <param name="maxQueuedItems">The max number of updates to be queued up. When happens when the queue is full and a new write is attempted can be specified with <see>fullMode</see></param>
/// <param name="fullBehavior">What should happen if the queue contains <see>maxQueuedItems</see> pending updates. If no max is set this setting is ignored</param>
public static async Task<CallResult<UpdateSubscription>> ProcessQueuedAsync<TEventType, TOutputType>(
Func<ProcessQueue<DataEvent<TEventType>>, Task<CallResult<UpdateSubscription>>> subscribeCall,
Func<DataEvent<TEventType>, DataEvent<TOutputType>> mapper,
Func<DataEvent<TOutputType>, Task> asyncHandler,
int? maxQueuedItems = null,
QueueFullBehavior? fullBehavior = null
)
{
var processor = new ProcessQueue<DataEvent<TEventType>>((update) => {
return asyncHandler.Invoke(mapper.Invoke(update));
}, maxQueuedItems, fullBehavior);
await processor.StartAsync().ConfigureAwait(false);
var result = await subscribeCall(processor).ConfigureAwait(false);
if (!result)
{
await processor.StopAsync().ConfigureAwait(false);
return result;
}
processor.Exception += result.Data._subscription.InvokeExceptionHandler;
result.Data.SubscriptionStatusChanged += (upd) =>
{
if (upd == CryptoExchange.Net.Objects.SubscriptionStatus.Closed)
_ = processor.StopAsync(true);
};
return result;
}
/// <summary> /// <summary>
/// Parse a decimal value from a string /// Parse a decimal value from a string
/// </summary> /// </summary>

View File

@ -294,4 +294,16 @@ namespace CryptoExchange.Net.Objects
Closed Closed
} }
/// <summary>
/// Queue full behavior
/// </summary>
public enum QueueFullBehavior
{
/// <summary>Remove and ignore the newest item in the queue in order to make room for the item being written.</summary>
DropNewest,
/// <summary>Remove and ignore the oldest item in the queue in order to make room for the item being written.</summary>
DropOldest,
/// <summary>Drop the item being written.</summary>
DropWrite
}
} }

View File

@ -12,7 +12,7 @@ namespace CryptoExchange.Net.Objects.Sockets
public class UpdateSubscription public class UpdateSubscription
{ {
private readonly SocketConnection _connection; private readonly SocketConnection _connection;
private readonly Subscription _listener; internal readonly Subscription _subscription;
private object _eventLock = new object(); private object _eventLock = new object();
private bool _connectionEventsSubscribed = true; private bool _connectionEventsSubscribed = true;
@ -89,8 +89,8 @@ namespace CryptoExchange.Net.Objects.Sockets
/// </summary> /// </summary>
public event Action<Exception> Exception public event Action<Exception> Exception
{ {
add => _listener.Exception += value; add => _subscription.Exception += value;
remove => _listener.Exception -= value; remove => _subscription.Exception -= value;
} }
/// <summary> /// <summary>
@ -101,7 +101,7 @@ namespace CryptoExchange.Net.Objects.Sockets
/// <summary> /// <summary>
/// The id of the subscription /// The id of the subscription
/// </summary> /// </summary>
public int Id => _listener.Id; public int Id => _subscription.Id;
/// <summary> /// <summary>
/// ctor /// ctor
@ -118,8 +118,8 @@ namespace CryptoExchange.Net.Objects.Sockets
_connection.ActivityPaused += HandlePausedEvent; _connection.ActivityPaused += HandlePausedEvent;
_connection.ActivityUnpaused += HandleUnpausedEvent; _connection.ActivityUnpaused += HandleUnpausedEvent;
_listener = subscription; _subscription = subscription;
_listener.StatusChanged += (x) => SubscriptionStatusChanged?.Invoke(x); _subscription.StatusChanged += (x) => SubscriptionStatusChanged?.Invoke(x);
} }
private void UnsubscribeConnectionEvents() private void UnsubscribeConnectionEvents()
@ -144,7 +144,7 @@ namespace CryptoExchange.Net.Objects.Sockets
UnsubscribeConnectionEvents(); UnsubscribeConnectionEvents();
// If we're not the subscription closing this connection don't bother emitting // If we're not the subscription closing this connection don't bother emitting
if (!_listener.IsClosingConnection) if (!_subscription.IsClosingConnection)
return; return;
List<Action> handlers; List<Action> handlers;
@ -157,7 +157,7 @@ namespace CryptoExchange.Net.Objects.Sockets
private void HandleConnectionLostEvent() private void HandleConnectionLostEvent()
{ {
if (!_listener.Active) if (!_subscription.Active)
{ {
UnsubscribeConnectionEvents(); UnsubscribeConnectionEvents();
return; return;
@ -173,7 +173,7 @@ namespace CryptoExchange.Net.Objects.Sockets
private void HandleConnectionRestoredEvent(TimeSpan period) private void HandleConnectionRestoredEvent(TimeSpan period)
{ {
if (!_listener.Active) if (!_subscription.Active)
{ {
UnsubscribeConnectionEvents(); UnsubscribeConnectionEvents();
return; return;
@ -189,7 +189,7 @@ namespace CryptoExchange.Net.Objects.Sockets
private void HandleResubscribeFailedEvent(Error error) private void HandleResubscribeFailedEvent(Error error)
{ {
if (!_listener.Active) if (!_subscription.Active)
{ {
UnsubscribeConnectionEvents(); UnsubscribeConnectionEvents();
return; return;
@ -205,7 +205,7 @@ namespace CryptoExchange.Net.Objects.Sockets
private void HandlePausedEvent() private void HandlePausedEvent()
{ {
if (!_listener.Active) if (!_subscription.Active)
{ {
UnsubscribeConnectionEvents(); UnsubscribeConnectionEvents();
return; return;
@ -221,7 +221,7 @@ namespace CryptoExchange.Net.Objects.Sockets
private void HandleUnpausedEvent() private void HandleUnpausedEvent()
{ {
if (!_listener.Active) if (!_subscription.Active)
{ {
UnsubscribeConnectionEvents(); UnsubscribeConnectionEvents();
return; return;
@ -241,7 +241,7 @@ namespace CryptoExchange.Net.Objects.Sockets
/// <returns></returns> /// <returns></returns>
public Task CloseAsync() public Task CloseAsync()
{ {
return _connection.CloseAsync(_listener); return _connection.CloseAsync(_subscription);
} }
/// <summary> /// <summary>
@ -259,7 +259,7 @@ namespace CryptoExchange.Net.Objects.Sockets
/// <returns></returns> /// <returns></returns>
internal async Task UnsubscribeAsync() internal async Task UnsubscribeAsync()
{ {
await _connection.UnsubscribeAsync(_listener).ConfigureAwait(false); await _connection.UnsubscribeAsync(_subscription).ConfigureAwait(false);
} }
/// <summary> /// <summary>
@ -268,7 +268,7 @@ namespace CryptoExchange.Net.Objects.Sockets
/// <returns></returns> /// <returns></returns>
internal async Task<CallResult> ResubscribeAsync() internal async Task<CallResult> ResubscribeAsync()
{ {
return await _connection.ResubscribeAsync(_listener).ConfigureAwait(false); return await _connection.ResubscribeAsync(_subscription).ConfigureAwait(false);
} }
} }
} }

View File

@ -0,0 +1,130 @@
using CryptoExchange.Net.Objects;
using Microsoft.Extensions.Logging;
using System;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Sockets
{
/// <summary>
/// Queue for processing items
/// </summary>
/// <typeparam name="T">Item type</typeparam>
public class ProcessQueue<T>
{
private readonly Channel<T> _channel;
private readonly Func<T, Task> _processor;
private Task? _processTask;
private CancellationTokenSource? _cts;
private bool _processTillEmpty;
/// <summary>
/// Event for when an exception is thrown in the processing handler
/// </summary>
public event Action<Exception>? Exception;
/// <summary>
/// ctor
/// </summary>
/// <param name="processor">The function to async handle the updates</param>
/// <param name="maxQueuedItems">The max number of items to be queued up. When happens when the queue is full and a new write is attempted can be specified with <see>fullMode</see></param>
/// <param name="fullBehavior">What should happen if the queue contains <see>maxQueuedItems</see> pending items. If no max is set this setting is ignored</param>
public ProcessQueue(Func<T, Task> processor, int? maxQueuedItems = null, QueueFullBehavior? fullBehavior = null)
{
_processor = processor;
if (maxQueuedItems == null)
{
_channel = Channel.CreateUnbounded<T>(new UnboundedChannelOptions
{
AllowSynchronousContinuations = false,
SingleReader = true,
SingleWriter = true
});
}
else
{
_channel = Channel.CreateBounded<T>(new BoundedChannelOptions(maxQueuedItems.Value)
{
AllowSynchronousContinuations = false,
SingleReader = true,
SingleWriter = true,
FullMode = MapMode(fullBehavior)
});
}
}
private BoundedChannelFullMode MapMode(QueueFullBehavior? behavior) =>
behavior switch
{
QueueFullBehavior.DropOldest => BoundedChannelFullMode.DropOldest,
QueueFullBehavior.DropNewest => BoundedChannelFullMode.DropNewest,
QueueFullBehavior.DropWrite => BoundedChannelFullMode.DropWrite,
_ => BoundedChannelFullMode.DropWrite
};
/// <summary>
/// Start the processing of the queue
/// </summary>
public Task StartAsync()
{
_cts = new CancellationTokenSource();
_processTask = Task.Run(async () =>
{
try
{
await foreach (var item in _channel.Reader.ReadAllAsync(_cts.Token).ConfigureAwait(false))
{
if (_cts.IsCancellationRequested && !_processTillEmpty) // Items might still be processed even if CT is canceled
return;
try
{
await _processor.Invoke(item).ConfigureAwait(false);
}
catch (Exception ex)
{
Exception?.Invoke(ex);
}
}
}
catch (OperationCanceledException) { }
});
return Task.CompletedTask;
}
/// <summary>
/// Stop processing the queue
/// </summary>
/// <param name="discardPending">Whether updates still pending in the queue should be discarded</param>
/// <returns></returns>
public async Task StopAsync(bool discardPending = true)
{
if (_processTask == null)
return;
_processTillEmpty = !discardPending;
_cts!.Cancel();
await _processTask.ConfigureAwait(false);
_channel.Writer.TryComplete(_processTask.Exception);
}
/// <summary>
/// Write an update to queue
/// </summary>
public bool Write(T item)
{
if (_cts?.IsCancellationRequested == true)
return false;
var write = _channel.Writer.TryWrite(item);
if (!write)
LibraryHelpers.StaticLogger?.Log(LogLevel.Warning, "Failed to write item to process queue. Item will be discarded");
return write;
}
}
}