diff --git a/CryptoExchange.Net/Converters/MessageParsing/DynamicConverters/MessageConverterTypes.cs b/CryptoExchange.Net/Converters/MessageParsing/DynamicConverters/MessageConverterTypes.cs index ceadfd2..60b42f5 100644 --- a/CryptoExchange.Net/Converters/MessageParsing/DynamicConverters/MessageConverterTypes.cs +++ b/CryptoExchange.Net/Converters/MessageParsing/DynamicConverters/MessageConverterTypes.cs @@ -1,6 +1,5 @@ using System; using System.Collections.Generic; -using System.Linq; using System.Text; namespace CryptoExchange.Net.Converters.MessageParsing.DynamicConverters @@ -13,7 +12,16 @@ namespace CryptoExchange.Net.Converters.MessageParsing.DynamicConverters public MessageFieldReference[] Fields { get; set; } - public Func MessageIdentifier { get; set; } + public Func IdentifyMessageCallback { get; set; } + public string? StaticIdentifier { get; set; } + + public string? IdentifyMessage(SearchResult result) + { + if (StaticIdentifier != null) + return StaticIdentifier; + + return IdentifyMessageCallback(result); + } public bool Statisfied(SearchResult result) { @@ -42,25 +50,28 @@ namespace CryptoExchange.Net.Converters.MessageParsing.DynamicConverters public class PropertyFieldReference : MessageFieldReference { - public string? PropertyName { get; set; } + public byte[] PropertyName { get; set; } public PropertyFieldReference(string propertyName) : base(propertyName) { - PropertyName = propertyName; + PropertyName = Encoding.UTF8.GetBytes(propertyName); } } public class ArrayFieldReference : MessageFieldReference { - public int? ArrayIndex { get; set; } + public int ArrayIndex { get; set; } - public ArrayFieldReference(string searchName) : base(searchName) + public ArrayFieldReference(string searchName, int depth, int index) : base(searchName) { + Depth = depth; + ArrayIndex = index; } } public class MessageEvalutorFieldReference { + public bool SkipReading { get; set; } public MessageFieldReference Field { get; set; } public MessageEvaluator? ForceEvaluator { get; set; } } @@ -69,13 +80,31 @@ namespace CryptoExchange.Net.Converters.MessageParsing.DynamicConverters { private List _items = new List(); - public string FieldValue(string searchName) => _items.First(x => x.Field.SearchName == searchName).Value; + public string FieldValue(string searchName) + { + foreach(var item in _items) + { + if (item.Field.SearchName == searchName) + return item.Value; + } + + throw new Exception(""); // TODO + } public int Count => _items.Count; public void Clear() => _items.Clear(); - public bool Contains(MessageFieldReference field) => _items.Any(x => x.Field == field); + public bool Contains(MessageFieldReference field) + { + foreach(var item in _items) + { + if (item.Field == field) + return true; + } + + return false; + } public void Write(MessageFieldReference field, string? value) => _items.Add(new SearchResultItem { diff --git a/CryptoExchange.Net/Converters/SystemTextJson/DynamicJsonConverter.cs b/CryptoExchange.Net/Converters/SystemTextJson/MessageConverters/DynamicJsonConverter.cs similarity index 55% rename from CryptoExchange.Net/Converters/SystemTextJson/DynamicJsonConverter.cs rename to CryptoExchange.Net/Converters/SystemTextJson/MessageConverters/DynamicJsonConverter.cs index dca2fc3..2cb1c53 100644 --- a/CryptoExchange.Net/Converters/SystemTextJson/DynamicJsonConverter.cs +++ b/CryptoExchange.Net/Converters/SystemTextJson/MessageConverters/DynamicJsonConverter.cs @@ -1,5 +1,4 @@ -using CryptoExchange.Net.Converters.MessageParsing; -using CryptoExchange.Net.Converters.MessageParsing.DynamicConverters; +using CryptoExchange.Net.Converters.MessageParsing.DynamicConverters; using System; using System.Collections.Generic; using System.Linq; @@ -9,7 +8,7 @@ using System.Text.Json; namespace CryptoExchange.Net.Converters.SystemTextJson { /// - /// JSON message converter + /// JSON message converter, sequentially read the json and looks for specific prefdefined fields to identify the message /// public abstract class DynamicJsonConverter : IMessageConverter { @@ -18,14 +17,109 @@ namespace CryptoExchange.Net.Converters.SystemTextJson /// public abstract JsonSerializerOptions Options { get; } + /// + /// Message evaluators + /// protected abstract MessageEvaluator[] MessageEvaluators { get; } private readonly SearchResult _searchResult = new(); + private bool _hasArraySearches; + private bool _initialized; + private int _maxSearchDepth; + private bool _overlappingFields; + private MessageEvaluator? _topEvaluator; + private List? _searchFields; + + private void InitializeConverter() + { + if (_initialized) + return; + + _maxSearchDepth = int.MinValue; + _searchFields = new List(); + foreach (var evaluator in MessageEvaluators.OrderBy(x => x.Priority)) + { + _topEvaluator ??= evaluator; + foreach (var field in evaluator.Fields) + { + if (MessageEvaluators.Where(x => x != evaluator).SelectMany(x => x.Fields).Any(otherField => + { + if (field is PropertyFieldReference propRef + && otherField is PropertyFieldReference otherPropRef) + { + return field.Depth == otherPropRef.Depth && propRef.PropertyName == otherPropRef.PropertyName; + } + else if (field is ArrayFieldReference arrayRef + && otherField is ArrayFieldReference otherArrayPropRef) + { + return field.Depth == otherArrayPropRef.Depth && arrayRef.ArrayIndex == otherArrayPropRef.ArrayIndex; + } + + return false; + })) + { + _overlappingFields = true; + } + + MessageEvalutorFieldReference? existing = null; + if (field is ArrayFieldReference arrayField) + { + _hasArraySearches = true; + existing = _searchFields.SingleOrDefault(x => + x.Field is ArrayFieldReference arrayFieldRef + && arrayFieldRef.ArrayIndex == arrayField.ArrayIndex + && arrayFieldRef.Depth == arrayField.Depth + && (arrayFieldRef.Constraint == null && arrayField.Constraint == null)); + } + else if (field is PropertyFieldReference propField) + { + existing = _searchFields.SingleOrDefault(x => + x.Field is PropertyFieldReference propFieldRef + && propFieldRef.PropertyName == propField.PropertyName + && propFieldRef.Depth == propField.Depth + && (propFieldRef.Constraint == null && propFieldRef.Constraint == null)); + } + + if (existing != null) + { + if (existing.SkipReading == true + && (evaluator.IdentifyMessageCallback != null + || field.Constraint != null)) + { + existing.SkipReading = false; + } + + if (evaluator.ForceIfFound) + { + if (evaluator.Fields.Length > 1 || existing.ForceEvaluator != null) + throw new Exception("Invalid config"); + + existing.ForceEvaluator = evaluator; + } + } + else + { + _searchFields.Add(new MessageEvalutorFieldReference + { + SkipReading = evaluator.IdentifyMessageCallback == null && field.Constraint == null, + ForceEvaluator = evaluator.ForceIfFound ? evaluator : null, + Field = field + }); + } + + if (field.Depth > _maxSearchDepth) + _maxSearchDepth = field.Depth; + } + } + + _initialized = true; + } + /// public virtual string? GetMessageIdentifier(ReadOnlySpan data, WebSocketMessageType? webSocketMessageType) { - InitializeSearch(); + InitializeConverter(); int? arrayIndex = null; @@ -43,7 +137,7 @@ namespace CryptoExchange.Net.Converters.SystemTextJson } if (reader.TokenType == JsonTokenType.StartArray) - arrayIndex = 0; + arrayIndex = -1; else if (reader.TokenType == JsonTokenType.EndArray) arrayIndex = null; else if (arrayIndex != null) @@ -53,23 +147,37 @@ namespace CryptoExchange.Net.Converters.SystemTextJson || arrayIndex != null && _hasArraySearches) { bool written = false; - foreach (var field in _searchFields) + + string? value = null; + byte[]? propName = null; + foreach (var field in _searchFields!) { if (field.Field.Depth != reader.CurrentDepth) continue; if (field.Field is PropertyFieldReference propFieldRef) { - if (reader.TokenType != JsonTokenType.PropertyName) - continue; + if (propName == null) + { + if (reader.TokenType != JsonTokenType.PropertyName) + continue; - if (!reader.ValueTextEquals(propFieldRef.PropertyName)) - continue; + if (!reader.ValueTextEquals(propFieldRef.PropertyName)) + continue; - reader.Read(); + propName = propFieldRef.PropertyName; + reader.Read(); + } + else if (!propFieldRef.PropertyName.SequenceEqual(propName)) + { + continue; + } } - else if(field.Field is ArrayFieldReference arrayFieldRef) + else if (field.Field is ArrayFieldReference arrayFieldRef) { + if (propName != null) + continue; + if (reader.TokenType == JsonTokenType.PropertyName) continue; @@ -77,39 +185,48 @@ namespace CryptoExchange.Net.Converters.SystemTextJson continue; } - string? value = null; - if (reader.TokenType == JsonTokenType.Number) - value = reader.GetDecimal().ToString(); - else if (reader.TokenType == JsonTokenType.String) - value = reader.GetString()!; - else if (reader.TokenType == JsonTokenType.Null) - value = null; - else - continue; - - if (field.Field.Constraint != null - && !field.Field.Constraint(value)) + if (!field.SkipReading) { - continue; + if (value == null) + { + if (reader.TokenType == JsonTokenType.Number) + value = reader.GetDecimal().ToString(); + else if (reader.TokenType == JsonTokenType.String) + value = reader.GetString()!; + else if (reader.TokenType == JsonTokenType.Null) + value = null; + else + continue; + } + + if (field.Field.Constraint != null + && !field.Field.Constraint(value)) + { + continue; + } } _searchResult.Write(field.Field, value); if (field.ForceEvaluator != null) { + if (field.ForceEvaluator.StaticIdentifier != null) + return field.ForceEvaluator.StaticIdentifier; + // Force the immediate return upon encountering this field - return field.ForceEvaluator.MessageIdentifier(_searchResult); + return field.ForceEvaluator.IdentifyMessage(_searchResult); } written = true; - break; + if (!_overlappingFields) + break; } if (!written) continue; - if (_topEvaluator.Statisfied(_searchResult)) - return _topEvaluator.MessageIdentifier(_searchResult); + if (_topEvaluator!.Statisfied(_searchResult)) + return _topEvaluator.IdentifyMessage(_searchResult); if (_searchFields.Count == _searchResult.Count) break; @@ -119,74 +236,12 @@ namespace CryptoExchange.Net.Converters.SystemTextJson foreach (var evaluator in MessageEvaluators) { if (evaluator.Statisfied(_searchResult)) - return evaluator.MessageIdentifier(_searchResult); + return evaluator.IdentifyMessage(_searchResult); } return null; } - protected bool _hasArraySearches; - protected bool _initialized; - protected List _searchFields; - protected int _maxSearchDepth; - protected MessageEvaluator _topEvaluator; - - protected void InitializeSearch() - { - if (_initialized) - return; - - _maxSearchDepth = int.MinValue; - _searchFields = new List(); - foreach (var evaluator in MessageEvaluators.OrderBy(x => x.Priority)) - { - _topEvaluator ??= evaluator; - foreach (var field in evaluator.Fields) - { - MessageEvalutorFieldReference? existing = null; - if (field is ArrayFieldReference arrayField) - { - _hasArraySearches = true; - existing = _searchFields.SingleOrDefault(x => - x.Field is ArrayFieldReference arrayFieldRef - && arrayFieldRef.ArrayIndex == arrayFieldRef.ArrayIndex - && arrayFieldRef.Depth == arrayFieldRef.Depth); - } - else if(field is PropertyFieldReference propField) - { - existing = _searchFields.SingleOrDefault(x => - x.Field is PropertyFieldReference propFieldRef - && propFieldRef.PropertyName == propField.PropertyName - && propFieldRef.Depth == propField.Depth); - } - - if (existing != null) - { - if (evaluator.ForceIfFound) - { - if (existing.ForceEvaluator != null) - throw new Exception("Invalid config"); - - existing.ForceEvaluator = evaluator; - } - } - else - { - _searchFields.Add(new MessageEvalutorFieldReference - { - ForceEvaluator = evaluator.ForceIfFound ? evaluator : null, - Field = field - }); - } - - if (field.Depth > _maxSearchDepth) - _maxSearchDepth = field.Depth; - } - } - - _initialized = true; - } - /// public virtual object Deserialize(ReadOnlySpan data, Type type) { diff --git a/CryptoExchange.Net/Converters/SystemTextJson/MessageConverters/PreloadJsonConverter.cs b/CryptoExchange.Net/Converters/SystemTextJson/MessageConverters/PreloadJsonConverter.cs new file mode 100644 index 0000000..b7109b8 --- /dev/null +++ b/CryptoExchange.Net/Converters/SystemTextJson/MessageConverters/PreloadJsonConverter.cs @@ -0,0 +1,58 @@ +using CryptoExchange.Net.Converters.MessageParsing.DynamicConverters; +using System; +using System.Net.WebSockets; +using System.Text.Json; + +namespace CryptoExchange.Net.Converters.SystemTextJson +{ + /// + /// JSON message converter, reads the json data info a JsonDocument after which the data can be inspected to identify the message + /// + public abstract class PreloadJsonConverter : IMessageConverter + { + /// + /// The serializer options to use + /// + public abstract JsonSerializerOptions Options { get; } + + /// + public virtual string? GetMessageIdentifier(ReadOnlySpan data, WebSocketMessageType? webSocketMessageType) + { + var reader = new Utf8JsonReader(data); + var jsonDocument = JsonDocument.ParseValue(ref reader); + + return GetMessageIdentifier(jsonDocument); + } + + /// + /// Get the message identifier for this document + /// + protected abstract string? GetMessageIdentifier(JsonDocument docuement); + + /// + public virtual object Deserialize(ReadOnlySpan data, Type type) + { +#pragma warning disable IL2026 // Members annotated with 'RequiresUnreferencedCodeAttribute' require dynamic access otherwise can break functionality when trimming application code +#pragma warning disable IL3050 // Calling members annotated with 'RequiresDynamicCodeAttribute' may break functionality when AOT compiling. + return JsonSerializer.Deserialize(data, type, Options)!; +#pragma warning restore IL3050 // Calling members annotated with 'RequiresDynamicCodeAttribute' may break functionality when AOT compiling. +#pragma warning restore IL2026 // Members annotated with 'RequiresUnreferencedCodeAttribute' require dynamic access otherwise can break functionality when trimming application code + } + + /// + /// Get the string value for a path, or an emtpy string if not found + /// + protected string StringOrEmpty(JsonDocument document, string path) + { + if (!document.RootElement.TryGetProperty(path, out var element)) + return string.Empty; + + if (element.ValueKind == JsonValueKind.String) + return element.GetString() ?? string.Empty; + else if (element.ValueKind == JsonValueKind.Number) + return element.GetDecimal().ToString(); + + return string.Empty; + } + } +} diff --git a/CryptoExchange.Net/OrderBook/SymbolOrderBook.cs b/CryptoExchange.Net/OrderBook/SymbolOrderBook.cs index b65938e..8fcece4 100644 --- a/CryptoExchange.Net/OrderBook/SymbolOrderBook.cs +++ b/CryptoExchange.Net/OrderBook/SymbolOrderBook.cs @@ -731,7 +731,7 @@ namespace CryptoExchange.Net.OrderBook LastUpdateId = item.EndUpdateId, }); - _logger.OrderBookUpdateBuffered(Api, Symbol, item.StartUpdateId, item.EndUpdateId, item.Asks.Count(), item.Bids.Count()); + _logger.OrderBookUpdateBuffered(Api, Symbol, item.StartUpdateId, item.EndUpdateId, item.Asks.Length, item.Bids.Length); } else { diff --git a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs index d597ca0..6e25cd9 100644 --- a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs +++ b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs @@ -9,7 +9,6 @@ using System; using System.Buffers; using System.Collections.Concurrent; using System.Collections.Generic; -using System.Data.Common; using System.IO; using System.Linq; using System.Net; @@ -299,7 +298,13 @@ namespace CryptoExchange.Net.Sockets _logger.SocketStartingProcessing(Id); SetProcessState(ProcessState.Processing); var sendTask = SendLoopAsync(); - var receiveTask = ReceiveLoopAsync(); + Task receiveTask; +#if !NETSTANDARD2_0 + if (Parameters.UseUpdatedDeserialization) + receiveTask = ReceiveLoopNewAsync(); + else +#endif + receiveTask = ReceiveLoopAsync(); var timeoutTask = Parameters.Timeout != null && Parameters.Timeout > TimeSpan.FromSeconds(0) ? CheckTimeoutAsync() : Task.CompletedTask; await Task.WhenAll(sendTask, receiveTask, timeoutTask).ConfigureAwait(false); _logger.SocketFinishedProcessing(Id); @@ -728,11 +733,161 @@ namespace CryptoExchange.Net.Sockets { _logger.SocketReassembledMessage(Id, multipartStream!.Length); // Get the underlying buffer of the memory stream holding the written data and delimit it (GetBuffer return the full array, not only the written part) - + if (!Parameters.UseUpdatedDeserialization) - await ProcessData(receiveResult.MessageType, new ReadOnlyMemory(multipartStream.GetBuffer(), 0, (int)multipartStream.Length)).ConfigureAwait(false); + await ProcessData(receiveResult.MessageType, new ReadOnlyMemory(buffer.Array!, buffer.Offset, receiveResult.Count)).ConfigureAwait(false); else - ProcessDataNew(receiveResult.MessageType, new ReadOnlySpan(multipartStream.GetBuffer(), 0, (int)multipartStream.Length)); + ProcessDataNew(receiveResult.MessageType, new ReadOnlySpan(buffer.Array!, buffer.Offset, receiveResult.Count)); + } + else + { + _logger.SocketDiscardIncompleteMessage(Id, multipartStream!.Length); + } + } + } + } + catch (Exception e) + { + // Because this is running in a separate task and not awaited until the socket gets closed + // any exception here will crash the receive processing, but do so silently unless the socket gets stopped. + // Make sure we at least let the owner know there was an error + _logger.SocketReceiveLoopStoppedWithException(Id, e); + await (OnError?.Invoke(e) ?? Task.CompletedTask).ConfigureAwait(false); + if (_closeTask?.IsCompleted != false) + _closeTask = CloseInternalAsync(); + } + finally + { + _receiveBufferPool.Return(rentedBuffer, true); + _logger.SocketReceiveLoopFinished(Id); + } + } + +#if !NETSTANDARD2_0 + /// + /// Loop for receiving and reassembling data + /// + /// + private async Task ReceiveLoopNewAsync() + { + byte[] rentedBuffer = _receiveBufferPool.Rent(_receiveBufferSize); + var buffer = new Memory(rentedBuffer); + try + { + while (true) + { + if (_ctsSource.IsCancellationRequested) + break; + + MemoryStream? multipartStream = null; + ValueWebSocketReceiveResult receiveResult = new(); + bool multiPartMessage = false; + while (true) + { + try + { + receiveResult = await _socket.ReceiveAsync(buffer, _ctsSource.Token).ConfigureAwait(false); + lock (_receivedMessagesLock) + _receivedMessages.Add(new ReceiveItem(DateTime.UtcNow, receiveResult.Count)); + } + catch (OperationCanceledException ex) + { + if (ex.InnerException?.InnerException?.Message.Contains("KeepAliveTimeout") == true) + { + // Specific case that the websocket connection got closed because of a ping frame timeout + // Unfortunately doesn't seem to be a nicer way to catch + _logger.SocketPingTimeout(Id); + } + + if (_closeTask?.IsCompleted != false) + _closeTask = CloseInternalAsync(); + + // canceled + break; + } + catch (Exception wse) + { + if (!_ctsSource.Token.IsCancellationRequested && !_stopRequested) + // Connection closed unexpectedly + await (OnError?.Invoke(wse) ?? Task.CompletedTask).ConfigureAwait(false); + + if (_closeTask?.IsCompleted != false) + _closeTask = CloseInternalAsync(); + break; + } + + if (receiveResult.MessageType == WebSocketMessageType.Close) + { + // Connection closed + if (_socket.State == WebSocketState.CloseReceived) + { + // Close received means it server initiated, we should send a confirmation and close the socket + _logger.SocketReceivedCloseMessage(Id, _socket.CloseStatus.ToString()!, _socket.CloseStatusDescription ?? string.Empty); + if (_closeTask?.IsCompleted != false) + _closeTask = CloseInternalAsync(); + } + else + { + // Means the socket is now closed and we were the one initiating it + _logger.SocketReceivedCloseConfirmation(Id, _socket.CloseStatus.ToString()!, _socket.CloseStatusDescription ?? string.Empty); + } + + break; + } + + if (!receiveResult.EndOfMessage) + { + // We received data, but it is not complete, write it to a memory stream for reassembling + multiPartMessage = true; + _logger.SocketReceivedPartialMessage(Id, receiveResult.Count); + + // Write the data to a memory stream to be reassembled later + multipartStream ??= new MemoryStream(); + multipartStream.Write(buffer.Span.Slice(0, receiveResult.Count)); + } + else + { + if (!multiPartMessage) + { + // Received a complete message and it's not multi part + _logger.SocketReceivedSingleMessage(Id, receiveResult.Count); + ProcessDataNew(receiveResult.MessageType, buffer.Span.Slice(0, receiveResult.Count)); + } + else + { + // Received the end of a multipart message, write to memory stream for reassembling + _logger.SocketReceivedPartialMessage(Id, receiveResult.Count); + multipartStream!.Write(buffer.Span.Slice(0, receiveResult.Count)); + } + + break; + } + } + + lock (_receivedMessagesLock) + UpdateReceivedMessages(); + + if (receiveResult.MessageType == WebSocketMessageType.Close) + { + // Received close message + break; + } + + if (_ctsSource.IsCancellationRequested) + { + // Error during receiving or cancellation requested, stop. + break; + } + + if (multiPartMessage) + { + // When the connection gets interrupted we might not have received a full message + if (receiveResult.EndOfMessage == true) + { + _logger.SocketReassembledMessage(Id, multipartStream!.Length); + // Get the underlying buffer of the memory stream holding the written data and delimit it (GetBuffer return the full array, not only the written part) + + ProcessDataNew(receiveResult.MessageType, new ReadOnlySpan(multipartStream.GetBuffer(), 0, (int)multipartStream.Length)); } else { @@ -757,6 +912,7 @@ namespace CryptoExchange.Net.Sockets _logger.SocketReceiveLoopFinished(Id); } } +#endif /// /// Process a stream message diff --git a/CryptoExchange.Net/Sockets/SocketConnection.cs b/CryptoExchange.Net/Sockets/SocketConnection.cs index f7362f1..da388e9 100644 --- a/CryptoExchange.Net/Sockets/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/SocketConnection.cs @@ -546,12 +546,17 @@ namespace CryptoExchange.Net.Sockets { foreach (var subscription in _listeners) { - var handler = subscription.MessageMatcher.GetHandlerLinks(messageIdentifier)?.FirstOrDefault(); - if (handler == null) - continue; + foreach (var link in subscription.MessageMatcher.HandlerLinks) + { + if (!link.Check(messageIdentifier!)) + continue; - deserializationType = handler.DeserializationType; - break; + deserializationType = link.DeserializationType; + break; + } + + if (deserializationType != null) + break; } } @@ -607,9 +612,11 @@ namespace CryptoExchange.Net.Sockets } var subscription = _listeners[i]; - var links = subscription.MessageMatcher.GetHandlerLinks(messageIdentifier!); - foreach (var link in links) + foreach (var link in subscription.MessageMatcher.HandlerLinks) { + if (!link.Check(messageIdentifier!)) + continue; + processed = true; subscription.Handle(this, dataEvent, link); }