mirror of
https://github.com/JKorf/CryptoExchange.Net
synced 2025-06-26 17:26:24 +00:00
wip
This commit is contained in:
parent
0e7d49991a
commit
08eadc6aaa
@ -119,7 +119,7 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations
|
|||||||
|
|
||||||
public override string GetListenerIdentifier(IMessageAccessor message)
|
public override string GetListenerIdentifier(IMessageAccessor message)
|
||||||
{
|
{
|
||||||
if (!message.IsJson)
|
if (!message.IsValid)
|
||||||
{
|
{
|
||||||
return "topic";
|
return "topic";
|
||||||
}
|
}
|
||||||
|
@ -465,10 +465,13 @@ namespace CryptoExchange.Net.Authentication
|
|||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
protected static string GetSerializedBody(IMessageSerializer serializer, IDictionary<string, object> parameters)
|
protected static string GetSerializedBody(IMessageSerializer serializer, IDictionary<string, object> parameters)
|
||||||
{
|
{
|
||||||
|
if (serializer is not IStringMessageSerializer stringSerializer)
|
||||||
|
throw new InvalidOperationException("Non-string message serializer can't get serialized request body");
|
||||||
|
|
||||||
if (parameters.Count == 1 && parameters.TryGetValue(Constants.BodyPlaceHolderKey, out object? value))
|
if (parameters.Count == 1 && parameters.TryGetValue(Constants.BodyPlaceHolderKey, out object? value))
|
||||||
return serializer.Serialize(value);
|
return stringSerializer.Serialize(value);
|
||||||
else
|
else
|
||||||
return serializer.Serialize(parameters);
|
return stringSerializer.Serialize(parameters);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -16,6 +16,7 @@ using CryptoExchange.Net.RateLimiting;
|
|||||||
using CryptoExchange.Net.RateLimiting.Interfaces;
|
using CryptoExchange.Net.RateLimiting.Interfaces;
|
||||||
using CryptoExchange.Net.Requests;
|
using CryptoExchange.Net.Requests;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
|
using ProtoBuf;
|
||||||
|
|
||||||
namespace CryptoExchange.Net.Clients
|
namespace CryptoExchange.Net.Clients
|
||||||
{
|
{
|
||||||
@ -603,12 +604,16 @@ namespace CryptoExchange.Net.Clients
|
|||||||
{
|
{
|
||||||
if (contentType == Constants.JsonContentHeader)
|
if (contentType == Constants.JsonContentHeader)
|
||||||
{
|
{
|
||||||
|
var serializer = CreateSerializer();
|
||||||
|
if (serializer is not IStringMessageSerializer stringSerializer)
|
||||||
|
throw new InvalidOperationException("Non-string message serializer can't get serialized request body");
|
||||||
|
|
||||||
// Write the parameters as json in the body
|
// Write the parameters as json in the body
|
||||||
string stringData;
|
string stringData;
|
||||||
if (parameters.Count == 1 && parameters.TryGetValue(Constants.BodyPlaceHolderKey, out object? value))
|
if (parameters.Count == 1 && parameters.TryGetValue(Constants.BodyPlaceHolderKey, out object? value))
|
||||||
stringData = CreateSerializer().Serialize(value);
|
stringData = stringSerializer.Serialize(value);
|
||||||
else
|
else
|
||||||
stringData = CreateSerializer().Serialize(parameters);
|
stringData = stringSerializer.Serialize(parameters);
|
||||||
request.SetContent(stringData, contentType);
|
request.SetContent(stringData, contentType);
|
||||||
}
|
}
|
||||||
else if (contentType == Constants.FormContentHeader)
|
else if (contentType == Constants.FormContentHeader)
|
||||||
|
@ -138,7 +138,7 @@ namespace CryptoExchange.Net.Clients
|
|||||||
/// Create a message accessor instance
|
/// Create a message accessor instance
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
protected internal abstract IByteMessageAccessor CreateAccessor();
|
protected internal abstract IByteMessageAccessor CreateAccessor(WebSocketMessageType messageType);
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Create a serializer instance
|
/// Create a serializer instance
|
||||||
|
@ -0,0 +1,308 @@
|
|||||||
|
using CryptoExchange.Net.Converters.MessageParsing;
|
||||||
|
using CryptoExchange.Net.Interfaces;
|
||||||
|
using CryptoExchange.Net.Objects;
|
||||||
|
using ProtoBuf;
|
||||||
|
using System;
|
||||||
|
using System.IO;
|
||||||
|
using System.Runtime.InteropServices.ComTypes;
|
||||||
|
using System.Text;
|
||||||
|
using System.Text.Json;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
|
namespace CryptoExchange.Net.Converters.Protobuf
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// System.Text.Json message accessor
|
||||||
|
/// </summary>
|
||||||
|
public abstract class ProtobufMessageAccessor<T> : IMessageAccessor
|
||||||
|
{
|
||||||
|
protected T? _intermediateType;
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public bool IsValid { get; set; }
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public abstract bool OriginalDataAvailable { get; }
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public object? Underlying => throw new NotImplementedException();
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// ctor
|
||||||
|
/// </summary>
|
||||||
|
public ProtobufMessageAccessor()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public NodeType? GetNodeType()
|
||||||
|
{
|
||||||
|
throw new Exception("");
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public NodeType? GetNodeType(MessagePath path)
|
||||||
|
{
|
||||||
|
object value = _intermediateType;
|
||||||
|
foreach (var step in path)
|
||||||
|
{
|
||||||
|
if (step.Type == 0)
|
||||||
|
{
|
||||||
|
// array index
|
||||||
|
}
|
||||||
|
else if (step.Type == 1)
|
||||||
|
{
|
||||||
|
// property value
|
||||||
|
value = value.GetType().GetProperty(step.Property).GetValue(value);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// property name
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var valueType = value.GetType();
|
||||||
|
if (valueType.IsArray)
|
||||||
|
return NodeType.Array;
|
||||||
|
|
||||||
|
if (IsSimple(valueType))
|
||||||
|
return NodeType.Value;
|
||||||
|
|
||||||
|
return NodeType.Object;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static bool IsSimple(Type type)
|
||||||
|
{
|
||||||
|
if (type.IsGenericType && type.GetGenericTypeDefinition() == typeof(Nullable<>))
|
||||||
|
{
|
||||||
|
// nullable type, check if the nested type is simple.
|
||||||
|
return IsSimple(type.GetGenericArguments()[0]);
|
||||||
|
}
|
||||||
|
return type.IsPrimitive
|
||||||
|
|| type.IsEnum
|
||||||
|
|| type == typeof(string)
|
||||||
|
|| type == typeof(decimal);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public T? GetValue<T>(MessagePath path)
|
||||||
|
{
|
||||||
|
object value = _intermediateType;
|
||||||
|
foreach(var step in path)
|
||||||
|
{
|
||||||
|
if (step.Type == 0)
|
||||||
|
{
|
||||||
|
// array index
|
||||||
|
}
|
||||||
|
else if (step.Type == 1)
|
||||||
|
{
|
||||||
|
// property value
|
||||||
|
value = value.GetType().GetProperty(step.Property)?.GetValue(value);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// property name
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return (T?)value;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public T?[]? GetValues<T>(MessagePath path)
|
||||||
|
{
|
||||||
|
throw new Exception("");
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public abstract string GetOriginalString();
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public abstract void Clear();
|
||||||
|
|
||||||
|
public abstract CallResult<object> Deserialize(Type type, MessagePath? path = null);
|
||||||
|
public abstract CallResult<T1> Deserialize<T1>(MessagePath? path = null);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// System.Text.Json stream message accessor
|
||||||
|
/// </summary>
|
||||||
|
public class ProtobufStreamMessageAccessor<T> : ProtobufMessageAccessor<T>, IStreamMessageAccessor
|
||||||
|
{
|
||||||
|
private Stream? _stream;
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public override bool OriginalDataAvailable => _stream?.CanSeek == true;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// ctor
|
||||||
|
/// </summary>
|
||||||
|
public ProtobufStreamMessageAccessor(): base()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public override CallResult<object> Deserialize(Type type, MessagePath? path = null)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var result = Serializer.Deserialize(type, _stream);
|
||||||
|
return new CallResult<object>(result);
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
return new CallResult<object>(new DeserializeError(ex.Message));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public override CallResult<T> Deserialize<T>(MessagePath? path = null)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var result = Serializer.Deserialize<T>(_stream);
|
||||||
|
return new CallResult<T>(result);
|
||||||
|
}
|
||||||
|
catch(Exception ex)
|
||||||
|
{
|
||||||
|
return new CallResult<T>(new DeserializeError(ex.Message));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public async Task<CallResult> Read(Stream stream, bool bufferStream)
|
||||||
|
{
|
||||||
|
if (bufferStream && stream is not MemoryStream)
|
||||||
|
{
|
||||||
|
// We need to be buffer the stream, and it's not currently a seekable stream, so copy it to a new memory stream
|
||||||
|
_stream = new MemoryStream();
|
||||||
|
stream.CopyTo(_stream);
|
||||||
|
_stream.Position = 0;
|
||||||
|
}
|
||||||
|
else if (bufferStream)
|
||||||
|
{
|
||||||
|
// We need to buffer the stream, and the current stream is seekable, store as is
|
||||||
|
_stream = stream;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// We don't need to buffer the stream, so don't bother keeping the reference
|
||||||
|
}
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
_intermediateType = Serializer.Deserialize<T>(_stream);
|
||||||
|
IsValid = true;
|
||||||
|
return CallResult.SuccessResult;
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
// Not a json message
|
||||||
|
IsValid = false;
|
||||||
|
return new CallResult(new DeserializeError("JsonError: " + ex.Message, ex));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public override string GetOriginalString()
|
||||||
|
{
|
||||||
|
if (_stream is null)
|
||||||
|
throw new NullReferenceException("Stream not initialized");
|
||||||
|
|
||||||
|
_stream.Position = 0;
|
||||||
|
using var textReader = new StreamReader(_stream, Encoding.UTF8, false, 1024, true);
|
||||||
|
return textReader.ReadToEnd();
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public override void Clear()
|
||||||
|
{
|
||||||
|
_stream?.Dispose();
|
||||||
|
_stream = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Protobuf byte message accessor
|
||||||
|
/// </summary>
|
||||||
|
public class ProtobufByteMessageAccessor<T> : ProtobufMessageAccessor<T>, IByteMessageAccessor
|
||||||
|
{
|
||||||
|
private ReadOnlyMemory<byte> _bytes;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// ctor
|
||||||
|
/// </summary>
|
||||||
|
public ProtobufByteMessageAccessor() : base()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public override CallResult<object> Deserialize(Type type, MessagePath? path = null)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
using var stream = new MemoryStream(_bytes.ToArray());
|
||||||
|
var result = Serializer.Deserialize(type, stream);
|
||||||
|
return new CallResult<object>(result);
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
return new CallResult<object>(new DeserializeError(ex.Message));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public override CallResult<T> Deserialize<T>(MessagePath? path = null)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var result = Serializer.Deserialize<T>(_bytes);
|
||||||
|
return new CallResult<T>(result);
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
return new CallResult<T>(new DeserializeError(ex.Message));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public CallResult Read(ReadOnlyMemory<byte> data)
|
||||||
|
{
|
||||||
|
_bytes = data;
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
_intermediateType = Serializer.Deserialize<T>(data);
|
||||||
|
IsValid = true;
|
||||||
|
return CallResult.SuccessResult;
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
// Not a json message
|
||||||
|
IsValid = false;
|
||||||
|
return new CallResult(new DeserializeError("JsonError: " + ex.Message, ex));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public override string GetOriginalString() =>
|
||||||
|
// NetStandard 2.0 doesn't support GetString from a ReadonlySpan<byte>, so use ToArray there instead
|
||||||
|
#if NETSTANDARD2_0
|
||||||
|
Encoding.UTF8.GetString(_bytes.ToArray());
|
||||||
|
#else
|
||||||
|
Encoding.UTF8.GetString(_bytes.Span);
|
||||||
|
#endif
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public override bool OriginalDataAvailable => true;
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public override void Clear()
|
||||||
|
{
|
||||||
|
_bytes = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,29 @@
|
|||||||
|
using CryptoExchange.Net.Interfaces;
|
||||||
|
using ProtoBuf.Meta;
|
||||||
|
using System.IO;
|
||||||
|
using System.Reflection;
|
||||||
|
|
||||||
|
namespace CryptoExchange.Net.Converters.Protobuf
|
||||||
|
{
|
||||||
|
/// <inheritdoc />
|
||||||
|
public class ProtobufMessageSerializer : IByteMessageSerializer
|
||||||
|
{
|
||||||
|
private readonly RuntimeTypeModel _model = RuntimeTypeModel.Create("CryptoExchange");
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// ctor
|
||||||
|
/// </summary>
|
||||||
|
public ProtobufMessageSerializer()
|
||||||
|
{
|
||||||
|
_model.UseImplicitZeroDefaults = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public byte[] Serialize<T>(T message)
|
||||||
|
{
|
||||||
|
using var memoryStream = new MemoryStream();
|
||||||
|
_model.Serialize(memoryStream, message);
|
||||||
|
return memoryStream.ToArray();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -24,7 +24,7 @@ namespace CryptoExchange.Net.Converters.SystemTextJson
|
|||||||
private readonly JsonSerializerOptions? _customSerializerOptions;
|
private readonly JsonSerializerOptions? _customSerializerOptions;
|
||||||
|
|
||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
public bool IsJson { get; set; }
|
public bool IsValid { get; set; }
|
||||||
|
|
||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
public abstract bool OriginalDataAvailable { get; }
|
public abstract bool OriginalDataAvailable { get; }
|
||||||
@ -47,7 +47,7 @@ namespace CryptoExchange.Net.Converters.SystemTextJson
|
|||||||
#endif
|
#endif
|
||||||
public CallResult<object> Deserialize(Type type, MessagePath? path = null)
|
public CallResult<object> Deserialize(Type type, MessagePath? path = null)
|
||||||
{
|
{
|
||||||
if (!IsJson)
|
if (!IsValid)
|
||||||
return new CallResult<object>(GetOriginalString());
|
return new CallResult<object>(GetOriginalString());
|
||||||
|
|
||||||
if (_document == null)
|
if (_document == null)
|
||||||
@ -100,7 +100,7 @@ namespace CryptoExchange.Net.Converters.SystemTextJson
|
|||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
public NodeType? GetNodeType()
|
public NodeType? GetNodeType()
|
||||||
{
|
{
|
||||||
if (!IsJson)
|
if (!IsValid)
|
||||||
throw new InvalidOperationException("Can't access json data on non-json message");
|
throw new InvalidOperationException("Can't access json data on non-json message");
|
||||||
|
|
||||||
if (_document == null)
|
if (_document == null)
|
||||||
@ -117,7 +117,7 @@ namespace CryptoExchange.Net.Converters.SystemTextJson
|
|||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
public NodeType? GetNodeType(MessagePath path)
|
public NodeType? GetNodeType(MessagePath path)
|
||||||
{
|
{
|
||||||
if (!IsJson)
|
if (!IsValid)
|
||||||
throw new InvalidOperationException("Can't access json data on non-json message");
|
throw new InvalidOperationException("Can't access json data on non-json message");
|
||||||
|
|
||||||
var node = GetPathNode(path);
|
var node = GetPathNode(path);
|
||||||
@ -139,7 +139,7 @@ namespace CryptoExchange.Net.Converters.SystemTextJson
|
|||||||
#endif
|
#endif
|
||||||
public T? GetValue<T>(MessagePath path)
|
public T? GetValue<T>(MessagePath path)
|
||||||
{
|
{
|
||||||
if (!IsJson)
|
if (!IsValid)
|
||||||
throw new InvalidOperationException("Can't access json data on non-json message");
|
throw new InvalidOperationException("Can't access json data on non-json message");
|
||||||
|
|
||||||
var value = GetPathNode(path);
|
var value = GetPathNode(path);
|
||||||
@ -173,7 +173,7 @@ namespace CryptoExchange.Net.Converters.SystemTextJson
|
|||||||
#endif
|
#endif
|
||||||
public T?[]? GetValues<T>(MessagePath path)
|
public T?[]? GetValues<T>(MessagePath path)
|
||||||
{
|
{
|
||||||
if (!IsJson)
|
if (!IsValid)
|
||||||
throw new InvalidOperationException("Can't access json data on non-json message");
|
throw new InvalidOperationException("Can't access json data on non-json message");
|
||||||
|
|
||||||
var value = GetPathNode(path);
|
var value = GetPathNode(path);
|
||||||
@ -188,7 +188,7 @@ namespace CryptoExchange.Net.Converters.SystemTextJson
|
|||||||
|
|
||||||
private JsonElement? GetPathNode(MessagePath path)
|
private JsonElement? GetPathNode(MessagePath path)
|
||||||
{
|
{
|
||||||
if (!IsJson)
|
if (!IsValid)
|
||||||
throw new InvalidOperationException("Can't access json data on non-json message");
|
throw new InvalidOperationException("Can't access json data on non-json message");
|
||||||
|
|
||||||
if (_document == null)
|
if (_document == null)
|
||||||
@ -279,13 +279,13 @@ namespace CryptoExchange.Net.Converters.SystemTextJson
|
|||||||
try
|
try
|
||||||
{
|
{
|
||||||
_document = await JsonDocument.ParseAsync(_stream ?? stream).ConfigureAwait(false);
|
_document = await JsonDocument.ParseAsync(_stream ?? stream).ConfigureAwait(false);
|
||||||
IsJson = true;
|
IsValid = true;
|
||||||
return CallResult.SuccessResult;
|
return CallResult.SuccessResult;
|
||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
// Not a json message
|
// Not a json message
|
||||||
IsJson = false;
|
IsValid = false;
|
||||||
return new CallResult(new DeserializeError("JsonError: " + ex.Message, ex));
|
return new CallResult(new DeserializeError("JsonError: " + ex.Message, ex));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -337,18 +337,18 @@ namespace CryptoExchange.Net.Converters.SystemTextJson
|
|||||||
if (firstByte != 0x7b && firstByte != 0x5b)
|
if (firstByte != 0x7b && firstByte != 0x5b)
|
||||||
{
|
{
|
||||||
// Value doesn't start with `{` or `[`, prevent deserialization attempt as it's slow
|
// Value doesn't start with `{` or `[`, prevent deserialization attempt as it's slow
|
||||||
IsJson = false;
|
IsValid = false;
|
||||||
return new CallResult(new ServerError("Not a json value"));
|
return new CallResult(new ServerError("Not a json value"));
|
||||||
}
|
}
|
||||||
|
|
||||||
_document = JsonDocument.Parse(data);
|
_document = JsonDocument.Parse(data);
|
||||||
IsJson = true;
|
IsValid = true;
|
||||||
return CallResult.SuccessResult;
|
return CallResult.SuccessResult;
|
||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
// Not a json message
|
// Not a json message
|
||||||
IsJson = false;
|
IsValid = false;
|
||||||
return new CallResult(new DeserializeError("JsonError: " + ex.Message, ex));
|
return new CallResult(new DeserializeError("JsonError: " + ex.Message, ex));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -7,7 +7,7 @@ using System.Text.Json.Serialization.Metadata;
|
|||||||
namespace CryptoExchange.Net.Converters.SystemTextJson
|
namespace CryptoExchange.Net.Converters.SystemTextJson
|
||||||
{
|
{
|
||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
public class SystemTextJsonMessageSerializer : IMessageSerializer
|
public class SystemTextJsonMessageSerializer : IStringMessageSerializer
|
||||||
{
|
{
|
||||||
private readonly JsonSerializerOptions _options;
|
private readonly JsonSerializerOptions _options;
|
||||||
|
|
||||||
|
@ -52,6 +52,7 @@
|
|||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<PackageReference Include="Microsoft.Extensions.Logging" Version="9.0.6" />
|
<PackageReference Include="Microsoft.Extensions.Logging" Version="9.0.6" />
|
||||||
|
<PackageReference Include="protobuf-net" Version="3.2.52" />
|
||||||
<PackageReference Include="System.Text.Json" Version="9.0.6" />
|
<PackageReference Include="System.Text.Json" Version="9.0.6" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
<ItemGroup Label="Transitive Client Packages">
|
<ItemGroup Label="Transitive Client Packages">
|
||||||
|
@ -13,9 +13,9 @@ namespace CryptoExchange.Net.Interfaces
|
|||||||
public interface IMessageAccessor
|
public interface IMessageAccessor
|
||||||
{
|
{
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Is this a json message
|
/// Is this a valid message
|
||||||
/// </summary>
|
/// </summary>
|
||||||
bool IsJson { get; }
|
bool IsValid { get; }
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Is the original data available for retrieval
|
/// Is the original data available for retrieval
|
||||||
/// </summary>
|
/// </summary>
|
||||||
|
@ -4,6 +4,21 @@
|
|||||||
/// Serializer interface
|
/// Serializer interface
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public interface IMessageSerializer
|
public interface IMessageSerializer
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
public interface IByteMessageSerializer: IMessageSerializer
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Serialize an object to a string
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="message"></param>
|
||||||
|
/// <returns></returns>
|
||||||
|
byte[] Serialize<T>(T message);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public interface IStringMessageSerializer: IMessageSerializer
|
||||||
{
|
{
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Serialize an object to a string
|
/// Serialize an object to a string
|
||||||
|
@ -78,13 +78,20 @@ namespace CryptoExchange.Net.Interfaces
|
|||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
Task<CallResult> ConnectAsync(CancellationToken ct);
|
Task<CallResult> ConnectAsync(CancellationToken ct);
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Send data
|
/// Send string data
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="id"></param>
|
/// <param name="id"></param>
|
||||||
/// <param name="data"></param>
|
/// <param name="data"></param>
|
||||||
/// <param name="weight"></param>
|
/// <param name="weight"></param>
|
||||||
bool Send(int id, string data, int weight);
|
bool Send(int id, string data, int weight);
|
||||||
/// <summary>
|
/// <summary>
|
||||||
|
/// Send byte data
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="id"></param>
|
||||||
|
/// <param name="data"></param>
|
||||||
|
/// <param name="weight"></param>
|
||||||
|
bool Send(int id, byte[] data, int weight);
|
||||||
|
/// <summary>
|
||||||
/// Reconnect the socket
|
/// Reconnect the socket
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
|
@ -377,7 +377,19 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
|
|
||||||
var bytes = Parameters.Encoding.GetBytes(data);
|
var bytes = Parameters.Encoding.GetBytes(data);
|
||||||
_logger.SocketAddingBytesToSendBuffer(Id, id, bytes);
|
_logger.SocketAddingBytesToSendBuffer(Id, id, bytes);
|
||||||
_sendBuffer.Enqueue(new SendItem { Id = id, Weight = weight, Bytes = bytes });
|
_sendBuffer.Enqueue(new SendItem { Id = id, Type = WebSocketMessageType.Text, Weight = weight, Bytes = bytes });
|
||||||
|
_sendEvent.Set();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public virtual bool Send(int id, byte[] data, int weight)
|
||||||
|
{
|
||||||
|
if (_ctsSource.IsCancellationRequested || _processState != ProcessState.Processing)
|
||||||
|
return false;
|
||||||
|
|
||||||
|
_logger.SocketAddingBytesToSendBuffer(Id, id, data);
|
||||||
|
_sendBuffer.Enqueue(new SendItem { Id = id, Type = WebSocketMessageType.Binary, Weight = weight, Bytes = data });
|
||||||
_sendEvent.Set();
|
_sendEvent.Set();
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@ -532,7 +544,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
await _socket.SendAsync(new ArraySegment<byte>(data.Bytes, 0, data.Bytes.Length), WebSocketMessageType.Text, true, _ctsSource.Token).ConfigureAwait(false);
|
await _socket.SendAsync(new ArraySegment<byte>(data.Bytes, 0, data.Bytes.Length), data.Type, true, _ctsSource.Token).ConfigureAwait(false);
|
||||||
await (OnRequestSent?.Invoke(data.Id) ?? Task.CompletedTask).ConfigureAwait(false);
|
await (OnRequestSent?.Invoke(data.Id) ?? Task.CompletedTask).ConfigureAwait(false);
|
||||||
_logger.SocketSentBytes(Id, data.Id, data.Bytes.Length);
|
_logger.SocketSentBytes(Id, data.Id, data.Bytes.Length);
|
||||||
}
|
}
|
||||||
@ -858,6 +870,11 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
public DateTime SendTime { get; set; }
|
public DateTime SendTime { get; set; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Message type
|
||||||
|
/// </summary>
|
||||||
|
public WebSocketMessageType Type { get; set; }
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// The bytes to send
|
/// The bytes to send
|
||||||
/// </summary>
|
/// </summary>
|
||||||
|
@ -211,7 +211,8 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
private SocketStatus _status;
|
private SocketStatus _status;
|
||||||
|
|
||||||
private readonly IMessageSerializer _serializer;
|
private readonly IMessageSerializer _serializer;
|
||||||
private readonly IByteMessageAccessor _accessor;
|
private IByteMessageAccessor _stringMessageAccessor;
|
||||||
|
private IByteMessageAccessor _byteMessageAccessor;
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// The task that is sending periodic data on the websocket. Can be used for sending Ping messages every x seconds or similar. Not necessary.
|
/// The task that is sending periodic data on the websocket. Can be used for sending Ping messages every x seconds or similar. Not necessary.
|
||||||
@ -258,7 +259,6 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
_listeners = new List<IMessageProcessor>();
|
_listeners = new List<IMessageProcessor>();
|
||||||
|
|
||||||
_serializer = apiClient.CreateSerializer();
|
_serializer = apiClient.CreateSerializer();
|
||||||
_accessor = apiClient.CreateAccessor();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -459,25 +459,31 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
data = ApiClient.PreprocessStreamMessage(this, type, data);
|
data = ApiClient.PreprocessStreamMessage(this, type, data);
|
||||||
|
|
||||||
// 2. Read data into accessor
|
// 2. Read data into accessor
|
||||||
_accessor.Read(data);
|
IByteMessageAccessor accessor;
|
||||||
|
if (type == WebSocketMessageType.Binary)
|
||||||
|
accessor = _stringMessageAccessor ??= ApiClient.CreateAccessor(type);
|
||||||
|
else
|
||||||
|
accessor = _byteMessageAccessor ??= ApiClient.CreateAccessor(type);
|
||||||
|
|
||||||
|
accessor.Read(data);
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
bool outputOriginalData = ApiClient.ApiOptions.OutputOriginalData ?? ApiClient.ClientOptions.OutputOriginalData;
|
bool outputOriginalData = ApiClient.ApiOptions.OutputOriginalData ?? ApiClient.ClientOptions.OutputOriginalData;
|
||||||
if (outputOriginalData)
|
if (outputOriginalData)
|
||||||
{
|
{
|
||||||
originalData = _accessor.GetOriginalString();
|
originalData = accessor.GetOriginalString();
|
||||||
_logger.ReceivedData(SocketId, originalData);
|
_logger.ReceivedData(SocketId, originalData);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 3. Determine the identifying properties of this message
|
// 3. Determine the identifying properties of this message
|
||||||
var listenId = ApiClient.GetListenerIdentifier(_accessor);
|
var listenId = ApiClient.GetListenerIdentifier(accessor);
|
||||||
if (listenId == null)
|
if (listenId == null)
|
||||||
{
|
{
|
||||||
originalData = outputOriginalData ? _accessor.GetOriginalString() : "[OutputOriginalData is false]";
|
originalData = outputOriginalData ? accessor.GetOriginalString() : "[OutputOriginalData is false]";
|
||||||
if (!ApiClient.UnhandledMessageExpected)
|
if (!ApiClient.UnhandledMessageExpected)
|
||||||
_logger.FailedToEvaluateMessage(SocketId, originalData);
|
_logger.FailedToEvaluateMessage(SocketId, originalData);
|
||||||
|
|
||||||
UnhandledMessage?.Invoke(_accessor);
|
UnhandledMessage?.Invoke(accessor);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -494,7 +500,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
lock (_listenersLock)
|
lock (_listenersLock)
|
||||||
listenerIds = _listeners.SelectMany(l => l.ListenerIdentifiers).ToList();
|
listenerIds = _listeners.SelectMany(l => l.ListenerIdentifiers).ToList();
|
||||||
_logger.ReceivedMessageNotMatchedToAnyListener(SocketId, listenId, string.Join(",", listenerIds));
|
_logger.ReceivedMessageNotMatchedToAnyListener(SocketId, listenId, string.Join(",", listenerIds));
|
||||||
UnhandledMessage?.Invoke(_accessor);
|
UnhandledMessage?.Invoke(accessor);
|
||||||
}
|
}
|
||||||
|
|
||||||
return;
|
return;
|
||||||
@ -512,7 +518,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
foreach (var processor in processors)
|
foreach (var processor in processors)
|
||||||
{
|
{
|
||||||
// 5. Determine the type to deserialize to for this processor
|
// 5. Determine the type to deserialize to for this processor
|
||||||
var messageType = processor.GetMessageType(_accessor);
|
var messageType = processor.GetMessageType(accessor);
|
||||||
if (messageType == null)
|
if (messageType == null)
|
||||||
{
|
{
|
||||||
_logger.ReceivedMessageNotRecognized(SocketId, processor.Id);
|
_logger.ReceivedMessageNotRecognized(SocketId, processor.Id);
|
||||||
@ -532,7 +538,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
|
|
||||||
if (deserialized == null)
|
if (deserialized == null)
|
||||||
{
|
{
|
||||||
var desResult = processor.Deserialize(_accessor, messageType);
|
var desResult = processor.Deserialize(accessor, messageType);
|
||||||
if (!desResult)
|
if (!desResult)
|
||||||
{
|
{
|
||||||
_logger.FailedToDeserializeMessage(SocketId, desResult.Error?.ToString(), desResult.Error?.Exception);
|
_logger.FailedToDeserializeMessage(SocketId, desResult.Error?.ToString(), desResult.Error?.Exception);
|
||||||
@ -563,7 +569,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
}
|
}
|
||||||
finally
|
finally
|
||||||
{
|
{
|
||||||
_accessor.Clear();
|
accessor.Clear();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -825,8 +831,55 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
/// <param name="weight">The weight of the message</param>
|
/// <param name="weight">The weight of the message</param>
|
||||||
public virtual CallResult Send<T>(int requestId, T obj, int weight)
|
public virtual CallResult Send<T>(int requestId, T obj, int weight)
|
||||||
{
|
{
|
||||||
var data = obj is string str ? str : _serializer.Serialize(obj!);
|
if (_serializer is IByteMessageSerializer byteSerializer)
|
||||||
return Send(requestId, data, weight);
|
{
|
||||||
|
return SendBytes(requestId, byteSerializer.Serialize(obj), weight);
|
||||||
|
}
|
||||||
|
else if (_serializer is IStringMessageSerializer stringSerializer)
|
||||||
|
{
|
||||||
|
if (obj is string str)
|
||||||
|
return Send(requestId, str, weight);
|
||||||
|
|
||||||
|
str = stringSerializer.Serialize(obj);
|
||||||
|
return Send(requestId, str, weight);
|
||||||
|
}
|
||||||
|
|
||||||
|
throw new Exception("");
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Send byte data over the websocket connection
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="data">The data to send</param>
|
||||||
|
/// <param name="weight">The weight of the message</param>
|
||||||
|
/// <param name="requestId">The id of the request</param>
|
||||||
|
public virtual CallResult SendBytes(int requestId, byte[] data, int weight)
|
||||||
|
{
|
||||||
|
if (ApiClient.MessageSendSizeLimit != null && data.Length > ApiClient.MessageSendSizeLimit.Value)
|
||||||
|
{
|
||||||
|
var info = $"Message to send exceeds the max server message size ({ApiClient.MessageSendSizeLimit.Value} bytes). Split the request into batches to keep below this limit";
|
||||||
|
_logger.LogWarning("[Sckt {SocketId}] [Req {RequestId}] {Info}", SocketId, requestId, info);
|
||||||
|
return new CallResult(new InvalidOperationError(info));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!_socket.IsOpen)
|
||||||
|
{
|
||||||
|
_logger.LogWarning("[Sckt {SocketId}] [Req {RequestId}] failed to send, socket no longer open", SocketId, requestId);
|
||||||
|
return new CallResult(new WebError("Failed to send message, socket no longer open"));
|
||||||
|
}
|
||||||
|
|
||||||
|
//_logger.SendingData(SocketId, requestId, data);
|
||||||
|
try
|
||||||
|
{
|
||||||
|
if (!_socket.Send(requestId, data, weight))
|
||||||
|
return new CallResult(new WebError("Failed to send message, connection not open"));
|
||||||
|
|
||||||
|
return CallResult.SuccessResult;
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
return new CallResult(new WebError("Failed to send message: " + ex.Message, exception: ex));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
|
@ -67,6 +67,17 @@ namespace CryptoExchange.Net.Testing.Implementations
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public bool Send(int requestId, byte[] data, int weight)
|
||||||
|
{
|
||||||
|
if (!Connected)
|
||||||
|
throw new Exception("Socket not connected");
|
||||||
|
|
||||||
|
OnRequestSent?.Invoke(requestId);
|
||||||
|
OnMessageSend?.Invoke(Encoding.UTF8.GetString(data));
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
public Task CloseAsync()
|
public Task CloseAsync()
|
||||||
{
|
{
|
||||||
Connected = false;
|
Connected = false;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user