1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-12-14 09:51:50 +00:00
This commit is contained in:
Jkorf 2025-11-18 16:37:28 +01:00
parent 3fd69d6e1a
commit 2cf0afa353
6 changed files with 256 additions and 41 deletions

View File

@ -1062,10 +1062,10 @@ namespace CryptoExchange.Net.Clients
/// <summary>
/// Preprocess a stream message
/// </summary>
/// <param name="connection"></param>
/// <param name="type"></param>
/// <param name="data"></param>
/// <returns></returns>
public virtual ReadOnlySpan<byte> PreprocessStreamMessage(SocketConnection connection, WebSocketMessageType type, ReadOnlySpan<byte> data) => data;
/// <summary>
/// Preprocess a stream message
/// </summary>
public virtual ReadOnlyMemory<byte> PreprocessStreamMessage(SocketConnection connection, WebSocketMessageType type, ReadOnlyMemory<byte> data) => data;
/// <summary>

View File

@ -0,0 +1,87 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace CryptoExchange.Net.Converters.MessageParsing.DynamicConverters
{
public class MessageEvaluator
{
public int Priority { get; set; }
public bool ForceIfFound { get; set; }
public MessageFieldReference[] Fields { get; set; }
public Func<SearchResult, string> MessageIdentifier { get; set; }
public Func<SearchResult, Type?> TypeIdentifier { get; set; }
public bool Statisfied(SearchResult result)
{
foreach(var field in Fields)
{
if (!result.Contains(field.Name))
return false;
}
return true;
}
public MessageInfo ProduceMessageInfo(SearchResult result)
{
return new MessageInfo
{
DeserializationType = TypeIdentifier(result),
Identifier = MessageIdentifier(result)
};
}
}
public class MessageFieldReference
{
public int Level { get; set; }
public string Name { get; set; }
public Type Type { get; set; }
}
public class SearchResult
{
public Dictionary<string, string>? _stringValues;
public Dictionary<string, int>? _intValues;
public int GetInt(string name) => _intValues[name];
public string GetString(string name) => _stringValues[name];
public void WriteInt(string name, int value)
{
_intValues ??= new();
_intValues[name] = value;
}
public void WriteString(string name, string value)
{
_stringValues ??= new();
_stringValues[name] = value;
}
public bool Contains(string name)
{
if (_intValues?.ContainsKey(name) == true)
return true;
if (_stringValues?.ContainsKey(name) == true)
return true;
return false;
}
public void Reset()
{
_intValues?.Clear();
_stringValues?.Clear();
}
}
public class MessageEvalutorFieldReference
{
public MessageFieldReference Field { get; set; }
public MessageEvaluator? ForceEvaluator { get; set; }
}
}

View File

@ -1,6 +1,8 @@
using CryptoExchange.Net.Converters.MessageParsing;
using CryptoExchange.Net.Converters.MessageParsing.DynamicConverters;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.WebSockets;
using System.Text.Json;
@ -16,8 +18,117 @@ namespace CryptoExchange.Net.Converters.SystemTextJson
/// </summary>
public abstract JsonSerializerOptions Options { get; }
protected abstract MessageEvaluator[] MessageEvaluators { get; }
private readonly SearchResult _searchResult = new SearchResult();
/// <inheritdoc />
public abstract MessageInfo GetMessageInfo(ReadOnlySpan<byte> data, WebSocketMessageType? webSocketMessageType);
public virtual MessageInfo GetMessageInfo(ReadOnlySpan<byte> data, WebSocketMessageType? webSocketMessageType)
{
InitializeSearch();
_searchResult.Reset();
var reader = new Utf8JsonReader(data);
while (reader.Read())
{
if ((reader.TokenType == JsonTokenType.StartArray
|| reader.TokenType == JsonTokenType.StartObject)
&& reader.CurrentDepth == _maxSearchDepth)
{
reader.Skip();
continue;
}
if (reader.TokenType != JsonTokenType.PropertyName)
continue;
bool written = false;
foreach (var field in _searchFields)
{
if (reader.CurrentDepth == field.Field.Level
&& reader.ValueTextEquals(field.Field.Name))
{
reader.Read();
if (field.Field.Type == typeof(int))
_searchResult.WriteInt(field.Field.Name, reader.GetInt32());
else
_searchResult.WriteString(field.Field.Name, reader.GetString()!);
if (field.ForceEvaluator != null)
{
// Force the immediate return upon encountering this field
return field.ForceEvaluator.ProduceMessageInfo(_searchResult);
}
written = true;
break;
}
}
if (!written)
continue;
if (_topEvaluator.Statisfied(_searchResult))
return _topEvaluator.ProduceMessageInfo(_searchResult);
if (_searchFields.All(x => _searchResult.Contains(x.Field.Name)))
break;
}
foreach (var evaluator in MessageEvaluators)
{
if (evaluator.Statisfied(_searchResult))
return evaluator.ProduceMessageInfo(_searchResult);
}
return new MessageInfo();
}
protected bool _initialized;
protected List<MessageEvalutorFieldReference> _searchFields;
protected int _maxSearchDepth;
protected MessageEvaluator _topEvaluator;
protected void InitializeSearch()
{
if (_initialized)
return;
_maxSearchDepth = int.MinValue;
_searchFields = new List<MessageEvalutorFieldReference>();
foreach (var evaluator in MessageEvaluators.OrderBy(x => x.Priority))
{
_topEvaluator ??= evaluator;
foreach (var field in evaluator.Fields)
{
var existing = _searchFields.SingleOrDefault(x => x.Field.Name == field.Name && x.Field.Level == field.Level);
if (existing != null)
{
if (evaluator.ForceIfFound)
{
if (existing.ForceEvaluator != null)
throw new Exception("Invalid config");
existing.ForceEvaluator = evaluator;
}
}
else
{
_searchFields.Add(new MessageEvalutorFieldReference
{
ForceEvaluator = evaluator.ForceIfFound ? evaluator : null,
Field = field
});
}
if (field.Level > _maxSearchDepth)
_maxSearchDepth = field.Level;
}
}
_initialized = true;
}
/// <inheritdoc />
public virtual object Deserialize(ReadOnlySpan<byte> data, Type type)

View File

@ -363,15 +363,24 @@ namespace CryptoExchange.Net
/// <summary>
/// Decompress using GzipStream
/// </summary>
/// <param name="data"></param>
/// <returns></returns>
public static ReadOnlySpan<byte> DecompressGzip(this ReadOnlySpan<byte> data)
{
using var decompressedStream = new MemoryStream();
using var deflateStream = new GZipStream(new MemoryStream(data.ToArray()), CompressionMode.Decompress);
deflateStream.CopyTo(decompressedStream);
return new ReadOnlySpan<byte>(decompressedStream.GetBuffer(), 0, (int)decompressedStream.Length);
}
/// <summary>
/// Decompress using GzipStream
/// </summary>
public static ReadOnlyMemory<byte> DecompressGzip(this ReadOnlyMemory<byte> data)
{
using var decompressedStream = new MemoryStream();
using var dataStream = MemoryMarshal.TryGetArray(data, out var arraySegment)
? new MemoryStream(arraySegment.Array!, arraySegment.Offset, arraySegment.Count)
: new MemoryStream(data.ToArray());
using var deflateStream = new GZipStream(new MemoryStream(data.ToArray()), CompressionMode.Decompress);
using var deflateStream = new GZipStream(dataStream, CompressionMode.Decompress);
deflateStream.CopyTo(decompressedStream);
return new ReadOnlyMemory<byte>(decompressedStream.GetBuffer(), 0, (int)decompressedStream.Length);
}
@ -381,7 +390,7 @@ namespace CryptoExchange.Net
/// </summary>
/// <param name="input"></param>
/// <returns></returns>
public static ReadOnlyMemory<byte> Decompress(this ReadOnlyMemory<byte> input)
public static ReadOnlySpan<byte> Decompress(this ReadOnlyMemory<byte> input)
{
var output = new MemoryStream();
@ -390,7 +399,7 @@ namespace CryptoExchange.Net
decompressor.CopyTo(output);
output.Position = 0;
return new ReadOnlyMemory<byte>(output.GetBuffer(), 0, (int)output.Length);
return new ReadOnlySpan<byte>(output.GetBuffer(), 0, (int)output.Length);
}
/// <summary>

View File

@ -768,7 +768,6 @@ namespace CryptoExchange.Net.Sockets
{
LastActionTime = DateTime.UtcNow;
_connection.HandleStreamMessage2(type, data);
//await (OnStreamMessage?.Invoke(type, data) ?? Task.CompletedTask).ConfigureAwait(false);
}
/// <summary>

View File

@ -514,7 +514,7 @@ namespace CryptoExchange.Net.Sockets
var receiveTime = DateTime.UtcNow;
// 1. Decrypt/Preprocess if necessary
//data = ApiClient.PreprocessStreamMessage(this, type, data);
data = ApiClient.PreprocessStreamMessage(this, type, data);
IMessageConverter messageConverter;
if (type == WebSocketMessageType.Binary)
@ -530,9 +530,9 @@ namespace CryptoExchange.Net.Sockets
#else
originalData = Encoding.UTF8.GetString(data);
#endif
_logger.ReceivedData(SocketId, originalData);
}
List<IMessageProcessor>? processors = null;
var messageInfo = messageConverter.GetMessageInfo(data, type);
if (messageInfo.DeserializationType == null)
{
@ -545,17 +545,17 @@ namespace CryptoExchange.Net.Sockets
// Couldn't determine deserialization type, try determine the type based on identifier
lock (_listenersLock)
processors = _listeners.ToList();
foreach (var subscription in processors)
{
var handler = subscription.MessageMatcher.GetHandlerLinks(messageInfo.Identifier)?.FirstOrDefault();
if (handler == null)
continue;
foreach (var subscription in _listeners)
{
var handler = subscription.MessageMatcher.GetHandlerLinks(messageInfo.Identifier)?.FirstOrDefault();
if (handler == null)
continue;
_logger.LogTrace("Message type determined based on identifier");
messageInfo.DeserializationType = handler.DeserializationType;
break;
_logger.LogTrace("Message type determined based on identifier");
messageInfo.DeserializationType = handler.DeserializationType;
break;
}
}
if (messageInfo.DeserializationType == null)
@ -569,7 +569,18 @@ namespace CryptoExchange.Net.Sockets
object result;
try
{
result = messageConverter.Deserialize(data, messageInfo.DeserializationType!);
if (messageInfo.DeserializationType == typeof(string))
{
#if NETSTANDARD2_0
result = Encoding.UTF8.GetString(data.ToArray());
#else
result = Encoding.UTF8.GetString(data);
#endif
}
else
{
result = messageConverter.Deserialize(data, messageInfo.DeserializationType!);
}
}
catch(Exception ex)
{
@ -584,26 +595,24 @@ namespace CryptoExchange.Net.Sockets
return;
}
var targetType = messageInfo.DeserializationType!;
if (processors == null)
{
lock (_listenersLock)
processors = _listeners.Where(x => x.DeserializationTypes.Contains(targetType)).ToList();
}
if (processors.Count == 0)
{
// No subscriptions found for type
_logger.LogWarning("No subscriptions found for message. Data: {Message}", Encoding.UTF8.GetString(data.ToArray()));
return;
}
bool processed = false;
var dataEvent = new DataEvent<object>(result, null, null, originalData, receiveTime, null);
foreach (var subscription in processors)
lock (_listenersLock)
{
var links = subscription.MessageMatcher.GetHandlerLinks(messageInfo.Identifier!);
foreach(var link in links)
subscription.Handle(this, dataEvent, link);
foreach (var subscription in _listeners)
{
var links = subscription.MessageMatcher.GetHandlerLinks(messageInfo.Identifier!);
foreach (var link in links)
{
processed = true;
subscription.Handle(this, dataEvent, link);
}
}
}
if (!processed)
{
_logger.ReceivedMessageNotMatchedToAnyListener(SocketId, messageInfo.Identifier!, string.Join(",", _listeners.Select(x => x.MessageMatcher.HandlerLinks.Select(x => x.ToString()))));
}
}