1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-12-14 18:00:26 +00:00
This commit is contained in:
Jkorf 2025-11-14 08:34:47 +01:00
parent f3d535f286
commit e2ffad9c61
17 changed files with 1795 additions and 124 deletions

View File

@ -14,6 +14,7 @@ using System.Collections.Generic;
using System.Linq;
using System.Net.WebSockets;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
@ -33,6 +34,11 @@ namespace CryptoExchange.Net.Clients
/// </summary>
protected internal ConcurrentDictionary<int, SocketConnection> socketConnections = new();
/// <summary>
/// List of HighPerf socket connections currently connecting/connected
/// </summary>
protected internal ConcurrentDictionary<int, HighPerfSocketConnection> highPerfSocketConnections = new();
/// <summary>
/// Semaphore used while creating sockets
/// </summary>
@ -115,6 +121,11 @@ namespace CryptoExchange.Net.Clients
}
}
/// <summary>
/// Serializer options to be used for high performance socket deserialization
/// </summary>
public abstract JsonSerializerOptions JsonSerializerOptions { get; }
/// <inheritdoc />
public new SocketExchangeOptions ClientOptions => (SocketExchangeOptions)base.ClientOptions;
@ -312,6 +323,98 @@ namespace CryptoExchange.Net.Clients
return new CallResult<UpdateSubscription>(new UpdateSubscription(socketConnection, subscription));
}
/// <summary>
/// Connect to an url and listen for data
/// </summary>
/// <param name="url">The URL to connect to</param>
/// <param name="subscription">The subscription</param>
/// <param name="ct">Cancellation token for closing this subscription</param>
/// <returns></returns>
protected virtual async Task<CallResult<HighPerfUpdateSubscription>> SubscribeHighPerfAsync<TUpdateType>(string url, HighPerfSubscription<TUpdateType> subscription, CancellationToken ct)
{
if (_disposing)
return new CallResult<HighPerfUpdateSubscription>(new InvalidOperationError("Client disposed, can't subscribe"));
HighPerfSocketConnection<TUpdateType> socketConnection;
var released = false;
// Wait for a semaphore here, so we only connect 1 socket at a time.
// This is necessary for being able to see if connections can be combined
try
{
await semaphoreSlim.WaitAsync(ct).ConfigureAwait(false);
}
catch (OperationCanceledException tce)
{
return new CallResult<HighPerfUpdateSubscription>(new CancellationRequestedError(tce));
}
try
{
while (true)
{
// Get a new or existing socket connection
var socketResult = await GetHighPerfSocketConnection<TUpdateType>(url, ct).ConfigureAwait(false);
if (!socketResult)
return socketResult.As<HighPerfUpdateSubscription>(null);
socketConnection = socketResult.Data;
// Add a subscription on the socket connection
var success = socketConnection.AddSubscription(subscription);
if (!success)
{
_logger.FailedToAddSubscriptionRetryOnDifferentConnection(socketConnection.SocketId);
continue;
}
if (ClientOptions.SocketSubscriptionsCombineTarget == 1)
{
// Only 1 subscription per connection, so no need to wait for connection since a new subscription will create a new connection anyway
semaphoreSlim.Release();
released = true;
}
var needsConnecting = !socketConnection.Connected;
var connectResult = await ConnectIfNeededAsync(socketConnection, false, ct).ConfigureAwait(false);
if (!connectResult)
return new CallResult<HighPerfUpdateSubscription>(connectResult.Error!);
break;
}
}
finally
{
if (!released)
semaphoreSlim.Release();
}
var subQuery = subscription.CreateSubscriptionQuery(socketConnection);
if (subQuery != null)
{
// Send the request and wait for answer
var sendResult = await socketConnection.SendAsync(subQuery.Id, subQuery.Request, subQuery.Weight).ConfigureAwait(false);
if (!sendResult)
{
// Needed?
await socketConnection.CloseAsync(subscription).ConfigureAwait(false);
return new CallResult<HighPerfUpdateSubscription>(sendResult.Error!);
}
}
if (ct != default)
{
subscription.CancellationTokenRegistration = ct.Register(async () =>
{
_logger.CancellationTokenSetClosingSubscription(socketConnection.SocketId, subscription.Id);
await socketConnection.CloseAsync(subscription).ConfigureAwait(false);
}, false);
}
_logger.SubscriptionCompletedSuccessfully(socketConnection.SocketId, subscription.Id);
return new CallResult<HighPerfUpdateSubscription>(new HighPerfUpdateSubscription(socketConnection, subscription));
}
/// <summary>
/// Send a query on a socket connection to the BaseAddress and wait for the response
/// </summary>
@ -387,7 +490,7 @@ namespace CryptoExchange.Net.Clients
/// <param name="authenticated">Whether the socket should authenticated</param>
/// <param name="ct">Cancellation token</param>
/// <returns></returns>
protected virtual async Task<CallResult> ConnectIfNeededAsync(SocketConnection socket, bool authenticated, CancellationToken ct)
protected virtual async Task<CallResult> ConnectIfNeededAsync(ISocketConnection socket, bool authenticated, CancellationToken ct)
{
if (socket.Connected)
return CallResult.SuccessResult;
@ -402,7 +505,10 @@ namespace CryptoExchange.Net.Clients
if (!authenticated || socket.Authenticated)
return CallResult.SuccessResult;
var result = await AuthenticateSocketAsync(socket).ConfigureAwait(false);
if (socket is not SocketConnection sc)
throw new InvalidOperationException("HighPerfSocketConnection not supported for authentication");
var result = await AuthenticateSocketAsync(sc).ConfigureAwait(false);
if (!result)
await socket.CloseAsync().ConfigureAwait(false);
@ -475,7 +581,7 @@ namespace CryptoExchange.Net.Clients
/// </summary>
/// <param name="connection"></param>
/// <returns></returns>
protected internal virtual Task<Uri?> GetReconnectUriAsync(SocketConnection connection)
protected internal virtual Task<Uri?> GetReconnectUriAsync(ISocketConnection connection)
{
return Task.FromResult<Uri?>(connection.ConnectionUri);
}
@ -510,11 +616,11 @@ namespace CryptoExchange.Net.Clients
// If all current socket connections are reconnecting or resubscribing wait for that to finish as we can probably use the existing connection
var delayStart = DateTime.UtcNow;
var delayed = false;
while (socketQuery.Count >= 1 && socketQuery.All(x => x.Status == SocketConnection.SocketStatus.Reconnecting || x.Status == SocketConnection.SocketStatus.Resubscribing))
while (socketQuery.Count >= 1 && socketQuery.All(x => x.Status == SocketStatus.Reconnecting || x.Status == SocketStatus.Resubscribing))
{
if (DateTime.UtcNow - delayStart > TimeSpan.FromSeconds(10))
{
if (socketQuery.Count >= 1 && socketQuery.All(x => x.Status == SocketConnection.SocketStatus.Reconnecting || x.Status == SocketConnection.SocketStatus.Resubscribing))
if (socketQuery.Count >= 1 && socketQuery.All(x => x.Status == SocketStatus.Reconnecting || x.Status == SocketStatus.Resubscribing))
{
// If after this time we still trying to reconnect/reprocess there is some issue in the connection
_logger.TimeoutWaitingForReconnectingSocket();
@ -534,7 +640,7 @@ namespace CryptoExchange.Net.Clients
if (delayed)
_logger.WaitedForReconnectingSocket((long)(DateTime.UtcNow - delayStart).TotalMilliseconds);
socketQuery = socketQuery.Where(s => (s.Status == SocketConnection.SocketStatus.None || s.Status == SocketConnection.SocketStatus.Connected)
socketQuery = socketQuery.Where(s => (s.Status == SocketStatus.None || s.Status == SocketStatus.Connected)
&& (s.Authenticated == authenticated || !authenticated)
&& s.Connected).ToList();
@ -571,9 +677,8 @@ namespace CryptoExchange.Net.Clients
if (connectionAddress.Data != address)
_logger.ConnectionAddressSetTo(connectionAddress.Data!);
// Create new socket
var socket = CreateSocket(connectionAddress.Data!);
var socketConnection = new SocketConnection(_logger, this, socket, address);
// Create new socket connection
var socketConnection = new SocketConnection(_logger, SocketFactory, GetWebSocketParameters(connectionAddress.Data!), this, address);
socketConnection.UnhandledMessage += HandleUnhandledMessage;
socketConnection.ConnectRateLimitedAsync += HandleConnectRateLimitedAsync;
if (dedicatedRequestConnection)
@ -594,6 +699,58 @@ namespace CryptoExchange.Net.Clients
return new CallResult<SocketConnection>(socketConnection);
}
/// <summary>
/// Gets a connection for a new subscription or query. Can be an existing if there are open position or a new one.
/// </summary>
/// <param name="address">The address the socket is for</param>
/// <param name="ct">Cancellation token</param>
/// <returns></returns>
protected virtual async Task<CallResult<HighPerfSocketConnection<TUpdateType>>> GetHighPerfSocketConnection<TUpdateType>(string address, CancellationToken ct)
{
var socketQuery = highPerfSocketConnections.Where(s => s.Value.Tag.TrimEnd('/') == address.TrimEnd('/')
&& s.Value.ApiClient.GetType() == GetType()
&& s.Value.UpdateType == typeof(TUpdateType))
.Select(x => x.Value)
.ToList();
socketQuery = socketQuery.Where(s => (s.Status == SocketStatus.None || s.Status == SocketStatus.Connected)
&& s.Connected).ToList();
var connection = socketQuery.OrderBy(s => s.UserSubscriptionCount).FirstOrDefault();
if (connection != null)
{
if (connection.UserSubscriptionCount < ClientOptions.SocketSubscriptionsCombineTarget
|| (socketConnections.Count >= (ApiOptions.MaxSocketConnections ?? ClientOptions.MaxSocketConnections) && socketConnections.All(s => s.Value.UserSubscriptionCount >= ClientOptions.SocketSubscriptionsCombineTarget)))
{
// Use existing socket if it has less than target connections OR it has the least connections and we can't make new
return new CallResult<HighPerfSocketConnection<TUpdateType>>((HighPerfSocketConnection<TUpdateType>)connection);
}
}
var connectionAddress = await GetConnectionUrlAsync(address, false).ConfigureAwait(false);
if (!connectionAddress)
{
_logger.FailedToDetermineConnectionUrl(connectionAddress.Error?.ToString());
return connectionAddress.As<HighPerfSocketConnection<TUpdateType>>(null);
}
if (connectionAddress.Data != address)
_logger.ConnectionAddressSetTo(connectionAddress.Data!);
// Create new socket connection
var socketConnection = new HighPerfSocketConnection<TUpdateType>(_logger, SocketFactory, GetWebSocketParameters(connectionAddress.Data!), this, JsonSerializerOptions, address);
foreach (var ptg in PeriodicTaskRegistrations)
socketConnection.QueryPeriodic(ptg.Identifier, ptg.Interval, ptg.QueryDelegate, ptg.Callback);
//foreach (var systemSubscription in systemSubscriptions)
// socketConnection.AddSubscription(systemSubscription);
return new CallResult<HighPerfSocketConnection<TUpdateType>>(socketConnection);
}
/// <summary>
/// Process an unhandled message
/// </summary>
@ -622,12 +779,15 @@ namespace CryptoExchange.Net.Clients
/// <param name="socketConnection">The socket to connect</param>
/// <param name="ct">Cancellation token</param>
/// <returns></returns>
protected virtual async Task<CallResult> ConnectSocketAsync(SocketConnection socketConnection, CancellationToken ct)
protected virtual async Task<CallResult> ConnectSocketAsync(ISocketConnection socketConnection, CancellationToken ct)
{
var connectResult = await socketConnection.ConnectAsync(ct).ConfigureAwait(false);
if (connectResult)
{
socketConnections.TryAdd(socketConnection.SocketId, socketConnection);
if (socketConnection is SocketConnection sc)
socketConnections.TryAdd(socketConnection.SocketId, sc);
else if (socketConnection is HighPerfSocketConnection hsc)
highPerfSocketConnections.TryAdd(socketConnection.SocketId, hsc);
return connectResult;
}
@ -658,7 +818,7 @@ namespace CryptoExchange.Net.Clients
/// </summary>
/// <param name="address">The address the socket should connect to</param>
/// <returns></returns>
protected virtual IWebsocket CreateSocket(string address)
protected internal virtual IWebsocket CreateSocket(string address)
{
var socket = SocketFactory.CreateWebsocket(_logger, GetWebSocketParameters(address));
_logger.SocketCreatedForAddress(socket.Id, address);
@ -798,11 +958,11 @@ namespace CryptoExchange.Net.Clients
/// <returns></returns>
public SocketApiClientState GetState(bool includeSubDetails = true)
{
var connectionStates = new List<SocketConnection.SocketConnectionState>();
var connectionStates = new List<SocketConnectionState>();
foreach (var socketIdAndConnection in socketConnections)
{
SocketConnection connection = socketIdAndConnection.Value;
SocketConnection.SocketConnectionState connectionState = connection.GetState(includeSubDetails);
SocketConnectionState connectionState = connection.GetState(includeSubDetails);
connectionStates.Add(connectionState);
}
@ -820,7 +980,7 @@ namespace CryptoExchange.Net.Clients
int Connections,
int Subscriptions,
double DownloadSpeed,
List<SocketConnection.SocketConnectionState> ConnectionStates)
List<SocketConnectionState> ConnectionStates)
{
/// <summary>
/// Print the state of the client

View File

@ -2,11 +2,15 @@
using CryptoExchange.Net.Objects.Sockets;
using CryptoExchange.Net.SharedApis;
using CryptoExchange.Net.Sockets;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using System;
using System.Collections.Generic;
using System.Globalization;
using System.IO.Pipelines;
using System.Runtime.CompilerServices;
using System.Security.Cryptography;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
@ -376,6 +380,28 @@ namespace CryptoExchange.Net
return result;
}
/// <summary>
/// Queue updates received from a websocket subscriptions and process them async
/// </summary>
/// <typeparam name="T">The queued update type</typeparam>
/// <param name="subscribeCall">The subscribe call</param>
/// <param name="asyncHandler">The async update handler</param>
/// <param name="maxQueuedItems">The max number of updates to be queued up. When happens when the queue is full and a new write is attempted can be specified with <see>fullMode</see></param>
/// <param name="fullBehavior">What should happen if the queue contains <see>maxQueuedItems</see> pending updates. If no max is set this setting is ignored</param>
public static async Task ProcessQueuedAsync<T>(
Func<Action<T>, Task> subscribeCall,
Func<T, Task> asyncHandler,
CancellationToken ct,
int? maxQueuedItems = null,
QueueFullBehavior? fullBehavior = null)
{
var processor = new ProcessQueue<T>(asyncHandler, maxQueuedItems, fullBehavior);
await processor.StartAsync().ConfigureAwait(false);
ct.Register(() => _ = processor.StopAsync());
await subscribeCall(upd => processor.Write(upd)).ConfigureAwait(false);
}
/// <summary>
/// Queue updates received from a websocket subscriptions and process them async
/// </summary>
@ -415,6 +441,39 @@ namespace CryptoExchange.Net
return result;
}
// public static async Task SubscribeHighPerformance<T>(ILogger logger, string url, JsonSerializerOptions jsonOptions, Action<T> callback, CancellationToken ct)
// {
// var pipe = new Pipe(new PipeOptions());
// var ws = new HighPerformanceWebSocketClient(
// logger,
// new WebSocketParameters(new Uri(url), ReconnectPolicy.Disabled)
// {
// PipeWriter = pipe.Writer
// });
// try
// {
// await ws.ConnectAsync(ct).ConfigureAwait(false);
// ct.Register(() => _ = ws.CloseAsync());
//#pragma warning disable IL3050 // Calling members annotated with 'RequiresDynamicCodeAttribute' may break functionality when AOT compiling.
//#pragma warning disable IL2026 // Members annotated with 'RequiresUnreferencedCodeAttribute' require dynamic access otherwise can break functionality when trimming application code
//#if NET10_0
// await foreach (var item in JsonSerializer.DeserializeAsyncEnumerable<T>(pipe.Reader, jsonOptions, ct).ConfigureAwait(false))
// callback(item!);
//#else
// await foreach (var item in JsonSerializer.DeserializeAsyncEnumerable<T>(pipe.Reader.AsStream(), jsonOptions, ct).ConfigureAwait(false))
// callback(item!);
//#endif
//#pragma warning restore IL2026 // Members annotated with 'RequiresUnreferencedCodeAttribute' require dynamic access otherwise can break functionality when trimming application code
//#pragma warning restore IL3050 // Calling members annotated with 'RequiresDynamicCodeAttribute' may break functionality when AOT compiling.
// }
// catch (OperationCanceledException) { }
// }
/// <summary>
/// Parse a decimal value from a string
/// </summary>

View File

@ -1,18 +1,19 @@
using System;
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.SharedApis;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.IO.Compression;
using System.Globalization;
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Runtime.InteropServices;
using System.Text;
using System.Web;
using CryptoExchange.Net.Objects;
using System.Globalization;
using Microsoft.Extensions.DependencyInjection;
using CryptoExchange.Net.SharedApis;
using System.Text.Json.Serialization.Metadata;
using System.Text.Json;
using System.Text.Json.Serialization;
using System.Text.Json.Serialization.Metadata;
using System.Threading.Tasks;
using System.Web;
namespace CryptoExchange.Net
{

View File

@ -0,0 +1,68 @@
using CryptoExchange.Net.Objects;
using System;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Interfaces
{
/// <summary>
/// Websocket connection interface
/// </summary>
public interface IHighPerfWebsocket : IDisposable
{
/// <summary>
/// Websocket closed event
/// </summary>
event Func<Task> OnClose;
/// <summary>
/// Websocket error event
/// </summary>
event Func<Exception, Task> OnError;
/// <summary>
/// Websocket opened event
/// </summary>
event Func<Task> OnOpen;
/// <summary>
/// Unique id for this socket
/// </summary>
int Id { get; }
/// <summary>
/// The uri the socket connects to
/// </summary>
Uri Uri { get; }
/// <summary>
/// Whether the socket connection is closed
/// </summary>
bool IsClosed { get; }
/// <summary>
/// Whether the socket connection is open
/// </summary>
bool IsOpen { get; }
/// <summary>
/// Connect the socket
/// </summary>
/// <returns></returns>
Task<CallResult> ConnectAsync(CancellationToken ct);
/// <summary>
/// Send string data
/// </summary>
/// <param name="id"></param>
/// <param name="data"></param>
/// <param name="weight"></param>
ValueTask<bool> SendAsync(int id, string data, int weight);
/// <summary>
/// Send byte data
/// </summary>
/// <param name="id"></param>
/// <param name="data"></param>
/// <param name="weight"></param>
ValueTask<bool> SendAsync(int id, byte[] data, int weight);
/// <summary>
/// Close the connection
/// </summary>
/// <returns></returns>
Task CloseAsync();
}
}

View File

@ -16,10 +16,6 @@ namespace CryptoExchange.Net.Interfaces
/// </summary>
event Func<Task> OnClose;
/// <summary>
/// Websocket message received event
/// </summary>
event Func<WebSocketMessageType, ReadOnlyMemory<byte>, Task> OnStreamMessage;
/// <summary>
/// Websocket sent event, RequestId as parameter
/// </summary>
event Func<int, Task> OnRequestSent;

View File

@ -1,5 +1,8 @@
using CryptoExchange.Net.Objects.Sockets;
using Microsoft.Extensions.Logging;
using System;
using System.IO.Pipelines;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Interfaces
{
@ -15,5 +18,7 @@ namespace CryptoExchange.Net.Interfaces
/// <param name="parameters">The parameters to use for the connection</param>
/// <returns></returns>
IWebsocket CreateWebsocket(ILogger logger, WebSocketParameters parameters);
IHighPerfWebsocket CreateHighPerfWebsocket(ILogger logger, WebSocketParameters parameters, PipeWriter pipeWriter);
}
}

View File

@ -2,9 +2,11 @@
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Text;
using System.Threading.Tasks;
namespace CryptoExchange.Net
{
@ -155,5 +157,37 @@ namespace CryptoExchange.Net
return httpHandler;
#endif
}
public static async ValueTask WhenAll(IReadOnlyList<ValueTask> tasks)
{
if (tasks.Count == 0)
return;
List<Task>? toAwait = null;
int completedTasks = 0;
for (int i = 0; i < tasks.Count; i++)
{
if (!tasks[i].IsCompletedSuccessfully)
{
toAwait ??= new();
toAwait.Add(tasks[i].AsTask());
}
else
{
completedTasks++;
}
}
if (completedTasks != tasks.Count)
await Task.WhenAll(toAwait!).ConfigureAwait(false);
}
public static ValueTask WhenAll(IEnumerable<ValueTask> tasks)
{
return WhenAll(tasks.ToList());
}
}
}

View File

@ -1,5 +1,6 @@
using System;
using System.Net.WebSockets;
using CryptoExchange.Net.Sockets;
using Microsoft.Extensions.Logging;
namespace CryptoExchange.Net.Logging.Extensions
@ -8,7 +9,7 @@ namespace CryptoExchange.Net.Logging.Extensions
public static class SocketConnectionLoggingExtension
{
private static readonly Action<ILogger, int, bool, Exception?> _activityPaused;
private static readonly Action<ILogger, int, Sockets.SocketConnection.SocketStatus, Sockets.SocketConnection.SocketStatus, Exception?> _socketStatusChanged;
private static readonly Action<ILogger, int, SocketStatus, SocketStatus, Exception?> _socketStatusChanged;
private static readonly Action<ILogger, int, string?, Exception?> _failedReconnectProcessing;
private static readonly Action<ILogger, int, Exception?> _unknownExceptionWhileProcessingReconnection;
private static readonly Action<ILogger, int, WebSocketError, string?, Exception?> _webSocketErrorCodeAndDetails;
@ -46,7 +47,7 @@ namespace CryptoExchange.Net.Logging.Extensions
new EventId(2000, "ActivityPaused"),
"[Sckt {SocketId}] paused activity: {Paused}");
_socketStatusChanged = LoggerMessage.Define<int, Sockets.SocketConnection.SocketStatus, Sockets.SocketConnection.SocketStatus>(
_socketStatusChanged = LoggerMessage.Define<int, SocketStatus, SocketStatus>(
LogLevel.Debug,
new EventId(2001, "SocketStatusChanged"),
"[Sckt {SocketId}] status changed from {OldStatus} to {NewStatus}");
@ -203,7 +204,7 @@ namespace CryptoExchange.Net.Logging.Extensions
_activityPaused(logger, socketId, paused, null);
}
public static void SocketStatusChanged(this ILogger logger, int socketId, Sockets.SocketConnection.SocketStatus oldStatus, Sockets.SocketConnection.SocketStatus newStatus)
public static void SocketStatusChanged(this ILogger logger, int socketId, SocketStatus oldStatus, SocketStatus newStatus)
{
_socketStatusChanged(logger, socketId, oldStatus, newStatus, null);
}

View File

@ -0,0 +1,109 @@
using CryptoExchange.Net.Sockets;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Objects.Sockets
{
/// <summary>
/// Subscription to a data stream
/// </summary>
public class HighPerfUpdateSubscription
{
private readonly HighPerfSocketConnection _connection;
internal readonly HighPerfSubscription _subscription;
private object _eventLock = new object();
private bool _connectionEventsSubscribed = true;
private List<Action> _connectionClosedEventHandlers = new List<Action>();
/// <summary>
/// Event when the status of the subscription changes
/// </summary>
public event Action<SubscriptionStatus>? SubscriptionStatusChanged;
/// <summary>
/// Event when the connection is closed and will not be reconnected
/// </summary>
public event Action ConnectionClosed
{
add { lock (_eventLock) _connectionClosedEventHandlers.Add(value); }
remove { lock (_eventLock) _connectionClosedEventHandlers.Remove(value); }
}
/// <summary>
/// Event when an exception happens during the handling of the data
/// </summary>
public event Action<Exception> Exception
{
add => _subscription.Exception += value;
remove => _subscription.Exception -= value;
}
/// <summary>
/// The id of the socket
/// </summary>
public int SocketId => _connection.SocketId;
/// <summary>
/// The id of the subscription
/// </summary>
public int Id => _subscription.Id;
/// <summary>
/// ctor
/// </summary>
/// <param name="connection">The socket connection the subscription is on</param>
/// <param name="subscription">The subscription</param>
public HighPerfUpdateSubscription(HighPerfSocketConnection connection, HighPerfSubscription subscription)
{
_connection = connection;
_connection.ConnectionClosed += HandleConnectionClosedEvent;
_subscription = subscription;
}
private void UnsubscribeConnectionEvents()
{
lock (_eventLock)
{
if (!_connectionEventsSubscribed)
return;
_connection.ConnectionClosed -= HandleConnectionClosedEvent;
_connectionEventsSubscribed = false;
}
}
private void HandleConnectionClosedEvent()
{
UnsubscribeConnectionEvents();
List<Action> handlers;
lock (_eventLock)
handlers = _connectionClosedEventHandlers.ToList();
foreach(var callback in handlers)
callback();
}
/// <summary>
/// Close the subscription
/// </summary>
/// <returns></returns>
public Task CloseAsync()
{
return _connection.CloseAsync(_subscription);
}
/// <summary>
/// Unsubscribe a subscription
/// </summary>
/// <returns></returns>
internal async Task UnsubscribeAsync()
{
await _connection.UnsubscribeAsync(_subscription).ConfigureAwait(false);
}
}
}

View File

@ -1,6 +1,7 @@
using CryptoExchange.Net.RateLimiting.Interfaces;
using System;
using System.Collections.Generic;
using System.IO.Pipelines;
using System.Text;
namespace CryptoExchange.Net.Objects.Sockets
@ -74,6 +75,8 @@ namespace CryptoExchange.Net.Objects.Sockets
/// </summary>
public int? ReceiveBufferSize { get; set; } = null;
public PipeWriter? PipeWriter { get; set; } = null;
/// <summary>
/// ctor
/// </summary>

View File

@ -0,0 +1,505 @@
using CryptoExchange.Net.Interfaces;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using CryptoExchange.Net.Objects;
using System.Net.WebSockets;
using CryptoExchange.Net.Objects.Sockets;
using System.Diagnostics;
using CryptoExchange.Net.Clients;
using CryptoExchange.Net.Logging.Extensions;
using System.Threading;
using System.IO.Pipelines;
using System.Text.Json;
namespace CryptoExchange.Net.Sockets
{
/// <summary>
/// A single socket connection to the server
/// </summary>
public abstract class HighPerfSocketConnection : ISocketConnection
{
/// <summary>
/// Connection closed and no reconnect is happening
/// </summary>
public event Action? ConnectionClosed;
/// <inheritdoc />
public bool Authenticated { get; set; } = false;
/// <summary>
/// The amount of subscriptions on this connection
/// </summary>
public int UserSubscriptionCount => _subscriptions.Count;
/// <summary>
/// Get a copy of the current message subscriptions
/// </summary>
public HighPerfSubscription[] Subscriptions
{
get
{
lock (_listenersLock)
return _subscriptions.ToArray();
}
}
/// <summary>
/// If connection is made
/// </summary>
public bool Connected => _socket.IsOpen;
/// <summary>
/// The unique ID of the socket
/// </summary>
public int SocketId => _socket.Id;
/// <summary>
/// The connection uri
/// </summary>
public Uri ConnectionUri => _socket.Uri;
/// <summary>
/// The API client the connection is for
/// </summary>
public SocketApiClient ApiClient { get; set; }
/// <summary>
/// Tag for identification
/// </summary>
public string Tag { get; set; }
/// <summary>
/// Additional properties for this connection
/// </summary>
public Dictionary<string, object> Properties { get; set; }
/// <summary>
/// Status of the socket connection
/// </summary>
public SocketStatus Status
{
get => _status;
private set
{
if (_status == value)
return;
var oldStatus = _status;
_status = value;
_logger.SocketStatusChanged(SocketId, oldStatus, value);
}
}
public string Topic { get; set; }
private readonly object _listenersLock;
private readonly ILogger _logger;
private SocketStatus _status;
private readonly IMessageSerializer _serializer;
protected readonly JsonSerializerOptions _serializerOptions;
protected readonly Pipe _pipe;
private Task _processTask;
private CancellationTokenSource _cts = new CancellationTokenSource();
protected abstract List<HighPerfSubscription> _subscriptions { get; }
public abstract Type UpdateType { get; }
/// <summary>
/// The task that is sending periodic data on the websocket. Can be used for sending Ping messages every x seconds or similar. Not necessary.
/// </summary>
protected Task? periodicTask;
/// <summary>
/// Wait event for the periodicTask
/// </summary>
protected AsyncResetEvent? periodicEvent;
/// <summary>
/// The underlying websocket
/// </summary>
private readonly IHighPerfWebsocket _socket;
/// <summary>
/// New socket connection
/// </summary>
public HighPerfSocketConnection(ILogger logger, IWebsocketFactory socketFactory, WebSocketParameters parameters, SocketApiClient apiClient, JsonSerializerOptions serializerOptions, string tag)
{
_logger = logger;
_pipe = new Pipe();
_serializerOptions = serializerOptions;
ApiClient = apiClient;
Tag = tag;
Properties = new Dictionary<string, object>();
_socket = socketFactory.CreateHighPerfWebsocket(logger, parameters, _pipe.Writer);
_logger.SocketCreatedForAddress(_socket.Id, parameters.Uri.ToString());
_socket.OnOpen += HandleOpenAsync;
_socket.OnClose += HandleCloseAsync;
_socket.OnError += HandleErrorAsync;
_listenersLock = new object();
_serializer = apiClient.CreateSerializer();
}
protected abstract Task ProcessAsync(CancellationToken ct);
/// <summary>
/// Handler for a socket opening
/// </summary>
protected virtual Task HandleOpenAsync()
{
Status = SocketStatus.Connected;
return Task.CompletedTask;
}
/// <summary>
/// Handler for a socket closing without reconnect
/// </summary>
protected virtual Task HandleCloseAsync()
{
Status = SocketStatus.Closed;
_cts.Cancel();
lock (_listenersLock)
{
foreach (var subscription in _subscriptions)
subscription.Reset();
}
_ = Task.Run(() => ConnectionClosed?.Invoke());
return Task.CompletedTask;
}
/// <summary>
/// Handler for an error on a websocket
/// </summary>
/// <param name="e">The exception</param>
protected virtual Task HandleErrorAsync(Exception e)
{
if (e is WebSocketException wse)
_logger.WebSocketErrorCodeAndDetails(SocketId, wse.WebSocketErrorCode, wse.Message, wse);
else
_logger.WebSocketError(SocketId, e.Message, e);
return Task.CompletedTask;
}
/// <summary>
/// Connect the websocket
/// </summary>
/// <returns></returns>
public async Task<CallResult> ConnectAsync(CancellationToken ct)
{
var result = await _socket.ConnectAsync(ct).ConfigureAwait(false);
if (result.Success)
_processTask = ProcessAsync(_cts.Token);
return result;
}
/// <summary>
/// Retrieve the underlying socket
/// </summary>
/// <returns></returns>
public IHighPerfWebsocket GetSocket() => _socket;
/// <summary>
/// Close the connection
/// </summary>
/// <returns></returns>
public async Task CloseAsync()
{
if (Status == SocketStatus.Closed || Status == SocketStatus.Disposed)
return;
if (ApiClient.socketConnections.ContainsKey(SocketId))
ApiClient.socketConnections.TryRemove(SocketId, out _);
lock (_listenersLock)
{
foreach (var subscription in _subscriptions)
{
if (subscription.CancellationTokenRegistration.HasValue)
subscription.CancellationTokenRegistration.Value.Dispose();
}
}
await _socket.CloseAsync().ConfigureAwait(false);
_socket.Dispose();
}
/// <summary>
/// Close a subscription on this connection. If all subscriptions on this connection are closed the connection gets closed as well
/// </summary>
/// <param name="subscription">Subscription to close</param>
/// <returns></returns>
public async Task CloseAsync(HighPerfSubscription subscription)
{
if (Status == SocketStatus.Closing || Status == SocketStatus.Closed || Status == SocketStatus.Disposed)
{
return;
}
_logger.ClosingSubscription(SocketId, subscription.Id);
if (subscription.CancellationTokenRegistration.HasValue)
subscription.CancellationTokenRegistration.Value.Dispose();
bool anyOtherSubscriptions;
lock (_listenersLock)
anyOtherSubscriptions = _subscriptions.Any(x => x != subscription);
if (anyOtherSubscriptions)
await UnsubscribeAsync(subscription).ConfigureAwait(false);
if (Status == SocketStatus.Closing)
{
_logger.AlreadyClosing(SocketId);
return;
}
if (!anyOtherSubscriptions)
{
Status = SocketStatus.Closing;
_logger.ClosingNoMoreSubscriptions(SocketId);
await CloseAsync().ConfigureAwait(false);
}
lock (_listenersLock)
_subscriptions.Remove(subscription);
}
/// <summary>
/// Dispose the connection
/// </summary>
public void Dispose()
{
Status = SocketStatus.Disposed;
periodicEvent?.Set();
periodicEvent?.Dispose();
_socket.Dispose();
}
/// <summary>
/// Whether or not a new subscription can be added to this connection
/// </summary>
/// <returns></returns>
public bool CanAddSubscription() => Status == SocketStatus.None || Status == SocketStatus.Connected;
/// <summary>
/// Get a subscription on this connection by id
/// </summary>
/// <param name="id"></param>
public HighPerfSubscription? GetSubscription(int id)
{
lock (_listenersLock)
return _subscriptions.SingleOrDefault(s => s.Id == id);
}
/// <summary>
/// Send data over the websocket connection
/// </summary>
/// <typeparam name="T">The type of the object to send</typeparam>
/// <param name="requestId">The request id</param>
/// <param name="obj">The object to send</param>
/// <param name="weight">The weight of the message</param>
public virtual ValueTask<CallResult> SendAsync<T>(int requestId, T obj, int weight)
{
if (_serializer is IByteMessageSerializer byteSerializer)
{
return SendBytesAsync(requestId, byteSerializer.Serialize(obj), weight);
}
else if (_serializer is IStringMessageSerializer stringSerializer)
{
if (obj is string str)
return SendStringAsync(requestId, str, weight);
str = stringSerializer.Serialize(obj);
return SendStringAsync(requestId, str, weight);
}
throw new Exception("Unknown serializer when sending message");
}
/// <summary>
/// Send byte data over the websocket connection
/// </summary>
/// <param name="data">The data to send</param>
/// <param name="weight">The weight of the message</param>
/// <param name="requestId">The id of the request</param>
public virtual async ValueTask<CallResult> SendBytesAsync(int requestId, byte[] data, int weight)
{
if (ApiClient.MessageSendSizeLimit != null && data.Length > ApiClient.MessageSendSizeLimit.Value)
{
var info = $"Message to send exceeds the max server message size ({data.Length} vs {ApiClient.MessageSendSizeLimit.Value} bytes). Split the request into batches to keep below this limit";
_logger.LogWarning("[Sckt {SocketId}] [Req {RequestId}] {Info}", SocketId, requestId, info);
return new CallResult(new InvalidOperationError(info));
}
if (!_socket.IsOpen)
{
_logger.LogWarning("[Sckt {SocketId}] [Req {RequestId}] failed to send, socket no longer open", SocketId, requestId);
return new CallResult(new WebError("Failed to send message, socket no longer open"));
}
_logger.SendingByteData(SocketId, requestId, data.Length);
try
{
if (!await _socket.SendAsync(requestId, data, weight).ConfigureAwait(false))
return new CallResult(new WebError("Failed to send message, connection not open"));
return CallResult.SuccessResult;
}
catch (Exception ex)
{
return new CallResult(new WebError("Failed to send message: " + ex.Message, exception: ex));
}
}
/// <summary>
/// Send string data over the websocket connection
/// </summary>
/// <param name="data">The data to send</param>
/// <param name="weight">The weight of the message</param>
/// <param name="requestId">The id of the request</param>
public virtual async ValueTask<CallResult> SendStringAsync(int requestId, string data, int weight)
{
if (ApiClient.MessageSendSizeLimit != null && data.Length > ApiClient.MessageSendSizeLimit.Value)
{
var info = $"Message to send exceeds the max server message size ({data.Length} vs {ApiClient.MessageSendSizeLimit.Value} bytes). Split the request into batches to keep below this limit";
_logger.LogWarning("[Sckt {SocketId}] [Req {RequestId}] {Info}", SocketId, requestId, info);
return new CallResult(new InvalidOperationError(info));
}
if (!_socket.IsOpen)
{
_logger.LogWarning("[Sckt {SocketId}] [Req {RequestId}] failed to send, socket no longer open", SocketId, requestId);
return new CallResult(new WebError("Failed to send message, socket no longer open"));
}
_logger.SendingData(SocketId, requestId, data);
try
{
if (!await _socket.SendAsync(requestId, data, weight).ConfigureAwait(false))
return new CallResult(new WebError("Failed to send message, connection not open"));
return CallResult.SuccessResult;
}
catch (Exception ex)
{
return new CallResult(new WebError("Failed to send message: " + ex.Message, exception: ex));
}
}
internal async Task UnsubscribeAsync(HighPerfSubscription subscription)
{
var unsubscribeRequest = subscription.CreateUnsubscriptionQuery(this);
if (unsubscribeRequest == null)
return;
await SendAsync(unsubscribeRequest.Id, unsubscribeRequest.Request, unsubscribeRequest.Weight).ConfigureAwait(false);
_logger.SubscriptionUnsubscribed(SocketId, subscription.Id);
}
/// <summary>
/// Periodically sends data over a socket connection
/// </summary>
/// <param name="identifier">Identifier for the periodic send</param>
/// <param name="interval">How often</param>
/// <param name="queryDelegate">Method returning the query to send</param>
/// <param name="callback">The callback for processing the response</param>
public virtual void QueryPeriodic(string identifier, TimeSpan interval, Func<HighPerfSocketConnection, Query> queryDelegate, Action<HighPerfSocketConnection, CallResult>? callback)
{
if (queryDelegate == null)
throw new ArgumentNullException(nameof(queryDelegate));
periodicEvent = new AsyncResetEvent();
periodicTask = Task.Run(async () =>
{
while (Status != SocketStatus.Disposed
&& Status != SocketStatus.Closed
&& Status != SocketStatus.Closing)
{
await periodicEvent.WaitAsync(interval).ConfigureAwait(false);
if (Status == SocketStatus.Disposed
|| Status == SocketStatus.Closed
|| Status == SocketStatus.Closing)
{
break;
}
if (!Connected)
continue;
var query = queryDelegate(this);
if (query == null)
continue;
_logger.SendingPeriodic(SocketId, identifier);
try
{
var result = await SendAsync(query.Id, query.Request, query.Weight).ConfigureAwait(false);
callback?.Invoke(this, result);
}
catch (Exception ex)
{
_logger.PeriodicSendFailed(SocketId, identifier, ex.Message, ex);
}
}
});
}
public void QueryPeriodic(string identifier, TimeSpan interval, Func<SocketConnection, Query> queryDelegate, Action<SocketConnection, CallResult>? callback) => throw new NotImplementedException();
}
public class HighPerfSocketConnection<T> : HighPerfSocketConnection
{
private List<HighPerfSubscription<T>> _typedSubscriptions;
protected override List<HighPerfSubscription> _subscriptions => _typedSubscriptions.Select(x => (HighPerfSubscription)x).ToList();
public override Type UpdateType => typeof(T);
public HighPerfSocketConnection(ILogger logger, IWebsocketFactory socketFactory, WebSocketParameters parameters, SocketApiClient apiClient, JsonSerializerOptions serializerOptions, string tag) : base(logger, socketFactory, parameters, apiClient, serializerOptions, tag)
{
_typedSubscriptions = new List<HighPerfSubscription<T>>();
}
/// <summary>
/// Add a subscription to this connection
/// </summary>
/// <param name="subscription"></param>
public bool AddSubscription(HighPerfSubscription<T> subscription)
{
if (Status != SocketStatus.None && Status != SocketStatus.Connected)
return false;
//lock (_listenersLock)
_typedSubscriptions.Add(subscription);
//_logger.AddingNewSubscription(SocketId, subscription.Id, UserSubscriptionCount);
return true;
}
protected override async Task ProcessAsync(CancellationToken ct)
{
try
{
await foreach (var update in JsonSerializer.DeserializeAsyncEnumerable<T>(_pipe.Reader, _serializerOptions, ct).ConfigureAwait(false))
{
var tasks = _typedSubscriptions.Select(sub => sub.HandleAsync(update!));
await LibraryHelpers.WhenAll(tasks).ConfigureAwait(false);
}
}
catch (OperationCanceledException) { }
}
}
}

View File

@ -0,0 +1,162 @@
using Microsoft.Extensions.Logging;
using System;
using System.Threading;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Sockets
{
/// <summary>
/// Socket subscription
/// </summary>
public abstract class HighPerfSubscription
{
/// <summary>
/// Subscription id
/// </summary>
public int Id { get; set; }
/// <summary>
/// Total amount of invocations
/// </summary>
public int TotalInvocations { get; set; }
//private SubscriptionStatus _status;
///// <summary>
///// Current subscription status
///// </summary>
//public SubscriptionStatus Status
//{
// get => _status;
// set
// {
// if (_status == value)
// return;
// _status = value;
// Task.Run(() => StatusChanged?.Invoke(value));
// }
//}
///// <summary>
///// Whether the subscription is active
///// </summary>
//public bool Active => Status != SubscriptionStatus.Closing && Status != SubscriptionStatus.Closed;
///// <summary>
///// Whether the unsubscribing of this subscription lead to the closing of the connection
///// </summary>
//public bool IsClosingConnection { get; set; }
/// <summary>
/// Logger
/// </summary>
protected readonly ILogger _logger;
/// <summary>
/// Cancellation token registration
/// </summary>
public CancellationTokenRegistration? CancellationTokenRegistration { get; set; }
/// <summary>
/// Exception event
/// </summary>
public event Action<Exception>? Exception;
///// <summary>
///// Listener unsubscribed event
///// </summary>
//public event Action<SubscriptionStatus>? StatusChanged;
/// <summary>
/// The subscribe query for this subscription
/// </summary>
public Query? SubscriptionQuery { get; private set; }
/// <summary>
/// The unsubscribe query for this subscription
/// </summary>
public Query? UnsubscriptionQuery { get; private set; }
/// <summary>
/// ctor
/// </summary>
public HighPerfSubscription(ILogger logger)
{
_logger = logger;
Id = ExchangeHelpers.NextId();
}
/// <summary>
/// Create a new subscription query
/// </summary>
public Query? CreateSubscriptionQuery(HighPerfSocketConnection connection)
{
var query = GetSubQuery(connection);
SubscriptionQuery = query;
return query;
}
/// <summary>
/// Get the subscribe query to send when subscribing
/// </summary>
/// <returns></returns>
protected abstract Query? GetSubQuery(HighPerfSocketConnection connection);
/// <summary>
/// Create a new unsubscription query
/// </summary>
public Query? CreateUnsubscriptionQuery(HighPerfSocketConnection connection)
{
var query = GetUnsubQuery(connection);
UnsubscriptionQuery = query;
return query;
}
/// <summary>
/// Get the unsubscribe query to send when unsubscribing
/// </summary>
/// <returns></returns>
protected abstract Query? GetUnsubQuery(HighPerfSocketConnection connection);
/// <summary>
/// Reset the subscription
/// </summary>
public void Reset()
{
DoHandleReset();
}
/// <summary>
/// Connection has been reset, do any logic for resetting the subscription
/// </summary>
public virtual void DoHandleReset() { }
/// <summary>
/// Invoke the exception event
/// </summary>
/// <param name="e"></param>
public void InvokeExceptionHandler(Exception e)
{
Exception?.Invoke(e);
}
}
/// <inheritdoc />
public abstract class HighPerfSubscription<TUpdateType> : HighPerfSubscription
{
private Func<TUpdateType, ValueTask> _handler;
/// <summary>
/// ctor
/// </summary>
protected HighPerfSubscription(ILogger logger, Func<TUpdateType, ValueTask> handler) : base(logger)
{
_handler = handler;
}
public ValueTask HandleAsync(TUpdateType update)
{
return _handler.Invoke(update);
}
}
}

View File

@ -0,0 +1,524 @@
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Logging.Extensions;
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Errors;
using CryptoExchange.Net.Objects.Sockets;
using Microsoft.Extensions.Logging;
using System;
using System.Buffers;
using System.IO.Pipelines;
using System.Net;
using System.Net.Http;
using System.Net.WebSockets;
using System.Text;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Sockets
{
/// <summary>
/// A wrapper around the ClientWebSocket
/// </summary>
public class HighPerfWebSocketClient : IHighPerfWebsocket
{
enum ProcessState
{
Idle,
Processing,
WaitingForClose,
Reconnecting
}
private ClientWebSocket? _socket;
private static readonly ArrayPool<byte> _receiveBufferPool = ArrayPool<byte>.Shared;
private readonly SemaphoreSlim _closeSem;
private CancellationTokenSource _ctsSource;
private Task? _processTask;
private Task? _closeTask;
private bool _stopRequested;
private bool _disposed;
private ProcessState _processState;
private DateTime _lastReconnectTime;
private readonly string _baseAddress;
private int _reconnectAttempt;
private readonly int _receiveBufferSize;
private readonly PipeWriter _pipeWriter;
private const int _defaultReceiveBufferSize = 1048576;
private const int _sendBufferSize = 4096;
/// <summary>
/// Log
/// </summary>
protected ILogger _logger;
/// <inheritdoc />
public int Id { get; }
/// <inheritdoc />
public WebSocketParameters Parameters { get; }
/// <inheritdoc />
public Uri Uri => Parameters.Uri;
/// <inheritdoc />
public virtual bool IsClosed => _socket == null || _socket?.State == WebSocketState.Closed;
/// <inheritdoc />
public virtual bool IsOpen => _socket?.State == WebSocketState.Open && !_ctsSource.IsCancellationRequested;
/// <inheritdoc />
public event Func<Task>? OnClose;
/// <inheritdoc />
public event Func<Exception, Task>? OnError;
/// <inheritdoc />
public event Func<Task>? OnOpen;
/// <summary>
/// ctor
/// </summary>
public HighPerfWebSocketClient(ILogger logger, WebSocketParameters websocketParameters, PipeWriter pipeWriter)
{
Id = ExchangeHelpers.NextId();
_logger = logger;
Parameters = websocketParameters;
_ctsSource = new CancellationTokenSource();
_receiveBufferSize = websocketParameters.ReceiveBufferSize ?? _defaultReceiveBufferSize;
_pipeWriter = pipeWriter;
_closeSem = new SemaphoreSlim(1, 1);
_baseAddress = $"{Uri.Scheme}://{Uri.Host}";
}
/// <inheritdoc />
public void UpdateProxy(ApiProxy? proxy)
{
Parameters.Proxy = proxy;
}
/// <inheritdoc />
public virtual async Task<CallResult> ConnectAsync(CancellationToken ct)
{
var connectResult = await ConnectInternalAsync(ct).ConfigureAwait(false);
if (!connectResult)
return connectResult;
await (OnOpen?.Invoke() ?? Task.CompletedTask).ConfigureAwait(false);
_processTask = ProcessAsync();
return connectResult;
}
/// <summary>
/// Create the socket object
/// </summary>
private ClientWebSocket CreateSocket()
{
var cookieContainer = new CookieContainer();
foreach (var cookie in Parameters.Cookies)
cookieContainer.Add(new Cookie(cookie.Key, cookie.Value));
var socket = new ClientWebSocket();
try
{
socket.Options.Cookies = cookieContainer;
foreach (var header in Parameters.Headers)
socket.Options.SetRequestHeader(header.Key, header.Value);
socket.Options.KeepAliveInterval = Parameters.KeepAliveInterval ?? TimeSpan.Zero;
socket.Options.SetBuffer(_receiveBufferSize, _sendBufferSize);
if (Parameters.Proxy != null)
SetProxy(socket, Parameters.Proxy);
#if NET6_0_OR_GREATER
socket.Options.CollectHttpResponseDetails = true;
#endif
#if NET9_0_OR_GREATER
socket.Options.KeepAliveTimeout = Parameters.KeepAliveTimeout ?? TimeSpan.FromSeconds(10);
#endif
}
catch (PlatformNotSupportedException)
{
// Options are not supported on certain platforms (WebAssembly for instance)
// best we can do it try to connect without setting options.
}
return socket;
}
private async Task<CallResult> ConnectInternalAsync(CancellationToken ct)
{
_logger.SocketConnecting(Id);
try
{
using CancellationTokenSource tcs = new(TimeSpan.FromSeconds(10));
using var linked = CancellationTokenSource.CreateLinkedTokenSource(tcs.Token, _ctsSource.Token, ct);
_socket = CreateSocket();
await _socket.ConnectAsync(Uri, linked.Token).ConfigureAwait(false);
}
catch (Exception e)
{
if (ct.IsCancellationRequested)
{
_logger.SocketConnectingCanceled(Id);
}
else if (!_ctsSource.IsCancellationRequested)
{
// if _ctsSource was canceled this was already logged
_logger.SocketConnectionFailed(Id, e.Message, e);
}
if (e is WebSocketException we)
{
#if (NET6_0_OR_GREATER)
if (_socket.HttpStatusCode == HttpStatusCode.TooManyRequests)
{
return new CallResult(new ServerRateLimitError(we.Message, we));
}
if (_socket.HttpStatusCode == HttpStatusCode.Unauthorized)
{
return new CallResult(new ServerError(new ErrorInfo(ErrorType.Unauthorized, "Server returned status code `401` when `101` was expected")));
}
#else
// ClientWebSocket.HttpStatusCode is only available in .NET6+ https://learn.microsoft.com/en-us/dotnet/api/system.net.websockets.clientwebsocket.httpstatuscode?view=net-8.0
// Try to read 429 from the message instead
if (we.Message.Contains("429"))
{
return new CallResult(new ServerRateLimitError(we.Message, we));
}
#endif
}
return new CallResult(new CantConnectError(e));
}
_logger.SocketConnected(Id, Uri);
return CallResult.SuccessResult;
}
/// <inheritdoc />
private async Task ProcessAsync()
{
_logger.SocketStartingProcessing(Id);
SetProcessState(ProcessState.Processing);
await ReceiveLoopAsync().ConfigureAwait(false);
_logger.SocketFinishedProcessing(Id);
SetProcessState(ProcessState.WaitingForClose);
while (_closeTask == null)
await Task.Delay(50).ConfigureAwait(false);
await _closeTask.ConfigureAwait(false);
if (!_stopRequested)
_closeTask = null;
SetProcessState(ProcessState.Idle);
await (OnClose?.Invoke() ?? Task.CompletedTask).ConfigureAwait(false);
_logger.SocketClosed(Id);
}
/// <inheritdoc />
public virtual async ValueTask<bool> SendAsync(int id, string data, int weight)
{
if (_ctsSource.IsCancellationRequested || _processState != ProcessState.Processing)
return false;
var bytes = Parameters.Encoding.GetBytes(data);
_logger.SocketAddingBytesToSendBuffer(Id, id, bytes);
try
{
await _socket!.SendAsync(new ArraySegment<byte>(bytes, 0, bytes.Length), WebSocketMessageType.Text, true, _ctsSource.Token).ConfigureAwait(false);
_logger.SocketSentBytes(Id, id, bytes.Length);
return true;
}
catch (OperationCanceledException)
{
// canceled
return false;
}
catch (Exception ioe)
{
// Connection closed unexpectedly, .NET framework
await (OnError?.Invoke(ioe) ?? Task.CompletedTask).ConfigureAwait(false);
if (_closeTask?.IsCompleted != false)
_closeTask = CloseInternalAsync();
return false;
}
}
/// <inheritdoc />
public virtual async ValueTask<bool> SendAsync(int id, byte[] data, int weight)
{
if (_ctsSource.IsCancellationRequested || _processState != ProcessState.Processing)
return false;
_logger.SocketAddingBytesToSendBuffer(Id, id, data);
try
{
await _socket!.SendAsync(new ArraySegment<byte>(data, 0, data.Length), WebSocketMessageType.Binary, true, _ctsSource.Token).ConfigureAwait(false);
_logger.SocketSentBytes(Id, id, data.Length);
return true;
}
catch (OperationCanceledException)
{
// canceled
return false;
}
catch (Exception ioe)
{
// Connection closed unexpectedly, .NET framework
await (OnError?.Invoke(ioe) ?? Task.CompletedTask).ConfigureAwait(false);
if (_closeTask?.IsCompleted != false)
_closeTask = CloseInternalAsync();
return false;
}
}
/// <inheritdoc />
public virtual async Task CloseAsync()
{
await _closeSem.WaitAsync().ConfigureAwait(false);
_stopRequested = true;
try
{
if (_closeTask?.IsCompleted == false)
{
_logger.SocketCloseAsyncWaitingForExistingCloseTask(Id);
await _closeTask.ConfigureAwait(false);
return;
}
if (!IsOpen)
{
_logger.SocketCloseAsyncSocketNotOpen(Id);
return;
}
_logger.SocketClosing(Id);
_closeTask = CloseInternalAsync();
}
finally
{
_closeSem.Release();
}
await _closeTask.ConfigureAwait(false);
if(_processTask != null)
await _processTask.ConfigureAwait(false);
}
/// <summary>
/// Internal close method
/// </summary>
/// <returns></returns>
private async Task CloseInternalAsync()
{
if (_disposed)
return;
try
{
if (_socket!.State == WebSocketState.CloseReceived)
{
await _socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing", default).ConfigureAwait(false);
}
else if (_socket.State == WebSocketState.Open)
{
await _socket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Closing", default).ConfigureAwait(false);
var startWait = DateTime.UtcNow;
while (_socket.State != WebSocketState.Closed && _socket.State != WebSocketState.Aborted)
{
// Wait until we receive close confirmation
await Task.Delay(10).ConfigureAwait(false);
if (DateTime.UtcNow - startWait > TimeSpan.FromSeconds(1))
break; // Wait for max 1 second, then just abort the connection
}
}
}
catch (Exception)
{
// Can sometimes throw an exception when socket is in aborted state due to timing
// Websocket is set to Aborted state when the cancelation token is set during SendAsync/ReceiveAsync
// So socket might go to aborted state, might still be open
}
_ctsSource.Cancel();
}
/// <summary>
/// Dispose the socket
/// </summary>
public void Dispose()
{
if (_disposed)
return;
if (_ctsSource?.IsCancellationRequested == false)
_ctsSource.Cancel();
_logger.SocketDisposing(Id);
_disposed = true;
_socket?.Dispose();
_ctsSource?.Dispose();
_logger.SocketDisposed(Id);
}
/// <summary>
/// Loop for receiving and reassembling data
/// </summary>
/// <returns></returns>
private async Task ReceiveLoopAsync()
{
byte[] rentedBuffer = _receiveBufferPool.Rent(_receiveBufferSize);
var buffer = new ArraySegment<byte>(rentedBuffer);
var first = true;
try
{
while (true)
{
if (_ctsSource.IsCancellationRequested)
break;
WebSocketReceiveResult? receiveResult = null;
while (true)
{
try
{
//_stream.Read
receiveResult = await _socket!.ReceiveAsync(buffer, _ctsSource.Token).ConfigureAwait(false);
}
catch (OperationCanceledException ex)
{
if (ex.InnerException?.InnerException?.Message.Contains("KeepAliveTimeout") == true)
{
// Specific case that the websocket connection got closed because of a ping frame timeout
// Unfortunately doesn't seem to be a nicer way to catch
_logger.SocketPingTimeout(Id);
}
if (_closeTask?.IsCompleted != false)
_closeTask = CloseInternalAsync();
// canceled
break;
}
catch (Exception wse)
{
if (!_ctsSource.Token.IsCancellationRequested && !_stopRequested)
// Connection closed unexpectedly
await (OnError?.Invoke(wse) ?? Task.CompletedTask).ConfigureAwait(false);
if (_closeTask?.IsCompleted != false)
_closeTask = CloseInternalAsync();
break;
}
if (receiveResult.MessageType == WebSocketMessageType.Close)
{
// Connection closed
if (_socket.State == WebSocketState.CloseReceived)
{
// Close received means it server initiated, we should send a confirmation and close the socket
_logger.SocketReceivedCloseMessage(Id, receiveResult.CloseStatus.ToString()!, receiveResult.CloseStatusDescription ?? string.Empty);
if (_closeTask?.IsCompleted != false)
_closeTask = CloseInternalAsync();
}
else
{
// Means the socket is now closed and we were the one initiating it
_logger.SocketReceivedCloseConfirmation(Id, receiveResult.CloseStatus.ToString()!, receiveResult.CloseStatusDescription ?? string.Empty);
}
break;
}
if (!first)
{
// Write a comma to split the json data
if (receiveResult.EndOfMessage)
await _pipeWriter.WriteAsync(new byte[] { 44 }).ConfigureAwait(false);
}
else
{
// Write a opening bracket
await _pipeWriter.WriteAsync(new byte[] { 91 }).ConfigureAwait(false);
first = false;
}
await _pipeWriter.WriteAsync(new ReadOnlyMemory<byte>(buffer.Array!, buffer.Offset, receiveResult.Count)).ConfigureAwait(false);
}
if (receiveResult?.MessageType == WebSocketMessageType.Close)
{
// Received close message
break;
}
if (receiveResult == null || _ctsSource.IsCancellationRequested)
{
// Error during receiving or cancellation requested, stop.
break;
}
}
}
catch(Exception e)
{
// Because this is running in a separate task and not awaited until the socket gets closed
// any exception here will crash the receive processing, but do so silently unless the socket gets stopped.
// Make sure we at least let the owner know there was an error
_logger.SocketReceiveLoopStoppedWithException(Id, e);
await (OnError?.Invoke(e) ?? Task.CompletedTask).ConfigureAwait(false);
if (_closeTask?.IsCompleted != false)
_closeTask = CloseInternalAsync();
}
finally
{
// Not needed?
//await _pipeWriter.WriteAsync(Encoding.UTF8.GetBytes("]")).ConfigureAwait(false);
_receiveBufferPool.Return(rentedBuffer, true);
_logger.SocketReceiveLoopFinished(Id);
}
}
/// <summary>
/// Set proxy on socket
/// </summary>
/// <param name="socket"></param>
/// <param name="proxy"></param>
/// <exception cref="ArgumentException"></exception>
protected virtual void SetProxy(ClientWebSocket socket, ApiProxy proxy)
{
if (!Uri.TryCreate($"{proxy.Host}:{proxy.Port}", UriKind.Absolute, out var uri))
throw new ArgumentException("Proxy settings invalid, {proxy.Host}:{proxy.Port} not a valid URI", nameof(proxy));
socket.Options.Proxy = uri?.Scheme == null
? socket.Options.Proxy = new WebProxy(proxy.Host, proxy.Port)
: socket.Options.Proxy = new WebProxy
{
Address = uri
};
if (proxy.Login != null)
socket.Options.Proxy.Credentials = new NetworkCredential(proxy.Login, proxy.Password);
}
private void SetProcessState(ProcessState state)
{
if (_processState == state)
return;
_logger.SocketProcessingStateChanged(Id, _processState.ToString(), state.ToString());
_processState = state;
}
}
}

View File

@ -0,0 +1,30 @@
using CryptoExchange.Net.Clients;
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Objects;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Sockets
{
public interface ISocketConnection
{
SocketApiClient ApiClient { get; set; }
bool Authenticated { get; set; }
bool Connected { get; }
Uri ConnectionUri { get; }
int SocketId { get; }
string Tag { get; set; }
event Action? ConnectionClosed;
Task<CallResult> ConnectAsync(CancellationToken ct);
Task CloseAsync();
void Dispose();
ValueTask<CallResult> SendStringAsync(int requestId, string data, int weight);
ValueTask<CallResult> SendAsync<T>(int requestId, T obj, int weight);
ValueTask<CallResult> SendBytesAsync(int requestId, byte[] data, int weight);
}
}

View File

@ -1,45 +1,83 @@
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Clients;
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Logging.Extensions;
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Sockets;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using CryptoExchange.Net.Objects;
using System.Net.WebSockets;
using CryptoExchange.Net.Objects.Sockets;
using System.Diagnostics;
using CryptoExchange.Net.Clients;
using CryptoExchange.Net.Logging.Extensions;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Sockets
{
/// <summary>
/// A single socket connection to the server
/// State of a the connection
/// </summary>
public class SocketConnection
/// <param name="Id">The id of the socket connection</param>
/// <param name="Address">The connection URI</param>
/// <param name="Subscriptions">Number of subscriptions on this socket</param>
/// <param name="Status">Socket status</param>
/// <param name="Authenticated">If the connection is authenticated</param>
/// <param name="DownloadSpeed">Download speed over this socket</param>
/// <param name="PendingQueries">Number of non-completed queries</param>
/// <param name="SubscriptionStates">State for each subscription on this socket</param>
public record SocketConnectionState(
int Id,
string Address,
int Subscriptions,
SocketStatus Status,
bool Authenticated,
double DownloadSpeed,
int PendingQueries,
List<Subscription.SubscriptionState> SubscriptionStates
);
/// <summary>
/// Status of the socket connection
/// </summary>
public enum SocketStatus
{
/// <summary>
/// State of a the connection
/// None/Initial
/// </summary>
/// <param name="Id">The id of the socket connection</param>
/// <param name="Address">The connection URI</param>
/// <param name="Subscriptions">Number of subscriptions on this socket</param>
/// <param name="Status">Socket status</param>
/// <param name="Authenticated">If the connection is authenticated</param>
/// <param name="DownloadSpeed">Download speed over this socket</param>
/// <param name="PendingQueries">Number of non-completed queries</param>
/// <param name="SubscriptionStates">State for each subscription on this socket</param>
public record SocketConnectionState(
int Id,
string Address,
int Subscriptions,
SocketStatus Status,
bool Authenticated,
double DownloadSpeed,
int PendingQueries,
List<Subscription.SubscriptionState> SubscriptionStates
);
None,
/// <summary>
/// Connected
/// </summary>
Connected,
/// <summary>
/// Reconnecting
/// </summary>
Reconnecting,
/// <summary>
/// Resubscribing on reconnected socket
/// </summary>
Resubscribing,
/// <summary>
/// Closing
/// </summary>
Closing,
/// <summary>
/// Closed
/// </summary>
Closed,
/// <summary>
/// Disposed
/// </summary>
Disposed
}
/// <summary>
/// A single socket connection to the server
/// </summary>
public class SocketConnection : ISocketConnection
{
/// <summary>
/// Connection lost event
@ -88,7 +126,7 @@ namespace CryptoExchange.Net.Sockets
{
get
{
lock(_listenersLock)
lock (_listenersLock)
return _listeners.OfType<Subscription>().Count(h => h.UserSubscription);
}
}
@ -100,7 +138,7 @@ namespace CryptoExchange.Net.Sockets
{
get
{
lock(_listenersLock)
lock (_listenersLock)
return _listeners.OfType<Subscription>().Where(h => h.UserSubscription).ToArray();
}
}
@ -162,7 +200,7 @@ namespace CryptoExchange.Net.Sockets
{
_pausedActivity = value;
_logger.ActivityPaused(SocketId, value);
if(_pausedActivity) _ = Task.Run(() => ActivityPaused?.Invoke());
if (_pausedActivity) _ = Task.Run(() => ActivityPaused?.Invoke());
else _ = Task.Run(() => ActivityUnpaused?.Invoke());
}
}
@ -247,19 +285,17 @@ namespace CryptoExchange.Net.Sockets
/// <summary>
/// New socket connection
/// </summary>
/// <param name="logger">The logger</param>
/// <param name="apiClient">The api client</param>
/// <param name="socket">The socket</param>
/// <param name="tag"></param>
public SocketConnection(ILogger logger, SocketApiClient apiClient, IWebsocket socket, string tag)
public SocketConnection(ILogger logger, IWebsocketFactory socketFactory, WebSocketParameters parameters, SocketApiClient apiClient, string tag)
{
_logger = logger;
ApiClient = apiClient;
Tag = tag;
Properties = new Dictionary<string, object>();
_socket = socket;
_socket.OnStreamMessage += HandleStreamMessage;
_socket = socketFactory.CreateWebsocket(logger, parameters);
_logger.SocketCreatedForAddress(_socket.Id, parameters.Uri.ToString());
//_socket.OnStreamMessage += HandleStreamMessage;
_socket.OnRequestSent += HandleRequestSentAsync;
_socket.OnRequestRateLimited += HandleRequestRateLimitedAsync;
_socket.OnConnectRateLimited += HandleConnectRateLimitedAsync;
@ -382,7 +418,7 @@ namespace CryptoExchange.Net.Sockets
});
}
}
catch(Exception ex)
catch (Exception ex)
{
_logger.UnknownExceptionWhileProcessingReconnection(SocketId, ex);
_ = _socket.ReconnectAsync().ConfigureAwait(false);
@ -432,7 +468,7 @@ namespace CryptoExchange.Net.Sockets
/// <returns></returns>
protected async virtual Task HandleConnectRateLimitedAsync()
{
if (ConnectRateLimitedAsync is not null)
if (ConnectRateLimitedAsync is not null)
await ConnectRateLimitedAsync().ConfigureAwait(false);
}
@ -506,12 +542,12 @@ namespace CryptoExchange.Net.Sockets
var totalUserTime = 0;
List<IMessageProcessor> localListeners;
lock(_listenersLock)
lock (_listenersLock)
localListeners = _listeners.ToList();
foreach(var processor in localListeners)
foreach (var processor in localListeners)
{
foreach(var listener in processor.MessageMatcher.GetHandlerLinks(listenId))
foreach (var listener in processor.MessageMatcher.GetHandlerLinks(listenId))
{
processed = true;
_logger.ProcessorMatched(SocketId, listener.ToString(), listenId);
@ -803,11 +839,11 @@ namespace CryptoExchange.Net.Sockets
private async Task SendAndWaitIntAsync(Query query, AsyncResetEvent? continueEvent, CancellationToken ct = default)
{
lock(_listenersLock)
lock (_listenersLock)
_listeners.Add(query);
query.ContinueAwaiter = continueEvent;
var sendResult = Send(query.Id, query.Request, query.Weight);
var sendResult = await SendAsync(query.Id, query.Request, query.Weight).ConfigureAwait(false);
if (!sendResult)
{
query.Fail(sendResult.Error!);
@ -855,19 +891,19 @@ namespace CryptoExchange.Net.Sockets
/// <param name="requestId">The request id</param>
/// <param name="obj">The object to send</param>
/// <param name="weight">The weight of the message</param>
public virtual CallResult Send<T>(int requestId, T obj, int weight)
public virtual ValueTask<CallResult> SendAsync<T>(int requestId, T obj, int weight)
{
if (_serializer is IByteMessageSerializer byteSerializer)
{
return SendBytes(requestId, byteSerializer.Serialize(obj), weight);
return SendBytesAsync(requestId, byteSerializer.Serialize(obj), weight);
}
else if (_serializer is IStringMessageSerializer stringSerializer)
{
if (obj is string str)
return Send(requestId, str, weight);
return SendStringAsync(requestId, str, weight);
str = stringSerializer.Serialize(obj);
return Send(requestId, str, weight);
return SendAsync(requestId, str, weight);
}
throw new Exception("Unknown serializer when sending message");
@ -879,7 +915,7 @@ namespace CryptoExchange.Net.Sockets
/// <param name="data">The data to send</param>
/// <param name="weight">The weight of the message</param>
/// <param name="requestId">The id of the request</param>
public virtual CallResult SendBytes(int requestId, byte[] data, int weight)
public virtual async ValueTask<CallResult> SendBytesAsync(int requestId, byte[] data, int weight)
{
if (ApiClient.MessageSendSizeLimit != null && data.Length > ApiClient.MessageSendSizeLimit.Value)
{
@ -914,7 +950,7 @@ namespace CryptoExchange.Net.Sockets
/// <param name="data">The data to send</param>
/// <param name="weight">The weight of the message</param>
/// <param name="requestId">The id of the request</param>
public virtual CallResult Send(int requestId, string data, int weight)
public virtual async ValueTask<CallResult> SendStringAsync(int requestId, string data, int weight)
{
if (ApiClient.MessageSendSizeLimit != null && data.Length > ApiClient.MessageSendSizeLimit.Value)
{
@ -937,7 +973,7 @@ namespace CryptoExchange.Net.Sockets
return CallResult.SuccessResult;
}
catch(Exception ex)
catch (Exception ex)
{
return new CallResult(new WebError("Failed to send message: " + ex.Message, exception: ex));
}
@ -1025,7 +1061,7 @@ namespace CryptoExchange.Net.Sockets
var waitEvent = new AsyncResetEvent(false);
taskList.Add(SendAndWaitQueryAsync(subQuery, waitEvent).ContinueWith((r) =>
{
subscription.Status = r.Result.Success ? SubscriptionStatus.Subscribed: SubscriptionStatus.Pending;
subscription.Status = r.Result.Success ? SubscriptionStatus.Subscribed : SubscriptionStatus.Pending;
subscription.HandleSubQueryResponse(subQuery.Response!);
waitEvent.Set();
return r.Result;
@ -1119,40 +1155,6 @@ namespace CryptoExchange.Net.Sockets
});
}
/// <summary>
/// Status of the socket connection
/// </summary>
public enum SocketStatus
{
/// <summary>
/// None/Initial
/// </summary>
None,
/// <summary>
/// Connected
/// </summary>
Connected,
/// <summary>
/// Reconnecting
/// </summary>
Reconnecting,
/// <summary>
/// Resubscribing on reconnected socket
/// </summary>
Resubscribing,
/// <summary>
/// Closing
/// </summary>
Closing,
/// <summary>
/// Closed
/// </summary>
Closed,
/// <summary>
/// Disposed
/// </summary>
Disposed
}
}
}

View File

@ -1,6 +1,9 @@
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Objects.Sockets;
using Microsoft.Extensions.Logging;
using System;
using System.IO.Pipelines;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Sockets
{
@ -14,5 +17,10 @@ namespace CryptoExchange.Net.Sockets
{
return new CryptoExchangeWebSocketClient(logger, parameters);
}
/// <inheritdoc />
public IHighPerfWebsocket CreateHighPerfWebsocket(ILogger logger, WebSocketParameters parameters, PipeWriter pipeWriter)
{
return new HighPerfWebSocketClient(logger, parameters, pipeWriter);
}
}
}

View File

@ -1,6 +1,9 @@
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Objects.Sockets;
using Microsoft.Extensions.Logging;
using System;
using System.IO.Pipelines;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Testing.Implementations
{
@ -12,6 +15,7 @@ namespace CryptoExchange.Net.Testing.Implementations
_socket = socket;
}
public IHighPerfWebsocket CreateHighPerfWebsocket(ILogger logger, WebSocketParameters parameters, PipeWriter pipeWriter) => throw new NotImplementedException();
public IWebsocket CreateWebsocket(ILogger logger, WebSocketParameters parameters) => _socket;
}
}