diff --git a/CryptoExchange.Net.Protobuf/Converters/Protobuf/DynamicProtobufConverter.cs b/CryptoExchange.Net.Protobuf/Converters/Protobuf/DynamicProtobufConverter.cs new file mode 100644 index 0000000..39529df --- /dev/null +++ b/CryptoExchange.Net.Protobuf/Converters/Protobuf/DynamicProtobufConverter.cs @@ -0,0 +1,32 @@ +using CryptoExchange.Net.Converters.MessageParsing; +using CryptoExchange.Net.Converters.MessageParsing.DynamicConverters; +using CryptoExchange.Net.Objects; +using ProtoBuf.Meta; +using System; +using System.Collections.Generic; +using System.Net.WebSockets; +using System.Text; + +namespace CryptoExchange.Net.Protobuf.Converters.Protobuf +{ + public abstract class DynamicProtobufConverter : IMessageConverter + { + /// + /// Runtime type model + /// + protected RuntimeTypeModel _model; + + public DynamicProtobufConverter(RuntimeTypeModel model) + { + _model = model; + } + + public object Deserialize(ReadOnlySpan data, Type type) + { + var result = _model.Deserialize(type, data); + return result; + } + + public abstract MessageInfo GetMessageInfo(ReadOnlySpan data, WebSocketMessageType? webSocketMessageType); + } +} diff --git a/CryptoExchange.Net.Protobuf/CryptoExchange.Net.Protobuf.csproj b/CryptoExchange.Net.Protobuf/CryptoExchange.Net.Protobuf.csproj index 41b82fe..8c31189 100644 --- a/CryptoExchange.Net.Protobuf/CryptoExchange.Net.Protobuf.csproj +++ b/CryptoExchange.Net.Protobuf/CryptoExchange.Net.Protobuf.csproj @@ -41,7 +41,9 @@ CryptoExchange.Net.Protobuf.xml - + + + \ No newline at end of file diff --git a/CryptoExchange.Net.Protobuf/CryptoExchange.Net.Protobuf.xml b/CryptoExchange.Net.Protobuf/CryptoExchange.Net.Protobuf.xml index 0b7b110..69080ea 100644 --- a/CryptoExchange.Net.Protobuf/CryptoExchange.Net.Protobuf.xml +++ b/CryptoExchange.Net.Protobuf/CryptoExchange.Net.Protobuf.xml @@ -4,6 +4,11 @@ CryptoExchange.Net.Protobuf + + + Runtime type model + + System.Text.Json message accessor diff --git a/CryptoExchange.Net/Clients/BaseSocketClient.cs b/CryptoExchange.Net/Clients/BaseSocketClient.cs index 3f36513..f5054e2 100644 --- a/CryptoExchange.Net/Clients/BaseSocketClient.cs +++ b/CryptoExchange.Net/Clients/BaseSocketClient.cs @@ -1,14 +1,15 @@ -using System; +using CryptoExchange.Net.Interfaces; +using CryptoExchange.Net.Logging.Extensions; +using CryptoExchange.Net.Objects.Options; +using CryptoExchange.Net.Objects.Sockets; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Xml.Linq; -using CryptoExchange.Net.Interfaces; -using CryptoExchange.Net.Logging.Extensions; -using CryptoExchange.Net.Objects.Sockets; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Logging.Abstractions; namespace CryptoExchange.Net.Clients { @@ -30,6 +31,9 @@ namespace CryptoExchange.Net.Clients public int CurrentSubscriptions => ApiClients.OfType().Sum(s => s.CurrentSubscriptions); /// public double IncomingKbps => ApiClients.OfType().Sum(s => s.IncomingKbps); + + /// + public new SocketExchangeOptions ClientOptions => (SocketExchangeOptions)base.ClientOptions; #endregion /// diff --git a/CryptoExchange.Net/Clients/SocketApiClient.cs b/CryptoExchange.Net/Clients/SocketApiClient.cs index b57f9c0..1010af5 100644 --- a/CryptoExchange.Net/Clients/SocketApiClient.cs +++ b/CryptoExchange.Net/Clients/SocketApiClient.cs @@ -127,11 +127,6 @@ namespace CryptoExchange.Net.Clients } } - /// - /// Serializer options to be used for high performance socket deserialization - /// - public abstract JsonSerializerOptions JsonSerializerOptions { get; } - /// public new SocketExchangeOptions ClientOptions => (SocketExchangeOptions)base.ClientOptions; @@ -186,7 +181,7 @@ namespace CryptoExchange.Net.Clients /// /// /// - protected virtual void RegisterPeriodicQuery(string identifier, TimeSpan interval, Func queryDelegate, Action? callback) + protected virtual void RegisterPeriodicQuery(string identifier, TimeSpan interval, Func queryDelegate, Action? callback) { PeriodicTaskRegistrations.Add(new PeriodicTaskRegistration { @@ -195,6 +190,13 @@ namespace CryptoExchange.Net.Clients Interval = interval, QueryDelegate = queryDelegate }); + + HighPerfPeriodicTaskRegistrations.Add(new HighPerfPeriodicTaskRegistration + { + Identifier = identifier, + Interval = interval, + GetRequestDelegate = (con) => queryDelegate(con).Request + }); } /// @@ -286,16 +288,33 @@ namespace CryptoExchange.Net.Clients return new CallResult(new ServerError(new ErrorInfo(ErrorType.WebsocketPaused, "Socket is paused"))); } + void HandleSubscriptionComplete(bool success, object? response) + { + if (!success) + return; + + subscription.HandleSubQueryResponse(response); + subscription.Status = SubscriptionStatus.Subscribed; + if (ct != default) + { + subscription.CancellationTokenRegistration = ct.Register(async () => + { + _logger.CancellationTokenSetClosingSubscription(socketConnection.SocketId, subscription.Id); + await socketConnection.CloseAsync(subscription).ConfigureAwait(false); + }, false); + } + } + subscription.Status = SubscriptionStatus.Subscribing; - var waitEvent = new AsyncResetEvent(false); var subQuery = subscription.CreateSubscriptionQuery(socketConnection); if (subQuery != null) { + subQuery.OnComplete = () => HandleSubscriptionComplete(subQuery.Result?.Success ?? false, subQuery.Response); + // Send the request and wait for answer - var subResult = await socketConnection.SendAndWaitQueryAsync(subQuery, waitEvent, ct).ConfigureAwait(false); + var subResult = await socketConnection.SendAndWaitQueryAsync(subQuery, ct).ConfigureAwait(false); if (!subResult) { - waitEvent?.Set(); var isTimeout = subResult.Error is CancellationRequestedError; if (isTimeout && subscription.Status == SubscriptionStatus.Subscribed) { @@ -304,27 +323,18 @@ namespace CryptoExchange.Net.Clients else { _logger.FailedToSubscribe(socketConnection.SocketId, subResult.Error?.ToString()); - // If this was a timeout we still need to send an unsubscribe to prevent messages coming in later + // If this was a server process error we still might need to send an unsubscribe to prevent messages coming in later subscription.Status = SubscriptionStatus.Pending; await socketConnection.CloseAsync(subscription).ConfigureAwait(false); return new CallResult(subResult.Error!); } } - - subscription.HandleSubQueryResponse(subQuery.Response!); } - - subscription.Status = SubscriptionStatus.Subscribed; - if (ct != default) + else { - subscription.CancellationTokenRegistration = ct.Register(async () => - { - _logger.CancellationTokenSetClosingSubscription(socketConnection.SocketId, subscription.Id); - await socketConnection.CloseAsync(subscription).ConfigureAwait(false); - }, false); + HandleSubscriptionComplete(true, null); } - waitEvent?.Set(); _logger.SubscriptionCompletedSuccessfully(socketConnection.SocketId, subscription.Id); return new CallResult(new UpdateSubscription(socketConnection, subscription)); } @@ -336,7 +346,11 @@ namespace CryptoExchange.Net.Clients /// The subscription /// Cancellation token for closing this subscription /// - protected virtual async Task> SubscribeHighPerfAsync(string url, HighPerfSubscription subscription, CancellationToken ct) + protected virtual async Task> SubscribeHighPerfAsync( + string url, + HighPerfSubscription subscription, + IHighPerfConnectionFactory connectionFactory, + CancellationToken ct) { if (_disposing) return new CallResult(new InvalidOperationError("Client disposed, can't subscribe")); @@ -359,7 +373,7 @@ namespace CryptoExchange.Net.Clients while (true) { // Get a new or existing socket connection - var socketResult = await GetHighPerfSocketConnection(url, ct).ConfigureAwait(false); + var socketResult = await GetHighPerfSocketConnection(url, connectionFactory, ct).ConfigureAwait(false); if (!socketResult) return socketResult.As(null); @@ -402,7 +416,6 @@ namespace CryptoExchange.Net.Clients var sendResult = await socketConnection.SendAsync(subRequest).ConfigureAwait(false); if (!sendResult) { - // Needed? await socketConnection.CloseAsync(subscription).ConfigureAwait(false); return new CallResult(sendResult.Error!); } @@ -486,7 +499,7 @@ namespace CryptoExchange.Net.Clients if (ct.IsCancellationRequested) return new CallResult(new CancellationRequestedError()); - return await socketConnection.SendAndWaitQueryAsync(query, null, ct).ConfigureAwait(false); + return await socketConnection.SendAndWaitQueryAsync(query, ct).ConfigureAwait(false); } /// @@ -712,7 +725,10 @@ namespace CryptoExchange.Net.Clients /// The address the socket is for /// Cancellation token /// - protected virtual async Task>> GetHighPerfSocketConnection(string address, CancellationToken ct) + protected virtual async Task>> GetHighPerfSocketConnection( + string address, + IHighPerfConnectionFactory connectionFactory, + CancellationToken ct) { var socketQuery = highPerfSocketConnections.Where(s => s.Value.Tag.TrimEnd('/') == address.TrimEnd('/') && s.Value.ApiClient.GetType() == GetType() @@ -746,7 +762,8 @@ namespace CryptoExchange.Net.Clients _logger.ConnectionAddressSetTo(connectionAddress.Data!); // Create new socket connection - var socketConnection = new HighPerfSocketConnection(_logger, SocketFactory, GetWebSocketParameters(connectionAddress.Data!), this, JsonSerializerOptions, address); + var socketConnection = connectionFactory.CreateHighPerfConnection(_logger, SocketFactory, GetWebSocketParameters(connectionAddress.Data!), this, address); + //var socketConnection = new HighPerfJsonSocketConnection(_logger, SocketFactory, GetWebSocketParameters(connectionAddress.Data!), this, JsonSerializerOptions, address); foreach (var ptg in HighPerfPeriodicTaskRegistrations) socketConnection.QueryPeriodic(ptg.Identifier, ptg.Interval, ptg.GetRequestDelegate); @@ -1062,6 +1079,10 @@ namespace CryptoExchange.Net.Clients /// public virtual ReadOnlyMemory PreprocessStreamMessage(SocketConnection connection, WebSocketMessageType type, ReadOnlyMemory data) => data; - public abstract IMessageConverter CreateMessageConverter(); + /// + /// Create a new message converter instance + /// + /// + public abstract IMessageConverter CreateMessageConverter(WebSocketMessageType messageType); } } diff --git a/CryptoExchange.Net/Converters/MessageParsing/DynamicConverters/DynamicConverter.cs b/CryptoExchange.Net/Converters/MessageParsing/DynamicConverters/DynamicConverter.cs deleted file mode 100644 index 6b2df3e..0000000 --- a/CryptoExchange.Net/Converters/MessageParsing/DynamicConverters/DynamicConverter.cs +++ /dev/null @@ -1,55 +0,0 @@ -using Microsoft.Extensions.Options; -using System; -using System.Collections.Generic; -using System.Net.WebSockets; -using System.Text; -using System.Text.Json; - -namespace CryptoExchange.Net.Converters.MessageParsing.DynamicConverters -{ - - public ref struct MessageType - { - public Type Type { get; set; } - public string? Identifier { get; set; } - } - - public interface IMessageConverter - { - MessageType GetMessageType(ReadOnlySpan data, WebSocketMessageType? webSocketMessageType); - - object Deserialize(ReadOnlySpan data, Type type); - } - - public abstract class DynamicConverter : IMessageConverter - { - public abstract JsonSerializerOptions Options { get; } - - public abstract MessageType GetMessageType(ReadOnlySpan data, WebSocketMessageType? webSocketMessageType); - - public virtual object Deserialize(ReadOnlySpan data, Type type) - { - return JsonSerializer.Deserialize(data, type, Options); - } - } - - public abstract class StaticConverter : IMessageConverter - { - public abstract JsonSerializerOptions Options { get; } - public abstract MessageType GetMessageType(ReadOnlySpan data, WebSocketMessageType? webSocketMessageType); - - public object? Deserialize(ReadOnlySpan data, Type type) - { - return JsonSerializer.Deserialize(data, type, Options); - } - - } - - public abstract class StaticConverter : StaticConverter - { - public override MessageType GetMessageType(ReadOnlySpan data,, WebSocketMessageType? webSocketMessageType) => - new MessageType { Type = typeof(T), Identifier = GetMessageListenId(data, webSocketMessageType) }; - - public abstract string GetMessageListenId(ReadOnlySpan data, WebSocketMessageType? webSocketMessageType); - } -} diff --git a/CryptoExchange.Net/Converters/MessageParsing/DynamicConverters/IMessageConverter.cs b/CryptoExchange.Net/Converters/MessageParsing/DynamicConverters/IMessageConverter.cs new file mode 100644 index 0000000..6030002 --- /dev/null +++ b/CryptoExchange.Net/Converters/MessageParsing/DynamicConverters/IMessageConverter.cs @@ -0,0 +1,22 @@ +using System; +using System.Net.WebSockets; + +namespace CryptoExchange.Net.Converters.MessageParsing.DynamicConverters +{ + /// + /// Message converter + /// + public interface IMessageConverter + { + /// + /// Get message info + /// + MessageInfo GetMessageInfo(ReadOnlySpan data, WebSocketMessageType? webSocketMessageType); + + /// + /// Deserialize to the provided type + /// + object Deserialize(ReadOnlySpan data, Type type); + } + +} diff --git a/CryptoExchange.Net/Converters/MessageParsing/DynamicConverters/MessageInfo.cs b/CryptoExchange.Net/Converters/MessageParsing/DynamicConverters/MessageInfo.cs new file mode 100644 index 0000000..22a67bd --- /dev/null +++ b/CryptoExchange.Net/Converters/MessageParsing/DynamicConverters/MessageInfo.cs @@ -0,0 +1,13 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace CryptoExchange.Net.Converters.MessageParsing +{ + public ref struct MessageInfo + { + public Type? Type { get; set; } + public string? Identifier { get; set; } + } + +} diff --git a/CryptoExchange.Net/Converters/SystemTextJson/DynamicJsonConverter.cs b/CryptoExchange.Net/Converters/SystemTextJson/DynamicJsonConverter.cs new file mode 100644 index 0000000..3b55219 --- /dev/null +++ b/CryptoExchange.Net/Converters/SystemTextJson/DynamicJsonConverter.cs @@ -0,0 +1,32 @@ +using CryptoExchange.Net.Converters.MessageParsing; +using CryptoExchange.Net.Converters.MessageParsing.DynamicConverters; +using System; +using System.Net.WebSockets; +using System.Text.Json; + +namespace CryptoExchange.Net.Converters.SystemTextJson +{ + /// + /// JSON message converter + /// + public abstract class DynamicJsonConverter : IMessageConverter + { + /// + /// The serializer options to use + /// + public abstract JsonSerializerOptions Options { get; } + + /// + public abstract MessageInfo GetMessageInfo(ReadOnlySpan data, WebSocketMessageType? webSocketMessageType); + + /// + public virtual object Deserialize(ReadOnlySpan data, Type type) + { +#pragma warning disable IL2026 // Members annotated with 'RequiresUnreferencedCodeAttribute' require dynamic access otherwise can break functionality when trimming application code +#pragma warning disable IL3050 // Calling members annotated with 'RequiresDynamicCodeAttribute' may break functionality when AOT compiling. + return JsonSerializer.Deserialize(data, type, Options)!; +#pragma warning restore IL3050 // Calling members annotated with 'RequiresDynamicCodeAttribute' may break functionality when AOT compiling. +#pragma warning restore IL2026 // Members annotated with 'RequiresUnreferencedCodeAttribute' require dynamic access otherwise can break functionality when trimming application code + } + } +} diff --git a/CryptoExchange.Net/Interfaces/IMessageProcessor.cs b/CryptoExchange.Net/Interfaces/IMessageProcessor.cs index a9632ef..0a72d9f 100644 --- a/CryptoExchange.Net/Interfaces/IMessageProcessor.cs +++ b/CryptoExchange.Net/Interfaces/IMessageProcessor.cs @@ -25,7 +25,7 @@ namespace CryptoExchange.Net.Interfaces /// /// Handle a message /// - Task Handle(SocketConnection connection, DataEvent message, MessageHandlerLink matchedHandler); + CallResult Handle(SocketConnection connection, DataEvent message, MessageHandlerLink matchedHandler); /// /// Deserialize a message into object of type /// diff --git a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs index 3df7d91..a34bfbe 100644 --- a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs +++ b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs @@ -165,7 +165,7 @@ namespace CryptoExchange.Net.Sockets _sendBuffer = new ConcurrentQueue(); _ctsSource = new CancellationTokenSource(); if (websocketParameters.UseNewMessageDeserialization) - _receiveBufferSize = 1024; + _receiveBufferSize = websocketParameters.ReceiveBufferSize ?? 65536; else _receiveBufferSize = websocketParameters.ReceiveBufferSize ?? _defaultReceiveBufferSize; diff --git a/CryptoExchange.Net/Sockets/HighPerf/HighPerfJsonConnectionFactory.cs b/CryptoExchange.Net/Sockets/HighPerf/HighPerfJsonConnectionFactory.cs new file mode 100644 index 0000000..1712205 --- /dev/null +++ b/CryptoExchange.Net/Sockets/HighPerf/HighPerfJsonConnectionFactory.cs @@ -0,0 +1,29 @@ +using CryptoExchange.Net.Clients; +using CryptoExchange.Net.Interfaces; +using CryptoExchange.Net.Objects.Sockets; +using CryptoExchange.Net.Sockets; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Net; +using System.Text; +using System.Text.Json; + +namespace CryptoExchange.Net.Sockets.HighPerf +{ + public class HighPerfJsonConnectionFactory : IHighPerfConnectionFactory + { + private readonly JsonSerializerOptions _options; + + public HighPerfJsonConnectionFactory(JsonSerializerOptions options) + { + _options = options; + } + + public HighPerfSocketConnection CreateHighPerfConnection( + ILogger logger, IWebsocketFactory factory, WebSocketParameters parameters, SocketApiClient client, string address) + { + return new HighPerfJsonSocketConnection(logger, factory, parameters, client, _options, address); + } + } +} diff --git a/CryptoExchange.Net/Sockets/HighPerf/HighPerfSocketConnection.cs b/CryptoExchange.Net/Sockets/HighPerf/HighPerfSocketConnection.cs index 1313fba..bbff275 100644 --- a/CryptoExchange.Net/Sockets/HighPerf/HighPerfSocketConnection.cs +++ b/CryptoExchange.Net/Sockets/HighPerf/HighPerfSocketConnection.cs @@ -28,6 +28,9 @@ namespace CryptoExchange.Net.Sockets /// public bool Authenticated { get; set; } = false; + /// + public bool HasAuthenticatedSubscription => false; + /// /// The amount of subscriptions on this connection /// @@ -95,10 +98,6 @@ namespace CryptoExchange.Net.Sockets private SocketStatus _status; private Task? _processTask; - /// - /// Serializer options - /// - protected readonly JsonSerializerOptions _serializerOptions; /// /// The pipe the websocket will write to /// @@ -126,11 +125,10 @@ namespace CryptoExchange.Net.Sockets /// /// New socket connection /// - public HighPerfSocketConnection(ILogger logger, IWebsocketFactory socketFactory, WebSocketParameters parameters, SocketApiClient apiClient, JsonSerializerOptions serializerOptions, string tag) + public HighPerfSocketConnection(ILogger logger, IWebsocketFactory socketFactory, WebSocketParameters parameters, SocketApiClient apiClient, string tag) { _logger = logger; _pipe = new Pipe(); - _serializerOptions = serializerOptions; ApiClient = apiClient; Tag = tag; Properties = new Dictionary(); @@ -147,7 +145,7 @@ namespace CryptoExchange.Net.Sockets } /// - /// Process message from the pipe + /// Process messages from the pipe /// protected abstract Task ProcessAsync(CancellationToken ct); @@ -418,15 +416,15 @@ namespace CryptoExchange.Net.Sockets } /// - public class HighPerfSocketConnection : HighPerfSocketConnection + public abstract class HighPerfSocketConnection : HighPerfSocketConnection { #if NET9_0_OR_GREATER - private readonly Lock _listenersLock = new Lock(); + protected readonly Lock _listenersLock = new Lock(); #else - private readonly object _listenersLock = new object(); + protected readonly object _listenersLock = new object(); #endif - private readonly List> _typedSubscriptions; + protected readonly List> _typedSubscriptions; /// public override HighPerfSubscription[] Subscriptions @@ -444,7 +442,8 @@ namespace CryptoExchange.Net.Sockets /// /// ctor /// - public HighPerfSocketConnection(ILogger logger, IWebsocketFactory socketFactory, WebSocketParameters parameters, SocketApiClient apiClient, JsonSerializerOptions serializerOptions, string tag) : base(logger, socketFactory, parameters, apiClient, serializerOptions, tag) + public HighPerfSocketConnection(ILogger logger, IWebsocketFactory socketFactory, WebSocketParameters parameters, SocketApiClient apiClient, string tag) + : base(logger, socketFactory, parameters, apiClient, tag) { _typedSubscriptions = new List>(); } @@ -473,6 +472,40 @@ namespace CryptoExchange.Net.Sockets _typedSubscriptions.Remove(subscription); } + protected ValueTask DelegateToSubscription(HighPerfSubscription sub, T update) + { + try + { + return sub.HandleAsync(update!); + } + catch (Exception ex) + { + sub.InvokeExceptionHandler(ex); + _logger.UserMessageProcessingFailed(SocketId, ex.Message, ex); + return new ValueTask(); + } + } + } + + public class HighPerfJsonSocketConnection : HighPerfSocketConnection + { + private JsonSerializerOptions _jsonOptions; + + /// + /// ctor + /// + public HighPerfJsonSocketConnection( + ILogger logger, + IWebsocketFactory socketFactory, + WebSocketParameters parameters, + SocketApiClient apiClient, + JsonSerializerOptions serializerOptions, + string tag) + : base(logger, socketFactory, parameters, apiClient, tag) + { + _jsonOptions = serializerOptions; + } + /// protected override async Task ProcessAsync(CancellationToken ct) { @@ -480,7 +513,7 @@ namespace CryptoExchange.Net.Sockets { #pragma warning disable IL2026 // Members annotated with 'RequiresUnreferencedCodeAttribute' require dynamic access otherwise can break functionality when trimming application code #pragma warning disable IL3050 // Calling members annotated with 'RequiresDynamicCodeAttribute' may break functionality when AOT compiling. - await foreach (var update in JsonSerializer.DeserializeAsyncEnumerable(_pipe.Reader, true, _serializerOptions, ct).ConfigureAwait(false)) + await foreach (var update in JsonSerializer.DeserializeAsyncEnumerable(_pipe.Reader, true, _jsonOptions, ct).ConfigureAwait(false)) #pragma warning restore IL3050 // Calling members annotated with 'RequiresDynamicCodeAttribute' may break functionality when AOT compiling. #pragma warning restore IL2026 // Members annotated with 'RequiresUnreferencedCodeAttribute' require dynamic access otherwise can break functionality when trimming application code { @@ -498,19 +531,6 @@ namespace CryptoExchange.Net.Sockets catch (OperationCanceledException) { } } - private ValueTask DelegateToSubscription(HighPerfSubscription sub, T update) - { - try - { - return sub.HandleAsync(update!); - } - catch (Exception ex) - { - sub.InvokeExceptionHandler(ex); - _logger.UserMessageProcessingFailed(SocketId, ex.Message, ex); - return new ValueTask(); - } - } } } diff --git a/CryptoExchange.Net/Sockets/HighPerf/HighPerfWebSocketClient.cs b/CryptoExchange.Net/Sockets/HighPerf/HighPerfWebSocketClient.cs index 6d8050a..58020c5 100644 --- a/CryptoExchange.Net/Sockets/HighPerf/HighPerfWebSocketClient.cs +++ b/CryptoExchange.Net/Sockets/HighPerf/HighPerfWebSocketClient.cs @@ -9,6 +9,7 @@ using System.Buffers; using System.IO.Pipelines; using System.Net; using System.Net.WebSockets; +using System.Text; using System.Threading; using System.Threading.Tasks; @@ -333,6 +334,7 @@ namespace CryptoExchange.Net.Sockets private async Task ReceiveLoopAsync() { Exception? exitException = null; + try { while (true) @@ -340,14 +342,14 @@ namespace CryptoExchange.Net.Sockets if (_ctsSource.IsCancellationRequested) break; - ValueWebSocketReceiveResult receiveResult = default; + ValueWebSocketReceiveResult receiveResult; try { receiveResult = await _socket!.ReceiveAsync(_pipeWriter.GetMemory(_receiveBufferSize), _ctsSource.Token).ConfigureAwait(false); // Advance the writer to communicate which part of the memory was written - _pipeWriter.Advance(receiveResult.Count); + _pipeWriter.Advance(receiveResult.Count); } catch (OperationCanceledException ex) { diff --git a/CryptoExchange.Net/Sockets/HighPerf/IHighPerfConnectionFactory.cs b/CryptoExchange.Net/Sockets/HighPerf/IHighPerfConnectionFactory.cs new file mode 100644 index 0000000..15cad5f --- /dev/null +++ b/CryptoExchange.Net/Sockets/HighPerf/IHighPerfConnectionFactory.cs @@ -0,0 +1,16 @@ +using CryptoExchange.Net.Clients; +using CryptoExchange.Net.Interfaces; +using CryptoExchange.Net.Objects.Sockets; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Text; + +namespace CryptoExchange.Net.Sockets.HighPerf +{ + public interface IHighPerfConnectionFactory + { + HighPerfSocketConnection CreateHighPerfConnection( + ILogger logger, IWebsocketFactory factory, WebSocketParameters parameters, SocketApiClient client, string address); + } +} diff --git a/CryptoExchange.Net/Sockets/ISocketConnection.cs b/CryptoExchange.Net/Sockets/ISocketConnection.cs index e4568f5..a7f9139 100644 --- a/CryptoExchange.Net/Sockets/ISocketConnection.cs +++ b/CryptoExchange.Net/Sockets/ISocketConnection.cs @@ -19,6 +19,8 @@ namespace CryptoExchange.Net.Sockets /// Whether the connection has been authenticated /// bool Authenticated { get; set; } + + bool HasAuthenticatedSubscription { get; } /// /// Whether the connection is established /// diff --git a/CryptoExchange.Net/Sockets/Query.cs b/CryptoExchange.Net/Sockets/Query.cs index 0cf1cff..ac6f7e3 100644 --- a/CryptoExchange.Net/Sockets/Query.cs +++ b/CryptoExchange.Net/Sockets/Query.cs @@ -60,11 +60,6 @@ namespace CryptoExchange.Net.Sockets /// public object? Response { get; set; } - /// - /// Wait event for the calling message processing thread - /// - public AsyncResetEvent? ContinueAwaiter { get; set; } - public HashSet DeserializationTypes { get; set; } private MessageMatcher _matcher; @@ -171,8 +166,9 @@ namespace CryptoExchange.Net.Sockets /// /// Handle a response message /// - public abstract Task Handle(SocketConnection connection, DataEvent message, MessageHandlerLink check); + public abstract CallResult Handle(SocketConnection connection, DataEvent message, MessageHandlerLink check); + public Action OnComplete { get; set; } } /// @@ -192,12 +188,16 @@ namespace CryptoExchange.Net.Sockets /// /// /// - protected Query(object request, bool authenticated, int weight = 1) : base(request, authenticated, weight) + protected Query( + object request, + bool authenticated, + int weight = 1) + : base(request, authenticated, weight) { } /// - public override async Task Handle(SocketConnection connection, DataEvent message, MessageHandlerLink check) + public override CallResult Handle(SocketConnection connection, DataEvent message, MessageHandlerLink check) { if (!PreCheckMessage(connection, message)) return CallResult.SuccessResult; @@ -214,8 +214,7 @@ namespace CryptoExchange.Net.Sockets { Completed = true; _event.Set(); - if (ContinueAwaiter != null) - await ContinueAwaiter.WaitAsync().ConfigureAwait(false); + OnComplete?.Invoke(); } return Result; @@ -238,17 +237,20 @@ namespace CryptoExchange.Net.Sockets else Result = new CallResult(default, null, default); - ContinueAwaiter?.Set(); _event.Set(); + OnComplete?.Invoke(); } /// public override void Fail(Error error) { + if (Completed) + return; + Result = new CallResult(error); Completed = true; - ContinueAwaiter?.Set(); _event.Set(); + OnComplete?.Invoke(); } } } diff --git a/CryptoExchange.Net/Sockets/SocketConnection.cs b/CryptoExchange.Net/Sockets/SocketConnection.cs index d80522a..370ebb6 100644 --- a/CryptoExchange.Net/Sockets/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/SocketConnection.cs @@ -12,6 +12,7 @@ using System.Linq; using System.Net; using System.Net.Sockets; using System.Net.WebSockets; +using System.Text; using System.Text.Json; using System.Threading; using System.Threading.Tasks; @@ -150,6 +151,8 @@ namespace CryptoExchange.Net.Sockets /// public bool Authenticated { get; set; } + public bool HasAuthenticatedSubscription => Subscriptions.Any(x => x.Authenticated); + /// /// If connection is made /// @@ -268,7 +271,8 @@ namespace CryptoExchange.Net.Sockets private IByteMessageAccessor? _stringMessageAccessor; private IByteMessageAccessor? _byteMessageAccessor; - private IMessageConverter? _messageConverter; + private IMessageConverter? _byteMessageConverter; + private IMessageConverter? _textMessageConverter; /// /// The task that is sending periodic data on the websocket. Can be used for sending Ping messages every x seconds or similar. Not necessary. @@ -509,42 +513,97 @@ namespace CryptoExchange.Net.Sockets /// protected internal virtual void HandleStreamMessage2(WebSocketMessageType type, ReadOnlySpan data) { - //var sw = Stopwatch.StartNew(); var receiveTime = DateTime.UtcNow; - //// 1. Decrypt/Preprocess if necessary + // 1. Decrypt/Preprocess if necessary //data = ApiClient.PreprocessStreamMessage(this, type, data); - _messageConverter ??= ApiClient.CreateMessageConverter(); + IMessageConverter messageConverter; + if (type == WebSocketMessageType.Binary) + messageConverter = _byteMessageConverter ??= ApiClient.CreateMessageConverter(type); + else + messageConverter = _textMessageConverter ??= ApiClient.CreateMessageConverter(type); - var messageType = _messageConverter.GetMessageType(data, type); - if (messageType.Type == null) + string? originalData = null; + if (ApiClient.ApiOptions.OutputOriginalData ?? ApiClient.ClientOptions.OutputOriginalData) { - // Failed to determine message type +#if NETSTANDARD2_0 + originalData = Encoding.UTF8.GetString(data.ToArray()); +#else + originalData = Encoding.UTF8.GetString(data); +#endif + } + + List? processors = null; + var messageInfo = messageConverter.GetMessageInfo(data, type); + if (messageInfo.Type == null) + { + if (messageInfo.Identifier == null) + { + // Both deserialization type and identifier null, can't process + _logger.LogWarning("Failed to evaluate message. Data: {Message}", Encoding.UTF8.GetString(data.ToArray())); + return; + } + + // Couldn't determine deserialization type, try determine the type based on identifier + lock (_listenersLock) + processors = _listeners.ToList(); + + foreach (var subscription in processors) + { + var handler = subscription.MessageMatcher.GetHandlerLinks(messageInfo.Identifier)?.FirstOrDefault(); + if (handler == null) + continue; + + _logger.LogTrace("Message type determined based on identifier"); + messageInfo.Type = handler.DeserializationType; + break; + } + + if (messageInfo.Type == null) + { + // No handler found for identifier either, can't process + _logger.LogWarning("Failed to determine message type. Data: {Message}", Encoding.UTF8.GetString(data.ToArray())); + return; + } + } + + object result; + try + { + result = messageConverter.Deserialize(data, messageInfo.Type!); + } + catch(Exception ex) + { + _logger.LogWarning(ex, "Deserialization failed. Data: {Message}", Encoding.UTF8.GetString(data.ToArray())); return; } - var result = _messageConverter.Deserialize(data, messageType.Type); if (result == null) { // Deserialize error + _logger.LogWarning("Deserialization returned null. Data: {Message}", Encoding.UTF8.GetString(data.ToArray())); return; } - var targetType = messageType.Type; - List listeners; - lock (_listenersLock) - listeners = _listeners.Where(x => x.DeserializationTypes.Contains(targetType)).ToList(); - if (listeners.Count == 0) + var targetType = messageInfo.Type!; + if (processors == null) + { + lock (_listenersLock) + processors = _listeners.Where(x => x.DeserializationTypes.Contains(targetType)).ToList(); + } + + if (processors.Count == 0) { // No subscriptions found for type + _logger.LogWarning("No subscriptions found for message. Data: {Message}", Encoding.UTF8.GetString(data.ToArray())); return; } - var dataEvent = new DataEvent(result, null, null, null /*originalData*/, receiveTime, null); - foreach (var subscription in listeners) + var dataEvent = new DataEvent(result, null, null, originalData, receiveTime, null); + foreach (var subscription in processors) { - var links = subscription.MessageMatcher.GetHandlerLinks(messageType.Identifier); + var links = subscription.MessageMatcher.GetHandlerLinks(messageInfo.Identifier!); foreach(var link in links) subscription.Handle(this, dataEvent, link); } @@ -649,7 +708,7 @@ namespace CryptoExchange.Net.Sockets try { var innerSw = Stopwatch.StartNew(); - await processor.Handle(this, new DataEvent(deserialized, null, null, originalData, receiveTime, null), listener).ConfigureAwait(false); + processor.Handle(this, new DataEvent(deserialized, null, null, originalData, receiveTime, null), listener); if (processor is Query query && query.RequiredResponses != 1) _logger.LogDebug($"[Sckt {SocketId}] [Req {query.Id}] responses: {query.CurrentResponses}/{query.RequiredResponses}"); totalUserTime += (int)innerSw.ElapsedMilliseconds; @@ -873,12 +932,11 @@ namespace CryptoExchange.Net.Sockets /// Send a query request and wait for an answer /// /// Query to send - /// Wait event for when the socket message handler can continue /// Cancellation token /// - public virtual async Task SendAndWaitQueryAsync(Query query, AsyncResetEvent? continueEvent = null, CancellationToken ct = default) + public virtual async Task SendAndWaitQueryAsync(Query query, CancellationToken ct = default) { - await SendAndWaitIntAsync(query, continueEvent, ct).ConfigureAwait(false); + await SendAndWaitIntAsync(query, ct).ConfigureAwait(false); return query.Result ?? new CallResult(new TimeoutError()); } @@ -887,21 +945,19 @@ namespace CryptoExchange.Net.Sockets /// /// Expected result type /// Query to send - /// Wait event for when the socket message handler can continue /// Cancellation token /// - public virtual async Task> SendAndWaitQueryAsync(Query query, AsyncResetEvent? continueEvent = null, CancellationToken ct = default) + public virtual async Task> SendAndWaitQueryAsync(Query query, CancellationToken ct = default) { - await SendAndWaitIntAsync(query, continueEvent, ct).ConfigureAwait(false); + await SendAndWaitIntAsync(query, ct).ConfigureAwait(false); return query.TypedResult ?? new CallResult(new TimeoutError()); } - private async Task SendAndWaitIntAsync(Query query, AsyncResetEvent? continueEvent, CancellationToken ct = default) + private async Task SendAndWaitIntAsync(Query query, CancellationToken ct = default) { lock (_listenersLock) _listeners.Add(query); - query.ContinueAwaiter = continueEvent; var sendResult = await SendAsync(query.Id, query.Request, query.Weight).ConfigureAwait(false); if (!sendResult) { @@ -1116,15 +1172,13 @@ namespace CryptoExchange.Net.Sockets subscription.Status = SubscriptionStatus.Subscribed; continue; } - - var waitEvent = new AsyncResetEvent(false); - taskList.Add(SendAndWaitQueryAsync(subQuery, waitEvent).ContinueWith((r) => + subQuery.OnComplete = () => { - subscription.Status = r.Result.Success ? SubscriptionStatus.Subscribed : SubscriptionStatus.Pending; - subscription.HandleSubQueryResponse(subQuery.Response!); - waitEvent.Set(); - return r.Result; - })); + subscription.Status = subQuery.Result!.Success ? SubscriptionStatus.Subscribed : SubscriptionStatus.Pending; + subscription.HandleSubQueryResponse(subQuery.Response); + }; + + taskList.Add(SendAndWaitQueryAsync(subQuery)); } await Task.WhenAll(taskList).ConfigureAwait(false); diff --git a/CryptoExchange.Net/Sockets/Subscription.cs b/CryptoExchange.Net/Sockets/Subscription.cs index e4d8643..c2def2b 100644 --- a/CryptoExchange.Net/Sockets/Subscription.cs +++ b/CryptoExchange.Net/Sockets/Subscription.cs @@ -152,7 +152,7 @@ namespace CryptoExchange.Net.Sockets /// Handle a subscription query response /// /// - public virtual void HandleSubQueryResponse(object message) { } + public virtual void HandleSubQueryResponse(object? message) { } /// /// Handle an unsubscription query response @@ -182,11 +182,11 @@ namespace CryptoExchange.Net.Sockets /// /// Handle an update message /// - public Task Handle(SocketConnection connection, DataEvent message, MessageHandlerLink matcher) + public CallResult Handle(SocketConnection connection, DataEvent message, MessageHandlerLink matcher) { ConnectionInvocations++; TotalInvocations++; - return Task.FromResult(matcher.Handle(connection, message)); + return matcher.Handle(connection, message); } /// diff --git a/CryptoExchange.Net/Testing/Implementations/TestSocket.cs b/CryptoExchange.Net/Testing/Implementations/TestSocket.cs index 49eb2e5..a0b74cf 100644 --- a/CryptoExchange.Net/Testing/Implementations/TestSocket.cs +++ b/CryptoExchange.Net/Testing/Implementations/TestSocket.cs @@ -6,6 +6,7 @@ using System.Threading; using System.Threading.Tasks; using CryptoExchange.Net.Interfaces; using CryptoExchange.Net.Objects; +using CryptoExchange.Net.Sockets; #pragma warning disable IL2026 // Members annotated with 'RequiresUnreferencedCodeAttribute' require dynamic access otherwise can break functionality when trimming application code #pragma warning disable IL3050 // Calling members annotated with 'RequiresDynamicCodeAttribute' may break functionality when AOT compiling. @@ -45,8 +46,13 @@ namespace CryptoExchange.Net.Testing.Implementations public static readonly object lastIdLock = new object(); #endif - public TestSocket(string address) + private bool _newDeserialization; + + public SocketConnection Connection { get; set; } + + public TestSocket(bool newDeserialization, string address) { + _newDeserialization = newDeserialization; Uri = new Uri(address); lock (lastIdLock) { @@ -101,12 +107,14 @@ namespace CryptoExchange.Net.Testing.Implementations public void InvokeMessage(string data) { - OnStreamMessage?.Invoke(WebSocketMessageType.Text, new ReadOnlyMemory(Encoding.UTF8.GetBytes(data))).Wait(); - } - - public void InvokeMessage(T data) - { - OnStreamMessage?.Invoke(WebSocketMessageType.Text, new ReadOnlyMemory(Encoding.UTF8.GetBytes(JsonSerializer.Serialize(data)))).Wait(); + if (!_newDeserialization) + { + OnStreamMessage?.Invoke(WebSocketMessageType.Text, new ReadOnlyMemory(Encoding.UTF8.GetBytes(data))).Wait(); + } + else + { + Connection.HandleStreamMessage2(WebSocketMessageType.Text, Encoding.UTF8.GetBytes(data)); + } } public Task ReconnectAsync() => throw new NotImplementedException(); diff --git a/CryptoExchange.Net/Testing/Implementations/TestWebsocketFactory.cs b/CryptoExchange.Net/Testing/Implementations/TestWebsocketFactory.cs index 18d2abd..1745a60 100644 --- a/CryptoExchange.Net/Testing/Implementations/TestWebsocketFactory.cs +++ b/CryptoExchange.Net/Testing/Implementations/TestWebsocketFactory.cs @@ -17,6 +17,10 @@ namespace CryptoExchange.Net.Testing.Implementations } public IHighPerfWebsocket CreateHighPerfWebsocket(ILogger logger, WebSocketParameters parameters, PipeWriter pipeWriter) => throw new NotImplementedException(); - public IWebsocket CreateWebsocket(ILogger logger, SocketConnection connection, WebSocketParameters parameters) => _socket; + public IWebsocket CreateWebsocket(ILogger logger, SocketConnection connection, WebSocketParameters parameters) + { + _socket.Connection = connection; + return _socket; + } } } diff --git a/CryptoExchange.Net/Testing/TestHelpers.cs b/CryptoExchange.Net/Testing/TestHelpers.cs index 169d186..709dce4 100644 --- a/CryptoExchange.Net/Testing/TestHelpers.cs +++ b/CryptoExchange.Net/Testing/TestHelpers.cs @@ -63,7 +63,7 @@ namespace CryptoExchange.Net.Testing internal static TestSocket ConfigureSocketClient(T client, string address) where T : BaseSocketClient { - var socket = new TestSocket(address); + var socket = new TestSocket(client.ClientOptions.EnabledNewDeserialization, address); foreach (var apiClient in client.ApiClients.OfType()) { apiClient.SocketFactory = new TestWebsocketFactory(socket);