1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-06-08 16:36:15 +00:00
This commit is contained in:
JKorf 2023-10-25 22:09:52 +02:00
parent cff3863373
commit 141d5bd956
13 changed files with 223 additions and 233 deletions

View File

@ -1,8 +1,6 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Globalization;
using System.IO; using System.IO;
using System.Net;
using System.Net.Http; using System.Net.Http;
using System.Text; using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;

View File

@ -4,17 +4,14 @@ using CryptoExchange.Net.Objects.Options;
using CryptoExchange.Net.Objects.Sockets; using CryptoExchange.Net.Objects.Sockets;
using CryptoExchange.Net.Sockets; using CryptoExchange.Net.Sockets;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Newtonsoft.Json.Linq;
using System; using System;
using System.Collections.Concurrent; using System.Collections.Concurrent;
using System.Collections.Generic; using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis; using System.IO;
using System.Linq; using System.Linq;
using System.Net.Sockets;
using System.Text; using System.Text;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using static CryptoExchange.Net.Objects.RateLimiter;
namespace CryptoExchange.Net namespace CryptoExchange.Net
{ {
@ -43,19 +40,14 @@ namespace CryptoExchange.Net
protected TimeSpan KeepAliveInterval { get; set; } = TimeSpan.FromSeconds(10); protected TimeSpan KeepAliveInterval { get; set; } = TimeSpan.FromSeconds(10);
/// <summary> /// <summary>
/// Delegate used for processing byte data received from socket connections before it is processed by handlers /// Delegate used for manipulating data received from socket connections before it is processed by listeners
/// </summary> /// </summary>
protected Func<byte[], string>? dataInterpreterBytes; protected Func<Stream, Stream>? interceptor;
/// <summary>
/// Delegate used for processing string data received from socket connections before it is processed by handlers
/// </summary>
protected Func<string, string>? dataInterpreterString;
/// <summary> /// <summary>
/// Handlers for data from the socket which doesn't need to be forwarded to the caller. Ping or welcome messages for example. /// Handlers for data from the socket which doesn't need to be forwarded to the caller. Ping or welcome messages for example.
/// </summary> /// </summary>
protected List<SystemSubscription> genericHandlers = new(); protected List<SystemSubscription> systemSubscriptions = new();
/// <summary> /// <summary>
/// The task that is sending periodic data on the websocket. Can be used for sending Ping messages every x seconds or similair. Not necesarry. /// The task that is sending periodic data on the websocket. Can be used for sending Ping messages every x seconds or similair. Not necesarry.
@ -106,7 +98,7 @@ namespace CryptoExchange.Net
if (!socketConnections.Any()) if (!socketConnections.Any())
return 0; return 0;
return socketConnections.Sum(s => s.Value.SubscriptionCount); return socketConnections.Sum(s => s.Value.UserListenerCount);
} }
} }
@ -140,27 +132,24 @@ namespace CryptoExchange.Net
} }
/// <summary> /// <summary>
/// Set a delegate to be used for processing data received from socket connections before it is processed by handlers /// Set a delegate which can manipulate the message stream before it is processed by listeners
/// </summary> /// </summary>
/// <param name="byteHandler">Handler for byte data</param> /// <param name="interceptor">Interceptor</param>
/// <param name="stringHandler">Handler for string data</param> protected void SetInterceptor(Func<Stream, Stream> interceptor)
protected void SetDataInterpreter(Func<byte[], string>? byteHandler, Func<string, string>? stringHandler)
{ {
// TODO this.interceptor = interceptor;
dataInterpreterBytes = byteHandler;
dataInterpreterString = stringHandler;
} }
/// <summary> /// <summary>
/// Connect to an url and listen for data on the BaseAddress /// Connect to an url and listen for data on the BaseAddress
/// </summary> /// </summary>
/// <typeparam name="T">The type of the expected data</typeparam> /// <typeparam name="T">The type of the expected data</typeparam>
/// <param name="subscriptionObject">The subscription</param> /// <param name="subscription">The subscription</param>
/// <param name="ct">Cancellation token for closing this subscription</param> /// <param name="ct">Cancellation token for closing this subscription</param>
/// <returns></returns> /// <returns></returns>
protected virtual Task<CallResult<UpdateSubscription>> SubscribeAsync<T>(SubscriptionActor subscriptionObject, CancellationToken ct) protected virtual Task<CallResult<UpdateSubscription>> SubscribeAsync<T>(Subscription subscription, CancellationToken ct)
{ {
return SubscribeAsync<T>(BaseAddress, subscriptionObject, ct); return SubscribeAsync<T>(BaseAddress, subscription, ct);
} }
/// <summary> /// <summary>
@ -168,16 +157,16 @@ namespace CryptoExchange.Net
/// </summary> /// </summary>
/// <typeparam name="T">The type of the expected data</typeparam> /// <typeparam name="T">The type of the expected data</typeparam>
/// <param name="url">The URL to connect to</param> /// <param name="url">The URL to connect to</param>
/// <param name="subscriptionObject">The subscription</param> /// <param name="subscription">The subscription</param>
/// <param name="ct">Cancellation token for closing this subscription</param> /// <param name="ct">Cancellation token for closing this subscription</param>
/// <returns></returns> /// <returns></returns>
protected virtual async Task<CallResult<UpdateSubscription>> SubscribeAsync<T>(string url, SubscriptionActor subscriptionObject, CancellationToken ct) protected virtual async Task<CallResult<UpdateSubscription>> SubscribeAsync<T>(string url, Subscription subscription, CancellationToken ct)
{ {
if (_disposing) if (_disposing)
return new CallResult<UpdateSubscription>(new InvalidOperationError("Client disposed, can't subscribe")); return new CallResult<UpdateSubscription>(new InvalidOperationError("Client disposed, can't subscribe"));
SocketConnection socketConnection; SocketConnection socketConnection;
SocketSubscriptionListener? subscription; MessageListener? messageListener;
var released = false; var released = false;
// Wait for a semaphore here, so we only connect 1 socket at a time. // Wait for a semaphore here, so we only connect 1 socket at a time.
// This is necessary for being able to see if connections can be combined // This is necessary for being able to see if connections can be combined
@ -195,15 +184,15 @@ namespace CryptoExchange.Net
while (true) while (true)
{ {
// Get a new or existing socket connection // Get a new or existing socket connection
var socketResult = await GetSocketConnection(url, subscriptionObject.Authenticated).ConfigureAwait(false); var socketResult = await GetSocketConnection(url, subscription.Authenticated).ConfigureAwait(false);
if (!socketResult) if (!socketResult)
return socketResult.As<UpdateSubscription>(null); return socketResult.As<UpdateSubscription>(null);
socketConnection = socketResult.Data; socketConnection = socketResult.Data;
// Add a subscription on the socket connection // Add a subscription on the socket connection
subscription = AddSubscription<T>(subscriptionObject, true, socketConnection); messageListener = AddSubscription<T>(subscription, true, socketConnection);
if (subscription == null) if (messageListener == null)
{ {
_logger.Log(LogLevel.Trace, $"Socket {socketConnection.SocketId} failed to add subscription, retrying on different connection"); _logger.Log(LogLevel.Trace, $"Socket {socketConnection.SocketId} failed to add subscription, retrying on different connection");
continue; continue;
@ -218,7 +207,7 @@ namespace CryptoExchange.Net
var needsConnecting = !socketConnection.Connected; var needsConnecting = !socketConnection.Connected;
var connectResult = await ConnectIfNeededAsync(socketConnection, subscriptionObject.Authenticated).ConfigureAwait(false); var connectResult = await ConnectIfNeededAsync(socketConnection, subscription.Authenticated).ConfigureAwait(false);
if (!connectResult) if (!connectResult)
return new CallResult<UpdateSubscription>(connectResult.Error!); return new CallResult<UpdateSubscription>(connectResult.Error!);
@ -237,35 +226,35 @@ namespace CryptoExchange.Net
return new CallResult<UpdateSubscription>(new ServerError("Socket is paused")); return new CallResult<UpdateSubscription>(new ServerError("Socket is paused"));
} }
var request = subscriptionObject.GetSubscribeRequest(); var request = subscription.GetSubRequest();
if (request != null) if (request != null)
{ {
// Send the request and wait for answer // Send the request and wait for answer
var subResult = await SubscribeAndWaitAsync(socketConnection, request, subscription).ConfigureAwait(false); var subResult = await SubscribeAndWaitAsync(socketConnection, request, messageListener).ConfigureAwait(false);
if (!subResult) if (!subResult)
{ {
_logger.Log(LogLevel.Warning, $"Socket {socketConnection.SocketId} failed to subscribe: {subResult.Error}"); _logger.Log(LogLevel.Warning, $"Socket {socketConnection.SocketId} failed to subscribe: {subResult.Error}");
await socketConnection.CloseAsync(subscription).ConfigureAwait(false); await socketConnection.CloseAsync(messageListener).ConfigureAwait(false);
return new CallResult<UpdateSubscription>(subResult.Error!); return new CallResult<UpdateSubscription>(subResult.Error!);
} }
} }
else else
{ {
// No request to be sent, so just mark the subscription as comfirmed // No request to be sent, so just mark the subscription as comfirmed
subscription.Confirmed = true; messageListener.Confirmed = true;
} }
if (ct != default) if (ct != default)
{ {
subscription.CancellationTokenRegistration = ct.Register(async () => messageListener.CancellationTokenRegistration = ct.Register(async () =>
{ {
_logger.Log(LogLevel.Information, $"Socket {socketConnection.SocketId} Cancellation token set, closing subscription"); _logger.Log(LogLevel.Information, $"Socket {socketConnection.SocketId} Cancellation token set, closing subscription {messageListener.Id}");
await socketConnection.CloseAsync(subscription).ConfigureAwait(false); await socketConnection.CloseAsync(messageListener).ConfigureAwait(false);
}, false); }, false);
} }
_logger.Log(LogLevel.Information, $"Socket {socketConnection.SocketId} subscription {subscription.Id} completed successfully"); _logger.Log(LogLevel.Information, $"Socket {socketConnection.SocketId} subscription {messageListener.Id} completed successfully");
return new CallResult<UpdateSubscription>(new UpdateSubscription(socketConnection, subscription)); return new CallResult<UpdateSubscription>(new UpdateSubscription(socketConnection, messageListener));
} }
/// <summary> /// <summary>
@ -273,14 +262,14 @@ namespace CryptoExchange.Net
/// </summary> /// </summary>
/// <param name="socketConnection">The connection to send the request on</param> /// <param name="socketConnection">The connection to send the request on</param>
/// <param name="request">The request to send, will be serialized to json</param> /// <param name="request">The request to send, will be serialized to json</param>
/// <param name="subscription">The subscription the request is for</param> /// <param name="listener">The message listener for the subscription</param>
/// <returns></returns> /// <returns></returns>
protected internal virtual async Task<CallResult<bool>> SubscribeAndWaitAsync(SocketConnection socketConnection, object request, SocketSubscriptionListener subscription) protected internal virtual async Task<CallResult<bool>> SubscribeAndWaitAsync(SocketConnection socketConnection, object request, MessageListener listener)
{ {
CallResult? callResult = null; CallResult? callResult = null;
await socketConnection.SendAndWaitAsync(request, ClientOptions.RequestTimeout, subscription, 1, x => await socketConnection.SendAndWaitAsync(request, ClientOptions.RequestTimeout, listener, 1, x =>
{ {
var (matches, result) = subscription.Subscription!.MessageMatchesSubscribeRequest(x); var (matches, result) = listener.Subscription!.MessageMatchesSubRequest(x);
if (matches) if (matches)
callResult = result; callResult = result;
return matches; return matches;
@ -288,7 +277,7 @@ namespace CryptoExchange.Net
if (callResult?.Success == true) if (callResult?.Success == true)
{ {
subscription.Confirmed = true; listener.Confirmed = true;
return new CallResult<bool>(true); return new CallResult<bool>(true);
} }
@ -303,11 +292,10 @@ namespace CryptoExchange.Net
/// </summary> /// </summary>
/// <typeparam name="T">Expected result type</typeparam> /// <typeparam name="T">Expected result type</typeparam>
/// <param name="query">The query</param> /// <param name="query">The query</param>
/// <param name="weight">Weight of the request</param>
/// <returns></returns> /// <returns></returns>
protected virtual Task<CallResult<T>> QueryAsync<T>(QueryActor query, int weight = 1) protected virtual Task<CallResult<T>> QueryAsync<T>(Query query)
{ {
return QueryAsync<T>(BaseAddress, query, weight); return QueryAsync<T>(BaseAddress, query);
} }
/// <summary> /// <summary>
@ -315,10 +303,9 @@ namespace CryptoExchange.Net
/// </summary> /// </summary>
/// <typeparam name="T">The expected result type</typeparam> /// <typeparam name="T">The expected result type</typeparam>
/// <param name="url">The url for the request</param> /// <param name="url">The url for the request</param>
/// <param name="request">The request to send</param> /// <param name="query">The query</param>
/// <param name="weight">Weight of the request</param>
/// <returns></returns> /// <returns></returns>
protected virtual async Task<CallResult<T>> QueryAsync<T>(string url, QueryActor request, int weight = 1) protected virtual async Task<CallResult<T>> QueryAsync<T>(string url, Query query)
{ {
if (_disposing) if (_disposing)
return new CallResult<T>(new InvalidOperationError("Client disposed, can't query")); return new CallResult<T>(new InvalidOperationError("Client disposed, can't query"));
@ -328,7 +315,7 @@ namespace CryptoExchange.Net
await semaphoreSlim.WaitAsync().ConfigureAwait(false); await semaphoreSlim.WaitAsync().ConfigureAwait(false);
try try
{ {
var socketResult = await GetSocketConnection(url, request.Authenticated).ConfigureAwait(false); var socketResult = await GetSocketConnection(url, query.Authenticated).ConfigureAwait(false);
if (!socketResult) if (!socketResult)
return socketResult.As<T>(default); return socketResult.As<T>(default);
@ -341,7 +328,7 @@ namespace CryptoExchange.Net
released = true; released = true;
} }
var connectResult = await ConnectIfNeededAsync(socketConnection, request.Authenticated).ConfigureAwait(false); var connectResult = await ConnectIfNeededAsync(socketConnection, query.Authenticated).ConfigureAwait(false);
if (!connectResult) if (!connectResult)
return new CallResult<T>(connectResult.Error!); return new CallResult<T>(connectResult.Error!);
} }
@ -357,7 +344,7 @@ namespace CryptoExchange.Net
return new CallResult<T>(new ServerError("Socket is paused")); return new CallResult<T>(new ServerError("Socket is paused"));
} }
return await QueryAndWaitAsync<T>(socketConnection, request, weight).ConfigureAwait(false); return await QueryAndWaitAsync<T>(socketConnection, query).ConfigureAwait(false);
} }
/// <summary> /// <summary>
@ -365,18 +352,17 @@ namespace CryptoExchange.Net
/// </summary> /// </summary>
/// <typeparam name="T">The expected result type</typeparam> /// <typeparam name="T">The expected result type</typeparam>
/// <param name="socket">The connection to send and wait on</param> /// <param name="socket">The connection to send and wait on</param>
/// <param name="request">The request to send</param> /// <param name="query">The query</param>
/// <param name="weight">The weight of the query</param>
/// <returns></returns> /// <returns></returns>
protected virtual async Task<CallResult<T>> QueryAndWaitAsync<T>(SocketConnection socket, QueryActor request, int weight) protected virtual async Task<CallResult<T>> QueryAndWaitAsync<T>(SocketConnection socket, Query query)
{ {
var dataResult = new CallResult<T>(new ServerError("No response on query received")); var dataResult = new CallResult<T>(new ServerError("No response on query received"));
await socket.SendAndWaitAsync(request.Query, ClientOptions.RequestTimeout, null, weight, x => await socket.SendAndWaitAsync(query.Request, ClientOptions.RequestTimeout, null, query.Weight, x =>
{ {
var matches = request.MessageMatchesQuery(x); var matches = query.MessageMatchesQuery(x);
if (matches) if (matches)
{ {
request.HandleResponse(x); query.HandleResponse(x);
return true; return true;
} }
@ -420,7 +406,7 @@ namespace CryptoExchange.Net
_logger.Log(LogLevel.Debug, $"Socket {socket.SocketId} Attempting to authenticate"); _logger.Log(LogLevel.Debug, $"Socket {socket.SocketId} Attempting to authenticate");
var authRequest = GetAuthenticationRequest(); var authRequest = GetAuthenticationRequest();
var authResult = new CallResult(new ServerError("No response from server")); var authResult = new CallResult(new ServerError("No response from server"));
await socket.SendAndWaitAsync(authRequest.Query, ClientOptions.RequestTimeout, null, 1, x => await socket.SendAndWaitAsync(authRequest.Request, ClientOptions.RequestTimeout, null, 1, x =>
{ {
var matches = authRequest.MessageMatchesQuery(x); var matches = authRequest.MessageMatchesQuery(x);
if (matches) if (matches)
@ -451,33 +437,23 @@ namespace CryptoExchange.Net
/// Should return the request which can be used to authenticate a socket connection /// Should return the request which can be used to authenticate a socket connection
/// </summary> /// </summary>
/// <returns></returns> /// <returns></returns>
protected internal abstract QueryActor GetAuthenticationRequest(); protected internal abstract Query GetAuthenticationRequest();
/// <summary>
/// Optional handler to interpolate data before sending it to the handlers
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
//protected internal virtual JToken ProcessTokenData(JToken message)
//{
// return message;
//}
/// <summary> /// <summary>
/// Add a subscription to a connection /// Add a subscription to a connection
/// </summary> /// </summary>
/// <typeparam name="T">The type of data the subscription expects</typeparam> /// <typeparam name="T">The type of data the subscription expects</typeparam>
/// <param name="subscriptionObject">The subscription</param> /// <param name="subscription">The subscription</param>
/// <param name="userSubscription">Whether or not this is a user subscription (counts towards the max amount of handlers on a socket)</param> /// <param name="userSubscription">Whether or not this is a user subscription (counts towards the max amount of handlers on a socket)</param>
/// <param name="connection">The socket connection the handler is on</param> /// <param name="connection">The socket connection the handler is on</param>
/// <returns></returns> /// <returns></returns>
protected virtual SocketSubscriptionListener? AddSubscription<T>(SubscriptionActor subscriptionObject, bool userSubscription, SocketConnection connection) protected virtual MessageListener? AddSubscription<T>(Subscription subscription, bool userSubscription, SocketConnection connection)
{ {
var subscription = new SocketSubscriptionListener(ExchangeHelpers.NextId(), subscriptionObject, userSubscription); var messageListener = new MessageListener(ExchangeHelpers.NextId(), subscription, userSubscription);
if (!connection.AddSubscription(subscription)) if (!connection.AddListener(messageListener))
return null; return null;
return subscription; return messageListener;
} }
/// <summary> /// <summary>
@ -486,10 +462,10 @@ namespace CryptoExchange.Net
/// <param name="systemSubscription">The subscription</param> /// <param name="systemSubscription">The subscription</param>
protected void AddSystemSubscription(SystemSubscription systemSubscription) protected void AddSystemSubscription(SystemSubscription systemSubscription)
{ {
genericHandlers.Add(systemSubscription); systemSubscriptions.Add(systemSubscription);
var subscription = new SocketSubscriptionListener(ExchangeHelpers.NextId(), systemSubscription, false); var subscription = new MessageListener(ExchangeHelpers.NextId(), systemSubscription, false);
foreach (var connection in socketConnections.Values) foreach (var connection in socketConnections.Values)
connection.AddSubscription(subscription); connection.AddListener(subscription);
} }
/// <summary> /// <summary>
@ -534,11 +510,11 @@ namespace CryptoExchange.Net
var socketResult = socketConnections.Where(s => (s.Value.Status == SocketConnection.SocketStatus.None || s.Value.Status == SocketConnection.SocketStatus.Connected) var socketResult = socketConnections.Where(s => (s.Value.Status == SocketConnection.SocketStatus.None || s.Value.Status == SocketConnection.SocketStatus.Connected)
&& s.Value.Tag.TrimEnd('/') == address.TrimEnd('/') && s.Value.Tag.TrimEnd('/') == address.TrimEnd('/')
&& (s.Value.ApiClient.GetType() == GetType()) && (s.Value.ApiClient.GetType() == GetType())
&& (s.Value.Authenticated == authenticated || !authenticated) && s.Value.Connected).OrderBy(s => s.Value.SubscriptionCount).FirstOrDefault(); && (s.Value.Authenticated == authenticated || !authenticated) && s.Value.Connected).OrderBy(s => s.Value.UserListenerCount).FirstOrDefault();
var result = socketResult.Equals(default(KeyValuePair<int, SocketConnection>)) ? null : socketResult.Value; var result = socketResult.Equals(default(KeyValuePair<int, SocketConnection>)) ? null : socketResult.Value;
if (result != null) if (result != null)
{ {
if (result.SubscriptionCount < ClientOptions.SocketSubscriptionsCombineTarget || (socketConnections.Count >= (ApiOptions.MaxSocketConnections ?? ClientOptions.MaxSocketConnections) && socketConnections.All(s => s.Value.SubscriptionCount >= ClientOptions.SocketSubscriptionsCombineTarget))) if (result.UserListenerCount < ClientOptions.SocketSubscriptionsCombineTarget || (socketConnections.Count >= (ApiOptions.MaxSocketConnections ?? ClientOptions.MaxSocketConnections) && socketConnections.All(s => s.Value.UserListenerCount >= ClientOptions.SocketSubscriptionsCombineTarget)))
{ {
// Use existing socket if it has less than target connections OR it has the least connections and we can't make new // Use existing socket if it has less than target connections OR it has the least connections and we can't make new
return new CallResult<SocketConnection>(result); return new CallResult<SocketConnection>(result);
@ -560,10 +536,10 @@ namespace CryptoExchange.Net
var socketConnection = new SocketConnection(_logger, this, socket, address); var socketConnection = new SocketConnection(_logger, this, socket, address);
socketConnection.UnhandledMessage += HandleUnhandledMessage; socketConnection.UnhandledMessage += HandleUnhandledMessage;
foreach (var systemHandler in genericHandlers) foreach (var systemSubscription in systemSubscriptions)
{ {
var handler = new SocketSubscriptionListener(ExchangeHelpers.NextId(), systemHandler, false); var handler = new MessageListener(ExchangeHelpers.NextId(), systemSubscription, false);
socketConnection.AddSubscription(handler); socketConnection.AddListener(handler);
} }
return new CallResult<SocketConnection>(socketConnection); return new CallResult<SocketConnection>(socketConnection);
@ -602,8 +578,7 @@ namespace CryptoExchange.Net
protected virtual WebSocketParameters GetWebSocketParameters(string address) protected virtual WebSocketParameters GetWebSocketParameters(string address)
=> new(new Uri(address), ClientOptions.AutoReconnect) => new(new Uri(address), ClientOptions.AutoReconnect)
{ {
DataInterpreterBytes = dataInterpreterBytes, Interceptor = interceptor,
DataInterpreterString = dataInterpreterString,
KeepAliveInterval = KeepAliveInterval, KeepAliveInterval = KeepAliveInterval,
ReconnectInterval = ClientOptions.ReconnectInterval, ReconnectInterval = ClientOptions.ReconnectInterval,
RateLimiters = RateLimiters, RateLimiters = RateLimiters,
@ -677,11 +652,11 @@ namespace CryptoExchange.Net
/// <returns></returns> /// <returns></returns>
public virtual async Task<bool> UnsubscribeAsync(int subscriptionId) public virtual async Task<bool> UnsubscribeAsync(int subscriptionId)
{ {
SocketSubscriptionListener? subscription = null; MessageListener? subscription = null;
SocketConnection? connection = null; SocketConnection? connection = null;
foreach (var socket in socketConnections.Values.ToList()) foreach (var socket in socketConnections.Values.ToList())
{ {
subscription = socket.GetSubscription(subscriptionId); subscription = socket.GetListener(subscriptionId);
if (subscription != null) if (subscription != null)
{ {
connection = socket; connection = socket;
@ -717,11 +692,11 @@ namespace CryptoExchange.Net
/// <returns></returns> /// <returns></returns>
public virtual async Task UnsubscribeAllAsync() public virtual async Task UnsubscribeAllAsync()
{ {
var sum = socketConnections.Sum(s => s.Value.SubscriptionCount); var sum = socketConnections.Sum(s => s.Value.UserListenerCount);
if (sum == 0) if (sum == 0)
return; return;
_logger.Log(LogLevel.Information, $"Unsubscribing all {socketConnections.Sum(s => s.Value.SubscriptionCount)} subscriptions"); _logger.Log(LogLevel.Information, $"Unsubscribing all {socketConnections.Sum(s => s.Value.UserListenerCount)} subscriptions");
var tasks = new List<Task>(); var tasks = new List<Task>();
{ {
var socketList = socketConnections.Values; var socketList = socketConnections.Values;
@ -758,8 +733,8 @@ namespace CryptoExchange.Net
sb.AppendLine($"{socketConnections.Count} connections, {CurrentSubscriptions} subscriptions, kbps: {IncomingKbps}"); sb.AppendLine($"{socketConnections.Count} connections, {CurrentSubscriptions} subscriptions, kbps: {IncomingKbps}");
foreach (var connection in socketConnections) foreach (var connection in socketConnections)
{ {
sb.AppendLine($" Connection {connection.Key}: {connection.Value.SubscriptionCount} subscriptions, status: {connection.Value.Status}, authenticated: {connection.Value.Authenticated}, kbps: {connection.Value.IncomingKbps}"); sb.AppendLine($" Connection {connection.Key}: {connection.Value.UserListenerCount} subscriptions, status: {connection.Value.Status}, authenticated: {connection.Value.Authenticated}, kbps: {connection.Value.IncomingKbps}");
foreach (var subscription in connection.Value.Subscriptions) foreach (var subscription in connection.Value.MessageListeners)
sb.AppendLine($" Subscription {subscription.Id}, authenticated: {subscription.Authenticated}, confirmed: {subscription.Confirmed}"); sb.AppendLine($" Subscription {subscription.Id}, authenticated: {subscription.Authenticated}, confirmed: {subscription.Confirmed}");
} }
return sb.ToString(); return sb.ToString();
@ -773,7 +748,7 @@ namespace CryptoExchange.Net
_disposing = true; _disposing = true;
periodicEvent?.Set(); periodicEvent?.Set();
periodicEvent?.Dispose(); periodicEvent?.Dispose();
if (socketConnections.Sum(s => s.Value.SubscriptionCount) > 0) if (socketConnections.Sum(s => s.Value.UserListenerCount) > 0)
{ {
_logger.Log(LogLevel.Debug, "Disposing socket client, closing all subscriptions"); _logger.Log(LogLevel.Debug, "Disposing socket client, closing all subscriptions");
_ = UnsubscribeAllAsync(); _ = UnsubscribeAllAsync();

View File

@ -20,7 +20,7 @@ namespace CryptoExchange.Net.Interfaces
/// <summary> /// <summary>
/// Websocket message received event /// Websocket message received event
/// </summary> /// </summary>
event Func<MemoryStream, Task> OnStreamMessage; event Func<Stream, Task> OnStreamMessage;
/// <summary> /// <summary>
/// Websocket sent event, RequestId as parameter /// Websocket sent event, RequestId as parameter
/// </summary> /// </summary>

View File

@ -1,5 +1,4 @@
using CryptoExchange.Net.Interfaces; using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Objects;
using System; using System;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
@ -14,20 +13,20 @@ namespace CryptoExchange.Net.Objects.Sockets
public AsyncResetEvent Event { get; } public AsyncResetEvent Event { get; }
public DateTime RequestTimestamp { get; set; } public DateTime RequestTimestamp { get; set; }
public TimeSpan Timeout { get; } public TimeSpan Timeout { get; }
public SocketSubscriptionListener? Subscription { get; } public MessageListener? MessageListener { get; }
private CancellationTokenSource? _cts; private CancellationTokenSource? _cts;
public int Priority => 100; public int Priority => 100;
public PendingRequest(int id, Func<StreamMessage, bool> messageMatchesHandler, TimeSpan timeout, SocketSubscriptionListener? subscription) public PendingRequest(int id, Func<StreamMessage, bool> messageMatchesHandler, TimeSpan timeout, MessageListener? subscription)
{ {
Id = id; Id = id;
MessageMatchesHandler = messageMatchesHandler; MessageMatchesHandler = messageMatchesHandler;
Event = new AsyncResetEvent(false, false); Event = new AsyncResetEvent(false, false);
Timeout = timeout; Timeout = timeout;
RequestTimestamp = DateTime.UtcNow; RequestTimestamp = DateTime.UtcNow;
Subscription = subscription; MessageListener = subscription;
} }
public void IsSend() public void IsSend()

View File

@ -7,12 +7,12 @@ using System.Threading.Tasks;
namespace CryptoExchange.Net.Objects.Sockets namespace CryptoExchange.Net.Objects.Sockets
{ {
/// <summary> /// <summary>
/// Socket subscription /// Socket listener
/// </summary> /// </summary>
public class SocketSubscriptionListener : IStreamMessageListener public class MessageListener : IStreamMessageListener
{ {
/// <summary> /// <summary>
/// Unique subscription id /// Unique listener id
/// </summary> /// </summary>
public int Id { get; } public int Id { get; }
@ -24,12 +24,12 @@ namespace CryptoExchange.Net.Objects.Sockets
/// <summary> /// <summary>
/// The request object send when subscribing on the server. Either this or the `Identifier` property should be set /// The request object send when subscribing on the server. Either this or the `Identifier` property should be set
/// </summary> /// </summary>
public SubscriptionActor Subscription { get; set; } public Subscription Subscription { get; set; }
/// <summary> /// <summary>
/// Whether this is a user subscription or an internal listener /// Whether this is a user subscription or an internal listener
/// </summary> /// </summary>
public bool UserSubscription { get; set; } public bool UserListener { get; set; }
/// <summary> /// <summary>
/// If the subscription has been confirmed to be subscribed by the server /// If the subscription has been confirmed to be subscribed by the server
@ -58,10 +58,10 @@ namespace CryptoExchange.Net.Objects.Sockets
/// <param name="id"></param> /// <param name="id"></param>
/// <param name="request"></param> /// <param name="request"></param>
/// <param name="userSubscription"></param> /// <param name="userSubscription"></param>
public SocketSubscriptionListener(int id, SubscriptionActor request, bool userSubscription) public MessageListener(int id, Subscription request, bool userSubscription)
{ {
Id = id; Id = id;
UserSubscription = userSubscription; UserListener = userSubscription;
Subscription = request; Subscription = request;
} }
@ -84,7 +84,7 @@ namespace CryptoExchange.Net.Objects.Sockets
/// </summary> /// </summary>
/// <param name="message"></param> /// <param name="message"></param>
/// <returns></returns> /// <returns></returns>
public bool MessageMatches(StreamMessage message) => Subscription.MessageMatchesSubscription(message); public bool MessageMatches(StreamMessage message) => Subscription.MessageMatchesEvent(message);
/// <summary> /// <summary>
/// Process the message /// Process the message

View File

@ -20,7 +20,7 @@ namespace CryptoExchange.Net.Objects.Sockets
/// <summary> /// <summary>
/// The data stream /// The data stream
/// </summary> /// </summary>
public MemoryStream Stream { get; } public Stream Stream { get; }
/// <summary> /// <summary>
/// Receive timestamp /// Receive timestamp
/// </summary> /// </summary>
@ -45,6 +45,9 @@ namespace CryptoExchange.Net.Objects.Sockets
return result; return result;
} }
/// <summary>
/// Dispose
/// </summary>
public void Dispose() public void Dispose()
{ {
Stream.Dispose(); Stream.Dispose();
@ -56,7 +59,7 @@ namespace CryptoExchange.Net.Objects.Sockets
/// <param name="connection"></param> /// <param name="connection"></param>
/// <param name="stream"></param> /// <param name="stream"></param>
/// <param name="timestamp"></param> /// <param name="timestamp"></param>
public StreamMessage(SocketConnection connection, MemoryStream stream, DateTime timestamp) public StreamMessage(SocketConnection connection, Stream stream, DateTime timestamp)
{ {
Connection = connection; Connection = connection;
Stream = stream; Stream = stream;

View File

@ -11,7 +11,7 @@ namespace CryptoExchange.Net.Objects.Sockets
public class UpdateSubscription public class UpdateSubscription
{ {
private readonly SocketConnection _connection; private readonly SocketConnection _connection;
private readonly SocketSubscriptionListener _subscription; private readonly MessageListener _listener;
/// <summary> /// <summary>
/// Event when the connection is lost. The socket will automatically reconnect when possible. /// Event when the connection is lost. The socket will automatically reconnect when possible.
@ -65,8 +65,8 @@ namespace CryptoExchange.Net.Objects.Sockets
/// </summary> /// </summary>
public event Action<Exception> Exception public event Action<Exception> Exception
{ {
add => _subscription.Exception += value; add => _listener.Exception += value;
remove => _subscription.Exception -= value; remove => _listener.Exception -= value;
} }
/// <summary> /// <summary>
@ -77,17 +77,17 @@ namespace CryptoExchange.Net.Objects.Sockets
/// <summary> /// <summary>
/// The id of the subscription /// The id of the subscription
/// </summary> /// </summary>
public int Id => _subscription.Id; public int Id => _listener.Id;
/// <summary> /// <summary>
/// ctor /// ctor
/// </summary> /// </summary>
/// <param name="connection">The socket connection the subscription is on</param> /// <param name="connection">The socket connection the subscription is on</param>
/// <param name="subscription">The subscription</param> /// <param name="subscription">The subscription</param>
public UpdateSubscription(SocketConnection connection, SocketSubscriptionListener subscription) public UpdateSubscription(SocketConnection connection, MessageListener subscription)
{ {
_connection = connection; _connection = connection;
_subscription = subscription; _listener = subscription;
} }
/// <summary> /// <summary>
@ -96,7 +96,7 @@ namespace CryptoExchange.Net.Objects.Sockets
/// <returns></returns> /// <returns></returns>
public Task CloseAsync() public Task CloseAsync()
{ {
return _connection.CloseAsync(_subscription); return _connection.CloseAsync(_listener);
} }
/// <summary> /// <summary>
@ -114,7 +114,7 @@ namespace CryptoExchange.Net.Objects.Sockets
/// <returns></returns> /// <returns></returns>
internal async Task UnsubscribeAsync() internal async Task UnsubscribeAsync()
{ {
await _connection.UnsubscribeAsync(_subscription).ConfigureAwait(false); await _connection.UnsubscribeAsync(_listener).ConfigureAwait(false);
} }
/// <summary> /// <summary>
@ -123,7 +123,7 @@ namespace CryptoExchange.Net.Objects.Sockets
/// <returns></returns> /// <returns></returns>
internal async Task<CallResult<bool>> ResubscribeAsync() internal async Task<CallResult<bool>> ResubscribeAsync()
{ {
return await _connection.ResubscribeAsync(_subscription).ConfigureAwait(false); return await _connection.ResubscribeAsync(_listener).ConfigureAwait(false);
} }
} }
} }

View File

@ -2,6 +2,7 @@
using CryptoExchange.Net.Objects; using CryptoExchange.Net.Objects;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.IO;
using System.Text; using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
@ -63,9 +64,9 @@ namespace CryptoExchange.Net.Objects.Sockets
public string? Origin { get; set; } public string? Origin { get; set; }
/// <summary> /// <summary>
/// Delegate used for processing byte data received from socket connections before it is processed by handlers /// Delegate used for manipulating data received from socket connections before it is processed by listeners
/// </summary> /// </summary>
public Func<byte[], string>? DataInterpreterBytes { get; set; } public Func<Stream, Stream>? Interceptor { get; set; }
/// <summary> /// <summary>
/// Delegate used for processing string data received from socket connections before it is processed by handlers /// Delegate used for processing string data received from socket connections before it is processed by handlers

View File

@ -101,7 +101,7 @@ namespace CryptoExchange.Net.Sockets
public event Action? OnClose; public event Action? OnClose;
/// <inheritdoc /> /// <inheritdoc />
public event Func<MemoryStream, Task>? OnStreamMessage; public event Func<Stream, Task>? OnStreamMessage;
/// <inheritdoc /> /// <inheritdoc />
public event Action<int>? OnRequestSent; public event Action<int>? OnRequestSent;
@ -521,7 +521,7 @@ namespace CryptoExchange.Net.Sockets
{ {
// Received a complete message and it's not multi part // Received a complete message and it's not multi part
_logger.Log(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in single message"); _logger.Log(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in single message");
await ProcessByteData(new MemoryStream(buffer.Array, buffer.Offset, receiveResult.Count), receiveResult.MessageType).ConfigureAwait(false); await ProcessData(new MemoryStream(buffer.Array, buffer.Offset, receiveResult.Count), receiveResult.MessageType).ConfigureAwait(false);
} }
else else
{ {
@ -555,7 +555,7 @@ namespace CryptoExchange.Net.Sockets
{ {
// Reassemble complete message from memory stream // Reassemble complete message from memory stream
_logger.Log(LogLevel.Trace, $"Socket {Id} reassembled message of {memoryStream!.Length} bytes"); _logger.Log(LogLevel.Trace, $"Socket {Id} reassembled message of {memoryStream!.Length} bytes");
await ProcessByteData(memoryStream, receiveResult.MessageType).ConfigureAwait(false); await ProcessData(memoryStream, receiveResult.MessageType).ConfigureAwait(false);
memoryStream.Dispose(); memoryStream.Dispose();
} }
else else
@ -580,10 +580,12 @@ namespace CryptoExchange.Net.Sockets
} }
} }
private async Task ProcessByteData(MemoryStream memoryStream, WebSocketMessageType messageType) private async Task ProcessData(Stream stream, WebSocketMessageType messageType)
{ {
if (Parameters.Interceptor != null)
stream = Parameters.Interceptor.Invoke(stream);
if (OnStreamMessage != null) if (OnStreamMessage != null)
await OnStreamMessage.Invoke(memoryStream).ConfigureAwait(false); await OnStreamMessage.Invoke(stream).ConfigureAwait(false);
} }
/// <summary> /// <summary>

View File

@ -6,18 +6,23 @@ namespace CryptoExchange.Net.Sockets
/// <summary> /// <summary>
/// Query /// Query
/// </summary> /// </summary>
public abstract class QueryActor public abstract class Query
{ {
/// <summary> /// <summary>
/// The query request /// The query request
/// </summary> /// </summary>
public object Query { get; set; } public object Request { get; set; }
/// <summary> /// <summary>
/// If this is a private request /// If this is a private request
/// </summary> /// </summary>
public bool Authenticated { get; } public bool Authenticated { get; }
/// <summary>
/// Weight of the query
/// </summary>
public int Weight { get; }
/// <summary> /// <summary>
/// Check if the message is the response to the query /// Check if the message is the response to the query
/// </summary> /// </summary>
@ -34,12 +39,14 @@ namespace CryptoExchange.Net.Sockets
/// <summary> /// <summary>
/// ctor /// ctor
/// </summary> /// </summary>
/// <param name="query"></param> /// <param name="request"></param>
/// <param name="authenticated"></param> /// <param name="authenticated"></param>
public QueryActor(object query, bool authenticated) /// <param name="weight"></param>
public Query(object request, bool authenticated, int weight = 1)
{ {
Authenticated = authenticated; Authenticated = authenticated;
Query = query; Request = request;
Weight = weight;
} }
} }
} }

View File

@ -5,7 +5,6 @@ using System.Diagnostics;
using System.Linq; using System.Linq;
using System.Threading.Tasks; using System.Threading.Tasks;
using Newtonsoft.Json; using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using CryptoExchange.Net.Objects; using CryptoExchange.Net.Objects;
using System.Net.WebSockets; using System.Net.WebSockets;
@ -50,23 +49,23 @@ namespace CryptoExchange.Net.Sockets
public event Action<StreamMessage>? UnhandledMessage; public event Action<StreamMessage>? UnhandledMessage;
/// <summary> /// <summary>
/// The amount of subscriptions on this connection /// The amount of listeners on this connection
/// </summary> /// </summary>
public int SubscriptionCount public int UserListenerCount
{ {
get { lock (_subscriptionLock) get { lock (_listenerLock)
return _subscriptions.Count(h => h.UserSubscription); } return _listeners.Count(h => h.UserListener); }
} }
/// <summary> /// <summary>
/// Get a copy of the current subscriptions /// Get a copy of the current message listeners
/// </summary> /// </summary>
public SocketSubscriptionListener[] Subscriptions public MessageListener[] MessageListeners
{ {
get get
{ {
lock (_subscriptionLock) lock (_listenerLock)
return _subscriptions.Where(h => h.UserSubscription).ToArray(); return _listeners.Where(h => h.UserListener).ToArray();
} }
} }
@ -151,10 +150,10 @@ namespace CryptoExchange.Net.Sockets
} }
private bool _pausedActivity; private bool _pausedActivity;
private readonly List<SocketSubscriptionListener> _subscriptions; private readonly List<MessageListener> _listeners;
private readonly List<IStreamMessageListener> _messageListeners; private readonly List<IStreamMessageListener> _messageListeners; // ?
private readonly object _subscriptionLock = new(); private readonly object _listenerLock = new();
private readonly ILogger _logger; private readonly ILogger _logger;
private SocketStatus _status; private SocketStatus _status;
@ -179,7 +178,7 @@ namespace CryptoExchange.Net.Sockets
Properties = new Dictionary<string, object>(); Properties = new Dictionary<string, object>();
_messageListeners = new List<IStreamMessageListener>(); _messageListeners = new List<IStreamMessageListener>();
_subscriptions = new List<SocketSubscriptionListener>(); _listeners = new List<MessageListener>();
_socket = socket; _socket = socket;
_socket.OnStreamMessage += HandleStreamMessage; _socket.OnStreamMessage += HandleStreamMessage;
@ -208,10 +207,10 @@ namespace CryptoExchange.Net.Sockets
{ {
Status = SocketStatus.Closed; Status = SocketStatus.Closed;
Authenticated = false; Authenticated = false;
lock(_subscriptionLock) lock(_listenerLock)
{ {
foreach (var sub in _subscriptions) foreach (var listener in _listeners)
sub.Confirmed = false; listener.Confirmed = false;
} }
Task.Run(() => ConnectionClosed?.Invoke()); Task.Run(() => ConnectionClosed?.Invoke());
} }
@ -224,10 +223,10 @@ namespace CryptoExchange.Net.Sockets
Status = SocketStatus.Reconnecting; Status = SocketStatus.Reconnecting;
DisconnectTime = DateTime.UtcNow; DisconnectTime = DateTime.UtcNow;
Authenticated = false; Authenticated = false;
lock (_subscriptionLock) lock (_listenerLock)
{ {
foreach (var sub in _subscriptions) foreach (var listener in _listeners)
sub.Confirmed = false; listener.Confirmed = false;
} }
_ = Task.Run(() => ConnectionLost?.Invoke()); _ = Task.Run(() => ConnectionLost?.Invoke());
@ -310,14 +309,19 @@ namespace CryptoExchange.Net.Sockets
/// </summary> /// </summary>
/// <param name="stream"></param> /// <param name="stream"></param>
/// <returns></returns> /// <returns></returns>
protected virtual async Task HandleStreamMessage(MemoryStream stream) protected virtual async Task HandleStreamMessage(Stream stream)
{ {
var timestamp = DateTime.UtcNow; var timestamp = DateTime.UtcNow;
var streamMessage = new StreamMessage(this, stream, timestamp); var streamMessage = new StreamMessage(this, stream, timestamp);
var handledResponse = false; var handledResponse = false;
SocketSubscriptionListener? currentSubscription = null; MessageListener? currentSubscription = null;
TimeSpan userCodeDuration = TimeSpan.Zero; TimeSpan userCodeDuration = TimeSpan.Zero;
foreach (var listener in _messageListeners.OrderByDescending(x => x.Priority).ToList()) // LOCK
List<IStreamMessageListener> listeners;
lock (_listenerLock)
listeners = _messageListeners.OrderByDescending(x => x.Priority).ToList();
foreach (var listener in listeners)
{ {
if (listener.MessageMatches(streamMessage)) if (listener.MessageMatches(streamMessage))
{ {
@ -329,10 +333,10 @@ namespace CryptoExchange.Net.Sockets
if (pendingRequest.Completed) if (pendingRequest.Completed)
{ {
// Answer to a timed out request, unsub if it is a subscription request // Answer to a timed out request, unsub if it is a subscription request
if (pendingRequest.Subscription != null) if (pendingRequest.MessageListener != null)
{ {
_logger.Log(LogLevel.Warning, $"Socket {SocketId} Received subscription info after request timed out; unsubscribing. Consider increasing the RequestTimeout"); _logger.Log(LogLevel.Warning, $"Socket {SocketId} Received subscription info after request timed out; unsubscribing. Consider increasing the RequestTimeout");
_ = UnsubscribeAsync(pendingRequest.Subscription).ConfigureAwait(false); _ = UnsubscribeAsync(pendingRequest.MessageListener).ConfigureAwait(false);
} }
} }
else else
@ -347,7 +351,7 @@ namespace CryptoExchange.Net.Sockets
handledResponse = true; handledResponse = true;
break; break;
} }
else if (listener is SocketSubscriptionListener subscription) else if (listener is MessageListener subscription)
{ {
currentSubscription = subscription; currentSubscription = subscription;
handledResponse = true; handledResponse = true;
@ -398,12 +402,12 @@ namespace CryptoExchange.Net.Sockets
if (ApiClient.socketConnections.ContainsKey(SocketId)) if (ApiClient.socketConnections.ContainsKey(SocketId))
ApiClient.socketConnections.TryRemove(SocketId, out _); ApiClient.socketConnections.TryRemove(SocketId, out _);
lock (_subscriptionLock) lock (_listenerLock)
{ {
foreach (var subscription in _subscriptions) foreach (var listener in _listeners)
{ {
if (subscription.CancellationTokenRegistration.HasValue) if (listener.CancellationTokenRegistration.HasValue)
subscription.CancellationTokenRegistration.Value.Dispose(); listener.CancellationTokenRegistration.Value.Dispose();
} }
} }
@ -412,32 +416,32 @@ namespace CryptoExchange.Net.Sockets
} }
/// <summary> /// <summary>
/// Close a subscription on this connection. If all subscriptions on this connection are closed the connection gets closed as well /// Close a listener on this connection. If all listener on this connection are closed the connection gets closed as well
/// </summary> /// </summary>
/// <param name="subscription">Subscription to close</param> /// <param name="listener">Listener to close</param>
/// <returns></returns> /// <returns></returns>
public async Task CloseAsync(SocketSubscriptionListener subscription) public async Task CloseAsync(MessageListener listener)
{ {
lock (_subscriptionLock) lock (_listenerLock)
{ {
if (!_subscriptions.Contains(subscription)) if (!_listeners.Contains(listener))
return; return;
subscription.Closed = true; listener.Closed = true;
} }
if (Status == SocketStatus.Closing || Status == SocketStatus.Closed || Status == SocketStatus.Disposed) if (Status == SocketStatus.Closing || Status == SocketStatus.Closed || Status == SocketStatus.Disposed)
return; return;
_logger.Log(LogLevel.Debug, $"Socket {SocketId} closing subscription {subscription.Id}"); _logger.Log(LogLevel.Debug, $"Socket {SocketId} closing listener {listener.Id}");
if (subscription.CancellationTokenRegistration.HasValue) if (listener.CancellationTokenRegistration.HasValue)
subscription.CancellationTokenRegistration.Value.Dispose(); listener.CancellationTokenRegistration.Value.Dispose();
if (subscription.Confirmed && _socket.IsOpen) if (listener.Confirmed && _socket.IsOpen)
await UnsubscribeAsync(subscription).ConfigureAwait(false); await UnsubscribeAsync(listener).ConfigureAwait(false);
bool shouldCloseConnection; bool shouldCloseConnection;
lock (_subscriptionLock) lock (_listenerLock)
{ {
if (Status == SocketStatus.Closing) if (Status == SocketStatus.Closing)
{ {
@ -445,21 +449,21 @@ namespace CryptoExchange.Net.Sockets
return; return;
} }
shouldCloseConnection = _subscriptions.All(r => !r.UserSubscription || r.Closed); shouldCloseConnection = _listeners.All(r => !r.UserListener || r.Closed);
if (shouldCloseConnection) if (shouldCloseConnection)
Status = SocketStatus.Closing; Status = SocketStatus.Closing;
} }
if (shouldCloseConnection) if (shouldCloseConnection)
{ {
_logger.Log(LogLevel.Debug, $"Socket {SocketId} closing as there are no more subscriptions"); _logger.Log(LogLevel.Debug, $"Socket {SocketId} closing as there are no more listeners");
await CloseAsync().ConfigureAwait(false); await CloseAsync().ConfigureAwait(false);
} }
lock (_subscriptionLock) lock (_listenerLock)
{ {
_messageListeners.Remove(subscription); _messageListeners.Remove(listener);
_subscriptions.Remove(subscription); _listeners.Remove(listener);
} }
} }
@ -473,44 +477,44 @@ namespace CryptoExchange.Net.Sockets
} }
/// <summary> /// <summary>
/// Add a subscription to this connection /// Add a listener to this connection
/// </summary> /// </summary>
/// <param name="subscription"></param> /// <param name="listener"></param>
public bool AddSubscription(SocketSubscriptionListener subscription) public bool AddListener(MessageListener listener)
{ {
lock (_subscriptionLock) lock (_listenerLock)
{ {
if (Status != SocketStatus.None && Status != SocketStatus.Connected) if (Status != SocketStatus.None && Status != SocketStatus.Connected)
return false; return false;
_subscriptions.Add(subscription); _listeners.Add(listener);
_messageListeners.Add(subscription); _messageListeners.Add(listener);
if (subscription.UserSubscription) if (listener.UserListener)
_logger.Log(LogLevel.Debug, $"Socket {SocketId} adding new subscription with id {subscription.Id}, total subscriptions on connection: {_subscriptions.Count(s => s.UserSubscription)}"); _logger.Log(LogLevel.Debug, $"Socket {SocketId} adding new listener with id {listener.Id}, total listeners on connection: {_listeners.Count(s => s.UserListener)}");
return true; return true;
} }
} }
/// <summary> /// <summary>
/// Get a subscription on this connection by id /// Get a listener on this connection by id
/// </summary> /// </summary>
/// <param name="id"></param> /// <param name="id"></param>
public SocketSubscriptionListener? GetSubscription(int id) public MessageListener? GetListener(int id)
{ {
lock (_subscriptionLock) lock (_listenerLock)
return _subscriptions.SingleOrDefault(s => s.Id == id); return _listeners.SingleOrDefault(s => s.Id == id);
} }
/// <summary> /// <summary>
/// Get a subscription on this connection by its subscribe request /// Get a listener on this connection by its subscribe request
/// </summary> /// </summary>
/// <param name="predicate">Filter for a request</param> /// <param name="predicate">Filter for a request</param>
/// <returns></returns> /// <returns></returns>
public SocketSubscriptionListener? GetSubscriptionByRequest(Func<object?, bool> predicate) public MessageListener? GetListenerByRequest(Func<object?, bool> predicate)
{ {
lock(_subscriptionLock) lock(_listenerLock)
return _subscriptions.SingleOrDefault(s => predicate(s.Subscription)); return _listeners.SingleOrDefault(s => predicate(s.Subscription));
} }
/// <summary> /// <summary>
@ -519,13 +523,13 @@ namespace CryptoExchange.Net.Sockets
/// <typeparam name="T">The data type expected in response</typeparam> /// <typeparam name="T">The data type expected in response</typeparam>
/// <param name="obj">The object to send</param> /// <param name="obj">The object to send</param>
/// <param name="timeout">The timeout for response</param> /// <param name="timeout">The timeout for response</param>
/// <param name="subscription">Subscription if this is a subscribe request</param> /// <param name="listener">Listener if this is a subscribe request</param>
/// <param name="handler">The response handler</param> /// <param name="handler">The response handler</param>
/// <param name="weight">The weight of the message</param> /// <param name="weight">The weight of the message</param>
/// <returns></returns> /// <returns></returns>
public virtual async Task SendAndWaitAsync<T>(T obj, TimeSpan timeout, SocketSubscriptionListener? subscription, int weight, Func<StreamMessage, bool> handler) public virtual async Task SendAndWaitAsync<T>(T obj, TimeSpan timeout, MessageListener? listener, int weight, Func<StreamMessage, bool> handler)
{ {
var pending = new PendingRequest(ExchangeHelpers.NextId(), handler, timeout, subscription); var pending = new PendingRequest(ExchangeHelpers.NextId(), handler, timeout, listener);
lock (_messageListeners) lock (_messageListeners)
{ {
_messageListeners.Add(pending); _messageListeners.Add(pending);
@ -598,8 +602,8 @@ namespace CryptoExchange.Net.Sockets
return new CallResult<bool>(new WebError("Socket not connected")); return new CallResult<bool>(new WebError("Socket not connected"));
bool anySubscriptions = false; bool anySubscriptions = false;
lock (_subscriptionLock) lock (_listenerLock)
anySubscriptions = _subscriptions.Any(s => s.UserSubscription); anySubscriptions = _listeners.Any(s => s.UserListener);
if (!anySubscriptions) if (!anySubscriptions)
{ {
@ -610,8 +614,8 @@ namespace CryptoExchange.Net.Sockets
} }
bool anyAuthenticated = false; bool anyAuthenticated = false;
lock (_subscriptionLock) lock (_listenerLock)
anyAuthenticated = _subscriptions.Any(s => s.Authenticated); anyAuthenticated = _listeners.Any(s => s.Authenticated);
if (anyAuthenticated) if (anyAuthenticated)
{ {
@ -628,21 +632,21 @@ namespace CryptoExchange.Net.Sockets
} }
// Get a list of all subscriptions on the socket // Get a list of all subscriptions on the socket
List<SocketSubscriptionListener> subscriptionList = new List<SocketSubscriptionListener>(); List<MessageListener> listenerList = new List<MessageListener>();
lock (_subscriptionLock) lock (_listenerLock)
{ {
foreach (var subscription in _subscriptions) foreach (var listener in _listeners)
{ {
if (subscription.Subscription != null) if (listener.Subscription != null)
subscriptionList.Add(subscription); listenerList.Add(listener);
else else
subscription.Confirmed = true; listener.Confirmed = true;
} }
} }
foreach(var subscription in subscriptionList.Where(s => s.Subscription != null)) foreach(var listener in listenerList.Where(s => s.Subscription != null))
{ {
var result = await ApiClient.RevitalizeRequestAsync(subscription.Subscription!).ConfigureAwait(false); var result = await ApiClient.RevitalizeRequestAsync(listener.Subscription!).ConfigureAwait(false);
if (!result) if (!result)
{ {
_logger.Log(LogLevel.Warning, $"Socket {SocketId} Failed request revitalization: " + result.Error); _logger.Log(LogLevel.Warning, $"Socket {SocketId} Failed request revitalization: " + result.Error);
@ -651,22 +655,22 @@ namespace CryptoExchange.Net.Sockets
} }
// Foreach subscription which is subscribed by a subscription request we will need to resend that request to resubscribe // Foreach subscription which is subscribed by a subscription request we will need to resend that request to resubscribe
for (var i = 0; i < subscriptionList.Count; i += ApiClient.ClientOptions.MaxConcurrentResubscriptionsPerSocket) for (var i = 0; i < listenerList.Count; i += ApiClient.ClientOptions.MaxConcurrentResubscriptionsPerSocket)
{ {
if (!_socket.IsOpen) if (!_socket.IsOpen)
return new CallResult<bool>(new WebError("Socket not connected")); return new CallResult<bool>(new WebError("Socket not connected"));
var taskList = new List<Task<CallResult<bool>>>(); var taskList = new List<Task<CallResult<bool>>>();
foreach (var subscription in subscriptionList.Skip(i).Take(ApiClient.ClientOptions.MaxConcurrentResubscriptionsPerSocket)) foreach (var listener in listenerList.Skip(i).Take(ApiClient.ClientOptions.MaxConcurrentResubscriptionsPerSocket))
taskList.Add(ApiClient.SubscribeAndWaitAsync(this, subscription.Subscription!, subscription)); taskList.Add(ApiClient.SubscribeAndWaitAsync(this, listener.Subscription!, listener));
await Task.WhenAll(taskList).ConfigureAwait(false); await Task.WhenAll(taskList).ConfigureAwait(false);
if (taskList.Any(t => !t.Result.Success)) if (taskList.Any(t => !t.Result.Success))
return taskList.First(t => !t.Result.Success).Result; return taskList.First(t => !t.Result.Success).Result;
} }
foreach (var subscription in subscriptionList) foreach (var listener in listenerList)
subscription.Confirmed = true; listener.Confirmed = true;
if (!_socket.IsOpen) if (!_socket.IsOpen)
return new CallResult<bool>(new WebError("Socket not connected")); return new CallResult<bool>(new WebError("Socket not connected"));
@ -675,26 +679,26 @@ namespace CryptoExchange.Net.Sockets
return new CallResult<bool>(true); return new CallResult<bool>(true);
} }
internal async Task UnsubscribeAsync(SocketSubscriptionListener socketSubscription) internal async Task UnsubscribeAsync(MessageListener listener)
{ {
var unsubscribeRequest = socketSubscription.Subscription?.GetUnsubscribeRequest(); var unsubscribeRequest = listener.Subscription?.GetUnsubRequest();
if (unsubscribeRequest != null) if (unsubscribeRequest != null)
{ {
await SendAndWaitAsync(unsubscribeRequest, TimeSpan.FromSeconds(10), socketSubscription, 0, x => await SendAndWaitAsync(unsubscribeRequest, TimeSpan.FromSeconds(10), listener, 0, x =>
{ {
var (matches, result) = socketSubscription.Subscription!.MessageMatchesUnsubscribeRequest(x); var (matches, result) = listener.Subscription!.MessageMatchesUnsubRequest(x);
// TODO check result? // TODO check result?
return matches; return matches;
}).ConfigureAwait(false); }).ConfigureAwait(false);
} }
} }
internal async Task<CallResult<bool>> ResubscribeAsync(SocketSubscriptionListener socketSubscription) internal async Task<CallResult<bool>> ResubscribeAsync(MessageListener listener)
{ {
if (!_socket.IsOpen) if (!_socket.IsOpen)
return new CallResult<bool>(new UnknownError("Socket is not connected")); return new CallResult<bool>(new UnknownError("Socket is not connected"));
return await ApiClient.SubscribeAndWaitAsync(this, socketSubscription.Subscription!, socketSubscription).ConfigureAwait(false); return await ApiClient.SubscribeAndWaitAsync(this, listener.Subscription!, listener).ConfigureAwait(false);
} }
/// <summary> /// <summary>

View File

@ -12,7 +12,7 @@ namespace CryptoExchange.Net.Sockets
/// <summary> /// <summary>
/// Subscription base /// Subscription base
/// </summary> /// </summary>
public abstract class SubscriptionActor public abstract class Subscription
{ {
private bool _outputOriginalData; private bool _outputOriginalData;
@ -32,7 +32,7 @@ namespace CryptoExchange.Net.Sockets
/// <param name="logger"></param> /// <param name="logger"></param>
/// <param name="apiClient"></param> /// <param name="apiClient"></param>
/// <param name="authenticated"></param> /// <param name="authenticated"></param>
public SubscriptionActor(ILogger logger, ISocketApiClient apiClient, bool authenticated) public Subscription(ILogger logger, ISocketApiClient apiClient, bool authenticated)
{ {
_logger = logger; _logger = logger;
_outputOriginalData = apiClient.ApiOptions.OutputOriginalData ?? apiClient.ClientOptions.OutputOriginalData; _outputOriginalData = apiClient.ApiOptions.OutputOriginalData ?? apiClient.ClientOptions.OutputOriginalData;
@ -43,32 +43,32 @@ namespace CryptoExchange.Net.Sockets
/// Get the subscribe object to send when subscribing /// Get the subscribe object to send when subscribing
/// </summary> /// </summary>
/// <returns></returns> /// <returns></returns>
public abstract object? GetSubscribeRequest(); public abstract object? GetSubRequest();
/// <summary> /// <summary>
/// Check if the message is the response to the subscribe request /// Check if the message is the response to the subscribe request
/// </summary> /// </summary>
/// <param name="message"></param> /// <param name="message"></param>
/// <returns></returns> /// <returns></returns>
public abstract (bool, CallResult?) MessageMatchesSubscribeRequest(StreamMessage message); public abstract (bool, CallResult?) MessageMatchesSubRequest(StreamMessage message);
/// <summary> /// <summary>
/// Get the unsubscribe object to send when unsubscribing /// Get the unsubscribe object to send when unsubscribing
/// </summary> /// </summary>
/// <returns></returns> /// <returns></returns>
public abstract object? GetUnsubscribeRequest(); public abstract object? GetUnsubRequest();
/// <summary> /// <summary>
/// Check if the message is the response to the unsubscribe request /// Check if the message is the response to the unsubscribe request
/// </summary> /// </summary>
/// <param name="message"></param> /// <param name="message"></param>
/// <returns></returns> /// <returns></returns>
public abstract (bool, CallResult?) MessageMatchesUnsubscribeRequest(StreamMessage message); public abstract (bool, CallResult?) MessageMatchesUnsubRequest(StreamMessage message);
/// <summary> /// <summary>
/// Check if the message is an update for this subscription /// Check if the message is an update for this subscription
/// </summary> /// </summary>
/// <param name="message"></param> /// <param name="message"></param>
/// <returns></returns> /// <returns></returns>
public abstract bool MessageMatchesSubscription(StreamMessage message); public abstract bool MessageMatchesEvent(StreamMessage message);
/// <summary> /// <summary>
/// Handle the update message /// Handle the update message
/// </summary> /// </summary>
@ -85,7 +85,7 @@ namespace CryptoExchange.Net.Sockets
/// <param name="topic"></param> /// <param name="topic"></param>
/// <param name="type"></param> /// <param name="type"></param>
/// <returns></returns> /// <returns></returns>
protected DataEvent<T> CreateDataEvent<T>(T obj, StreamMessage message, string? topic = null, SocketUpdateType? type = null) protected virtual DataEvent<T> CreateDataEvent<T>(T obj, StreamMessage message, string? topic = null, SocketUpdateType? type = null)
{ {
string? originalData = null; string? originalData = null;
if (_outputOriginalData) if (_outputOriginalData)
@ -101,7 +101,7 @@ namespace CryptoExchange.Net.Sockets
/// <param name="message"></param> /// <param name="message"></param>
/// <param name="settings"></param> /// <param name="settings"></param>
/// <returns></returns> /// <returns></returns>
protected Task<CallResult<T>> DeserializeAsync<T>(StreamMessage message, JsonSerializerSettings settings) protected virtual Task<CallResult<T>> DeserializeAsync<T>(StreamMessage message, JsonSerializerSettings settings)
{ {
var serializer = JsonSerializer.Create(settings); var serializer = JsonSerializer.Create(settings);
using var sr = new StreamReader(message.Stream, Encoding.UTF8, false, (int)message.Stream.Length, true); using var sr = new StreamReader(message.Stream, Encoding.UTF8, false, (int)message.Stream.Length, true);

View File

@ -9,25 +9,26 @@ namespace CryptoExchange.Net.Sockets
/// <summary> /// <summary>
/// A system subscription /// A system subscription
/// </summary> /// </summary>
public abstract class SystemSubscription : SubscriptionActor public abstract class SystemSubscription : Subscription
{ {
/// <summary> /// <summary>
/// ctor /// ctor
/// </summary> /// </summary>
/// <param name="logger"></param> /// <param name="logger"></param>
/// <param name="socketApiClient"></param>
/// <param name="authenticated"></param> /// <param name="authenticated"></param>
public SystemSubscription(ILogger logger, ISocketApiClient socketApiClient, bool authenticated = false) : base(logger, socketApiClient, authenticated) public SystemSubscription(ILogger logger, ISocketApiClient socketApiClient, bool authenticated = false) : base(logger, socketApiClient, authenticated)
{ {
} }
/// <inheritdoc /> /// <inheritdoc />
public override object? GetSubscribeRequest() => null; public override object? GetSubRequest() => null;
/// <inheritdoc /> /// <inheritdoc />
public override (bool, CallResult?) MessageMatchesSubscribeRequest(StreamMessage message) => throw new NotImplementedException(); public override (bool, CallResult?) MessageMatchesSubRequest(StreamMessage message) => throw new NotImplementedException();
/// <inheritdoc /> /// <inheritdoc />
public override object? GetUnsubscribeRequest() => null; public override object? GetUnsubRequest() => null;
/// <inheritdoc /> /// <inheritdoc />
public override (bool, CallResult?) MessageMatchesUnsubscribeRequest(StreamMessage message) => throw new NotImplementedException(); public override (bool, CallResult?) MessageMatchesUnsubRequest(StreamMessage message) => throw new NotImplementedException();
} }
} }