1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-12-14 09:51:50 +00:00
This commit is contained in:
Jkorf 2025-11-19 15:30:11 +01:00
parent 4bbc48a2d4
commit 477a747151
6 changed files with 418 additions and 113 deletions

View File

@ -1,6 +1,5 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace CryptoExchange.Net.Converters.MessageParsing.DynamicConverters
@ -13,7 +12,16 @@ namespace CryptoExchange.Net.Converters.MessageParsing.DynamicConverters
public MessageFieldReference[] Fields { get; set; }
public Func<SearchResult, string> MessageIdentifier { get; set; }
public Func<SearchResult, string> IdentifyMessageCallback { get; set; }
public string? StaticIdentifier { get; set; }
public string? IdentifyMessage(SearchResult result)
{
if (StaticIdentifier != null)
return StaticIdentifier;
return IdentifyMessageCallback(result);
}
public bool Statisfied(SearchResult result)
{
@ -42,25 +50,28 @@ namespace CryptoExchange.Net.Converters.MessageParsing.DynamicConverters
public class PropertyFieldReference : MessageFieldReference
{
public string? PropertyName { get; set; }
public byte[] PropertyName { get; set; }
public PropertyFieldReference(string propertyName) : base(propertyName)
{
PropertyName = propertyName;
PropertyName = Encoding.UTF8.GetBytes(propertyName);
}
}
public class ArrayFieldReference : MessageFieldReference
{
public int? ArrayIndex { get; set; }
public int ArrayIndex { get; set; }
public ArrayFieldReference(string searchName) : base(searchName)
public ArrayFieldReference(string searchName, int depth, int index) : base(searchName)
{
Depth = depth;
ArrayIndex = index;
}
}
public class MessageEvalutorFieldReference
{
public bool SkipReading { get; set; }
public MessageFieldReference Field { get; set; }
public MessageEvaluator? ForceEvaluator { get; set; }
}
@ -69,13 +80,31 @@ namespace CryptoExchange.Net.Converters.MessageParsing.DynamicConverters
{
private List<SearchResultItem> _items = new List<SearchResultItem>();
public string FieldValue(string searchName) => _items.First(x => x.Field.SearchName == searchName).Value;
public string FieldValue(string searchName)
{
foreach(var item in _items)
{
if (item.Field.SearchName == searchName)
return item.Value;
}
throw new Exception(""); // TODO
}
public int Count => _items.Count;
public void Clear() => _items.Clear();
public bool Contains(MessageFieldReference field) => _items.Any(x => x.Field == field);
public bool Contains(MessageFieldReference field)
{
foreach(var item in _items)
{
if (item.Field == field)
return true;
}
return false;
}
public void Write(MessageFieldReference field, string? value) => _items.Add(new SearchResultItem
{

View File

@ -1,5 +1,4 @@
using CryptoExchange.Net.Converters.MessageParsing;
using CryptoExchange.Net.Converters.MessageParsing.DynamicConverters;
using CryptoExchange.Net.Converters.MessageParsing.DynamicConverters;
using System;
using System.Collections.Generic;
using System.Linq;
@ -9,7 +8,7 @@ using System.Text.Json;
namespace CryptoExchange.Net.Converters.SystemTextJson
{
/// <summary>
/// JSON message converter
/// JSON message converter, sequentially read the json and looks for specific prefdefined fields to identify the message
/// </summary>
public abstract class DynamicJsonConverter : IMessageConverter
{
@ -18,14 +17,109 @@ namespace CryptoExchange.Net.Converters.SystemTextJson
/// </summary>
public abstract JsonSerializerOptions Options { get; }
/// <summary>
/// Message evaluators
/// </summary>
protected abstract MessageEvaluator[] MessageEvaluators { get; }
private readonly SearchResult _searchResult = new();
private bool _hasArraySearches;
private bool _initialized;
private int _maxSearchDepth;
private bool _overlappingFields;
private MessageEvaluator? _topEvaluator;
private List<MessageEvalutorFieldReference>? _searchFields;
private void InitializeConverter()
{
if (_initialized)
return;
_maxSearchDepth = int.MinValue;
_searchFields = new List<MessageEvalutorFieldReference>();
foreach (var evaluator in MessageEvaluators.OrderBy(x => x.Priority))
{
_topEvaluator ??= evaluator;
foreach (var field in evaluator.Fields)
{
if (MessageEvaluators.Where(x => x != evaluator).SelectMany(x => x.Fields).Any(otherField =>
{
if (field is PropertyFieldReference propRef
&& otherField is PropertyFieldReference otherPropRef)
{
return field.Depth == otherPropRef.Depth && propRef.PropertyName == otherPropRef.PropertyName;
}
else if (field is ArrayFieldReference arrayRef
&& otherField is ArrayFieldReference otherArrayPropRef)
{
return field.Depth == otherArrayPropRef.Depth && arrayRef.ArrayIndex == otherArrayPropRef.ArrayIndex;
}
return false;
}))
{
_overlappingFields = true;
}
MessageEvalutorFieldReference? existing = null;
if (field is ArrayFieldReference arrayField)
{
_hasArraySearches = true;
existing = _searchFields.SingleOrDefault(x =>
x.Field is ArrayFieldReference arrayFieldRef
&& arrayFieldRef.ArrayIndex == arrayField.ArrayIndex
&& arrayFieldRef.Depth == arrayField.Depth
&& (arrayFieldRef.Constraint == null && arrayField.Constraint == null));
}
else if (field is PropertyFieldReference propField)
{
existing = _searchFields.SingleOrDefault(x =>
x.Field is PropertyFieldReference propFieldRef
&& propFieldRef.PropertyName == propField.PropertyName
&& propFieldRef.Depth == propField.Depth
&& (propFieldRef.Constraint == null && propFieldRef.Constraint == null));
}
if (existing != null)
{
if (existing.SkipReading == true
&& (evaluator.IdentifyMessageCallback != null
|| field.Constraint != null))
{
existing.SkipReading = false;
}
if (evaluator.ForceIfFound)
{
if (evaluator.Fields.Length > 1 || existing.ForceEvaluator != null)
throw new Exception("Invalid config");
existing.ForceEvaluator = evaluator;
}
}
else
{
_searchFields.Add(new MessageEvalutorFieldReference
{
SkipReading = evaluator.IdentifyMessageCallback == null && field.Constraint == null,
ForceEvaluator = evaluator.ForceIfFound ? evaluator : null,
Field = field
});
}
if (field.Depth > _maxSearchDepth)
_maxSearchDepth = field.Depth;
}
}
_initialized = true;
}
/// <inheritdoc />
public virtual string? GetMessageIdentifier(ReadOnlySpan<byte> data, WebSocketMessageType? webSocketMessageType)
{
InitializeSearch();
InitializeConverter();
int? arrayIndex = null;
@ -43,7 +137,7 @@ namespace CryptoExchange.Net.Converters.SystemTextJson
}
if (reader.TokenType == JsonTokenType.StartArray)
arrayIndex = 0;
arrayIndex = -1;
else if (reader.TokenType == JsonTokenType.EndArray)
arrayIndex = null;
else if (arrayIndex != null)
@ -53,23 +147,37 @@ namespace CryptoExchange.Net.Converters.SystemTextJson
|| arrayIndex != null && _hasArraySearches)
{
bool written = false;
foreach (var field in _searchFields)
string? value = null;
byte[]? propName = null;
foreach (var field in _searchFields!)
{
if (field.Field.Depth != reader.CurrentDepth)
continue;
if (field.Field is PropertyFieldReference propFieldRef)
{
if (reader.TokenType != JsonTokenType.PropertyName)
continue;
if (propName == null)
{
if (reader.TokenType != JsonTokenType.PropertyName)
continue;
if (!reader.ValueTextEquals(propFieldRef.PropertyName))
continue;
if (!reader.ValueTextEquals(propFieldRef.PropertyName))
continue;
reader.Read();
propName = propFieldRef.PropertyName;
reader.Read();
}
else if (!propFieldRef.PropertyName.SequenceEqual(propName))
{
continue;
}
}
else if(field.Field is ArrayFieldReference arrayFieldRef)
else if (field.Field is ArrayFieldReference arrayFieldRef)
{
if (propName != null)
continue;
if (reader.TokenType == JsonTokenType.PropertyName)
continue;
@ -77,39 +185,48 @@ namespace CryptoExchange.Net.Converters.SystemTextJson
continue;
}
string? value = null;
if (reader.TokenType == JsonTokenType.Number)
value = reader.GetDecimal().ToString();
else if (reader.TokenType == JsonTokenType.String)
value = reader.GetString()!;
else if (reader.TokenType == JsonTokenType.Null)
value = null;
else
continue;
if (field.Field.Constraint != null
&& !field.Field.Constraint(value))
if (!field.SkipReading)
{
continue;
if (value == null)
{
if (reader.TokenType == JsonTokenType.Number)
value = reader.GetDecimal().ToString();
else if (reader.TokenType == JsonTokenType.String)
value = reader.GetString()!;
else if (reader.TokenType == JsonTokenType.Null)
value = null;
else
continue;
}
if (field.Field.Constraint != null
&& !field.Field.Constraint(value))
{
continue;
}
}
_searchResult.Write(field.Field, value);
if (field.ForceEvaluator != null)
{
if (field.ForceEvaluator.StaticIdentifier != null)
return field.ForceEvaluator.StaticIdentifier;
// Force the immediate return upon encountering this field
return field.ForceEvaluator.MessageIdentifier(_searchResult);
return field.ForceEvaluator.IdentifyMessage(_searchResult);
}
written = true;
break;
if (!_overlappingFields)
break;
}
if (!written)
continue;
if (_topEvaluator.Statisfied(_searchResult))
return _topEvaluator.MessageIdentifier(_searchResult);
if (_topEvaluator!.Statisfied(_searchResult))
return _topEvaluator.IdentifyMessage(_searchResult);
if (_searchFields.Count == _searchResult.Count)
break;
@ -119,74 +236,12 @@ namespace CryptoExchange.Net.Converters.SystemTextJson
foreach (var evaluator in MessageEvaluators)
{
if (evaluator.Statisfied(_searchResult))
return evaluator.MessageIdentifier(_searchResult);
return evaluator.IdentifyMessage(_searchResult);
}
return null;
}
protected bool _hasArraySearches;
protected bool _initialized;
protected List<MessageEvalutorFieldReference> _searchFields;
protected int _maxSearchDepth;
protected MessageEvaluator _topEvaluator;
protected void InitializeSearch()
{
if (_initialized)
return;
_maxSearchDepth = int.MinValue;
_searchFields = new List<MessageEvalutorFieldReference>();
foreach (var evaluator in MessageEvaluators.OrderBy(x => x.Priority))
{
_topEvaluator ??= evaluator;
foreach (var field in evaluator.Fields)
{
MessageEvalutorFieldReference? existing = null;
if (field is ArrayFieldReference arrayField)
{
_hasArraySearches = true;
existing = _searchFields.SingleOrDefault(x =>
x.Field is ArrayFieldReference arrayFieldRef
&& arrayFieldRef.ArrayIndex == arrayFieldRef.ArrayIndex
&& arrayFieldRef.Depth == arrayFieldRef.Depth);
}
else if(field is PropertyFieldReference propField)
{
existing = _searchFields.SingleOrDefault(x =>
x.Field is PropertyFieldReference propFieldRef
&& propFieldRef.PropertyName == propField.PropertyName
&& propFieldRef.Depth == propField.Depth);
}
if (existing != null)
{
if (evaluator.ForceIfFound)
{
if (existing.ForceEvaluator != null)
throw new Exception("Invalid config");
existing.ForceEvaluator = evaluator;
}
}
else
{
_searchFields.Add(new MessageEvalutorFieldReference
{
ForceEvaluator = evaluator.ForceIfFound ? evaluator : null,
Field = field
});
}
if (field.Depth > _maxSearchDepth)
_maxSearchDepth = field.Depth;
}
}
_initialized = true;
}
/// <inheritdoc />
public virtual object Deserialize(ReadOnlySpan<byte> data, Type type)
{

View File

@ -0,0 +1,58 @@
using CryptoExchange.Net.Converters.MessageParsing.DynamicConverters;
using System;
using System.Net.WebSockets;
using System.Text.Json;
namespace CryptoExchange.Net.Converters.SystemTextJson
{
/// <summary>
/// JSON message converter, reads the json data info a JsonDocument after which the data can be inspected to identify the message
/// </summary>
public abstract class PreloadJsonConverter : IMessageConverter
{
/// <summary>
/// The serializer options to use
/// </summary>
public abstract JsonSerializerOptions Options { get; }
/// <inheritdoc />
public virtual string? GetMessageIdentifier(ReadOnlySpan<byte> data, WebSocketMessageType? webSocketMessageType)
{
var reader = new Utf8JsonReader(data);
var jsonDocument = JsonDocument.ParseValue(ref reader);
return GetMessageIdentifier(jsonDocument);
}
/// <summary>
/// Get the message identifier for this document
/// </summary>
protected abstract string? GetMessageIdentifier(JsonDocument docuement);
/// <inheritdoc />
public virtual object Deserialize(ReadOnlySpan<byte> data, Type type)
{
#pragma warning disable IL2026 // Members annotated with 'RequiresUnreferencedCodeAttribute' require dynamic access otherwise can break functionality when trimming application code
#pragma warning disable IL3050 // Calling members annotated with 'RequiresDynamicCodeAttribute' may break functionality when AOT compiling.
return JsonSerializer.Deserialize(data, type, Options)!;
#pragma warning restore IL3050 // Calling members annotated with 'RequiresDynamicCodeAttribute' may break functionality when AOT compiling.
#pragma warning restore IL2026 // Members annotated with 'RequiresUnreferencedCodeAttribute' require dynamic access otherwise can break functionality when trimming application code
}
/// <summary>
/// Get the string value for a path, or an emtpy string if not found
/// </summary>
protected string StringOrEmpty(JsonDocument document, string path)
{
if (!document.RootElement.TryGetProperty(path, out var element))
return string.Empty;
if (element.ValueKind == JsonValueKind.String)
return element.GetString() ?? string.Empty;
else if (element.ValueKind == JsonValueKind.Number)
return element.GetDecimal().ToString();
return string.Empty;
}
}
}

View File

@ -731,7 +731,7 @@ namespace CryptoExchange.Net.OrderBook
LastUpdateId = item.EndUpdateId,
});
_logger.OrderBookUpdateBuffered(Api, Symbol, item.StartUpdateId, item.EndUpdateId, item.Asks.Count(), item.Bids.Count());
_logger.OrderBookUpdateBuffered(Api, Symbol, item.StartUpdateId, item.EndUpdateId, item.Asks.Length, item.Bids.Length);
}
else
{

View File

@ -9,7 +9,6 @@ using System;
using System.Buffers;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Data.Common;
using System.IO;
using System.Linq;
using System.Net;
@ -299,7 +298,13 @@ namespace CryptoExchange.Net.Sockets
_logger.SocketStartingProcessing(Id);
SetProcessState(ProcessState.Processing);
var sendTask = SendLoopAsync();
var receiveTask = ReceiveLoopAsync();
Task receiveTask;
#if !NETSTANDARD2_0
if (Parameters.UseUpdatedDeserialization)
receiveTask = ReceiveLoopNewAsync();
else
#endif
receiveTask = ReceiveLoopAsync();
var timeoutTask = Parameters.Timeout != null && Parameters.Timeout > TimeSpan.FromSeconds(0) ? CheckTimeoutAsync() : Task.CompletedTask;
await Task.WhenAll(sendTask, receiveTask, timeoutTask).ConfigureAwait(false);
_logger.SocketFinishedProcessing(Id);
@ -728,11 +733,161 @@ namespace CryptoExchange.Net.Sockets
{
_logger.SocketReassembledMessage(Id, multipartStream!.Length);
// Get the underlying buffer of the memory stream holding the written data and delimit it (GetBuffer return the full array, not only the written part)
if (!Parameters.UseUpdatedDeserialization)
await ProcessData(receiveResult.MessageType, new ReadOnlyMemory<byte>(multipartStream.GetBuffer(), 0, (int)multipartStream.Length)).ConfigureAwait(false);
await ProcessData(receiveResult.MessageType, new ReadOnlyMemory<byte>(buffer.Array!, buffer.Offset, receiveResult.Count)).ConfigureAwait(false);
else
ProcessDataNew(receiveResult.MessageType, new ReadOnlySpan<byte>(multipartStream.GetBuffer(), 0, (int)multipartStream.Length));
ProcessDataNew(receiveResult.MessageType, new ReadOnlySpan<byte>(buffer.Array!, buffer.Offset, receiveResult.Count));
}
else
{
_logger.SocketDiscardIncompleteMessage(Id, multipartStream!.Length);
}
}
}
}
catch (Exception e)
{
// Because this is running in a separate task and not awaited until the socket gets closed
// any exception here will crash the receive processing, but do so silently unless the socket gets stopped.
// Make sure we at least let the owner know there was an error
_logger.SocketReceiveLoopStoppedWithException(Id, e);
await (OnError?.Invoke(e) ?? Task.CompletedTask).ConfigureAwait(false);
if (_closeTask?.IsCompleted != false)
_closeTask = CloseInternalAsync();
}
finally
{
_receiveBufferPool.Return(rentedBuffer, true);
_logger.SocketReceiveLoopFinished(Id);
}
}
#if !NETSTANDARD2_0
/// <summary>
/// Loop for receiving and reassembling data
/// </summary>
/// <returns></returns>
private async Task ReceiveLoopNewAsync()
{
byte[] rentedBuffer = _receiveBufferPool.Rent(_receiveBufferSize);
var buffer = new Memory<byte>(rentedBuffer);
try
{
while (true)
{
if (_ctsSource.IsCancellationRequested)
break;
MemoryStream? multipartStream = null;
ValueWebSocketReceiveResult receiveResult = new();
bool multiPartMessage = false;
while (true)
{
try
{
receiveResult = await _socket.ReceiveAsync(buffer, _ctsSource.Token).ConfigureAwait(false);
lock (_receivedMessagesLock)
_receivedMessages.Add(new ReceiveItem(DateTime.UtcNow, receiveResult.Count));
}
catch (OperationCanceledException ex)
{
if (ex.InnerException?.InnerException?.Message.Contains("KeepAliveTimeout") == true)
{
// Specific case that the websocket connection got closed because of a ping frame timeout
// Unfortunately doesn't seem to be a nicer way to catch
_logger.SocketPingTimeout(Id);
}
if (_closeTask?.IsCompleted != false)
_closeTask = CloseInternalAsync();
// canceled
break;
}
catch (Exception wse)
{
if (!_ctsSource.Token.IsCancellationRequested && !_stopRequested)
// Connection closed unexpectedly
await (OnError?.Invoke(wse) ?? Task.CompletedTask).ConfigureAwait(false);
if (_closeTask?.IsCompleted != false)
_closeTask = CloseInternalAsync();
break;
}
if (receiveResult.MessageType == WebSocketMessageType.Close)
{
// Connection closed
if (_socket.State == WebSocketState.CloseReceived)
{
// Close received means it server initiated, we should send a confirmation and close the socket
_logger.SocketReceivedCloseMessage(Id, _socket.CloseStatus.ToString()!, _socket.CloseStatusDescription ?? string.Empty);
if (_closeTask?.IsCompleted != false)
_closeTask = CloseInternalAsync();
}
else
{
// Means the socket is now closed and we were the one initiating it
_logger.SocketReceivedCloseConfirmation(Id, _socket.CloseStatus.ToString()!, _socket.CloseStatusDescription ?? string.Empty);
}
break;
}
if (!receiveResult.EndOfMessage)
{
// We received data, but it is not complete, write it to a memory stream for reassembling
multiPartMessage = true;
_logger.SocketReceivedPartialMessage(Id, receiveResult.Count);
// Write the data to a memory stream to be reassembled later
multipartStream ??= new MemoryStream();
multipartStream.Write(buffer.Span.Slice(0, receiveResult.Count));
}
else
{
if (!multiPartMessage)
{
// Received a complete message and it's not multi part
_logger.SocketReceivedSingleMessage(Id, receiveResult.Count);
ProcessDataNew(receiveResult.MessageType, buffer.Span.Slice(0, receiveResult.Count));
}
else
{
// Received the end of a multipart message, write to memory stream for reassembling
_logger.SocketReceivedPartialMessage(Id, receiveResult.Count);
multipartStream!.Write(buffer.Span.Slice(0, receiveResult.Count));
}
break;
}
}
lock (_receivedMessagesLock)
UpdateReceivedMessages();
if (receiveResult.MessageType == WebSocketMessageType.Close)
{
// Received close message
break;
}
if (_ctsSource.IsCancellationRequested)
{
// Error during receiving or cancellation requested, stop.
break;
}
if (multiPartMessage)
{
// When the connection gets interrupted we might not have received a full message
if (receiveResult.EndOfMessage == true)
{
_logger.SocketReassembledMessage(Id, multipartStream!.Length);
// Get the underlying buffer of the memory stream holding the written data and delimit it (GetBuffer return the full array, not only the written part)
ProcessDataNew(receiveResult.MessageType, new ReadOnlySpan<byte>(multipartStream.GetBuffer(), 0, (int)multipartStream.Length));
}
else
{
@ -757,6 +912,7 @@ namespace CryptoExchange.Net.Sockets
_logger.SocketReceiveLoopFinished(Id);
}
}
#endif
/// <summary>
/// Process a stream message

View File

@ -546,12 +546,17 @@ namespace CryptoExchange.Net.Sockets
{
foreach (var subscription in _listeners)
{
var handler = subscription.MessageMatcher.GetHandlerLinks(messageIdentifier)?.FirstOrDefault();
if (handler == null)
continue;
foreach (var link in subscription.MessageMatcher.HandlerLinks)
{
if (!link.Check(messageIdentifier!))
continue;
deserializationType = handler.DeserializationType;
break;
deserializationType = link.DeserializationType;
break;
}
if (deserializationType != null)
break;
}
}
@ -607,9 +612,11 @@ namespace CryptoExchange.Net.Sockets
}
var subscription = _listeners[i];
var links = subscription.MessageMatcher.GetHandlerLinks(messageIdentifier!);
foreach (var link in links)
foreach (var link in subscription.MessageMatcher.HandlerLinks)
{
if (!link.Check(messageIdentifier!))
continue;
processed = true;
subscription.Handle(this, dataEvent, link);
}