1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-06-10 09:26:22 +00:00
This commit is contained in:
JKorf 2023-10-22 21:42:03 +02:00
parent d91755dff5
commit cff3863373
24 changed files with 579 additions and 479 deletions

View File

@ -5,8 +5,8 @@ using System.Threading.Tasks;
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Options;
using CryptoExchange.Net.Objects.Sockets;
using CryptoExchange.Net.OrderBook;
using CryptoExchange.Net.Sockets;
using NUnit.Framework;
namespace CryptoExchange.Net.UnitTests

View File

@ -5,6 +5,7 @@ using CryptoExchange.Net.Authentication;
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Options;
using CryptoExchange.Net.Objects.Sockets;
using CryptoExchange.Net.Sockets;
using Microsoft.Extensions.Logging;
using Moq;

View File

@ -4,7 +4,7 @@ using System.Linq;
using System.Text;
using System.Threading.Tasks;
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Sockets;
using CryptoExchange.Net.Objects.Sockets;
using Microsoft.Extensions.Logging;
namespace CryptoExchange.Net

View File

@ -1,6 +1,7 @@
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Options;
using CryptoExchange.Net.Objects.Sockets;
using CryptoExchange.Net.Sockets;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json.Linq;
@ -9,6 +10,7 @@ using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
@ -53,7 +55,7 @@ namespace CryptoExchange.Net
/// <summary>
/// Handlers for data from the socket which doesn't need to be forwarded to the caller. Ping or welcome messages for example.
/// </summary>
protected Dictionary<string, Action<MessageEvent>> genericHandlers = new();
protected List<SystemSubscription> genericHandlers = new();
/// <summary>
/// The task that is sending periodic data on the websocket. Can be used for sending Ping messages every x seconds or similair. Not necesarry.
@ -144,6 +146,7 @@ namespace CryptoExchange.Net
/// <param name="stringHandler">Handler for string data</param>
protected void SetDataInterpreter(Func<byte[], string>? byteHandler, Func<string, string>? stringHandler)
{
// TODO
dataInterpreterBytes = byteHandler;
dataInterpreterString = stringHandler;
}
@ -152,15 +155,12 @@ namespace CryptoExchange.Net
/// Connect to an url and listen for data on the BaseAddress
/// </summary>
/// <typeparam name="T">The type of the expected data</typeparam>
/// <param name="request">The optional request object to send, will be serialized to json</param>
/// <param name="identifier">The identifier to use, necessary if no request object is sent</param>
/// <param name="authenticated">If the subscription is to an authenticated endpoint</param>
/// <param name="dataHandler">The handler of update data</param>
/// <param name="subscriptionObject">The subscription</param>
/// <param name="ct">Cancellation token for closing this subscription</param>
/// <returns></returns>
protected virtual Task<CallResult<UpdateSubscription>> SubscribeAsync<T>(object? request, string? identifier, bool authenticated, Action<DataEvent<T>> dataHandler, CancellationToken ct)
protected virtual Task<CallResult<UpdateSubscription>> SubscribeAsync<T>(SubscriptionActor subscriptionObject, CancellationToken ct)
{
return SubscribeAsync(BaseAddress, request, identifier, authenticated, dataHandler, ct);
return SubscribeAsync<T>(BaseAddress, subscriptionObject, ct);
}
/// <summary>
@ -168,19 +168,16 @@ namespace CryptoExchange.Net
/// </summary>
/// <typeparam name="T">The type of the expected data</typeparam>
/// <param name="url">The URL to connect to</param>
/// <param name="request">The optional request object to send, will be serialized to json</param>
/// <param name="identifier">The identifier to use, necessary if no request object is sent</param>
/// <param name="authenticated">If the subscription is to an authenticated endpoint</param>
/// <param name="dataHandler">The handler of update data</param>
/// <param name="subscriptionObject">The subscription</param>
/// <param name="ct">Cancellation token for closing this subscription</param>
/// <returns></returns>
protected virtual async Task<CallResult<UpdateSubscription>> SubscribeAsync<T>(string url, object? request, string? identifier, bool authenticated, Action<DataEvent<T>> dataHandler, CancellationToken ct)
protected virtual async Task<CallResult<UpdateSubscription>> SubscribeAsync<T>(string url, SubscriptionActor subscriptionObject, CancellationToken ct)
{
if (_disposing)
return new CallResult<UpdateSubscription>(new InvalidOperationError("Client disposed, can't subscribe"));
SocketConnection socketConnection;
SocketSubscription? subscription;
SocketSubscriptionListener? subscription;
var released = false;
// Wait for a semaphore here, so we only connect 1 socket at a time.
// This is necessary for being able to see if connections can be combined
@ -198,14 +195,14 @@ namespace CryptoExchange.Net
while (true)
{
// Get a new or existing socket connection
var socketResult = await GetSocketConnection(url, authenticated).ConfigureAwait(false);
var socketResult = await GetSocketConnection(url, subscriptionObject.Authenticated).ConfigureAwait(false);
if (!socketResult)
return socketResult.As<UpdateSubscription>(null);
socketConnection = socketResult.Data;
// Add a subscription on the socket connection
subscription = AddSubscription(request, identifier, true, socketConnection, dataHandler, authenticated);
subscription = AddSubscription<T>(subscriptionObject, true, socketConnection);
if (subscription == null)
{
_logger.Log(LogLevel.Trace, $"Socket {socketConnection.SocketId} failed to add subscription, retrying on different connection");
@ -221,7 +218,7 @@ namespace CryptoExchange.Net
var needsConnecting = !socketConnection.Connected;
var connectResult = await ConnectIfNeededAsync(socketConnection, authenticated).ConfigureAwait(false);
var connectResult = await ConnectIfNeededAsync(socketConnection, subscriptionObject.Authenticated).ConfigureAwait(false);
if (!connectResult)
return new CallResult<UpdateSubscription>(connectResult.Error!);
@ -240,6 +237,7 @@ namespace CryptoExchange.Net
return new CallResult<UpdateSubscription>(new ServerError("Socket is paused"));
}
var request = subscriptionObject.GetSubscribeRequest();
if (request != null)
{
// Send the request and wait for answer
@ -277,10 +275,16 @@ namespace CryptoExchange.Net
/// <param name="request">The request to send, will be serialized to json</param>
/// <param name="subscription">The subscription the request is for</param>
/// <returns></returns>
protected internal virtual async Task<CallResult<bool>> SubscribeAndWaitAsync(SocketConnection socketConnection, object request, SocketSubscription subscription)
protected internal virtual async Task<CallResult<bool>> SubscribeAndWaitAsync(SocketConnection socketConnection, object request, SocketSubscriptionListener subscription)
{
CallResult<object>? callResult = null;
await socketConnection.SendAndWaitAsync(request, ClientOptions.RequestTimeout, subscription, 1, data => HandleSubscriptionResponse(socketConnection, subscription, request, data, out callResult)).ConfigureAwait(false);
CallResult? callResult = null;
await socketConnection.SendAndWaitAsync(request, ClientOptions.RequestTimeout, subscription, 1, x =>
{
var (matches, result) = subscription.Subscription!.MessageMatchesSubscribeRequest(x);
if (matches)
callResult = result;
return matches;
}).ConfigureAwait(false);
if (callResult?.Success == true)
{
@ -298,13 +302,12 @@ namespace CryptoExchange.Net
/// Send a query on a socket connection to the BaseAddress and wait for the response
/// </summary>
/// <typeparam name="T">Expected result type</typeparam>
/// <param name="request">The request to send, will be serialized to json</param>
/// <param name="authenticated">If the query is to an authenticated endpoint</param>
/// <param name="query">The query</param>
/// <param name="weight">Weight of the request</param>
/// <returns></returns>
protected virtual Task<CallResult<T>> QueryAsync<T>(object request, bool authenticated, int weight = 1)
protected virtual Task<CallResult<T>> QueryAsync<T>(QueryActor query, int weight = 1)
{
return QueryAsync<T>(BaseAddress, request, authenticated, weight);
return QueryAsync<T>(BaseAddress, query, weight);
}
/// <summary>
@ -313,10 +316,9 @@ namespace CryptoExchange.Net
/// <typeparam name="T">The expected result type</typeparam>
/// <param name="url">The url for the request</param>
/// <param name="request">The request to send</param>
/// <param name="authenticated">Whether the socket should be authenticated</param>
/// <param name="weight">Weight of the request</param>
/// <returns></returns>
protected virtual async Task<CallResult<T>> QueryAsync<T>(string url, object request, bool authenticated, int weight = 1)
protected virtual async Task<CallResult<T>> QueryAsync<T>(string url, QueryActor request, int weight = 1)
{
if (_disposing)
return new CallResult<T>(new InvalidOperationError("Client disposed, can't query"));
@ -326,7 +328,7 @@ namespace CryptoExchange.Net
await semaphoreSlim.WaitAsync().ConfigureAwait(false);
try
{
var socketResult = await GetSocketConnection(url, authenticated).ConfigureAwait(false);
var socketResult = await GetSocketConnection(url, request.Authenticated).ConfigureAwait(false);
if (!socketResult)
return socketResult.As<T>(default);
@ -339,7 +341,7 @@ namespace CryptoExchange.Net
released = true;
}
var connectResult = await ConnectIfNeededAsync(socketConnection, authenticated).ConfigureAwait(false);
var connectResult = await ConnectIfNeededAsync(socketConnection, request.Authenticated).ConfigureAwait(false);
if (!connectResult)
return new CallResult<T>(connectResult.Error!);
}
@ -366,16 +368,19 @@ namespace CryptoExchange.Net
/// <param name="request">The request to send</param>
/// <param name="weight">The weight of the query</param>
/// <returns></returns>
protected virtual async Task<CallResult<T>> QueryAndWaitAsync<T>(SocketConnection socket, object request, int weight)
protected virtual async Task<CallResult<T>> QueryAndWaitAsync<T>(SocketConnection socket, QueryActor request, int weight)
{
var dataResult = new CallResult<T>(new ServerError("No response on query received"));
await socket.SendAndWaitAsync(request, ClientOptions.RequestTimeout, null, weight, data =>
await socket.SendAndWaitAsync(request.Query, ClientOptions.RequestTimeout, null, weight, x =>
{
if (!HandleQueryResponse<T>(socket, request, data, out var callResult))
return false;
var matches = request.MessageMatchesQuery(x);
if (matches)
{
request.HandleResponse(x);
return true;
}
dataResult = callResult;
return true;
return false;
}).ConfigureAwait(false);
return dataResult;
@ -402,16 +407,39 @@ namespace CryptoExchange.Net
if (!authenticated || socket.Authenticated)
return new CallResult<bool>(true);
return await AuthenticateSocketAsync(socket).ConfigureAwait(false);
}
/// <summary>
/// Authenticate a socket connection
/// </summary>
/// <param name="socket">Socket to authenticate</param>
/// <returns></returns>
public virtual async Task<CallResult<bool>> AuthenticateSocketAsync(SocketConnection socket)
{
_logger.Log(LogLevel.Debug, $"Socket {socket.SocketId} Attempting to authenticate");
var result = await AuthenticateSocketAsync(socket).ConfigureAwait(false);
if (!result)
var authRequest = GetAuthenticationRequest();
var authResult = new CallResult(new ServerError("No response from server"));
await socket.SendAndWaitAsync(authRequest.Query, ClientOptions.RequestTimeout, null, 1, x =>
{
var matches = authRequest.MessageMatchesQuery(x);
if (matches)
{
authResult = authRequest.HandleResponse(x);
return true;
}
return false;
}).ConfigureAwait(false);
if (!authResult)
{
_logger.Log(LogLevel.Warning, $"Socket {socket.SocketId} authentication failed");
if (socket.Connected)
await socket.CloseAsync().ConfigureAwait(false);
result.Error!.Message = "Authentication failed: " + result.Error.Message;
return new CallResult<bool>(result.Error);
authResult.Error!.Message = "Authentication failed: " + authResult.Error.Message;
return new CallResult<bool>(authResult.Error);
}
_logger.Log(LogLevel.Debug, $"Socket {socket.SocketId} authenticated");
@ -420,129 +448,46 @@ namespace CryptoExchange.Net
}
/// <summary>
/// The socketConnection received data (the data JToken parameter). The implementation of this method should check if the received data is a response to the query that was send (the request parameter).
/// For example; A query is sent in a request message with an Id parameter with value 10. The socket receives data and calls this method to see if the data it received is an
/// anwser to any query that was done. The implementation of this method should check if the response.Id == request.Id to see if they match (assuming the api has some sort of Id tracking on messages,
/// if not some other method has be implemented to match the messages).
/// If the messages match, the callResult out parameter should be set with the deserialized data in the from of (T) and return true.
/// Should return the request which can be used to authenticate a socket connection
/// </summary>
/// <typeparam name="T">The type of response that is expected on the query</typeparam>
/// <param name="socketConnection">The socket connection</param>
/// <param name="request">The request that a response is awaited for</param>
/// <param name="data">The message received from the server</param>
/// <param name="callResult">The interpretation (null if message wasn't a response to the request)</param>
/// <returns>True if the message was a response to the query</returns>
protected internal abstract bool HandleQueryResponse<T>(SocketConnection socketConnection, object request, JToken data, [NotNullWhen(true)] out CallResult<T>? callResult);
/// <summary>
/// The socketConnection received data (the data JToken parameter). The implementation of this method should check if the received data is a response to the subscription request that was send (the request parameter).
/// For example; A subscribe request message is send with an Id parameter with value 10. The socket receives data and calls this method to see if the data it received is an
/// anwser to any subscription request that was done. The implementation of this method should check if the response.Id == request.Id to see if they match (assuming the api has some sort of Id tracking on messages,
/// if not some other method has be implemented to match the messages).
/// If the messages match, the callResult out parameter should be set with the deserialized data in the from of (T) and return true.
/// </summary>
/// <param name="socketConnection">The socket connection</param>
/// <param name="subscription">A subscription that waiting for a subscription response</param>
/// <param name="request">The request that the subscription sent</param>
/// <param name="data">The message received from the server</param>
/// <param name="callResult">The interpretation (null if message wasn't a response to the request)</param>
/// <returns>True if the message was a response to the subscription request</returns>
protected internal abstract bool HandleSubscriptionResponse(SocketConnection socketConnection, SocketSubscription subscription, object request, JToken data, out CallResult<object>? callResult);
/// <summary>
/// Needs to check if a received message matches a handler by request. After subscribing data message will come in. These data messages need to be matched to a specific connection
/// to pass the correct data to the correct handler. The implementation of this method should check if the message received matches the subscribe request that was sent.
/// </summary>
/// <param name="socketConnection">The socket connection the message was recieved on</param>
/// <param name="message">The received data</param>
/// <param name="request">The subscription request</param>
/// <returns>True if the message is for the subscription which sent the request</returns>
protected internal abstract bool MessageMatchesHandler(SocketConnection socketConnection, JToken message, object request);
/// <summary>
/// Needs to check if a received message matches a handler by identifier. Generally used by GenericHandlers. For example; a generic handler is registered which handles ping messages
/// from the server. This method should check if the message received is a ping message and the identifer is the identifier of the GenericHandler
/// </summary>
/// <param name="socketConnection">The socket connection the message was recieved on</param>
/// <param name="message">The received data</param>
/// <param name="identifier">The string identifier of the handler</param>
/// <returns>True if the message is for the handler which has the identifier</returns>
protected internal abstract bool MessageMatchesHandler(SocketConnection socketConnection, JToken message, string identifier);
/// <summary>
/// Needs to authenticate the socket so authenticated queries/subscriptions can be made on this socket connection
/// </summary>
/// <param name="socketConnection">The socket connection that should be authenticated</param>
/// <returns></returns>
protected internal abstract Task<CallResult<bool>> AuthenticateSocketAsync(SocketConnection socketConnection);
/// <summary>
/// Needs to unsubscribe a subscription, typically by sending an unsubscribe request. If multiple subscriptions per socket is not allowed this can just return since the socket will be closed anyway
/// </summary>
/// <param name="connection">The connection on which to unsubscribe</param>
/// <param name="subscriptionToUnsub">The subscription to unsubscribe</param>
/// <returns></returns>
protected internal abstract Task<bool> UnsubscribeAsync(SocketConnection connection, SocketSubscription subscriptionToUnsub);
protected internal abstract QueryActor GetAuthenticationRequest();
/// <summary>
/// Optional handler to interpolate data before sending it to the handlers
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
protected internal virtual JToken ProcessTokenData(JToken message)
{
return message;
}
//protected internal virtual JToken ProcessTokenData(JToken message)
//{
// return message;
//}
/// <summary>
/// Add a subscription to a connection
/// </summary>
/// <typeparam name="T">The type of data the subscription expects</typeparam>
/// <param name="request">The request of the subscription</param>
/// <param name="identifier">The identifier of the subscription (can be null if request param is used)</param>
/// <param name="subscriptionObject">The subscription</param>
/// <param name="userSubscription">Whether or not this is a user subscription (counts towards the max amount of handlers on a socket)</param>
/// <param name="connection">The socket connection the handler is on</param>
/// <param name="dataHandler">The handler of the data received</param>
/// <param name="authenticated">Whether the subscription needs authentication</param>
/// <returns></returns>
protected virtual SocketSubscription? AddSubscription<T>(object? request, string? identifier, bool userSubscription, SocketConnection connection, Action<DataEvent<T>> dataHandler, bool authenticated)
protected virtual SocketSubscriptionListener? AddSubscription<T>(SubscriptionActor subscriptionObject, bool userSubscription, SocketConnection connection)
{
void InternalHandler(MessageEvent messageEvent)
{
if (typeof(T) == typeof(string))
{
var stringData = (T)Convert.ChangeType(messageEvent.JsonData.ToString(), typeof(T));
dataHandler(new DataEvent<T>(stringData, null, OutputOriginalData ? messageEvent.OriginalData : null, messageEvent.ReceivedTimestamp, null));
return;
}
var desResult = Deserialize<T>(messageEvent.JsonData);
if (!desResult)
{
_logger.Log(LogLevel.Warning, $"Socket {connection.SocketId} Failed to deserialize data into type {typeof(T)}: {desResult.Error}");
return;
}
dataHandler(new DataEvent<T>(desResult.Data, null, OutputOriginalData ? messageEvent.OriginalData : null, messageEvent.ReceivedTimestamp, null));
}
var subscription = request == null
? SocketSubscription.CreateForIdentifier(ExchangeHelpers.NextId(), identifier!, userSubscription, authenticated, InternalHandler)
: SocketSubscription.CreateForRequest(ExchangeHelpers.NextId(), request, userSubscription, authenticated, InternalHandler);
var subscription = new SocketSubscriptionListener(ExchangeHelpers.NextId(), subscriptionObject, userSubscription);
if (!connection.AddSubscription(subscription))
return null;
return subscription;
}
/// <summary>
/// Adds a generic message handler. Used for example to reply to ping requests
/// Adds a system subscription. Used for example to reply to ping requests
/// </summary>
/// <param name="identifier">The name of the request handler. Needs to be unique</param>
/// <param name="action">The action to execute when receiving a message for this handler (checked by <see cref="MessageMatchesHandler(SocketConnection, Newtonsoft.Json.Linq.JToken,string)"/>)</param>
protected void AddGenericHandler(string identifier, Action<MessageEvent> action)
/// <param name="systemSubscription">The subscription</param>
protected void AddSystemSubscription(SystemSubscription systemSubscription)
{
genericHandlers.Add(identifier, action);
var subscription = SocketSubscription.CreateForIdentifier(ExchangeHelpers.NextId(), identifier, false, false, action);
genericHandlers.Add(systemSubscription);
var subscription = new SocketSubscriptionListener(ExchangeHelpers.NextId(), systemSubscription, false);
foreach (var connection in socketConnections.Values)
connection.AddSubscription(subscription);
}
@ -614,9 +559,10 @@ namespace CryptoExchange.Net
var socket = CreateSocket(connectionAddress.Data!);
var socketConnection = new SocketConnection(_logger, this, socket, address);
socketConnection.UnhandledMessage += HandleUnhandledMessage;
foreach (var kvp in genericHandlers)
foreach (var systemHandler in genericHandlers)
{
var handler = SocketSubscription.CreateForIdentifier(ExchangeHelpers.NextId(), kvp.Key, false, false, kvp.Value);
var handler = new SocketSubscriptionListener(ExchangeHelpers.NextId(), systemHandler, false);
socketConnection.AddSubscription(handler);
}
@ -626,8 +572,8 @@ namespace CryptoExchange.Net
/// <summary>
/// Process an unhandled message
/// </summary>
/// <param name="token">The token that wasn't processed</param>
protected virtual void HandleUnhandledMessage(JToken token)
/// <param name="message">The message that wasn't processed</param>
protected virtual void HandleUnhandledMessage(StreamMessage message)
{
}
@ -731,7 +677,7 @@ namespace CryptoExchange.Net
/// <returns></returns>
public virtual async Task<bool> UnsubscribeAsync(int subscriptionId)
{
SocketSubscription? subscription = null;
SocketSubscriptionListener? subscription = null;
SocketConnection? connection = null;
foreach (var socket in socketConnections.Values.ToList())
{

View File

@ -1,6 +1,6 @@
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Options;
using CryptoExchange.Net.Sockets;
using CryptoExchange.Net.Objects.Sockets;
using System;
using System.Threading.Tasks;
@ -26,7 +26,15 @@ namespace CryptoExchange.Net.Interfaces
/// <summary>
/// The factory for creating sockets. Used for unit testing
/// </summary>
IWebsocketFactory SocketFactory { get; set; }
IWebsocketFactory SocketFactory { get; }
/// <summary>
/// Current client options
/// </summary>
SocketExchangeOptions ClientOptions { get; }
/// <summary>
/// Current API options
/// </summary>
SocketApiOptions ApiOptions { get; }
/// <summary>
/// Log the current state of connections and subscriptions
/// </summary>

View File

@ -3,7 +3,7 @@ using System.Threading.Tasks;
using CryptoExchange.Net.Authentication;
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Options;
using CryptoExchange.Net.Sockets;
using CryptoExchange.Net.Objects.Sockets;
namespace CryptoExchange.Net.Interfaces
{

View File

@ -0,0 +1,12 @@
using CryptoExchange.Net.Objects.Sockets;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Interfaces
{
internal interface IStreamMessageListener
{
int Priority { get; }
bool MessageMatches(StreamMessage message);
Task ProcessAsync(StreamMessage message);
}
}

View File

@ -1,6 +1,7 @@
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Sockets;
using System;
using System.IO;
using System.Security.Authentication;
using System.Text;
using System.Threading.Tasks;
@ -19,7 +20,7 @@ namespace CryptoExchange.Net.Interfaces
/// <summary>
/// Websocket message received event
/// </summary>
event Action<string> OnMessage;
event Func<MemoryStream, Task> OnStreamMessage;
/// <summary>
/// Websocket sent event, RequestId as parameter
/// </summary>

View File

@ -1,4 +1,4 @@
using CryptoExchange.Net.Sockets;
using CryptoExchange.Net.Objects.Sockets;
using Microsoft.Extensions.Logging;
namespace CryptoExchange.Net.Interfaces

View File

@ -1,7 +1,7 @@
using CryptoExchange.Net.Objects;
using System;
namespace CryptoExchange.Net.Sockets
namespace CryptoExchange.Net.Objects.Sockets
{
/// <summary>
/// An update received from a socket update subscription

View File

@ -1,27 +1,29 @@
using CryptoExchange.Net.Objects;
using Newtonsoft.Json.Linq;
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Objects;
using System;
using System.Threading;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Sockets
namespace CryptoExchange.Net.Objects.Sockets
{
internal class PendingRequest
internal class PendingRequest : IStreamMessageListener
{
public int Id { get; set; }
public Func<JToken, bool> Handler { get; }
public JToken? Result { get; private set; }
public Func<StreamMessage, bool> MessageMatchesHandler { get; }
public bool Completed { get; private set; }
public AsyncResetEvent Event { get; }
public DateTime RequestTimestamp { get; set; }
public TimeSpan Timeout { get; }
public SocketSubscription? Subscription { get; }
public SocketSubscriptionListener? Subscription { get; }
private CancellationTokenSource? _cts;
public PendingRequest(int id, Func<JToken, bool> handler, TimeSpan timeout, SocketSubscription? subscription)
public int Priority => 100;
public PendingRequest(int id, Func<StreamMessage, bool> messageMatchesHandler, TimeSpan timeout, SocketSubscriptionListener? subscription)
{
Id = id;
Handler = handler;
MessageMatchesHandler = messageMatchesHandler;
Event = new AsyncResetEvent(false, false);
Timeout = timeout;
RequestTimestamp = DateTime.UtcNow;
@ -35,23 +37,22 @@ namespace CryptoExchange.Net.Sockets
_cts.Token.Register(Fail, false);
}
public bool CheckData(JToken data)
{
return Handler(data);
}
public bool Succeed(JToken data)
{
Result = data;
Completed = true;
Event.Set();
return true;
}
public void Fail()
{
Completed = true;
Event.Set();
}
public bool MessageMatches(StreamMessage message)
{
return MessageMatchesHandler(message);
}
public Task ProcessAsync(StreamMessage message)
{
Completed = true;
Event.Set();
return Task.CompletedTask;
}
}
}

View File

@ -1,12 +1,15 @@
using System;
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Sockets;
using System;
using System.Threading;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Sockets
namespace CryptoExchange.Net.Objects.Sockets
{
/// <summary>
/// Socket subscription
/// </summary>
public class SocketSubscription
public class SocketSubscriptionListener : IStreamMessageListener
{
/// <summary>
/// Unique subscription id
@ -18,26 +21,16 @@ namespace CryptoExchange.Net.Sockets
/// </summary>
public event Action<Exception>? Exception;
/// <summary>
/// Message handlers for this subscription. Should return true if the message is handled and should not be distributed to the other handlers
/// </summary>
public Action<MessageEvent> MessageHandler { get; set; }
/// <summary>
/// The request object send when subscribing on the server. Either this or the `Identifier` property should be set
/// </summary>
public object? Request { get; set; }
/// <summary>
/// The subscription identifier, used instead of a `Request` object to identify the subscription
/// </summary>
public string? Identifier { get; set; }
public SubscriptionActor Subscription { get; set; }
/// <summary>
/// Whether this is a user subscription or an internal listener
/// </summary>
public bool UserSubscription { get; set; }
/// <summary>
/// If the subscription has been confirmed to be subscribed by the server
/// </summary>
@ -46,7 +39,7 @@ namespace CryptoExchange.Net.Sockets
/// <summary>
/// Whether authentication is needed for this subscription
/// </summary>
public bool Authenticated { get; set; }
public bool Authenticated => Subscription.Authenticated;
/// <summary>
/// Whether we're closing this subscription and a socket connection shouldn't be kept open for it
@ -59,44 +52,17 @@ namespace CryptoExchange.Net.Sockets
/// </summary>
public CancellationTokenRegistration? CancellationTokenRegistration { get; set; }
private SocketSubscription(int id, object? request, string? identifier, bool userSubscription, bool authenticated, Action<MessageEvent> dataHandler)
{
Id = id;
UserSubscription = userSubscription;
MessageHandler = dataHandler;
Request = request;
Identifier = identifier;
Authenticated = authenticated;
}
/// <summary>
/// Create SocketSubscription for a subscribe request
/// ctor
/// </summary>
/// <param name="id"></param>
/// <param name="request"></param>
/// <param name="userSubscription"></param>
/// <param name="authenticated"></param>
/// <param name="dataHandler"></param>
/// <returns></returns>
public static SocketSubscription CreateForRequest(int id, object request, bool userSubscription,
bool authenticated, Action<MessageEvent> dataHandler)
public SocketSubscriptionListener(int id, SubscriptionActor request, bool userSubscription)
{
return new SocketSubscription(id, request, null, userSubscription, authenticated, dataHandler);
}
/// <summary>
/// Create SocketSubscription for an identifier
/// </summary>
/// <param name="id"></param>
/// <param name="identifier"></param>
/// <param name="userSubscription"></param>
/// <param name="authenticated"></param>
/// <param name="dataHandler"></param>
/// <returns></returns>
public static SocketSubscription CreateForIdentifier(int id, string identifier, bool userSubscription,
bool authenticated, Action<MessageEvent> dataHandler)
{
return new SocketSubscription(id, null, identifier, userSubscription, authenticated, dataHandler);
Id = id;
UserSubscription = userSubscription;
Subscription = request;
}
/// <summary>
@ -107,5 +73,24 @@ namespace CryptoExchange.Net.Sockets
{
Exception?.Invoke(e);
}
/// <summary>
/// The priority of this subscription
/// </summary>
public int Priority => Subscription is SystemSubscription ? 50 : 1;
/// <summary>
/// Check if message matches the subscription
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
public bool MessageMatches(StreamMessage message) => Subscription.MessageMatchesSubscription(message);
/// <summary>
/// Process the message
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
public Task ProcessAsync(StreamMessage message) => Subscription.HandleEventAsync(message);
}
}

View File

@ -0,0 +1,67 @@
using System;
using System.Collections.Generic;
using System.Data.Common;
using System.IO;
using System.Text;
using System.Threading.Tasks;
using CryptoExchange.Net.Sockets;
namespace CryptoExchange.Net.Objects.Sockets
{
/// <summary>
/// A message received from a stream
/// </summary>
public class StreamMessage : IDisposable
{
/// <summary>
/// The connection it was received on
/// </summary>
public SocketConnection Connection { get; }
/// <summary>
/// The data stream
/// </summary>
public MemoryStream Stream { get; }
/// <summary>
/// Receive timestamp
/// </summary>
public DateTime Timestamp { get; set; }
private Dictionary<Type, object> _casted;
/// <summary>
/// Get the data from the memory in specified type using the converter. If this type has been resolved before it will use that instead
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="converter"></param>
/// <returns></returns>
public T Get<T>(Func<Stream, T> converter)
{
if (_casted.TryGetValue(typeof(T), out var casted))
return (T)casted;
var result = converter(Stream);
_casted.Add(typeof(T), result!);
Stream.Position = 0;
return result;
}
public void Dispose()
{
Stream.Dispose();
}
/// <summary>
/// ctor
/// </summary>
/// <param name="connection"></param>
/// <param name="stream"></param>
/// <param name="timestamp"></param>
public StreamMessage(SocketConnection connection, MemoryStream stream, DateTime timestamp)
{
Connection = connection;
Stream = stream;
Timestamp = timestamp;
_casted = new Dictionary<Type, object>();
}
}
}

View File

@ -1,8 +1,9 @@
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Sockets;
using System;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Sockets
namespace CryptoExchange.Net.Objects.Sockets
{
/// <summary>
/// Subscription to a data stream
@ -10,7 +11,7 @@ namespace CryptoExchange.Net.Sockets
public class UpdateSubscription
{
private readonly SocketConnection _connection;
private readonly SocketSubscription _subscription;
private readonly SocketSubscriptionListener _subscription;
/// <summary>
/// Event when the connection is lost. The socket will automatically reconnect when possible.
@ -83,12 +84,12 @@ namespace CryptoExchange.Net.Sockets
/// </summary>
/// <param name="connection">The socket connection the subscription is on</param>
/// <param name="subscription">The subscription</param>
public UpdateSubscription(SocketConnection connection, SocketSubscription subscription)
public UpdateSubscription(SocketConnection connection, SocketSubscriptionListener subscription)
{
this._connection = connection;
this._subscription = subscription;
_connection = connection;
_subscription = subscription;
}
/// <summary>
/// Close the subscription
/// </summary>

View File

@ -5,7 +5,7 @@ using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Sockets
namespace CryptoExchange.Net.Objects.Sockets
{
/// <summary>
/// Parameters for a websocket

View File

@ -9,7 +9,7 @@ using System.Threading.Tasks;
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Options;
using CryptoExchange.Net.Sockets;
using CryptoExchange.Net.Objects.Sockets;
using Microsoft.Extensions.Logging;
namespace CryptoExchange.Net.OrderBook

View File

@ -0,0 +1,36 @@
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System.IO;
using System.Text;
namespace CryptoExchange.Net
{
/// <summary>
/// Parsing utility methods
/// </summary>
public static class ParsingUtils
{
/// <summary>
/// Read the stream as string
/// </summary>
/// <param name="stream"></param>
/// <returns></returns>
public static string GetString(Stream stream)
{
using var reader = new StreamReader(stream, Encoding.UTF8, false, (int)stream.Length, true);
return reader.ReadToEnd();
}
/// <summary>
/// Read the stream and parse to JToken
/// </summary>
/// <param name="x"></param>
/// <returns></returns>
public static JToken GetJToken(Stream x)
{
using var sr = new StreamReader(x, Encoding.UTF8, false, (int)x.Length, true);
using var jsonTextReader = new JsonTextReader(sr);
return JToken.Load(jsonTextReader);
}
}
}

View File

@ -1,5 +1,6 @@
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Sockets;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Concurrent;
@ -44,7 +45,6 @@ namespace CryptoExchange.Net.Sockets
private ProcessState _processState;
private DateTime _lastReconnectTime;
/// <summary>
/// Received messages, the size and the timstamp
/// </summary>
@ -101,7 +101,7 @@ namespace CryptoExchange.Net.Sockets
public event Action? OnClose;
/// <inheritdoc />
public event Action<string>? OnMessage;
public event Func<MemoryStream, Task>? OnStreamMessage;
/// <inheritdoc />
public event Action<int>? OnRequestSent;
@ -521,7 +521,7 @@ namespace CryptoExchange.Net.Sockets
{
// Received a complete message and it's not multi part
_logger.Log(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in single message");
HandleMessage(buffer.Array!, buffer.Offset, receiveResult.Count, receiveResult.MessageType);
await ProcessByteData(new MemoryStream(buffer.Array, buffer.Offset, receiveResult.Count), receiveResult.MessageType).ConfigureAwait(false);
}
else
{
@ -555,11 +555,13 @@ namespace CryptoExchange.Net.Sockets
{
// Reassemble complete message from memory stream
_logger.Log(LogLevel.Trace, $"Socket {Id} reassembled message of {memoryStream!.Length} bytes");
HandleMessage(memoryStream!.ToArray(), 0, (int)memoryStream.Length, receiveResult.MessageType);
await ProcessByteData(memoryStream, receiveResult.MessageType).ConfigureAwait(false);
memoryStream.Dispose();
}
else
{
_logger.Log(LogLevel.Trace, $"Socket {Id} discarding incomplete message of {memoryStream!.Length} bytes");
}
}
}
}
@ -578,58 +580,10 @@ namespace CryptoExchange.Net.Sockets
}
}
/// <summary>
/// Handles the message
/// </summary>
/// <param name="data"></param>
/// <param name="offset"></param>
/// <param name="count"></param>
/// <param name="messageType"></param>
private void HandleMessage(byte[] data, int offset, int count, WebSocketMessageType messageType)
private async Task ProcessByteData(MemoryStream memoryStream, WebSocketMessageType messageType)
{
string strData;
if (messageType == WebSocketMessageType.Binary)
{
if (Parameters.DataInterpreterBytes == null)
throw new Exception("Byte interpreter not set while receiving byte data");
try
{
var relevantData = new byte[count];
Array.Copy(data, offset, relevantData, 0, count);
strData = Parameters.DataInterpreterBytes(relevantData);
}
catch(Exception e)
{
_logger.Log(LogLevel.Error, $"Socket {Id} unhandled exception during byte data interpretation: " + e.ToLogString());
return;
}
}
else
strData = Parameters.Encoding.GetString(data, offset, count);
if (Parameters.DataInterpreterString != null)
{
try
{
strData = Parameters.DataInterpreterString(strData);
}
catch(Exception e)
{
_logger.Log(LogLevel.Error, $"Socket {Id} unhandled exception during string data interpretation: " + e.ToLogString());
return;
}
}
try
{
LastActionTime = DateTime.UtcNow;
OnMessage?.Invoke(strData);
}
catch(Exception e)
{
_logger.Log(LogLevel.Error, $"Socket {Id} unhandled exception during message processing: " + e.ToLogString());
}
if (OnStreamMessage != null)
await OnStreamMessage.Invoke(memoryStream).ConfigureAwait(false);
}
/// <summary>

View File

@ -1,46 +0,0 @@
using Newtonsoft.Json.Linq;
using System;
namespace CryptoExchange.Net.Sockets
{
/// <summary>
/// Message received event
/// </summary>
public class MessageEvent
{
/// <summary>
/// The connection the message was received on
/// </summary>
public SocketConnection Connection { get; set; }
/// <summary>
/// The json object of the data
/// </summary>
public JToken JsonData { get; set; }
/// <summary>
/// The originally received string data
/// </summary>
public string? OriginalData { get; set; }
/// <summary>
/// The timestamp of when the data was received
/// </summary>
public DateTime ReceivedTimestamp { get; set; }
/// <summary>
/// ctor
/// </summary>
/// <param name="connection"></param>
/// <param name="jsonData"></param>
/// <param name="originalData"></param>
/// <param name="timestamp"></param>
public MessageEvent(SocketConnection connection, JToken jsonData, string? originalData, DateTime timestamp)
{
Connection = connection;
JsonData = jsonData;
OriginalData = originalData;
ReceivedTimestamp = timestamp;
}
}
}

View File

@ -0,0 +1,45 @@
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Sockets;
namespace CryptoExchange.Net.Sockets
{
/// <summary>
/// Query
/// </summary>
public abstract class QueryActor
{
/// <summary>
/// The query request
/// </summary>
public object Query { get; set; }
/// <summary>
/// If this is a private request
/// </summary>
public bool Authenticated { get; }
/// <summary>
/// Check if the message is the response to the query
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
public abstract bool MessageMatchesQuery(StreamMessage message);
/// <summary>
/// Handle the query response
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
public abstract CallResult HandleResponse(StreamMessage message);
/// <summary>
/// ctor
/// </summary>
/// <param name="query"></param>
/// <param name="authenticated"></param>
public QueryActor(object query, bool authenticated)
{
Authenticated = authenticated;
Query = query;
}
}
}

View File

@ -9,6 +9,8 @@ using Newtonsoft.Json.Linq;
using Microsoft.Extensions.Logging;
using CryptoExchange.Net.Objects;
using System.Net.WebSockets;
using System.IO;
using CryptoExchange.Net.Objects.Sockets;
namespace CryptoExchange.Net.Sockets
{
@ -45,7 +47,7 @@ namespace CryptoExchange.Net.Sockets
/// <summary>
/// Unhandled message event
/// </summary>
public event Action<JToken>? UnhandledMessage;
public event Action<StreamMessage>? UnhandledMessage;
/// <summary>
/// The amount of subscriptions on this connection
@ -59,7 +61,7 @@ namespace CryptoExchange.Net.Sockets
/// <summary>
/// Get a copy of the current subscriptions
/// </summary>
public SocketSubscription[] Subscriptions
public SocketSubscriptionListener[] Subscriptions
{
get
{
@ -149,13 +151,12 @@ namespace CryptoExchange.Net.Sockets
}
private bool _pausedActivity;
private readonly List<SocketSubscription> _subscriptions;
private readonly List<SocketSubscriptionListener> _subscriptions;
private readonly List<IStreamMessageListener> _messageListeners;
private readonly object _subscriptionLock = new();
private readonly ILogger _logger;
private readonly List<PendingRequest> _pendingRequests;
private SocketStatus _status;
/// <summary>
@ -177,11 +178,11 @@ namespace CryptoExchange.Net.Sockets
Tag = tag;
Properties = new Dictionary<string, object>();
_pendingRequests = new List<PendingRequest>();
_subscriptions = new List<SocketSubscription>();
_messageListeners = new List<IStreamMessageListener>();
_subscriptions = new List<SocketSubscriptionListener>();
_socket = socket;
_socket.OnMessage += HandleMessage;
_socket.OnStreamMessage += HandleStreamMessage;
_socket.OnRequestSent += HandleRequestSent;
_socket.OnOpen += HandleOpen;
_socket.OnClose += HandleClose;
@ -247,12 +248,12 @@ namespace CryptoExchange.Net.Sockets
protected virtual async void HandleReconnected()
{
Status = SocketStatus.Resubscribing;
lock (_pendingRequests)
lock (_messageListeners)
{
foreach (var pendingRequest in _pendingRequests.ToList())
foreach (var pendingRequest in _messageListeners.OfType<PendingRequest>().ToList())
{
pendingRequest.Fail();
_pendingRequests.Remove(pendingRequest);
_messageListeners.Remove(pendingRequest);
}
}
@ -292,8 +293,8 @@ namespace CryptoExchange.Net.Sockets
protected virtual void HandleRequestSent(int requestId)
{
PendingRequest pendingRequest;
lock (_pendingRequests)
pendingRequest = _pendingRequests.SingleOrDefault(p => p.Id == requestId);
lock (_messageListeners)
pendingRequest = _messageListeners.OfType<PendingRequest>().SingleOrDefault(p => p.Id == requestId);
if (pendingRequest == null)
{
@ -305,86 +306,67 @@ namespace CryptoExchange.Net.Sockets
}
/// <summary>
/// Process a message received by the socket
/// Handle a message
/// </summary>
/// <param name="data">The received data</param>
protected virtual void HandleMessage(string data)
/// <param name="stream"></param>
/// <returns></returns>
protected virtual async Task HandleStreamMessage(MemoryStream stream)
{
var timestamp = DateTime.UtcNow;
_logger.Log(LogLevel.Trace, $"Socket {SocketId} received data: " + data);
if (string.IsNullOrEmpty(data)) return;
var tokenData = data.ToJToken(_logger);
if (tokenData == null)
{
data = $"\"{data}\"";
tokenData = data.ToJToken(_logger);
if (tokenData == null)
return;
}
var streamMessage = new StreamMessage(this, stream, timestamp);
var handledResponse = false;
// Remove any timed out requests
PendingRequest[] requests;
lock (_pendingRequests)
SocketSubscriptionListener? currentSubscription = null;
TimeSpan userCodeDuration = TimeSpan.Zero;
foreach (var listener in _messageListeners.OrderByDescending(x => x.Priority).ToList()) // LOCK
{
// Remove only timed out requests after 5 minutes have passed so we can still process any
// message coming in after the request timeout
_pendingRequests.RemoveAll(r => r.Completed && DateTime.UtcNow - r.RequestTimestamp > TimeSpan.FromMinutes(5));
requests = _pendingRequests.ToArray();
}
// Check if this message is an answer on any pending requests
foreach (var pendingRequest in requests)
{
if (pendingRequest.CheckData(tokenData))
if (listener.MessageMatches(streamMessage))
{
lock (_pendingRequests)
_pendingRequests.Remove(pendingRequest);
if (pendingRequest.Completed)
if (listener is PendingRequest pendingRequest)
{
// Answer to a timed out request, unsub if it is a subscription request
if (pendingRequest.Subscription != null)
lock (_messageListeners)
_messageListeners.Remove(pendingRequest);
if (pendingRequest.Completed)
{
_logger.Log(LogLevel.Warning, $"Socket {SocketId} Received subscription info after request timed out; unsubscribing. Consider increasing the RequestTimeout");
_ = ApiClient.UnsubscribeAsync(this, pendingRequest.Subscription).ConfigureAwait(false);
// Answer to a timed out request, unsub if it is a subscription request
if (pendingRequest.Subscription != null)
{
_logger.Log(LogLevel.Warning, $"Socket {SocketId} Received subscription info after request timed out; unsubscribing. Consider increasing the RequestTimeout");
_ = UnsubscribeAsync(pendingRequest.Subscription).ConfigureAwait(false);
}
}
else
{
_logger.Log(LogLevel.Trace, $"Socket {SocketId} - msg {pendingRequest.Id} - received data matched to pending request");
await pendingRequest.ProcessAsync(streamMessage).ConfigureAwait(false);
}
if (!ApiClient.ContinueOnQueryResponse)
return;
handledResponse = true;
break;
}
else
else if (listener is SocketSubscriptionListener subscription)
{
_logger.Log(LogLevel.Trace, $"Socket {SocketId} - msg {pendingRequest.Id} - received data matched to pending request");
pendingRequest.Succeed(tokenData);
currentSubscription = subscription;
handledResponse = true;
var userSw = Stopwatch.StartNew();
await subscription.ProcessAsync(streamMessage).ConfigureAwait(false);
userSw.Stop();
userCodeDuration = userSw.Elapsed;
break;
}
if (!ApiClient.ContinueOnQueryResponse)
return;
handledResponse = true;
break;
}
}
// Message was not a request response, check data handlers
var messageEvent = new MessageEvent(this, tokenData, ApiClient.OutputOriginalData ? data : null, timestamp);
var (handled, userProcessTime, subscription) = HandleData(messageEvent);
if (!handled && !handledResponse)
if (!handledResponse)
{
if (!ApiClient.UnhandledMessageExpected)
_logger.Log(LogLevel.Warning, $"Socket {SocketId} Message not handled: " + tokenData);
UnhandledMessage?.Invoke(tokenData);
_logger.Log(LogLevel.Warning, $"Socket {SocketId} Message not handled: " + streamMessage.Get(ParsingUtils.GetString));
UnhandledMessage?.Invoke(streamMessage);
}
var total = DateTime.UtcNow - timestamp;
if (userProcessTime.TotalMilliseconds > 500)
{
_logger.Log(LogLevel.Debug, $"Socket {SocketId}{(subscription == null ? "" : " subscription " + subscription!.Id)} message processing slow ({(int)total.TotalMilliseconds}ms, {(int)userProcessTime.TotalMilliseconds}ms user code), consider offloading data handling to another thread. " +
"Data from this socket may arrive late or not at all if message processing is continuously slow.");
}
_logger.Log(LogLevel.Trace, $"Socket {SocketId}{(subscription == null ? "" : " subscription " + subscription!.Id)} message processed in {(int)total.TotalMilliseconds}ms ({(int)userProcessTime.TotalMilliseconds}ms user code)");
}
}
/// <summary>
/// Connect the websocket
@ -434,7 +416,7 @@ namespace CryptoExchange.Net.Sockets
/// </summary>
/// <param name="subscription">Subscription to close</param>
/// <returns></returns>
public async Task CloseAsync(SocketSubscription subscription)
public async Task CloseAsync(SocketSubscriptionListener subscription)
{
lock (_subscriptionLock)
{
@ -452,7 +434,7 @@ namespace CryptoExchange.Net.Sockets
subscription.CancellationTokenRegistration.Value.Dispose();
if (subscription.Confirmed && _socket.IsOpen)
await ApiClient.UnsubscribeAsync(this, subscription).ConfigureAwait(false);
await UnsubscribeAsync(subscription).ConfigureAwait(false);
bool shouldCloseConnection;
lock (_subscriptionLock)
@ -475,7 +457,10 @@ namespace CryptoExchange.Net.Sockets
}
lock (_subscriptionLock)
{
_messageListeners.Remove(subscription);
_subscriptions.Remove(subscription);
}
}
/// <summary>
@ -491,7 +476,7 @@ namespace CryptoExchange.Net.Sockets
/// Add a subscription to this connection
/// </summary>
/// <param name="subscription"></param>
public bool AddSubscription(SocketSubscription subscription)
public bool AddSubscription(SocketSubscriptionListener subscription)
{
lock (_subscriptionLock)
{
@ -499,7 +484,9 @@ namespace CryptoExchange.Net.Sockets
return false;
_subscriptions.Add(subscription);
if(subscription.UserSubscription)
_messageListeners.Add(subscription);
if (subscription.UserSubscription)
_logger.Log(LogLevel.Debug, $"Socket {SocketId} adding new subscription with id {subscription.Id}, total subscriptions on connection: {_subscriptions.Count(s => s.UserSubscription)}");
return true;
}
@ -509,7 +496,7 @@ namespace CryptoExchange.Net.Sockets
/// Get a subscription on this connection by id
/// </summary>
/// <param name="id"></param>
public SocketSubscription? GetSubscription(int id)
public SocketSubscriptionListener? GetSubscription(int id)
{
lock (_subscriptionLock)
return _subscriptions.SingleOrDefault(s => s.Id == id);
@ -520,66 +507,10 @@ namespace CryptoExchange.Net.Sockets
/// </summary>
/// <param name="predicate">Filter for a request</param>
/// <returns></returns>
public SocketSubscription? GetSubscriptionByRequest(Func<object?, bool> predicate)
public SocketSubscriptionListener? GetSubscriptionByRequest(Func<object?, bool> predicate)
{
lock(_subscriptionLock)
return _subscriptions.SingleOrDefault(s => predicate(s.Request));
}
/// <summary>
/// Process data
/// </summary>
/// <param name="messageEvent"></param>
/// <returns>True if the data was successfully handled</returns>
private (bool, TimeSpan, SocketSubscription?) HandleData(MessageEvent messageEvent)
{
SocketSubscription? currentSubscription = null;
try
{
var handled = false;
TimeSpan userCodeDuration = TimeSpan.Zero;
// Loop the subscriptions to check if any of them signal us that the message is for them
List<SocketSubscription> subscriptionsCopy;
lock (_subscriptionLock)
subscriptionsCopy = _subscriptions.ToList();
foreach (var subscription in subscriptionsCopy)
{
currentSubscription = subscription;
if (subscription.Request == null)
{
if (ApiClient.MessageMatchesHandler(this, messageEvent.JsonData, subscription.Identifier!))
{
handled = true;
var userSw = Stopwatch.StartNew();
subscription.MessageHandler(messageEvent);
userSw.Stop();
userCodeDuration = userSw.Elapsed;
}
}
else
{
if (ApiClient.MessageMatchesHandler(this, messageEvent.JsonData, subscription.Request))
{
handled = true;
messageEvent.JsonData = ApiClient.ProcessTokenData(messageEvent.JsonData);
var userSw = Stopwatch.StartNew();
subscription.MessageHandler(messageEvent);
userSw.Stop();
userCodeDuration = userSw.Elapsed;
}
}
}
return (handled, userCodeDuration, currentSubscription);
}
catch (Exception ex)
{
_logger.Log(LogLevel.Error, $"Socket {SocketId} Exception during message processing\r\nException: {ex.ToLogString()}\r\nData: {messageEvent.JsonData}");
currentSubscription?.InvokeExceptionHandler(ex);
return (false, TimeSpan.Zero, null);
}
return _subscriptions.SingleOrDefault(s => predicate(s.Subscription));
}
/// <summary>
@ -589,15 +520,15 @@ namespace CryptoExchange.Net.Sockets
/// <param name="obj">The object to send</param>
/// <param name="timeout">The timeout for response</param>
/// <param name="subscription">Subscription if this is a subscribe request</param>
/// <param name="handler">The response handler, should return true if the received JToken was the response to the request</param>
/// <param name="handler">The response handler</param>
/// <param name="weight">The weight of the message</param>
/// <returns></returns>
public virtual async Task SendAndWaitAsync<T>(T obj, TimeSpan timeout, SocketSubscription? subscription, int weight, Func<JToken, bool> handler)
public virtual async Task SendAndWaitAsync<T>(T obj, TimeSpan timeout, SocketSubscriptionListener? subscription, int weight, Func<StreamMessage, bool> handler)
{
var pending = new PendingRequest(ExchangeHelpers.NextId(), handler, timeout, subscription);
lock (_pendingRequests)
lock (_messageListeners)
{
_pendingRequests.Add(pending);
_messageListeners.Add(pending);
}
var sendOk = Send(pending.Id, obj, weight);
@ -697,21 +628,21 @@ namespace CryptoExchange.Net.Sockets
}
// Get a list of all subscriptions on the socket
List<SocketSubscription> subscriptionList = new List<SocketSubscription>();
List<SocketSubscriptionListener> subscriptionList = new List<SocketSubscriptionListener>();
lock (_subscriptionLock)
{
foreach (var subscription in _subscriptions)
{
if (subscription.Request != null)
if (subscription.Subscription != null)
subscriptionList.Add(subscription);
else
subscription.Confirmed = true;
}
}
foreach(var subscription in subscriptionList.Where(s => s.Request != null))
foreach(var subscription in subscriptionList.Where(s => s.Subscription != null))
{
var result = await ApiClient.RevitalizeRequestAsync(subscription.Request!).ConfigureAwait(false);
var result = await ApiClient.RevitalizeRequestAsync(subscription.Subscription!).ConfigureAwait(false);
if (!result)
{
_logger.Log(LogLevel.Warning, $"Socket {SocketId} Failed request revitalization: " + result.Error);
@ -727,7 +658,7 @@ namespace CryptoExchange.Net.Sockets
var taskList = new List<Task<CallResult<bool>>>();
foreach (var subscription in subscriptionList.Skip(i).Take(ApiClient.ClientOptions.MaxConcurrentResubscriptionsPerSocket))
taskList.Add(ApiClient.SubscribeAndWaitAsync(this, subscription.Request!, subscription));
taskList.Add(ApiClient.SubscribeAndWaitAsync(this, subscription.Subscription!, subscription));
await Task.WhenAll(taskList).ConfigureAwait(false);
if (taskList.Any(t => !t.Result.Success))
@ -744,17 +675,26 @@ namespace CryptoExchange.Net.Sockets
return new CallResult<bool>(true);
}
internal async Task UnsubscribeAsync(SocketSubscription socketSubscription)
internal async Task UnsubscribeAsync(SocketSubscriptionListener socketSubscription)
{
await ApiClient.UnsubscribeAsync(this, socketSubscription).ConfigureAwait(false);
var unsubscribeRequest = socketSubscription.Subscription?.GetUnsubscribeRequest();
if (unsubscribeRequest != null)
{
await SendAndWaitAsync(unsubscribeRequest, TimeSpan.FromSeconds(10), socketSubscription, 0, x =>
{
var (matches, result) = socketSubscription.Subscription!.MessageMatchesUnsubscribeRequest(x);
// TODO check result?
return matches;
}).ConfigureAwait(false);
}
}
internal async Task<CallResult<bool>> ResubscribeAsync(SocketSubscription socketSubscription)
internal async Task<CallResult<bool>> ResubscribeAsync(SocketSubscriptionListener socketSubscription)
{
if (!_socket.IsOpen)
return new CallResult<bool>(new UnknownError("Socket is not connected"));
return await ApiClient.SubscribeAndWaitAsync(this, socketSubscription.Request!, socketSubscription).ConfigureAwait(false);
return await ApiClient.SubscribeAndWaitAsync(this, socketSubscription.Subscription!, socketSubscription).ConfigureAwait(false);
}
/// <summary>
@ -793,3 +733,4 @@ namespace CryptoExchange.Net.Sockets
}
}
}

View File

@ -0,0 +1,114 @@
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Sockets;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using System.IO;
using System.Text;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Sockets
{
/// <summary>
/// Subscription base
/// </summary>
public abstract class SubscriptionActor
{
private bool _outputOriginalData;
/// <summary>
/// Logger
/// </summary>
protected readonly ILogger _logger;
/// <summary>
/// If the subscription is a private subscription and needs authentication
/// </summary>
public bool Authenticated { get; }
/// <summary>
/// ctor
/// </summary>
/// <param name="logger"></param>
/// <param name="apiClient"></param>
/// <param name="authenticated"></param>
public SubscriptionActor(ILogger logger, ISocketApiClient apiClient, bool authenticated)
{
_logger = logger;
_outputOriginalData = apiClient.ApiOptions.OutputOriginalData ?? apiClient.ClientOptions.OutputOriginalData;
Authenticated = authenticated;
}
/// <summary>
/// Get the subscribe object to send when subscribing
/// </summary>
/// <returns></returns>
public abstract object? GetSubscribeRequest();
/// <summary>
/// Check if the message is the response to the subscribe request
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
public abstract (bool, CallResult?) MessageMatchesSubscribeRequest(StreamMessage message);
/// <summary>
/// Get the unsubscribe object to send when unsubscribing
/// </summary>
/// <returns></returns>
public abstract object? GetUnsubscribeRequest();
/// <summary>
/// Check if the message is the response to the unsubscribe request
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
public abstract (bool, CallResult?) MessageMatchesUnsubscribeRequest(StreamMessage message);
/// <summary>
/// Check if the message is an update for this subscription
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
public abstract bool MessageMatchesSubscription(StreamMessage message);
/// <summary>
/// Handle the update message
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
public abstract Task HandleEventAsync(StreamMessage message);
/// <summary>
/// Create a data event
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="obj"></param>
/// <param name="message"></param>
/// <param name="topic"></param>
/// <param name="type"></param>
/// <returns></returns>
protected DataEvent<T> CreateDataEvent<T>(T obj, StreamMessage message, string? topic = null, SocketUpdateType? type = null)
{
string? originalData = null;
if (_outputOriginalData)
originalData = message.Get(ParsingUtils.GetString);
return new DataEvent<T>(obj, topic, originalData, message.Timestamp, type);
}
/// <summary>
/// Deserialize the message to an object using Json.Net
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="message"></param>
/// <param name="settings"></param>
/// <returns></returns>
protected Task<CallResult<T>> DeserializeAsync<T>(StreamMessage message, JsonSerializerSettings settings)
{
var serializer = JsonSerializer.Create(settings);
using var sr = new StreamReader(message.Stream, Encoding.UTF8, false, (int)message.Stream.Length, true);
using var jsonTextReader = new JsonTextReader(sr);
var result = serializer.Deserialize<T>(jsonTextReader);
message.Stream.Position = 0;
return Task.FromResult(new CallResult<T>(result!));
}
}
}

View File

@ -0,0 +1,33 @@
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Sockets;
using Microsoft.Extensions.Logging;
using System;
namespace CryptoExchange.Net.Sockets
{
/// <summary>
/// A system subscription
/// </summary>
public abstract class SystemSubscription : SubscriptionActor
{
/// <summary>
/// ctor
/// </summary>
/// <param name="logger"></param>
/// <param name="authenticated"></param>
public SystemSubscription(ILogger logger, ISocketApiClient socketApiClient, bool authenticated = false) : base(logger, socketApiClient, authenticated)
{
}
/// <inheritdoc />
public override object? GetSubscribeRequest() => null;
/// <inheritdoc />
public override (bool, CallResult?) MessageMatchesSubscribeRequest(StreamMessage message) => throw new NotImplementedException();
/// <inheritdoc />
public override object? GetUnsubscribeRequest() => null;
/// <inheritdoc />
public override (bool, CallResult?) MessageMatchesUnsubscribeRequest(StreamMessage message) => throw new NotImplementedException();
}
}

View File

@ -1,4 +1,5 @@
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Objects.Sockets;
using Microsoft.Extensions.Logging;
namespace CryptoExchange.Net.Sockets