diff --git a/CryptoExchange.Net.UnitTests/TestImplementations/TestSocketClient.cs b/CryptoExchange.Net.UnitTests/TestImplementations/TestSocketClient.cs
index 3225fbb..561b88a 100644
--- a/CryptoExchange.Net.UnitTests/TestImplementations/TestSocketClient.cs
+++ b/CryptoExchange.Net.UnitTests/TestImplementations/TestSocketClient.cs
@@ -119,7 +119,7 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations
public override string GetListenerIdentifier(IMessageAccessor message)
{
- if (!message.IsJson)
+ if (!message.IsValid)
{
return "topic";
}
diff --git a/CryptoExchange.Net/Authentication/AuthenticationProvider.cs b/CryptoExchange.Net/Authentication/AuthenticationProvider.cs
index ede3cf0..a092078 100644
--- a/CryptoExchange.Net/Authentication/AuthenticationProvider.cs
+++ b/CryptoExchange.Net/Authentication/AuthenticationProvider.cs
@@ -465,10 +465,13 @@ namespace CryptoExchange.Net.Authentication
///
protected static string GetSerializedBody(IMessageSerializer serializer, IDictionary parameters)
{
+ if (serializer is not IStringMessageSerializer stringSerializer)
+ throw new InvalidOperationException("Non-string message serializer can't get serialized request body");
+
if (parameters.Count == 1 && parameters.TryGetValue(Constants.BodyPlaceHolderKey, out object? value))
- return serializer.Serialize(value);
+ return stringSerializer.Serialize(value);
else
- return serializer.Serialize(parameters);
+ return stringSerializer.Serialize(parameters);
}
}
diff --git a/CryptoExchange.Net/Clients/RestApiClient.cs b/CryptoExchange.Net/Clients/RestApiClient.cs
index 0c1b199..9ba63af 100644
--- a/CryptoExchange.Net/Clients/RestApiClient.cs
+++ b/CryptoExchange.Net/Clients/RestApiClient.cs
@@ -16,6 +16,7 @@ using CryptoExchange.Net.RateLimiting;
using CryptoExchange.Net.RateLimiting.Interfaces;
using CryptoExchange.Net.Requests;
using Microsoft.Extensions.Logging;
+using ProtoBuf;
namespace CryptoExchange.Net.Clients
{
@@ -603,12 +604,16 @@ namespace CryptoExchange.Net.Clients
{
if (contentType == Constants.JsonContentHeader)
{
+ var serializer = CreateSerializer();
+ if (serializer is not IStringMessageSerializer stringSerializer)
+ throw new InvalidOperationException("Non-string message serializer can't get serialized request body");
+
// Write the parameters as json in the body
string stringData;
if (parameters.Count == 1 && parameters.TryGetValue(Constants.BodyPlaceHolderKey, out object? value))
- stringData = CreateSerializer().Serialize(value);
+ stringData = stringSerializer.Serialize(value);
else
- stringData = CreateSerializer().Serialize(parameters);
+ stringData = stringSerializer.Serialize(parameters);
request.SetContent(stringData, contentType);
}
else if (contentType == Constants.FormContentHeader)
diff --git a/CryptoExchange.Net/Clients/SocketApiClient.cs b/CryptoExchange.Net/Clients/SocketApiClient.cs
index d605818..b1a9833 100644
--- a/CryptoExchange.Net/Clients/SocketApiClient.cs
+++ b/CryptoExchange.Net/Clients/SocketApiClient.cs
@@ -138,7 +138,7 @@ namespace CryptoExchange.Net.Clients
/// Create a message accessor instance
///
///
- protected internal abstract IByteMessageAccessor CreateAccessor();
+ protected internal abstract IByteMessageAccessor CreateAccessor(WebSocketMessageType messageType);
///
/// Create a serializer instance
diff --git a/CryptoExchange.Net/Converters/Protobuf/ProtobufMessageAccessor.cs b/CryptoExchange.Net/Converters/Protobuf/ProtobufMessageAccessor.cs
new file mode 100644
index 0000000..8dc2bc2
--- /dev/null
+++ b/CryptoExchange.Net/Converters/Protobuf/ProtobufMessageAccessor.cs
@@ -0,0 +1,308 @@
+using CryptoExchange.Net.Converters.MessageParsing;
+using CryptoExchange.Net.Interfaces;
+using CryptoExchange.Net.Objects;
+using ProtoBuf;
+using System;
+using System.IO;
+using System.Runtime.InteropServices.ComTypes;
+using System.Text;
+using System.Text.Json;
+using System.Threading.Tasks;
+
+namespace CryptoExchange.Net.Converters.Protobuf
+{
+ ///
+ /// System.Text.Json message accessor
+ ///
+ public abstract class ProtobufMessageAccessor : IMessageAccessor
+ {
+ protected T? _intermediateType;
+
+ ///
+ public bool IsValid { get; set; }
+
+ ///
+ public abstract bool OriginalDataAvailable { get; }
+
+ ///
+ public object? Underlying => throw new NotImplementedException();
+
+ ///
+ /// ctor
+ ///
+ public ProtobufMessageAccessor()
+ {
+ }
+
+ ///
+ public NodeType? GetNodeType()
+ {
+ throw new Exception("");
+ }
+
+ ///
+ public NodeType? GetNodeType(MessagePath path)
+ {
+ object value = _intermediateType;
+ foreach (var step in path)
+ {
+ if (step.Type == 0)
+ {
+ // array index
+ }
+ else if (step.Type == 1)
+ {
+ // property value
+ value = value.GetType().GetProperty(step.Property).GetValue(value);
+ }
+ else
+ {
+ // property name
+ }
+ }
+
+ var valueType = value.GetType();
+ if (valueType.IsArray)
+ return NodeType.Array;
+
+ if (IsSimple(valueType))
+ return NodeType.Value;
+
+ return NodeType.Object;
+ }
+
+ private static bool IsSimple(Type type)
+ {
+ if (type.IsGenericType && type.GetGenericTypeDefinition() == typeof(Nullable<>))
+ {
+ // nullable type, check if the nested type is simple.
+ return IsSimple(type.GetGenericArguments()[0]);
+ }
+ return type.IsPrimitive
+ || type.IsEnum
+ || type == typeof(string)
+ || type == typeof(decimal);
+ }
+
+
+ ///
+ public T? GetValue(MessagePath path)
+ {
+ object value = _intermediateType;
+ foreach(var step in path)
+ {
+ if (step.Type == 0)
+ {
+ // array index
+ }
+ else if (step.Type == 1)
+ {
+ // property value
+ value = value.GetType().GetProperty(step.Property)?.GetValue(value);
+ }
+ else
+ {
+ // property name
+ }
+ }
+
+ return (T?)value;
+ }
+
+ ///
+ public T?[]? GetValues(MessagePath path)
+ {
+ throw new Exception("");
+
+ }
+
+ ///
+ public abstract string GetOriginalString();
+
+ ///
+ public abstract void Clear();
+
+ public abstract CallResult
public interface IMessageSerializer
+ {
+ }
+
+ public interface IByteMessageSerializer: IMessageSerializer
+ {
+ ///
+ /// Serialize an object to a string
+ ///
+ ///
+ ///
+ byte[] Serialize(T message);
+ }
+
+
+ public interface IStringMessageSerializer: IMessageSerializer
{
///
/// Serialize an object to a string
diff --git a/CryptoExchange.Net/Interfaces/IWebsocket.cs b/CryptoExchange.Net/Interfaces/IWebsocket.cs
index 1d4c2b9..bc360d0 100644
--- a/CryptoExchange.Net/Interfaces/IWebsocket.cs
+++ b/CryptoExchange.Net/Interfaces/IWebsocket.cs
@@ -78,13 +78,20 @@ namespace CryptoExchange.Net.Interfaces
///
Task ConnectAsync(CancellationToken ct);
///
- /// Send data
+ /// Send string data
///
///
///
///
bool Send(int id, string data, int weight);
///
+ /// Send byte data
+ ///
+ ///
+ ///
+ ///
+ bool Send(int id, byte[] data, int weight);
+ ///
/// Reconnect the socket
///
///
diff --git a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs
index 908a25a..f71004f 100644
--- a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs
+++ b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs
@@ -377,7 +377,19 @@ namespace CryptoExchange.Net.Sockets
var bytes = Parameters.Encoding.GetBytes(data);
_logger.SocketAddingBytesToSendBuffer(Id, id, bytes);
- _sendBuffer.Enqueue(new SendItem { Id = id, Weight = weight, Bytes = bytes });
+ _sendBuffer.Enqueue(new SendItem { Id = id, Type = WebSocketMessageType.Text, Weight = weight, Bytes = bytes });
+ _sendEvent.Set();
+ return true;
+ }
+
+ ///
+ public virtual bool Send(int id, byte[] data, int weight)
+ {
+ if (_ctsSource.IsCancellationRequested || _processState != ProcessState.Processing)
+ return false;
+
+ _logger.SocketAddingBytesToSendBuffer(Id, id, data);
+ _sendBuffer.Enqueue(new SendItem { Id = id, Type = WebSocketMessageType.Binary, Weight = weight, Bytes = data });
_sendEvent.Set();
return true;
}
@@ -532,7 +544,7 @@ namespace CryptoExchange.Net.Sockets
try
{
- await _socket.SendAsync(new ArraySegment(data.Bytes, 0, data.Bytes.Length), WebSocketMessageType.Text, true, _ctsSource.Token).ConfigureAwait(false);
+ await _socket.SendAsync(new ArraySegment(data.Bytes, 0, data.Bytes.Length), data.Type, true, _ctsSource.Token).ConfigureAwait(false);
await (OnRequestSent?.Invoke(data.Id) ?? Task.CompletedTask).ConfigureAwait(false);
_logger.SocketSentBytes(Id, data.Id, data.Bytes.Length);
}
@@ -858,6 +870,11 @@ namespace CryptoExchange.Net.Sockets
///
public DateTime SendTime { get; set; }
+ ///
+ /// Message type
+ ///
+ public WebSocketMessageType Type { get; set; }
+
///
/// The bytes to send
///
diff --git a/CryptoExchange.Net/Sockets/SocketConnection.cs b/CryptoExchange.Net/Sockets/SocketConnection.cs
index 72680f6..993cabd 100644
--- a/CryptoExchange.Net/Sockets/SocketConnection.cs
+++ b/CryptoExchange.Net/Sockets/SocketConnection.cs
@@ -211,7 +211,8 @@ namespace CryptoExchange.Net.Sockets
private SocketStatus _status;
private readonly IMessageSerializer _serializer;
- private readonly IByteMessageAccessor _accessor;
+ private IByteMessageAccessor _stringMessageAccessor;
+ private IByteMessageAccessor _byteMessageAccessor;
///
/// The task that is sending periodic data on the websocket. Can be used for sending Ping messages every x seconds or similar. Not necessary.
@@ -258,7 +259,6 @@ namespace CryptoExchange.Net.Sockets
_listeners = new List();
_serializer = apiClient.CreateSerializer();
- _accessor = apiClient.CreateAccessor();
}
///
@@ -459,25 +459,31 @@ namespace CryptoExchange.Net.Sockets
data = ApiClient.PreprocessStreamMessage(this, type, data);
// 2. Read data into accessor
- _accessor.Read(data);
+ IByteMessageAccessor accessor;
+ if (type == WebSocketMessageType.Binary)
+ accessor = _stringMessageAccessor ??= ApiClient.CreateAccessor(type);
+ else
+ accessor = _byteMessageAccessor ??= ApiClient.CreateAccessor(type);
+
+ accessor.Read(data);
try
{
bool outputOriginalData = ApiClient.ApiOptions.OutputOriginalData ?? ApiClient.ClientOptions.OutputOriginalData;
if (outputOriginalData)
{
- originalData = _accessor.GetOriginalString();
+ originalData = accessor.GetOriginalString();
_logger.ReceivedData(SocketId, originalData);
}
// 3. Determine the identifying properties of this message
- var listenId = ApiClient.GetListenerIdentifier(_accessor);
+ var listenId = ApiClient.GetListenerIdentifier(accessor);
if (listenId == null)
{
- originalData = outputOriginalData ? _accessor.GetOriginalString() : "[OutputOriginalData is false]";
+ originalData = outputOriginalData ? accessor.GetOriginalString() : "[OutputOriginalData is false]";
if (!ApiClient.UnhandledMessageExpected)
_logger.FailedToEvaluateMessage(SocketId, originalData);
- UnhandledMessage?.Invoke(_accessor);
+ UnhandledMessage?.Invoke(accessor);
return;
}
@@ -494,7 +500,7 @@ namespace CryptoExchange.Net.Sockets
lock (_listenersLock)
listenerIds = _listeners.SelectMany(l => l.ListenerIdentifiers).ToList();
_logger.ReceivedMessageNotMatchedToAnyListener(SocketId, listenId, string.Join(",", listenerIds));
- UnhandledMessage?.Invoke(_accessor);
+ UnhandledMessage?.Invoke(accessor);
}
return;
@@ -512,7 +518,7 @@ namespace CryptoExchange.Net.Sockets
foreach (var processor in processors)
{
// 5. Determine the type to deserialize to for this processor
- var messageType = processor.GetMessageType(_accessor);
+ var messageType = processor.GetMessageType(accessor);
if (messageType == null)
{
_logger.ReceivedMessageNotRecognized(SocketId, processor.Id);
@@ -532,7 +538,7 @@ namespace CryptoExchange.Net.Sockets
if (deserialized == null)
{
- var desResult = processor.Deserialize(_accessor, messageType);
+ var desResult = processor.Deserialize(accessor, messageType);
if (!desResult)
{
_logger.FailedToDeserializeMessage(SocketId, desResult.Error?.ToString(), desResult.Error?.Exception);
@@ -563,7 +569,7 @@ namespace CryptoExchange.Net.Sockets
}
finally
{
- _accessor.Clear();
+ accessor.Clear();
}
}
@@ -825,8 +831,55 @@ namespace CryptoExchange.Net.Sockets
/// The weight of the message
public virtual CallResult Send(int requestId, T obj, int weight)
{
- var data = obj is string str ? str : _serializer.Serialize(obj!);
- return Send(requestId, data, weight);
+ if (_serializer is IByteMessageSerializer byteSerializer)
+ {
+ return SendBytes(requestId, byteSerializer.Serialize(obj), weight);
+ }
+ else if (_serializer is IStringMessageSerializer stringSerializer)
+ {
+ if (obj is string str)
+ return Send(requestId, str, weight);
+
+ str = stringSerializer.Serialize(obj);
+ return Send(requestId, str, weight);
+ }
+
+ throw new Exception("");
+ }
+
+ ///
+ /// Send byte data over the websocket connection
+ ///
+ /// The data to send
+ /// The weight of the message
+ /// The id of the request
+ public virtual CallResult SendBytes(int requestId, byte[] data, int weight)
+ {
+ if (ApiClient.MessageSendSizeLimit != null && data.Length > ApiClient.MessageSendSizeLimit.Value)
+ {
+ var info = $"Message to send exceeds the max server message size ({ApiClient.MessageSendSizeLimit.Value} bytes). Split the request into batches to keep below this limit";
+ _logger.LogWarning("[Sckt {SocketId}] [Req {RequestId}] {Info}", SocketId, requestId, info);
+ return new CallResult(new InvalidOperationError(info));
+ }
+
+ if (!_socket.IsOpen)
+ {
+ _logger.LogWarning("[Sckt {SocketId}] [Req {RequestId}] failed to send, socket no longer open", SocketId, requestId);
+ return new CallResult(new WebError("Failed to send message, socket no longer open"));
+ }
+
+ //_logger.SendingData(SocketId, requestId, data);
+ try
+ {
+ if (!_socket.Send(requestId, data, weight))
+ return new CallResult(new WebError("Failed to send message, connection not open"));
+
+ return CallResult.SuccessResult;
+ }
+ catch (Exception ex)
+ {
+ return new CallResult(new WebError("Failed to send message: " + ex.Message, exception: ex));
+ }
}
///
diff --git a/CryptoExchange.Net/Testing/Implementations/TestSocket.cs b/CryptoExchange.Net/Testing/Implementations/TestSocket.cs
index b2d2f3b..5df0dae 100644
--- a/CryptoExchange.Net/Testing/Implementations/TestSocket.cs
+++ b/CryptoExchange.Net/Testing/Implementations/TestSocket.cs
@@ -67,6 +67,17 @@ namespace CryptoExchange.Net.Testing.Implementations
return true;
}
+ public bool Send(int requestId, byte[] data, int weight)
+ {
+ if (!Connected)
+ throw new Exception("Socket not connected");
+
+ OnRequestSent?.Invoke(requestId);
+ OnMessageSend?.Invoke(Encoding.UTF8.GetString(data));
+ return true;
+ }
+
+
public Task CloseAsync()
{
Connected = false;