1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-12-14 01:33:26 +00:00
This commit is contained in:
Jkorf 2025-11-17 16:34:51 +01:00
parent 9c43f58e6c
commit b12e69d13a
22 changed files with 390 additions and 177 deletions

View File

@ -0,0 +1,32 @@
using CryptoExchange.Net.Converters.MessageParsing;
using CryptoExchange.Net.Converters.MessageParsing.DynamicConverters;
using CryptoExchange.Net.Objects;
using ProtoBuf.Meta;
using System;
using System.Collections.Generic;
using System.Net.WebSockets;
using System.Text;
namespace CryptoExchange.Net.Protobuf.Converters.Protobuf
{
public abstract class DynamicProtobufConverter : IMessageConverter
{
/// <summary>
/// Runtime type model
/// </summary>
protected RuntimeTypeModel _model;
public DynamicProtobufConverter(RuntimeTypeModel model)
{
_model = model;
}
public object Deserialize(ReadOnlySpan<byte> data, Type type)
{
var result = _model.Deserialize(type, data);
return result;
}
public abstract MessageInfo GetMessageInfo(ReadOnlySpan<byte> data, WebSocketMessageType? webSocketMessageType);
}
}

View File

@ -41,7 +41,9 @@
<DocumentationFile>CryptoExchange.Net.Protobuf.xml</DocumentationFile>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="CryptoExchange.Net" Version="9.13.0" />
<PackageReference Include="protobuf-net" Version="3.2.56" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\CryptoExchange.Net\CryptoExchange.Net.csproj" />
</ItemGroup>
</Project>

View File

@ -4,6 +4,11 @@
<name>CryptoExchange.Net.Protobuf</name>
</assembly>
<members>
<member name="F:CryptoExchange.Net.Protobuf.Converters.Protobuf.DynamicProtobufConverter._model">
<summary>
Runtime type model
</summary>
</member>
<member name="T:CryptoExchange.Net.Converters.Protobuf.ProtobufMessageAccessor`1">
<summary>
System.Text.Json message accessor

View File

@ -1,14 +1,15 @@
using System;
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Logging.Extensions;
using CryptoExchange.Net.Objects.Options;
using CryptoExchange.Net.Objects.Sockets;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Xml.Linq;
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Logging.Extensions;
using CryptoExchange.Net.Objects.Sockets;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
namespace CryptoExchange.Net.Clients
{
@ -30,6 +31,9 @@ namespace CryptoExchange.Net.Clients
public int CurrentSubscriptions => ApiClients.OfType<SocketApiClient>().Sum(s => s.CurrentSubscriptions);
/// <inheritdoc />
public double IncomingKbps => ApiClients.OfType<SocketApiClient>().Sum(s => s.IncomingKbps);
/// <inheritdoc />
public new SocketExchangeOptions ClientOptions => (SocketExchangeOptions)base.ClientOptions;
#endregion
/// <summary>

View File

@ -127,11 +127,6 @@ namespace CryptoExchange.Net.Clients
}
}
/// <summary>
/// Serializer options to be used for high performance socket deserialization
/// </summary>
public abstract JsonSerializerOptions JsonSerializerOptions { get; }
/// <inheritdoc />
public new SocketExchangeOptions ClientOptions => (SocketExchangeOptions)base.ClientOptions;
@ -186,7 +181,7 @@ namespace CryptoExchange.Net.Clients
/// <param name="interval"></param>
/// <param name="queryDelegate"></param>
/// <param name="callback"></param>
protected virtual void RegisterPeriodicQuery(string identifier, TimeSpan interval, Func<SocketConnection, Query> queryDelegate, Action<SocketConnection, CallResult>? callback)
protected virtual void RegisterPeriodicQuery(string identifier, TimeSpan interval, Func<ISocketConnection, Query> queryDelegate, Action<SocketConnection, CallResult>? callback)
{
PeriodicTaskRegistrations.Add(new PeriodicTaskRegistration
{
@ -195,6 +190,13 @@ namespace CryptoExchange.Net.Clients
Interval = interval,
QueryDelegate = queryDelegate
});
HighPerfPeriodicTaskRegistrations.Add(new HighPerfPeriodicTaskRegistration
{
Identifier = identifier,
Interval = interval,
GetRequestDelegate = (con) => queryDelegate(con).Request
});
}
/// <summary>
@ -286,16 +288,33 @@ namespace CryptoExchange.Net.Clients
return new CallResult<UpdateSubscription>(new ServerError(new ErrorInfo(ErrorType.WebsocketPaused, "Socket is paused")));
}
void HandleSubscriptionComplete(bool success, object? response)
{
if (!success)
return;
subscription.HandleSubQueryResponse(response);
subscription.Status = SubscriptionStatus.Subscribed;
if (ct != default)
{
subscription.CancellationTokenRegistration = ct.Register(async () =>
{
_logger.CancellationTokenSetClosingSubscription(socketConnection.SocketId, subscription.Id);
await socketConnection.CloseAsync(subscription).ConfigureAwait(false);
}, false);
}
}
subscription.Status = SubscriptionStatus.Subscribing;
var waitEvent = new AsyncResetEvent(false);
var subQuery = subscription.CreateSubscriptionQuery(socketConnection);
if (subQuery != null)
{
subQuery.OnComplete = () => HandleSubscriptionComplete(subQuery.Result?.Success ?? false, subQuery.Response);
// Send the request and wait for answer
var subResult = await socketConnection.SendAndWaitQueryAsync(subQuery, waitEvent, ct).ConfigureAwait(false);
var subResult = await socketConnection.SendAndWaitQueryAsync(subQuery, ct).ConfigureAwait(false);
if (!subResult)
{
waitEvent?.Set();
var isTimeout = subResult.Error is CancellationRequestedError;
if (isTimeout && subscription.Status == SubscriptionStatus.Subscribed)
{
@ -304,27 +323,18 @@ namespace CryptoExchange.Net.Clients
else
{
_logger.FailedToSubscribe(socketConnection.SocketId, subResult.Error?.ToString());
// If this was a timeout we still need to send an unsubscribe to prevent messages coming in later
// If this was a server process error we still might need to send an unsubscribe to prevent messages coming in later
subscription.Status = SubscriptionStatus.Pending;
await socketConnection.CloseAsync(subscription).ConfigureAwait(false);
return new CallResult<UpdateSubscription>(subResult.Error!);
}
}
subscription.HandleSubQueryResponse(subQuery.Response!);
}
subscription.Status = SubscriptionStatus.Subscribed;
if (ct != default)
else
{
subscription.CancellationTokenRegistration = ct.Register(async () =>
{
_logger.CancellationTokenSetClosingSubscription(socketConnection.SocketId, subscription.Id);
await socketConnection.CloseAsync(subscription).ConfigureAwait(false);
}, false);
HandleSubscriptionComplete(true, null);
}
waitEvent?.Set();
_logger.SubscriptionCompletedSuccessfully(socketConnection.SocketId, subscription.Id);
return new CallResult<UpdateSubscription>(new UpdateSubscription(socketConnection, subscription));
}
@ -336,7 +346,11 @@ namespace CryptoExchange.Net.Clients
/// <param name="subscription">The subscription</param>
/// <param name="ct">Cancellation token for closing this subscription</param>
/// <returns></returns>
protected virtual async Task<CallResult<HighPerfUpdateSubscription>> SubscribeHighPerfAsync<TUpdateType>(string url, HighPerfSubscription<TUpdateType> subscription, CancellationToken ct)
protected virtual async Task<CallResult<HighPerfUpdateSubscription>> SubscribeHighPerfAsync<TUpdateType>(
string url,
HighPerfSubscription<TUpdateType> subscription,
IHighPerfConnectionFactory connectionFactory,
CancellationToken ct)
{
if (_disposing)
return new CallResult<HighPerfUpdateSubscription>(new InvalidOperationError("Client disposed, can't subscribe"));
@ -359,7 +373,7 @@ namespace CryptoExchange.Net.Clients
while (true)
{
// Get a new or existing socket connection
var socketResult = await GetHighPerfSocketConnection<TUpdateType>(url, ct).ConfigureAwait(false);
var socketResult = await GetHighPerfSocketConnection<TUpdateType>(url, connectionFactory, ct).ConfigureAwait(false);
if (!socketResult)
return socketResult.As<HighPerfUpdateSubscription>(null);
@ -402,7 +416,6 @@ namespace CryptoExchange.Net.Clients
var sendResult = await socketConnection.SendAsync(subRequest).ConfigureAwait(false);
if (!sendResult)
{
// Needed?
await socketConnection.CloseAsync(subscription).ConfigureAwait(false);
return new CallResult<HighPerfUpdateSubscription>(sendResult.Error!);
}
@ -486,7 +499,7 @@ namespace CryptoExchange.Net.Clients
if (ct.IsCancellationRequested)
return new CallResult<THandlerResponse>(new CancellationRequestedError());
return await socketConnection.SendAndWaitQueryAsync(query, null, ct).ConfigureAwait(false);
return await socketConnection.SendAndWaitQueryAsync(query, ct).ConfigureAwait(false);
}
/// <summary>
@ -712,7 +725,10 @@ namespace CryptoExchange.Net.Clients
/// <param name="address">The address the socket is for</param>
/// <param name="ct">Cancellation token</param>
/// <returns></returns>
protected virtual async Task<CallResult<HighPerfSocketConnection<TUpdateType>>> GetHighPerfSocketConnection<TUpdateType>(string address, CancellationToken ct)
protected virtual async Task<CallResult<HighPerfSocketConnection<TUpdateType>>> GetHighPerfSocketConnection<TUpdateType>(
string address,
IHighPerfConnectionFactory connectionFactory,
CancellationToken ct)
{
var socketQuery = highPerfSocketConnections.Where(s => s.Value.Tag.TrimEnd('/') == address.TrimEnd('/')
&& s.Value.ApiClient.GetType() == GetType()
@ -746,7 +762,8 @@ namespace CryptoExchange.Net.Clients
_logger.ConnectionAddressSetTo(connectionAddress.Data!);
// Create new socket connection
var socketConnection = new HighPerfSocketConnection<TUpdateType>(_logger, SocketFactory, GetWebSocketParameters(connectionAddress.Data!), this, JsonSerializerOptions, address);
var socketConnection = connectionFactory.CreateHighPerfConnection<TUpdateType>(_logger, SocketFactory, GetWebSocketParameters(connectionAddress.Data!), this, address);
//var socketConnection = new HighPerfJsonSocketConnection<TUpdateType>(_logger, SocketFactory, GetWebSocketParameters(connectionAddress.Data!), this, JsonSerializerOptions, address);
foreach (var ptg in HighPerfPeriodicTaskRegistrations)
socketConnection.QueryPeriodic(ptg.Identifier, ptg.Interval, ptg.GetRequestDelegate);
@ -1062,6 +1079,10 @@ namespace CryptoExchange.Net.Clients
/// <returns></returns>
public virtual ReadOnlyMemory<byte> PreprocessStreamMessage(SocketConnection connection, WebSocketMessageType type, ReadOnlyMemory<byte> data) => data;
public abstract IMessageConverter CreateMessageConverter();
/// <summary>
/// Create a new message converter instance
/// </summary>
/// <returns></returns>
public abstract IMessageConverter CreateMessageConverter(WebSocketMessageType messageType);
}
}

View File

@ -1,55 +0,0 @@
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Net.WebSockets;
using System.Text;
using System.Text.Json;
namespace CryptoExchange.Net.Converters.MessageParsing.DynamicConverters
{
public ref struct MessageType
{
public Type Type { get; set; }
public string? Identifier { get; set; }
}
public interface IMessageConverter
{
MessageType GetMessageType(ReadOnlySpan<byte> data, WebSocketMessageType? webSocketMessageType);
object Deserialize(ReadOnlySpan<byte> data, Type type);
}
public abstract class DynamicConverter : IMessageConverter
{
public abstract JsonSerializerOptions Options { get; }
public abstract MessageType GetMessageType(ReadOnlySpan<byte> data, WebSocketMessageType? webSocketMessageType);
public virtual object Deserialize(ReadOnlySpan<byte> data, Type type)
{
return JsonSerializer.Deserialize(data, type, Options);
}
}
public abstract class StaticConverter : IMessageConverter
{
public abstract JsonSerializerOptions Options { get; }
public abstract MessageType GetMessageType(ReadOnlySpan<byte> data, WebSocketMessageType? webSocketMessageType);
public object? Deserialize(ReadOnlySpan<byte> data, Type type)
{
return JsonSerializer.Deserialize(data, type, Options);
}
}
public abstract class StaticConverter<T> : StaticConverter
{
public override MessageType GetMessageType(ReadOnlySpan<byte> data,, WebSocketMessageType? webSocketMessageType) =>
new MessageType { Type = typeof(T), Identifier = GetMessageListenId(data, webSocketMessageType) };
public abstract string GetMessageListenId(ReadOnlySpan<byte> data, WebSocketMessageType? webSocketMessageType);
}
}

View File

@ -0,0 +1,22 @@
using System;
using System.Net.WebSockets;
namespace CryptoExchange.Net.Converters.MessageParsing.DynamicConverters
{
/// <summary>
/// Message converter
/// </summary>
public interface IMessageConverter
{
/// <summary>
/// Get message info
/// </summary>
MessageInfo GetMessageInfo(ReadOnlySpan<byte> data, WebSocketMessageType? webSocketMessageType);
/// <summary>
/// Deserialize to the provided type
/// </summary>
object Deserialize(ReadOnlySpan<byte> data, Type type);
}
}

View File

@ -0,0 +1,13 @@
using System;
using System.Collections.Generic;
using System.Text;
namespace CryptoExchange.Net.Converters.MessageParsing
{
public ref struct MessageInfo
{
public Type? Type { get; set; }
public string? Identifier { get; set; }
}
}

View File

@ -0,0 +1,32 @@
using CryptoExchange.Net.Converters.MessageParsing;
using CryptoExchange.Net.Converters.MessageParsing.DynamicConverters;
using System;
using System.Net.WebSockets;
using System.Text.Json;
namespace CryptoExchange.Net.Converters.SystemTextJson
{
/// <summary>
/// JSON message converter
/// </summary>
public abstract class DynamicJsonConverter : IMessageConverter
{
/// <summary>
/// The serializer options to use
/// </summary>
public abstract JsonSerializerOptions Options { get; }
/// <inheritdoc />
public abstract MessageInfo GetMessageInfo(ReadOnlySpan<byte> data, WebSocketMessageType? webSocketMessageType);
/// <inheritdoc />
public virtual object Deserialize(ReadOnlySpan<byte> data, Type type)
{
#pragma warning disable IL2026 // Members annotated with 'RequiresUnreferencedCodeAttribute' require dynamic access otherwise can break functionality when trimming application code
#pragma warning disable IL3050 // Calling members annotated with 'RequiresDynamicCodeAttribute' may break functionality when AOT compiling.
return JsonSerializer.Deserialize(data, type, Options)!;
#pragma warning restore IL3050 // Calling members annotated with 'RequiresDynamicCodeAttribute' may break functionality when AOT compiling.
#pragma warning restore IL2026 // Members annotated with 'RequiresUnreferencedCodeAttribute' require dynamic access otherwise can break functionality when trimming application code
}
}
}

View File

@ -25,7 +25,7 @@ namespace CryptoExchange.Net.Interfaces
/// <summary>
/// Handle a message
/// </summary>
Task<CallResult> Handle(SocketConnection connection, DataEvent<object> message, MessageHandlerLink matchedHandler);
CallResult Handle(SocketConnection connection, DataEvent<object> message, MessageHandlerLink matchedHandler);
/// <summary>
/// Deserialize a message into object of type
/// </summary>

View File

@ -165,7 +165,7 @@ namespace CryptoExchange.Net.Sockets
_sendBuffer = new ConcurrentQueue<SendItem>();
_ctsSource = new CancellationTokenSource();
if (websocketParameters.UseNewMessageDeserialization)
_receiveBufferSize = 1024;
_receiveBufferSize = websocketParameters.ReceiveBufferSize ?? 65536;
else
_receiveBufferSize = websocketParameters.ReceiveBufferSize ?? _defaultReceiveBufferSize;

View File

@ -0,0 +1,29 @@
using CryptoExchange.Net.Clients;
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Objects.Sockets;
using CryptoExchange.Net.Sockets;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Net;
using System.Text;
using System.Text.Json;
namespace CryptoExchange.Net.Sockets.HighPerf
{
public class HighPerfJsonConnectionFactory : IHighPerfConnectionFactory
{
private readonly JsonSerializerOptions _options;
public HighPerfJsonConnectionFactory(JsonSerializerOptions options)
{
_options = options;
}
public HighPerfSocketConnection<T> CreateHighPerfConnection<T>(
ILogger logger, IWebsocketFactory factory, WebSocketParameters parameters, SocketApiClient client, string address)
{
return new HighPerfJsonSocketConnection<T>(logger, factory, parameters, client, _options, address);
}
}
}

View File

@ -28,6 +28,9 @@ namespace CryptoExchange.Net.Sockets
/// <inheritdoc />
public bool Authenticated { get; set; } = false;
/// <inheritdoc />
public bool HasAuthenticatedSubscription => false;
/// <summary>
/// The amount of subscriptions on this connection
/// </summary>
@ -95,10 +98,6 @@ namespace CryptoExchange.Net.Sockets
private SocketStatus _status;
private Task? _processTask;
/// <summary>
/// Serializer options
/// </summary>
protected readonly JsonSerializerOptions _serializerOptions;
/// <summary>
/// The pipe the websocket will write to
/// </summary>
@ -126,11 +125,10 @@ namespace CryptoExchange.Net.Sockets
/// <summary>
/// New socket connection
/// </summary>
public HighPerfSocketConnection(ILogger logger, IWebsocketFactory socketFactory, WebSocketParameters parameters, SocketApiClient apiClient, JsonSerializerOptions serializerOptions, string tag)
public HighPerfSocketConnection(ILogger logger, IWebsocketFactory socketFactory, WebSocketParameters parameters, SocketApiClient apiClient, string tag)
{
_logger = logger;
_pipe = new Pipe();
_serializerOptions = serializerOptions;
ApiClient = apiClient;
Tag = tag;
Properties = new Dictionary<string, object>();
@ -147,7 +145,7 @@ namespace CryptoExchange.Net.Sockets
}
/// <summary>
/// Process message from the pipe
/// Process messages from the pipe
/// </summary>
protected abstract Task ProcessAsync(CancellationToken ct);
@ -418,15 +416,15 @@ namespace CryptoExchange.Net.Sockets
}
/// <inheritdoc />
public class HighPerfSocketConnection<T> : HighPerfSocketConnection
public abstract class HighPerfSocketConnection<T> : HighPerfSocketConnection
{
#if NET9_0_OR_GREATER
private readonly Lock _listenersLock = new Lock();
protected readonly Lock _listenersLock = new Lock();
#else
private readonly object _listenersLock = new object();
protected readonly object _listenersLock = new object();
#endif
private readonly List<HighPerfSubscription<T>> _typedSubscriptions;
protected readonly List<HighPerfSubscription<T>> _typedSubscriptions;
/// <inheritdoc />
public override HighPerfSubscription[] Subscriptions
@ -444,7 +442,8 @@ namespace CryptoExchange.Net.Sockets
/// <summary>
/// ctor
/// </summary>
public HighPerfSocketConnection(ILogger logger, IWebsocketFactory socketFactory, WebSocketParameters parameters, SocketApiClient apiClient, JsonSerializerOptions serializerOptions, string tag) : base(logger, socketFactory, parameters, apiClient, serializerOptions, tag)
public HighPerfSocketConnection(ILogger logger, IWebsocketFactory socketFactory, WebSocketParameters parameters, SocketApiClient apiClient, string tag)
: base(logger, socketFactory, parameters, apiClient, tag)
{
_typedSubscriptions = new List<HighPerfSubscription<T>>();
}
@ -473,6 +472,40 @@ namespace CryptoExchange.Net.Sockets
_typedSubscriptions.Remove(subscription);
}
protected ValueTask DelegateToSubscription(HighPerfSubscription<T> sub, T update)
{
try
{
return sub.HandleAsync(update!);
}
catch (Exception ex)
{
sub.InvokeExceptionHandler(ex);
_logger.UserMessageProcessingFailed(SocketId, ex.Message, ex);
return new ValueTask();
}
}
}
public class HighPerfJsonSocketConnection<T> : HighPerfSocketConnection<T>
{
private JsonSerializerOptions _jsonOptions;
/// <summary>
/// ctor
/// </summary>
public HighPerfJsonSocketConnection(
ILogger logger,
IWebsocketFactory socketFactory,
WebSocketParameters parameters,
SocketApiClient apiClient,
JsonSerializerOptions serializerOptions,
string tag)
: base(logger, socketFactory, parameters, apiClient, tag)
{
_jsonOptions = serializerOptions;
}
/// <inheritdoc />
protected override async Task ProcessAsync(CancellationToken ct)
{
@ -480,7 +513,7 @@ namespace CryptoExchange.Net.Sockets
{
#pragma warning disable IL2026 // Members annotated with 'RequiresUnreferencedCodeAttribute' require dynamic access otherwise can break functionality when trimming application code
#pragma warning disable IL3050 // Calling members annotated with 'RequiresDynamicCodeAttribute' may break functionality when AOT compiling.
await foreach (var update in JsonSerializer.DeserializeAsyncEnumerable<T>(_pipe.Reader, true, _serializerOptions, ct).ConfigureAwait(false))
await foreach (var update in JsonSerializer.DeserializeAsyncEnumerable<T>(_pipe.Reader, true, _jsonOptions, ct).ConfigureAwait(false))
#pragma warning restore IL3050 // Calling members annotated with 'RequiresDynamicCodeAttribute' may break functionality when AOT compiling.
#pragma warning restore IL2026 // Members annotated with 'RequiresUnreferencedCodeAttribute' require dynamic access otherwise can break functionality when trimming application code
{
@ -498,19 +531,6 @@ namespace CryptoExchange.Net.Sockets
catch (OperationCanceledException) { }
}
private ValueTask DelegateToSubscription(HighPerfSubscription<T> sub, T update)
{
try
{
return sub.HandleAsync(update!);
}
catch (Exception ex)
{
sub.InvokeExceptionHandler(ex);
_logger.UserMessageProcessingFailed(SocketId, ex.Message, ex);
return new ValueTask();
}
}
}
}

View File

@ -9,6 +9,7 @@ using System.Buffers;
using System.IO.Pipelines;
using System.Net;
using System.Net.WebSockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
@ -333,6 +334,7 @@ namespace CryptoExchange.Net.Sockets
private async Task ReceiveLoopAsync()
{
Exception? exitException = null;
try
{
while (true)
@ -340,14 +342,14 @@ namespace CryptoExchange.Net.Sockets
if (_ctsSource.IsCancellationRequested)
break;
ValueWebSocketReceiveResult receiveResult = default;
ValueWebSocketReceiveResult receiveResult;
try
{
receiveResult = await _socket!.ReceiveAsync(_pipeWriter.GetMemory(_receiveBufferSize), _ctsSource.Token).ConfigureAwait(false);
// Advance the writer to communicate which part of the memory was written
_pipeWriter.Advance(receiveResult.Count);
_pipeWriter.Advance(receiveResult.Count);
}
catch (OperationCanceledException ex)
{

View File

@ -0,0 +1,16 @@
using CryptoExchange.Net.Clients;
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Objects.Sockets;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Text;
namespace CryptoExchange.Net.Sockets.HighPerf
{
public interface IHighPerfConnectionFactory
{
HighPerfSocketConnection<T> CreateHighPerfConnection<T>(
ILogger logger, IWebsocketFactory factory, WebSocketParameters parameters, SocketApiClient client, string address);
}
}

View File

@ -19,6 +19,8 @@ namespace CryptoExchange.Net.Sockets
/// Whether the connection has been authenticated
/// </summary>
bool Authenticated { get; set; }
bool HasAuthenticatedSubscription { get; }
/// <summary>
/// Whether the connection is established
/// </summary>

View File

@ -60,11 +60,6 @@ namespace CryptoExchange.Net.Sockets
/// </summary>
public object? Response { get; set; }
/// <summary>
/// Wait event for the calling message processing thread
/// </summary>
public AsyncResetEvent? ContinueAwaiter { get; set; }
public HashSet<Type> DeserializationTypes { get; set; }
private MessageMatcher _matcher;
@ -171,8 +166,9 @@ namespace CryptoExchange.Net.Sockets
/// <summary>
/// Handle a response message
/// </summary>
public abstract Task<CallResult> Handle(SocketConnection connection, DataEvent<object> message, MessageHandlerLink check);
public abstract CallResult Handle(SocketConnection connection, DataEvent<object> message, MessageHandlerLink check);
public Action OnComplete { get; set; }
}
/// <summary>
@ -192,12 +188,16 @@ namespace CryptoExchange.Net.Sockets
/// <param name="request"></param>
/// <param name="authenticated"></param>
/// <param name="weight"></param>
protected Query(object request, bool authenticated, int weight = 1) : base(request, authenticated, weight)
protected Query(
object request,
bool authenticated,
int weight = 1)
: base(request, authenticated, weight)
{
}
/// <inheritdoc />
public override async Task<CallResult> Handle(SocketConnection connection, DataEvent<object> message, MessageHandlerLink check)
public override CallResult Handle(SocketConnection connection, DataEvent<object> message, MessageHandlerLink check)
{
if (!PreCheckMessage(connection, message))
return CallResult.SuccessResult;
@ -214,8 +214,7 @@ namespace CryptoExchange.Net.Sockets
{
Completed = true;
_event.Set();
if (ContinueAwaiter != null)
await ContinueAwaiter.WaitAsync().ConfigureAwait(false);
OnComplete?.Invoke();
}
return Result;
@ -238,17 +237,20 @@ namespace CryptoExchange.Net.Sockets
else
Result = new CallResult<THandlerResponse>(default, null, default);
ContinueAwaiter?.Set();
_event.Set();
OnComplete?.Invoke();
}
/// <inheritdoc />
public override void Fail(Error error)
{
if (Completed)
return;
Result = new CallResult<THandlerResponse>(error);
Completed = true;
ContinueAwaiter?.Set();
_event.Set();
OnComplete?.Invoke();
}
}
}

View File

@ -12,6 +12,7 @@ using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Net.WebSockets;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
@ -150,6 +151,8 @@ namespace CryptoExchange.Net.Sockets
/// </summary>
public bool Authenticated { get; set; }
public bool HasAuthenticatedSubscription => Subscriptions.Any(x => x.Authenticated);
/// <summary>
/// If connection is made
/// </summary>
@ -268,7 +271,8 @@ namespace CryptoExchange.Net.Sockets
private IByteMessageAccessor? _stringMessageAccessor;
private IByteMessageAccessor? _byteMessageAccessor;
private IMessageConverter? _messageConverter;
private IMessageConverter? _byteMessageConverter;
private IMessageConverter? _textMessageConverter;
/// <summary>
/// The task that is sending periodic data on the websocket. Can be used for sending Ping messages every x seconds or similar. Not necessary.
@ -509,42 +513,97 @@ namespace CryptoExchange.Net.Sockets
/// </summary>
protected internal virtual void HandleStreamMessage2(WebSocketMessageType type, ReadOnlySpan<byte> data)
{
//var sw = Stopwatch.StartNew();
var receiveTime = DateTime.UtcNow;
//// 1. Decrypt/Preprocess if necessary
// 1. Decrypt/Preprocess if necessary
//data = ApiClient.PreprocessStreamMessage(this, type, data);
_messageConverter ??= ApiClient.CreateMessageConverter();
IMessageConverter messageConverter;
if (type == WebSocketMessageType.Binary)
messageConverter = _byteMessageConverter ??= ApiClient.CreateMessageConverter(type);
else
messageConverter = _textMessageConverter ??= ApiClient.CreateMessageConverter(type);
var messageType = _messageConverter.GetMessageType(data, type);
if (messageType.Type == null)
string? originalData = null;
if (ApiClient.ApiOptions.OutputOriginalData ?? ApiClient.ClientOptions.OutputOriginalData)
{
// Failed to determine message type
#if NETSTANDARD2_0
originalData = Encoding.UTF8.GetString(data.ToArray());
#else
originalData = Encoding.UTF8.GetString(data);
#endif
}
List<IMessageProcessor>? processors = null;
var messageInfo = messageConverter.GetMessageInfo(data, type);
if (messageInfo.Type == null)
{
if (messageInfo.Identifier == null)
{
// Both deserialization type and identifier null, can't process
_logger.LogWarning("Failed to evaluate message. Data: {Message}", Encoding.UTF8.GetString(data.ToArray()));
return;
}
// Couldn't determine deserialization type, try determine the type based on identifier
lock (_listenersLock)
processors = _listeners.ToList();
foreach (var subscription in processors)
{
var handler = subscription.MessageMatcher.GetHandlerLinks(messageInfo.Identifier)?.FirstOrDefault();
if (handler == null)
continue;
_logger.LogTrace("Message type determined based on identifier");
messageInfo.Type = handler.DeserializationType;
break;
}
if (messageInfo.Type == null)
{
// No handler found for identifier either, can't process
_logger.LogWarning("Failed to determine message type. Data: {Message}", Encoding.UTF8.GetString(data.ToArray()));
return;
}
}
object result;
try
{
result = messageConverter.Deserialize(data, messageInfo.Type!);
}
catch(Exception ex)
{
_logger.LogWarning(ex, "Deserialization failed. Data: {Message}", Encoding.UTF8.GetString(data.ToArray()));
return;
}
var result = _messageConverter.Deserialize(data, messageType.Type);
if (result == null)
{
// Deserialize error
_logger.LogWarning("Deserialization returned null. Data: {Message}", Encoding.UTF8.GetString(data.ToArray()));
return;
}
var targetType = messageType.Type;
List<IMessageProcessor> listeners;
lock (_listenersLock)
listeners = _listeners.Where(x => x.DeserializationTypes.Contains(targetType)).ToList();
if (listeners.Count == 0)
var targetType = messageInfo.Type!;
if (processors == null)
{
lock (_listenersLock)
processors = _listeners.Where(x => x.DeserializationTypes.Contains(targetType)).ToList();
}
if (processors.Count == 0)
{
// No subscriptions found for type
_logger.LogWarning("No subscriptions found for message. Data: {Message}", Encoding.UTF8.GetString(data.ToArray()));
return;
}
var dataEvent = new DataEvent<object>(result, null, null, null /*originalData*/, receiveTime, null);
foreach (var subscription in listeners)
var dataEvent = new DataEvent<object>(result, null, null, originalData, receiveTime, null);
foreach (var subscription in processors)
{
var links = subscription.MessageMatcher.GetHandlerLinks(messageType.Identifier);
var links = subscription.MessageMatcher.GetHandlerLinks(messageInfo.Identifier!);
foreach(var link in links)
subscription.Handle(this, dataEvent, link);
}
@ -649,7 +708,7 @@ namespace CryptoExchange.Net.Sockets
try
{
var innerSw = Stopwatch.StartNew();
await processor.Handle(this, new DataEvent<object>(deserialized, null, null, originalData, receiveTime, null), listener).ConfigureAwait(false);
processor.Handle(this, new DataEvent<object>(deserialized, null, null, originalData, receiveTime, null), listener);
if (processor is Query query && query.RequiredResponses != 1)
_logger.LogDebug($"[Sckt {SocketId}] [Req {query.Id}] responses: {query.CurrentResponses}/{query.RequiredResponses}");
totalUserTime += (int)innerSw.ElapsedMilliseconds;
@ -873,12 +932,11 @@ namespace CryptoExchange.Net.Sockets
/// Send a query request and wait for an answer
/// </summary>
/// <param name="query">Query to send</param>
/// <param name="continueEvent">Wait event for when the socket message handler can continue</param>
/// <param name="ct">Cancellation token</param>
/// <returns></returns>
public virtual async Task<CallResult> SendAndWaitQueryAsync(Query query, AsyncResetEvent? continueEvent = null, CancellationToken ct = default)
public virtual async Task<CallResult> SendAndWaitQueryAsync(Query query, CancellationToken ct = default)
{
await SendAndWaitIntAsync(query, continueEvent, ct).ConfigureAwait(false);
await SendAndWaitIntAsync(query, ct).ConfigureAwait(false);
return query.Result ?? new CallResult(new TimeoutError());
}
@ -887,21 +945,19 @@ namespace CryptoExchange.Net.Sockets
/// </summary>
/// <typeparam name="THandlerResponse">Expected result type</typeparam>
/// <param name="query">Query to send</param>
/// <param name="continueEvent">Wait event for when the socket message handler can continue</param>
/// <param name="ct">Cancellation token</param>
/// <returns></returns>
public virtual async Task<CallResult<THandlerResponse>> SendAndWaitQueryAsync<THandlerResponse>(Query<THandlerResponse> query, AsyncResetEvent? continueEvent = null, CancellationToken ct = default)
public virtual async Task<CallResult<THandlerResponse>> SendAndWaitQueryAsync<THandlerResponse>(Query<THandlerResponse> query, CancellationToken ct = default)
{
await SendAndWaitIntAsync(query, continueEvent, ct).ConfigureAwait(false);
await SendAndWaitIntAsync(query, ct).ConfigureAwait(false);
return query.TypedResult ?? new CallResult<THandlerResponse>(new TimeoutError());
}
private async Task SendAndWaitIntAsync(Query query, AsyncResetEvent? continueEvent, CancellationToken ct = default)
private async Task SendAndWaitIntAsync(Query query, CancellationToken ct = default)
{
lock (_listenersLock)
_listeners.Add(query);
query.ContinueAwaiter = continueEvent;
var sendResult = await SendAsync(query.Id, query.Request, query.Weight).ConfigureAwait(false);
if (!sendResult)
{
@ -1116,15 +1172,13 @@ namespace CryptoExchange.Net.Sockets
subscription.Status = SubscriptionStatus.Subscribed;
continue;
}
var waitEvent = new AsyncResetEvent(false);
taskList.Add(SendAndWaitQueryAsync(subQuery, waitEvent).ContinueWith((r) =>
subQuery.OnComplete = () =>
{
subscription.Status = r.Result.Success ? SubscriptionStatus.Subscribed : SubscriptionStatus.Pending;
subscription.HandleSubQueryResponse(subQuery.Response!);
waitEvent.Set();
return r.Result;
}));
subscription.Status = subQuery.Result!.Success ? SubscriptionStatus.Subscribed : SubscriptionStatus.Pending;
subscription.HandleSubQueryResponse(subQuery.Response);
};
taskList.Add(SendAndWaitQueryAsync(subQuery));
}
await Task.WhenAll(taskList).ConfigureAwait(false);

View File

@ -152,7 +152,7 @@ namespace CryptoExchange.Net.Sockets
/// Handle a subscription query response
/// </summary>
/// <param name="message"></param>
public virtual void HandleSubQueryResponse(object message) { }
public virtual void HandleSubQueryResponse(object? message) { }
/// <summary>
/// Handle an unsubscription query response
@ -182,11 +182,11 @@ namespace CryptoExchange.Net.Sockets
/// <summary>
/// Handle an update message
/// </summary>
public Task<CallResult> Handle(SocketConnection connection, DataEvent<object> message, MessageHandlerLink matcher)
public CallResult Handle(SocketConnection connection, DataEvent<object> message, MessageHandlerLink matcher)
{
ConnectionInvocations++;
TotalInvocations++;
return Task.FromResult(matcher.Handle(connection, message));
return matcher.Handle(connection, message);
}
/// <summary>

View File

@ -6,6 +6,7 @@ using System.Threading;
using System.Threading.Tasks;
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Sockets;
#pragma warning disable IL2026 // Members annotated with 'RequiresUnreferencedCodeAttribute' require dynamic access otherwise can break functionality when trimming application code
#pragma warning disable IL3050 // Calling members annotated with 'RequiresDynamicCodeAttribute' may break functionality when AOT compiling.
@ -45,8 +46,13 @@ namespace CryptoExchange.Net.Testing.Implementations
public static readonly object lastIdLock = new object();
#endif
public TestSocket(string address)
private bool _newDeserialization;
public SocketConnection Connection { get; set; }
public TestSocket(bool newDeserialization, string address)
{
_newDeserialization = newDeserialization;
Uri = new Uri(address);
lock (lastIdLock)
{
@ -101,12 +107,14 @@ namespace CryptoExchange.Net.Testing.Implementations
public void InvokeMessage(string data)
{
OnStreamMessage?.Invoke(WebSocketMessageType.Text, new ReadOnlyMemory<byte>(Encoding.UTF8.GetBytes(data))).Wait();
}
public void InvokeMessage<T>(T data)
{
OnStreamMessage?.Invoke(WebSocketMessageType.Text, new ReadOnlyMemory<byte>(Encoding.UTF8.GetBytes(JsonSerializer.Serialize(data)))).Wait();
if (!_newDeserialization)
{
OnStreamMessage?.Invoke(WebSocketMessageType.Text, new ReadOnlyMemory<byte>(Encoding.UTF8.GetBytes(data))).Wait();
}
else
{
Connection.HandleStreamMessage2(WebSocketMessageType.Text, Encoding.UTF8.GetBytes(data));
}
}
public Task ReconnectAsync() => throw new NotImplementedException();

View File

@ -17,6 +17,10 @@ namespace CryptoExchange.Net.Testing.Implementations
}
public IHighPerfWebsocket CreateHighPerfWebsocket(ILogger logger, WebSocketParameters parameters, PipeWriter pipeWriter) => throw new NotImplementedException();
public IWebsocket CreateWebsocket(ILogger logger, SocketConnection connection, WebSocketParameters parameters) => _socket;
public IWebsocket CreateWebsocket(ILogger logger, SocketConnection connection, WebSocketParameters parameters)
{
_socket.Connection = connection;
return _socket;
}
}
}

View File

@ -63,7 +63,7 @@ namespace CryptoExchange.Net.Testing
internal static TestSocket ConfigureSocketClient<T>(T client, string address) where T : BaseSocketClient
{
var socket = new TestSocket(address);
var socket = new TestSocket(client.ClientOptions.EnabledNewDeserialization, address);
foreach (var apiClient in client.ApiClients.OfType<SocketApiClient>())
{
apiClient.SocketFactory = new TestWebsocketFactory(socket);