1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-12-14 09:51:50 +00:00
This commit is contained in:
JKorf 2025-11-17 21:37:59 +01:00 committed by Jkorf
parent b12e69d13a
commit 3fd69d6e1a
29 changed files with 158 additions and 170 deletions

View File

@ -1,7 +1,4 @@
using System;
using System.IO;
using CryptoExchange.Net.Converters.SystemTextJson;
using CryptoExchange.Net.Converters.MessageParsing;
namespace CryptoExchange.Net.Authentication
{

View File

@ -389,6 +389,7 @@ namespace CryptoExchange.Net.Clients
var uri = new Uri(baseAddress.AppendPath(definition.Path) + queryString);
var request = RequestFactory.Create(ClientOptions.HttpVersion, definition.Method, uri, requestId);
#warning Should be configurable
request.Accept = Constants.JsonContentHeader;
foreach (var header in requestConfiguration.Headers)
@ -482,11 +483,11 @@ namespace CryptoExchange.Net.Clients
if (!valid)
{
// Invalid json
// Invalid data
return new WebCallResult<T>(response.StatusCode, response.HttpVersion, response.ResponseHeaders, sw.Elapsed, response.ContentLength, OutputOriginalData ? accessor.GetOriginalString() : null, request.RequestId, request.Uri.ToString(), request.Content, request.Method, request.GetHeaders(), ResultDataSource.Server, default, valid.Error);
}
// Json response received
// Data response received
var parsedError = TryParseError(requestDefinition, response.ResponseHeaders, accessor);
if (parsedError != null)
{

View File

@ -16,7 +16,6 @@ using System.Collections.Generic;
using System.Linq;
using System.Net.WebSockets;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
@ -80,11 +79,7 @@ namespace CryptoExchange.Net.Clients
/// Periodic task registrations
/// </summary>
protected List<PeriodicTaskRegistration> PeriodicTaskRegistrations { get; set; } = new List<PeriodicTaskRegistration>();
/// <summary>
/// Periodic task registrations
/// </summary>
protected List<HighPerfPeriodicTaskRegistration> HighPerfPeriodicTaskRegistrations { get; set; } = new List<HighPerfPeriodicTaskRegistration>();
/// <summary>
/// List of address to keep an alive connection to
/// </summary>
@ -190,13 +185,6 @@ namespace CryptoExchange.Net.Clients
Interval = interval,
QueryDelegate = queryDelegate
});
HighPerfPeriodicTaskRegistrations.Add(new HighPerfPeriodicTaskRegistration
{
Identifier = identifier,
Interval = interval,
GetRequestDelegate = (con) => queryDelegate(con).Request
});
}
/// <summary>
@ -344,6 +332,7 @@ namespace CryptoExchange.Net.Clients
/// </summary>
/// <param name="url">The URL to connect to</param>
/// <param name="subscription">The subscription</param>
/// <param name="connectionFactory">The factory for creating a socket connection</param>
/// <param name="ct">Cancellation token for closing this subscription</param>
/// <returns></returns>
protected virtual async Task<CallResult<HighPerfUpdateSubscription>> SubscribeHighPerfAsync<TUpdateType>(
@ -723,6 +712,7 @@ namespace CryptoExchange.Net.Clients
/// Gets a connection for a new subscription or query. Can be an existing if there are open position or a new one.
/// </summary>
/// <param name="address">The address the socket is for</param>
/// <param name="connectionFactory">The factory for creating a socket connection</param>
/// <param name="ct">Cancellation token</param>
/// <returns></returns>
protected virtual async Task<CallResult<HighPerfSocketConnection<TUpdateType>>> GetHighPerfSocketConnection<TUpdateType>(
@ -763,9 +753,8 @@ namespace CryptoExchange.Net.Clients
// Create new socket connection
var socketConnection = connectionFactory.CreateHighPerfConnection<TUpdateType>(_logger, SocketFactory, GetWebSocketParameters(connectionAddress.Data!), this, address);
//var socketConnection = new HighPerfJsonSocketConnection<TUpdateType>(_logger, SocketFactory, GetWebSocketParameters(connectionAddress.Data!), this, JsonSerializerOptions, address);
foreach (var ptg in HighPerfPeriodicTaskRegistrations)
socketConnection.QueryPeriodic(ptg.Identifier, ptg.Interval, ptg.GetRequestDelegate);
foreach (var ptg in PeriodicTaskRegistrations)
socketConnection.QueryPeriodic(ptg.Identifier, ptg.Interval, (con) => ptg.QueryDelegate(con).Request);
return new CallResult<HighPerfSocketConnection<TUpdateType>>(socketConnection);
}
@ -831,7 +820,7 @@ namespace CryptoExchange.Net.Clients
Proxy = ClientOptions.Proxy,
Timeout = ApiOptions.SocketNoDataTimeout ?? ClientOptions.SocketNoDataTimeout,
ReceiveBufferSize = ClientOptions.ReceiveBufferSize,
UseNewMessageDeserialization = ClientOptions.EnabledNewDeserialization
UseUpdatedDeserialization = ClientOptions.UseUpdatedDeserialization
};
///// <summary>

View File

@ -1,12 +1,19 @@
using System;
using System.Collections.Generic;
using System.Text;
namespace CryptoExchange.Net.Converters.MessageParsing
{
/// <summary>
/// Message info
/// </summary>
public ref struct MessageInfo
{
public Type? Type { get; set; }
/// <summary>
/// The deserialization type
/// </summary>
public Type? DeserializationType { get; set; }
/// <summary>
/// The listen identifier
/// </summary>
public string? Identifier { get; set; }
}

View File

@ -2,15 +2,11 @@
using CryptoExchange.Net.Objects.Sockets;
using CryptoExchange.Net.SharedApis;
using CryptoExchange.Net.Sockets;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using System;
using System.Collections.Generic;
using System.Globalization;
using System.IO.Pipelines;
using System.Runtime.CompilerServices;
using System.Security.Cryptography;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
@ -435,46 +431,13 @@ namespace CryptoExchange.Net
processor.Exception += result.Data._subscription.InvokeExceptionHandler;
result.Data.SubscriptionStatusChanged += (upd) =>
{
if (upd == CryptoExchange.Net.Objects.SubscriptionStatus.Closed)
if (upd == SubscriptionStatus.Closed)
_ = processor.StopAsync(true);
};
return result;
}
// public static async Task SubscribeHighPerformance<T>(ILogger logger, string url, JsonSerializerOptions jsonOptions, Action<T> callback, CancellationToken ct)
// {
// var pipe = new Pipe(new PipeOptions());
// var ws = new HighPerformanceWebSocketClient(
// logger,
// new WebSocketParameters(new Uri(url), ReconnectPolicy.Disabled)
// {
// PipeWriter = pipe.Writer
// });
// try
// {
// await ws.ConnectAsync(ct).ConfigureAwait(false);
// ct.Register(() => _ = ws.CloseAsync());
//#pragma warning disable IL3050 // Calling members annotated with 'RequiresDynamicCodeAttribute' may break functionality when AOT compiling.
//#pragma warning disable IL2026 // Members annotated with 'RequiresUnreferencedCodeAttribute' require dynamic access otherwise can break functionality when trimming application code
//#if NET10_0
// await foreach (var item in JsonSerializer.DeserializeAsyncEnumerable<T>(pipe.Reader, jsonOptions, ct).ConfigureAwait(false))
// callback(item!);
//#else
// await foreach (var item in JsonSerializer.DeserializeAsyncEnumerable<T>(pipe.Reader.AsStream(), jsonOptions, ct).ConfigureAwait(false))
// callback(item!);
//#endif
//#pragma warning restore IL2026 // Members annotated with 'RequiresUnreferencedCodeAttribute' require dynamic access otherwise can break functionality when trimming application code
//#pragma warning restore IL3050 // Calling members annotated with 'RequiresDynamicCodeAttribute' may break functionality when AOT compiling.
// }
// catch (OperationCanceledException) { }
// }
/// <summary>
/// Parse a decimal value from a string
/// </summary>

View File

@ -9,10 +9,6 @@ using System.IO.Compression;
using System.Linq;
using System.Runtime.InteropServices;
using System.Text;
using System.Text.Json;
using System.Text.Json.Serialization;
using System.Text.Json.Serialization.Metadata;
using System.Threading.Tasks;
using System.Web;
namespace CryptoExchange.Net

View File

@ -20,7 +20,9 @@ namespace CryptoExchange.Net.Interfaces
/// The matcher for this listener
/// </summary>
public MessageMatcher MessageMatcher { get; }
/// <summary>
/// The types the message processor deserializes to
/// </summary>
public HashSet<Type> DeserializationTypes { get; set; }
/// <summary>
/// Handle a message

View File

@ -14,6 +14,7 @@ namespace CryptoExchange.Net.Interfaces
/// Create a websocket for an url
/// </summary>
/// <param name="logger">The logger</param>
/// <param name="connection">The socket connection</param>
/// <param name="parameters">The parameters to use for the connection</param>
/// <returns></returns>
IWebsocket CreateWebsocket(ILogger logger, SocketConnection connection, WebSocketParameters parameters);

View File

@ -8,7 +8,7 @@ namespace CryptoExchange.Net.Objects.Options
public class ApiOptions
{
/// <summary>
/// If true, the CallResult and DataEvent objects will also include the originally received json data in the OriginalData property
/// If true, the CallResult and DataEvent objects will also include the originally received data in the OriginalData property
/// </summary>
public bool? OutputOriginalData { get; set; }

View File

@ -14,7 +14,7 @@ namespace CryptoExchange.Net.Objects.Options
public ApiProxy? Proxy { get; set; }
/// <summary>
/// If true, the CallResult and DataEvent objects will also include the originally received json data in the OriginalData property
/// If true, the CallResult and DataEvent objects will also include the originally received data in the OriginalData property
/// </summary>
public bool OutputOriginalData { get; set; } = false;

View File

@ -61,7 +61,10 @@ namespace CryptoExchange.Net.Objects.Options
/// </remarks>
public int? ReceiveBufferSize { get; set; }
public bool EnabledNewDeserialization { get; set; }
/// <summary>
/// Whether or not to use the updated deserialization logic
/// </summary>
public bool UseUpdatedDeserialization { get; set; }
/// <summary>
/// Create a copy of this options
@ -84,7 +87,7 @@ namespace CryptoExchange.Net.Objects.Options
item.RateLimitingBehaviour = RateLimitingBehaviour;
item.RateLimiterEnabled = RateLimiterEnabled;
item.ReceiveBufferSize = ReceiveBufferSize;
item.EnabledNewDeserialization = EnabledNewDeserialization;
item.UseUpdatedDeserialization = UseUpdatedDeserialization;
return item;
}
}

View File

@ -74,8 +74,11 @@ namespace CryptoExchange.Net.Objects.Sockets
/// The buffer size to use for receiving data
/// </summary>
public int? ReceiveBufferSize { get; set; } = null;
public bool UseNewMessageDeserialization { get; set; }
/// <summary>
/// Whether or not to use the updated deserialization logic
/// </summary>
public bool UseUpdatedDeserialization { get; set; }
/// <summary>
/// ctor

View File

@ -1,7 +1,4 @@
using CryptoExchange.Net.Converters.SystemTextJson;
using System;
using System.Collections.Generic;
using System.Text;
using System.Text.Json.Serialization;
namespace CryptoExchange.Net.SharedApis

View File

@ -1,7 +1,5 @@
using CryptoExchange.Net.Converters.SystemTextJson;
using CryptoExchange.Net.Objects;
using System;
using System.Collections.Generic;
using System.Text.Json.Serialization;
namespace CryptoExchange.Net.SharedApis

View File

@ -152,6 +152,7 @@ namespace CryptoExchange.Net.Sockets
/// ctor
/// </summary>
/// <param name="logger">The log object to use</param>
/// <param name="connection">The socket connection</param>
/// <param name="websocketParameters">The parameters for this socket</param>
public CryptoExchangeWebSocketClient(ILogger logger, SocketConnection connection, WebSocketParameters websocketParameters)
{
@ -164,7 +165,7 @@ namespace CryptoExchange.Net.Sockets
_sendEvent = new AsyncResetEvent();
_sendBuffer = new ConcurrentQueue<SendItem>();
_ctsSource = new CancellationTokenSource();
if (websocketParameters.UseNewMessageDeserialization)
if (websocketParameters.UseUpdatedDeserialization)
_receiveBufferSize = websocketParameters.ReceiveBufferSize ?? 65536;
else
_receiveBufferSize = websocketParameters.ReceiveBufferSize ?? _defaultReceiveBufferSize;
@ -689,7 +690,7 @@ namespace CryptoExchange.Net.Sockets
{
// Received a complete message and it's not multi part
_logger.SocketReceivedSingleMessage(Id, receiveResult.Count);
if (!Parameters.UseNewMessageDeserialization)
if (!Parameters.UseUpdatedDeserialization)
await ProcessData(receiveResult.MessageType, new ReadOnlyMemory<byte>(buffer.Array!, buffer.Offset, receiveResult.Count)).ConfigureAwait(false);
else
ProcessDataNew(receiveResult.MessageType, new ReadOnlySpan<byte>(buffer.Array!, buffer.Offset, receiveResult.Count));
@ -728,7 +729,7 @@ namespace CryptoExchange.Net.Sockets
_logger.SocketReassembledMessage(Id, multipartStream!.Length);
// Get the underlying buffer of the memory stream holding the written data and delimit it (GetBuffer return the full array, not only the written part)
if (!Parameters.UseNewMessageDeserialization)
if (!Parameters.UseUpdatedDeserialization)
await ProcessData(receiveResult.MessageType, new ReadOnlyMemory<byte>(multipartStream.GetBuffer(), 0, (int)multipartStream.Length)).ConfigureAwait(false);
else
ProcessDataNew(receiveResult.MessageType, new ReadOnlySpan<byte>(multipartStream.GetBuffer(), 0, (int)multipartStream.Length));

View File

@ -0,0 +1,62 @@
using CryptoExchange.Net.Clients;
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Objects.Sockets;
using Microsoft.Extensions.Logging;
using System;
using System.Linq;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Sockets.HighPerf
{
/// <summary>
/// A single socket connection focused on performance expecting JSON data
/// </summary>
/// <typeparam name="T">The type of updates this connection produces</typeparam>
public class HighPerfJsonSocketConnection<T> : HighPerfSocketConnection<T>
{
private JsonSerializerOptions _jsonOptions;
/// <summary>
/// ctor
/// </summary>
public HighPerfJsonSocketConnection(
ILogger logger,
IWebsocketFactory socketFactory,
WebSocketParameters parameters,
SocketApiClient apiClient,
JsonSerializerOptions serializerOptions,
string tag)
: base(logger, socketFactory, parameters, apiClient, tag)
{
_jsonOptions = serializerOptions;
}
/// <inheritdoc />
protected override async Task ProcessAsync(CancellationToken ct)
{
try
{
#pragma warning disable IL2026 // Members annotated with 'RequiresUnreferencedCodeAttribute' require dynamic access otherwise can break functionality when trimming application code
#pragma warning disable IL3050 // Calling members annotated with 'RequiresDynamicCodeAttribute' may break functionality when AOT compiling.
await foreach (var update in JsonSerializer.DeserializeAsyncEnumerable<T>(_pipe.Reader, true, _jsonOptions, ct).ConfigureAwait(false))
#pragma warning restore IL3050 // Calling members annotated with 'RequiresDynamicCodeAttribute' may break functionality when AOT compiling.
#pragma warning restore IL2026 // Members annotated with 'RequiresUnreferencedCodeAttribute' require dynamic access otherwise can break functionality when trimming application code
{
if (_typedSubscriptions.Count == 1)
{
// If there is only one listener we can prevent the overhead of the await which will call a `ToList`
await DelegateToSubscription(_typedSubscriptions[0], update!).ConfigureAwait(false);
continue;
}
var tasks = _typedSubscriptions.Select(sub => DelegateToSubscription(sub, update!));
await LibraryHelpers.WhenAll(tasks).ConfigureAwait(false);
}
}
catch (OperationCanceledException) { }
}
}
}

View File

@ -1,25 +1,25 @@
using CryptoExchange.Net.Clients;
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Objects.Sockets;
using CryptoExchange.Net.Sockets;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Net;
using System.Text;
using System.Text.Json;
namespace CryptoExchange.Net.Sockets.HighPerf
{
public class HighPerfJsonConnectionFactory : IHighPerfConnectionFactory
/// <inheritdoc />
public class HighPerfJsonSocketConnectionFactory : IHighPerfConnectionFactory
{
private readonly JsonSerializerOptions _options;
public HighPerfJsonConnectionFactory(JsonSerializerOptions options)
/// <summary>
/// ctor
/// </summary>
public HighPerfJsonSocketConnectionFactory(JsonSerializerOptions options)
{
_options = options;
}
/// <inheritdoc />
public HighPerfSocketConnection<T> CreateHighPerfConnection<T>(
ILogger logger, IWebsocketFactory factory, WebSocketParameters parameters, SocketApiClient client, string address)
{

View File

@ -11,12 +11,11 @@ using CryptoExchange.Net.Clients;
using CryptoExchange.Net.Logging.Extensions;
using System.Threading;
using System.IO.Pipelines;
using System.Text.Json;
namespace CryptoExchange.Net.Sockets
{
/// <summary>
/// A single socket connection to the server
/// A single socket connection focused on performance
/// </summary>
public abstract class HighPerfSocketConnection : ISocketConnection
{
@ -418,12 +417,18 @@ namespace CryptoExchange.Net.Sockets
/// <inheritdoc />
public abstract class HighPerfSocketConnection<T> : HighPerfSocketConnection
{
/// <summary>
/// Lock for listener access
/// </summary>
#if NET9_0_OR_GREATER
protected readonly Lock _listenersLock = new Lock();
#else
protected readonly object _listenersLock = new object();
#endif
/// <summary>
/// Subscriptions
/// </summary>
protected readonly List<HighPerfSubscription<T>> _typedSubscriptions;
/// <inheritdoc />
@ -472,65 +477,22 @@ namespace CryptoExchange.Net.Sockets
_typedSubscriptions.Remove(subscription);
}
protected ValueTask DelegateToSubscription(HighPerfSubscription<T> sub, T update)
/// <summary>
/// Delegate the update to the listeners
/// </summary>
protected ValueTask DelegateToSubscription(HighPerfSubscription<T> subscription, T update)
{
try
{
return sub.HandleAsync(update!);
return subscription.HandleAsync(update!);
}
catch (Exception ex)
{
sub.InvokeExceptionHandler(ex);
subscription.InvokeExceptionHandler(ex);
_logger.UserMessageProcessingFailed(SocketId, ex.Message, ex);
return new ValueTask();
}
}
}
public class HighPerfJsonSocketConnection<T> : HighPerfSocketConnection<T>
{
private JsonSerializerOptions _jsonOptions;
/// <summary>
/// ctor
/// </summary>
public HighPerfJsonSocketConnection(
ILogger logger,
IWebsocketFactory socketFactory,
WebSocketParameters parameters,
SocketApiClient apiClient,
JsonSerializerOptions serializerOptions,
string tag)
: base(logger, socketFactory, parameters, apiClient, tag)
{
_jsonOptions = serializerOptions;
}
/// <inheritdoc />
protected override async Task ProcessAsync(CancellationToken ct)
{
try
{
#pragma warning disable IL2026 // Members annotated with 'RequiresUnreferencedCodeAttribute' require dynamic access otherwise can break functionality when trimming application code
#pragma warning disable IL3050 // Calling members annotated with 'RequiresDynamicCodeAttribute' may break functionality when AOT compiling.
await foreach (var update in JsonSerializer.DeserializeAsyncEnumerable<T>(_pipe.Reader, true, _jsonOptions, ct).ConfigureAwait(false))
#pragma warning restore IL3050 // Calling members annotated with 'RequiresDynamicCodeAttribute' may break functionality when AOT compiling.
#pragma warning restore IL2026 // Members annotated with 'RequiresUnreferencedCodeAttribute' require dynamic access otherwise can break functionality when trimming application code
{
if (_typedSubscriptions.Count == 1)
{
// If there is only one listener we can prevent the overhead of the await which will call a `ToList`
await DelegateToSubscription(_typedSubscriptions[0], update!).ConfigureAwait(false);
continue;
}
var tasks = _typedSubscriptions.Select(sub => DelegateToSubscription(sub, update!));
await LibraryHelpers.WhenAll(tasks).ConfigureAwait(false);
}
}
catch (OperationCanceledException) { }
}
}
}

View File

@ -2,14 +2,17 @@
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Objects.Sockets;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Text;
namespace CryptoExchange.Net.Sockets.HighPerf
{
/// <summary>
/// Factory for creating connections
/// </summary>
public interface IHighPerfConnectionFactory
{
/// <summary>
/// Create a new websocket connection
/// </summary>
HighPerfSocketConnection<T> CreateHighPerfConnection<T>(
ILogger logger, IWebsocketFactory factory, WebSocketParameters parameters, SocketApiClient client, string address);
}

View File

@ -19,7 +19,9 @@ namespace CryptoExchange.Net.Sockets
/// Whether the connection has been authenticated
/// </summary>
bool Authenticated { get; set; }
/// <summary>
/// Is there a subscription which requires authentication on this connection
/// </summary>
bool HasAuthenticatedSubscription { get; }
/// <summary>
/// Whether the connection is established

View File

@ -112,7 +112,6 @@ namespace CryptoExchange.Net.Sockets
/// <summary>
/// Deserialization type
/// </summary>
public abstract Type GetDeserializationType(IMessageAccessor accessor);
public abstract Type DeserializationType { get; }
/// <summary>
@ -151,10 +150,8 @@ namespace CryptoExchange.Net.Sockets
{
private Func<SocketConnection, DataEvent<TServer>, CallResult> _handler;
public override Type DeserializationType => typeof(TServer);
/// <inheritdoc />
public override Type GetDeserializationType(IMessageAccessor accessor) => typeof(TServer);
public override Type DeserializationType => typeof(TServer);
/// <summary>
/// ctor

View File

@ -19,7 +19,7 @@ namespace CryptoExchange.Net.Sockets
/// <summary>
/// Delegate for getting the query
/// </summary>
public Func<SocketConnection, Query> QueryDelegate { get; set; } = null!;
public Func<ISocketConnection, Query> QueryDelegate { get; set; } = null!;
/// <summary>
/// Callback after query
/// </summary>

View File

@ -59,7 +59,7 @@ namespace CryptoExchange.Net.Sockets
/// Response
/// </summary>
public object? Response { get; set; }
#warning check if there is a better solution for this in combination with the MessageMatcher
public HashSet<Type> DeserializationTypes { get; set; }
private MessageMatcher _matcher;
@ -106,12 +106,14 @@ namespace CryptoExchange.Net.Sockets
/// </summary>
protected CancellationTokenSource? _cts;
/// <summary>
/// On complete callback
/// </summary>
public Action? OnComplete { get; set; }
/// <summary>
/// ctor
/// </summary>
/// <param name="request"></param>
/// <param name="authenticated"></param>
/// <param name="weight"></param>
public Query(object request, bool authenticated, int weight = 1)
{
_event = new AsyncResetEvent(false, false);
@ -168,7 +170,6 @@ namespace CryptoExchange.Net.Sockets
/// </summary>
public abstract CallResult Handle(SocketConnection connection, DataEvent<object> message, MessageHandlerLink check);
public Action OnComplete { get; set; }
}
/// <summary>

View File

@ -9,11 +9,8 @@ using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Net.WebSockets;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
@ -151,6 +148,7 @@ namespace CryptoExchange.Net.Sockets
/// </summary>
public bool Authenticated { get; set; }
/// <inheritdoc />
public bool HasAuthenticatedSubscription => Subscriptions.Any(x => x.Authenticated);
/// <summary>
@ -536,7 +534,7 @@ namespace CryptoExchange.Net.Sockets
List<IMessageProcessor>? processors = null;
var messageInfo = messageConverter.GetMessageInfo(data, type);
if (messageInfo.Type == null)
if (messageInfo.DeserializationType == null)
{
if (messageInfo.Identifier == null)
{
@ -556,11 +554,11 @@ namespace CryptoExchange.Net.Sockets
continue;
_logger.LogTrace("Message type determined based on identifier");
messageInfo.Type = handler.DeserializationType;
messageInfo.DeserializationType = handler.DeserializationType;
break;
}
if (messageInfo.Type == null)
if (messageInfo.DeserializationType == null)
{
// No handler found for identifier either, can't process
_logger.LogWarning("Failed to determine message type. Data: {Message}", Encoding.UTF8.GetString(data.ToArray()));
@ -571,7 +569,7 @@ namespace CryptoExchange.Net.Sockets
object result;
try
{
result = messageConverter.Deserialize(data, messageInfo.Type!);
result = messageConverter.Deserialize(data, messageInfo.DeserializationType!);
}
catch(Exception ex)
{
@ -586,7 +584,7 @@ namespace CryptoExchange.Net.Sockets
return;
}
var targetType = messageInfo.Type!;
var targetType = messageInfo.DeserializationType!;
if (processors == null)
{
lock (_listenersLock)
@ -671,7 +669,7 @@ namespace CryptoExchange.Net.Sockets
_logger.ProcessorMatched(SocketId, listener.ToString(), listenId);
// 4. Determine the type to deserialize to for this processor
var messageType = listener.GetDeserializationType(accessor);
var messageType = listener.DeserializationType;
if (messageType == null)
{
_logger.ReceivedMessageNotRecognized(SocketId, processor.Id);

View File

@ -249,14 +249,14 @@ namespace CryptoExchange.Net.Sockets
}
/// <inheritdoc />
public override void HandleSubQueryResponse(object message)
=> HandleSubQueryResponse((TSubResponse)message);
public override void HandleSubQueryResponse(object? message)
=> HandleSubQueryResponse((TSubResponse?)message);
/// <summary>
/// Handle a subscription query response
/// </summary>
/// <param name="message"></param>
public virtual void HandleSubQueryResponse(TSubResponse message) { }
public virtual void HandleSubQueryResponse(TSubResponse? message) { }
/// <inheritdoc />
public override void HandleUnsubQueryResponse(object message)

View File

@ -5,7 +5,6 @@ using System.Diagnostics;
using System.Globalization;
using System.Linq;
using System.Text.Json;
using System.Text.Json.Nodes;
using System.Text.Json.Serialization;
using CryptoExchange.Net.Converters;
using CryptoExchange.Net.Converters.SystemTextJson;

View File

@ -48,11 +48,12 @@ namespace CryptoExchange.Net.Testing.Implementations
private bool _newDeserialization;
public SocketConnection Connection { get; set; }
public SocketConnection? Connection { get; set; }
public TestSocket(bool newDeserialization, string address)
{
_newDeserialization = newDeserialization;
Uri = new Uri(address);
lock (lastIdLock)
{
@ -113,6 +114,9 @@ namespace CryptoExchange.Net.Testing.Implementations
}
else
{
if (Connection == null)
throw new ArgumentNullException(nameof(Connection));
Connection.HandleStreamMessage2(WebSocketMessageType.Text, Encoding.UTF8.GetBytes(data));
}
}

View File

@ -16,7 +16,9 @@ namespace CryptoExchange.Net.Testing.Implementations
_socket = socket;
}
public IHighPerfWebsocket CreateHighPerfWebsocket(ILogger logger, WebSocketParameters parameters, PipeWriter pipeWriter) => throw new NotImplementedException();
public IHighPerfWebsocket CreateHighPerfWebsocket(ILogger logger, WebSocketParameters parameters, PipeWriter pipeWriter)
=> throw new NotImplementedException();
public IWebsocket CreateWebsocket(ILogger logger, SocketConnection connection, WebSocketParameters parameters)
{
_socket.Connection = connection;

View File

@ -63,7 +63,7 @@ namespace CryptoExchange.Net.Testing
internal static TestSocket ConfigureSocketClient<T>(T client, string address) where T : BaseSocketClient
{
var socket = new TestSocket(client.ClientOptions.EnabledNewDeserialization, address);
var socket = new TestSocket(client.ClientOptions.UseUpdatedDeserialization, address);
foreach (var apiClient in client.ApiClients.OfType<SocketApiClient>())
{
apiClient.SocketFactory = new TestWebsocketFactory(socket);