1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-07-27 03:36:52 +00:00

Added cancellation token support for socket subscriptions

This commit is contained in:
Jkorf 2021-10-27 12:57:23 +02:00
parent b7f1619aec
commit 23bbf0ef88
4 changed files with 47 additions and 154 deletions

View File

@ -3172,7 +3172,7 @@
<param name="byteHandler">Handler for byte data</param> <param name="byteHandler">Handler for byte data</param>
<param name="stringHandler">Handler for string data</param> <param name="stringHandler">Handler for string data</param>
</member> </member>
<member name="M:CryptoExchange.Net.SocketClient.SubscribeAsync``1(System.Object,System.String,System.Boolean,System.Action{CryptoExchange.Net.Sockets.DataEvent{``0}})"> <member name="M:CryptoExchange.Net.SocketClient.SubscribeAsync``1(System.Object,System.String,System.Boolean,System.Action{CryptoExchange.Net.Sockets.DataEvent{``0}},System.Threading.CancellationToken)">
<summary> <summary>
Connect to an url and listen for data on the BaseAddress Connect to an url and listen for data on the BaseAddress
</summary> </summary>
@ -3181,9 +3181,10 @@
<param name="identifier">The identifier to use, necessary if no request object is sent</param> <param name="identifier">The identifier to use, necessary if no request object is sent</param>
<param name="authenticated">If the subscription is to an authenticated endpoint</param> <param name="authenticated">If the subscription is to an authenticated endpoint</param>
<param name="dataHandler">The handler of update data</param> <param name="dataHandler">The handler of update data</param>
<param name="ct">Cancellation token for closing this subscription</param>
<returns></returns> <returns></returns>
</member> </member>
<member name="M:CryptoExchange.Net.SocketClient.SubscribeAsync``1(System.String,System.Object,System.String,System.Boolean,System.Action{CryptoExchange.Net.Sockets.DataEvent{``0}})"> <member name="M:CryptoExchange.Net.SocketClient.SubscribeAsync``1(System.String,System.Object,System.String,System.Boolean,System.Action{CryptoExchange.Net.Sockets.DataEvent{``0}},System.Threading.CancellationToken)">
<summary> <summary>
Connect to an url and listen for data Connect to an url and listen for data
</summary> </summary>
@ -3193,6 +3194,7 @@
<param name="identifier">The identifier to use, necessary if no request object is sent</param> <param name="identifier">The identifier to use, necessary if no request object is sent</param>
<param name="authenticated">If the subscription is to an authenticated endpoint</param> <param name="authenticated">If the subscription is to an authenticated endpoint</param>
<param name="dataHandler">The handler of update data</param> <param name="dataHandler">The handler of update data</param>
<param name="ct">Cancellation token for closing this subscription</param>
<returns></returns> <returns></returns>
</member> </member>
<member name="M:CryptoExchange.Net.SocketClient.SubscribeAndWaitAsync(CryptoExchange.Net.Sockets.SocketConnection,System.Object,CryptoExchange.Net.Sockets.SocketSubscription)"> <member name="M:CryptoExchange.Net.SocketClient.SubscribeAndWaitAsync(CryptoExchange.Net.Sockets.SocketConnection,System.Object,CryptoExchange.Net.Sockets.SocketSubscription)">
@ -3310,7 +3312,7 @@
<param name="message"></param> <param name="message"></param>
<returns></returns> <returns></returns>
</member> </member>
<member name="M:CryptoExchange.Net.SocketClient.AddSubscription``1(System.Object,System.String,System.Boolean,CryptoExchange.Net.Sockets.SocketConnection,System.Action{CryptoExchange.Net.Sockets.DataEvent{``0}})"> <member name="M:CryptoExchange.Net.SocketClient.AddSubscription``1(System.Object,System.String,System.Boolean,CryptoExchange.Net.Sockets.SocketConnection,System.Action{CryptoExchange.Net.Sockets.DataEvent{``0}},System.Threading.CancellationToken)">
<summary> <summary>
Add a subscription to a connection Add a subscription to a connection
</summary> </summary>
@ -3928,6 +3930,11 @@
If the subscription has been confirmed If the subscription has been confirmed
</summary> </summary>
</member> </member>
<member name="P:CryptoExchange.Net.Sockets.SocketSubscription.CancellationTokenRegistration">
<summary>
Cancellation token registration, should be disposed when subscription is closed
</summary>
</member>
<member name="M:CryptoExchange.Net.Sockets.SocketSubscription.CreateForRequest(System.Int32,System.Object,System.Boolean,System.Action{CryptoExchange.Net.Sockets.MessageEvent})"> <member name="M:CryptoExchange.Net.Sockets.SocketSubscription.CreateForRequest(System.Int32,System.Object,System.Boolean,System.Action{CryptoExchange.Net.Sockets.MessageEvent})">
<summary> <summary>
Create SocketSubscription for a request Create SocketSubscription for a request
@ -3936,6 +3943,7 @@
<param name="request"></param> <param name="request"></param>
<param name="userSubscription"></param> <param name="userSubscription"></param>
<param name="dataHandler"></param> <param name="dataHandler"></param>
<param name="ct"></param>
<returns></returns> <returns></returns>
</member> </member>
<member name="M:CryptoExchange.Net.Sockets.SocketSubscription.CreateForIdentifier(System.Int32,System.String,System.Boolean,System.Action{CryptoExchange.Net.Sockets.MessageEvent})"> <member name="M:CryptoExchange.Net.Sockets.SocketSubscription.CreateForIdentifier(System.Int32,System.String,System.Boolean,System.Action{CryptoExchange.Net.Sockets.MessageEvent})">
@ -3946,6 +3954,7 @@
<param name="identifier"></param> <param name="identifier"></param>
<param name="userSubscription"></param> <param name="userSubscription"></param>
<param name="dataHandler"></param> <param name="dataHandler"></param>
<param name="ct"></param>
<returns></returns> <returns></returns>
</member> </member>
<member name="M:CryptoExchange.Net.Sockets.SocketSubscription.InvokeExceptionHandler(System.Exception)"> <member name="M:CryptoExchange.Net.Sockets.SocketSubscription.InvokeExceptionHandler(System.Exception)">
@ -4046,148 +4055,3 @@
</member> </member>
</members> </members>
</doc> </doc>
System.Diagnostics.CodeAnalysis.AllowNullAttribute">
<summary>
Specifies that <see langword="null"/> is allowed as an input even if the
corresponding type disallows it.
</summary>
</member>
<member name="M:System.Diagnostics.CodeAnalysis.AllowNullAttribute.#ctor">
<summary>
Initializes a new instance of the <see cref="T:System.Diagnostics.CodeAnalysis.AllowNullAttribute"/> class.
</summary>
</member>
<member name="T:System.Diagnostics.CodeAnalysis.DisallowNullAttribute">
<summary>
Specifies that <see langword="null"/> is disallowed as an input even if the
corresponding type allows it.
</summary>
</member>
<member name="M:System.Diagnostics.CodeAnalysis.DisallowNullAttribute.#ctor">
<summary>
Initializes a new instance of the <see cref="T:System.Diagnostics.CodeAnalysis.DisallowNullAttribute"/> class.
</summary>
</member>
<member name="T:System.Diagnostics.CodeAnalysis.DoesNotReturnAttribute">
<summary>
Specifies that a method that will never return under any circumstance.
</summary>
</member>
<member name="M:System.Diagnostics.CodeAnalysis.DoesNotReturnAttribute.#ctor">
<summary>
Initializes a new instance of the <see cref="T:System.Diagnostics.CodeAnalysis.DoesNotReturnAttribute"/> class.
</summary>
</member>
<member name="T:System.Diagnostics.CodeAnalysis.DoesNotReturnIfAttribute">
<summary>
Specifies that the method will not return if the associated <see cref="T:System.Boolean"/>
parameter is passed the specified value.
</summary>
</member>
<member name="P:System.Diagnostics.CodeAnalysis.DoesNotReturnIfAttribute.ParameterValue">
<summary>
Gets the condition parameter value.
Code after the method is considered unreachable by diagnostics if the argument
to the associated parameter matches this value.
</summary>
</member>
<member name="M:System.Diagnostics.CodeAnalysis.DoesNotReturnIfAttribute.#ctor(System.Boolean)">
<summary>
Initializes a new instance of the <see cref="T:System.Diagnostics.CodeAnalysis.DoesNotReturnIfAttribute"/>
class with the specified parameter value.
</summary>
<param name="parameterValue">
The condition parameter value.
Code after the method is considered unreachable by diagnostics if the argument
to the associated parameter matches this value.
</param>
</member>
<member name="T:System.Diagnostics.CodeAnalysis.MaybeNullAttribute">
<summary>
Specifies that an output may be <see langword="null"/> even if the
corresponding type disallows it.
</summary>
</member>
<member name="M:System.Diagnostics.CodeAnalysis.MaybeNullAttribute.#ctor">
<summary>
Initializes a new instance of the <see cref="T:System.Diagnostics.CodeAnalysis.MaybeNullAttribute"/> class.
</summary>
</member>
<member name="T:System.Diagnostics.CodeAnalysis.MaybeNullWhenAttribute">
<summary>
Specifies that when a method returns <see cref="P:System.Diagnostics.CodeAnalysis.MaybeNullWhenAttribute.ReturnValue"/>,
the parameter may be <see langword="null"/> even if the corresponding type disallows it.
</summary>
</member>
<member name="P:System.Diagnostics.CodeAnalysis.MaybeNullWhenAttribute.ReturnValue">
<summary>
Gets the return value condition.
If the method returns this value, the associated parameter may be <see langword="null"/>.
</summary>
</member>
<member name="M:System.Diagnostics.CodeAnalysis.MaybeNullWhenAttribute.#ctor(System.Boolean)">
<summary>
Initializes the attribute with the specified return value condition.
</summary>
<param name="returnValue">
The return value condition.
If the method returns this value, the associated parameter may be <see langword="null"/>.
</param>
</member>
<member name="T:System.Diagnostics.CodeAnalysis.NotNullAttribute">
<summary>
Specifies that an output is not <see langword="null"/> even if the
corresponding type allows it.
</summary>
</member>
<member name="M:System.Diagnostics.CodeAnalysis.NotNullAttribute.#ctor">
<summary>
Initializes a new instance of the <see cref="T:System.Diagnostics.CodeAnalysis.NotNullAttribute"/> class.
</summary>
</member>
<member name="T:System.Diagnostics.CodeAnalysis.NotNullIfNotNullAttribute">
<summary>
Specifies that the output will be non-<see langword="null"/> if the
named parameter is non-<see langword="null"/>.
</summary>
</member>
<member name="P:System.Diagnostics.CodeAnalysis.NotNullIfNotNullAttribute.ParameterName">
<summary>
Gets the associated parameter name.
The output will be non-<see langword="null"/> if the argument to the
parameter specified is non-<see langword="null"/>.
</summary>
</member>
<member name="M:System.Diagnostics.CodeAnalysis.NotNullIfNotNullAttribute.#ctor(System.String)">
<summary>
Initializes the attribute with the associated parameter name.
</summary>
<param name="parameterName">
The associated parameter name.
The output will be non-<see langword="null"/> if the argument to the
parameter specified is non-<see langword="null"/>.
</param>
</member>
<member name="T:System.Diagnostics.CodeAnalysis.NotNullWhenAttribute">
<summary>
Specifies that when a method returns <see cref="P:System.Diagnostics.CodeAnalysis.NotNullWhenAttribute.ReturnValue"/>,
the parameter will not be <see langword="null"/> even if the corresponding type allows it.
</summary>
</member>
<member name="P:System.Diagnostics.CodeAnalysis.NotNullWhenAttribute.ReturnValue">
<summary>
Gets the return value condition.
If the method returns this value, the associated parameter will not be <see langword="null"/>.
</summary>
</member>
<member name="M:System.Diagnostics.CodeAnalysis.NotNullWhenAttribute.#ctor(System.Boolean)">
<summary>
Initializes the attribute with the specified return value condition.
</summary>
<param name="returnValue">
The return value condition.
If the method returns this value, the associated parameter will not be <see langword="null"/>.
</param>
</member>
</members>
</doc>

View File

@ -152,10 +152,11 @@ namespace CryptoExchange.Net
/// <param name="identifier">The identifier to use, necessary if no request object is sent</param> /// <param name="identifier">The identifier to use, necessary if no request object is sent</param>
/// <param name="authenticated">If the subscription is to an authenticated endpoint</param> /// <param name="authenticated">If the subscription is to an authenticated endpoint</param>
/// <param name="dataHandler">The handler of update data</param> /// <param name="dataHandler">The handler of update data</param>
/// <param name="ct">Cancellation token for closing this subscription</param>
/// <returns></returns> /// <returns></returns>
protected virtual Task<CallResult<UpdateSubscription>> SubscribeAsync<T>(object? request, string? identifier, bool authenticated, Action<DataEvent<T>> dataHandler) protected virtual Task<CallResult<UpdateSubscription>> SubscribeAsync<T>(object? request, string? identifier, bool authenticated, Action<DataEvent<T>> dataHandler, CancellationToken ct)
{ {
return SubscribeAsync(BaseAddress, request, identifier, authenticated, dataHandler); return SubscribeAsync(BaseAddress, request, identifier, authenticated, dataHandler, ct);
} }
/// <summary> /// <summary>
@ -167,8 +168,9 @@ namespace CryptoExchange.Net
/// <param name="identifier">The identifier to use, necessary if no request object is sent</param> /// <param name="identifier">The identifier to use, necessary if no request object is sent</param>
/// <param name="authenticated">If the subscription is to an authenticated endpoint</param> /// <param name="authenticated">If the subscription is to an authenticated endpoint</param>
/// <param name="dataHandler">The handler of update data</param> /// <param name="dataHandler">The handler of update data</param>
/// <param name="ct">Cancellation token for closing this subscription</param>
/// <returns></returns> /// <returns></returns>
protected virtual async Task<CallResult<UpdateSubscription>> SubscribeAsync<T>(string url, object? request, string? identifier, bool authenticated, Action<DataEvent<T>> dataHandler) protected virtual async Task<CallResult<UpdateSubscription>> SubscribeAsync<T>(string url, object? request, string? identifier, bool authenticated, Action<DataEvent<T>> dataHandler, CancellationToken ct)
{ {
SocketConnection socketConnection; SocketConnection socketConnection;
SocketSubscription subscription; SocketSubscription subscription;
@ -182,7 +184,7 @@ namespace CryptoExchange.Net
socketConnection = GetSocketConnection(url, authenticated); socketConnection = GetSocketConnection(url, authenticated);
// Add a subscription on the socket connection // Add a subscription on the socket connection
subscription = AddSubscription(request, identifier, true, socketConnection, dataHandler); subscription = AddSubscription(request, identifier, true, socketConnection, dataHandler, ct);
if (SocketCombineTarget == 1) if (SocketCombineTarget == 1)
{ {
// Only 1 subscription per connection, so no need to wait for connection since a new subscription will create a new connection anyway // Only 1 subscription per connection, so no need to wait for connection since a new subscription will create a new connection anyway
@ -228,6 +230,14 @@ namespace CryptoExchange.Net
} }
socketConnection.ShouldReconnect = true; socketConnection.ShouldReconnect = true;
if (ct != default)
{
subscription.CancellationTokenRegistration = ct.Register(async () =>
{
log.Write(LogLevel.Debug, $"Socket {socketConnection.Socket.Id} Cancellation token set, closing subscription");
await socketConnection.CloseAsync(subscription).ConfigureAwait(false);
}, false);
}
return new CallResult<UpdateSubscription>(new UpdateSubscription(socketConnection, subscription), null); return new CallResult<UpdateSubscription>(new UpdateSubscription(socketConnection, subscription), null);
} }
@ -435,7 +445,7 @@ namespace CryptoExchange.Net
/// <param name="connection">The socket connection the handler is on</param> /// <param name="connection">The socket connection the handler is on</param>
/// <param name="dataHandler">The handler of the data received</param> /// <param name="dataHandler">The handler of the data received</param>
/// <returns></returns> /// <returns></returns>
protected virtual SocketSubscription AddSubscription<T>(object? request, string? identifier, bool userSubscription, SocketConnection connection, Action<DataEvent<T>> dataHandler) protected virtual SocketSubscription AddSubscription<T>(object? request, string? identifier, bool userSubscription, SocketConnection connection, Action<DataEvent<T>> dataHandler, CancellationToken ct)
{ {
void InternalHandler(MessageEvent messageEvent) void InternalHandler(MessageEvent messageEvent)
{ {

View File

@ -522,6 +522,14 @@ namespace CryptoExchange.Net.Sockets
if (socketClient.sockets.ContainsKey(Socket.Id)) if (socketClient.sockets.ContainsKey(Socket.Id))
socketClient.sockets.TryRemove(Socket.Id, out _); socketClient.sockets.TryRemove(Socket.Id, out _);
lock (subscriptionLock)
{
foreach (var subscription in subscriptions)
{
if (subscription.CancellationTokenRegistration.HasValue)
subscription.CancellationTokenRegistration.Value.Dispose();
}
}
await Socket.CloseAsync().ConfigureAwait(false); await Socket.CloseAsync().ConfigureAwait(false);
Socket.Dispose(); Socket.Dispose();
} }
@ -536,6 +544,9 @@ namespace CryptoExchange.Net.Sockets
if (!Socket.IsOpen) if (!Socket.IsOpen)
return; return;
if (subscription.CancellationTokenRegistration.HasValue)
subscription.CancellationTokenRegistration.Value.Dispose();
if (subscription.Confirmed) if (subscription.Confirmed)
await socketClient.UnsubscribeAsync(this, subscription).ConfigureAwait(false); await socketClient.UnsubscribeAsync(this, subscription).ConfigureAwait(false);

View File

@ -1,4 +1,5 @@
using System; using System;
using System.Threading;
namespace CryptoExchange.Net.Sockets namespace CryptoExchange.Net.Sockets
{ {
@ -39,6 +40,11 @@ namespace CryptoExchange.Net.Sockets
/// </summary> /// </summary>
public bool Confirmed { get; set; } public bool Confirmed { get; set; }
/// <summary>
/// Cancellation token registration, should be disposed when subscription is closed
/// </summary>
public CancellationTokenRegistration? CancellationTokenRegistration { get; set; }
private SocketSubscription(int id, object? request, string? identifier, bool userSubscription, Action<MessageEvent> dataHandler) private SocketSubscription(int id, object? request, string? identifier, bool userSubscription, Action<MessageEvent> dataHandler)
{ {
Id = id; Id = id;
@ -55,6 +61,7 @@ namespace CryptoExchange.Net.Sockets
/// <param name="request"></param> /// <param name="request"></param>
/// <param name="userSubscription"></param> /// <param name="userSubscription"></param>
/// <param name="dataHandler"></param> /// <param name="dataHandler"></param>
/// <param name="ct"></param>
/// <returns></returns> /// <returns></returns>
public static SocketSubscription CreateForRequest(int id, object request, bool userSubscription, public static SocketSubscription CreateForRequest(int id, object request, bool userSubscription,
Action<MessageEvent> dataHandler) Action<MessageEvent> dataHandler)
@ -69,6 +76,7 @@ namespace CryptoExchange.Net.Sockets
/// <param name="identifier"></param> /// <param name="identifier"></param>
/// <param name="userSubscription"></param> /// <param name="userSubscription"></param>
/// <param name="dataHandler"></param> /// <param name="dataHandler"></param>
/// <param name="ct"></param>
/// <returns></returns> /// <returns></returns>
public static SocketSubscription CreateForIdentifier(int id, string identifier, bool userSubscription, public static SocketSubscription CreateForIdentifier(int id, string identifier, bool userSubscription,
Action<MessageEvent> dataHandler) Action<MessageEvent> dataHandler)