1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-07-23 09:55:48 +00:00

Compare commits

..

12 Commits

23 changed files with 457 additions and 356 deletions

View File

@ -1,16 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFrameworks>netstandard2.0;netstandard2.1;net8.0;net9.0</TargetFrameworks>
</PropertyGroup>
<PropertyGroup>
<PackageId>CryptoExchange.Net.Protobuf</PackageId>
<Authors>JKorf</Authors>
<Description>Protobuf support for CryptoExchange.Net</Description>
<PackageVersion>9.1.0</PackageVersion>
<AssemblyVersion>9.1.0</AssemblyVersion>
<FileVersion>9.1.0</FileVersion>
<PackageVersion>9.3.0</PackageVersion>
<AssemblyVersion>9.3.0</AssemblyVersion>
<FileVersion>9.3.0</FileVersion>
<PackageRequireLicenseAcceptance>false</PackageRequireLicenseAcceptance>
<PackageTags>CryptoExchange;CryptoExchange.Net</PackageTags>
<RepositoryType>git</RepositoryType>
@ -43,11 +41,7 @@
<DocumentationFile>CryptoExchange.Net.Protobuf.xml</DocumentationFile>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="CryptoExchange.Net" Version="9.3.0" />
<PackageReference Include="protobuf-net" Version="3.2.52" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\CryptoExchange.Net\CryptoExchange.Net.csproj" />
</ItemGroup>
</Project>

View File

@ -53,7 +53,6 @@
</member>
<member name="M:CryptoExchange.Net.Converters.Protobuf.ProtobufMessageAccessor`1.Deserialize(System.Type,System.Nullable{CryptoExchange.Net.Converters.MessageParsing.MessagePath})">
<inheritdoc />
</member>
<member name="M:CryptoExchange.Net.Converters.Protobuf.ProtobufMessageAccessor`1.Deserialize``1(System.Nullable{CryptoExchange.Net.Converters.MessageParsing.MessagePath})">
<inheritdoc />

View File

@ -1,7 +1,12 @@
# ![.CryptoExchange.Net](https://github.com/JKorf/CryptoExchange.Net/blob/ffcb7db8ff597c2f14982d68464015a748815580/CryptoExchange.Net/Icon/icon.png) CryptoExchange.Net.Proto
[![.NET](https://img.shields.io/github/actions/workflow/status/JKorf/CryptoExchange.Net.Protobuf/dotnet.yml?style=for-the-badge)](https://github.com/JKorf/CryptoExchange.NetProtobuf/actions/workflows/dotnet.yml) [![Nuget downloads](https://img.shields.io/nuget/dt/CryptoExchange.NetProtobuf.svg?style=for-the-badge)](https://www.nuget.org/packages/CryptoExchange.NetProtobuf) ![License](https://img.shields.io/github/license/JKorf/CryptoExchange.Net?style=for-the-badge)
[![.NET](https://img.shields.io/github/actions/workflow/status/JKorf/CryptoExchange.Net/dotnet.yml?style=for-the-badge)](https://github.com/JKorf/CryptoExchange.Net/actions/workflows/dotnet.yml) [![Nuget downloads](https://img.shields.io/nuget/dt/CryptoExchange.Net.Protobuf.svg?style=for-the-badge)](https://www.nuget.org/packages/CryptoExchange.Net.Protobuf) ![License](https://img.shields.io/github/license/JKorf/CryptoExchange.Net?style=for-the-badge)
Protobuf support for CryptoExchange.Net.
## Release notes
* Version 9.3.0 - 23 Jul 2025
* Updated CryptoExchange.Net to version 9.3.0, see https://github.com/JKorf/CryptoExchange.Net/releases/
* Version 9.2.0 - 14 Jul 2025
* Initial release

View File

@ -224,13 +224,17 @@ namespace CryptoExchange.Net.UnitTests
[TestCase(null, null)]
[TestCase("", null)]
[TestCase("null", null)]
[TestCase("nan", null)]
[TestCase("1E+2", 100)]
[TestCase("1E-2", 0.01)]
[TestCase("80228162514264337593543950335", -999)] // -999 is workaround for not being able to specify decimal.MaxValue
[TestCase("Infinity", 999)] // 999 is workaround for not being able to specify decimal.MinValue
[TestCase("-Infinity", -999)] // -999 is workaround for not being able to specify decimal.MaxValue
[TestCase("80228162514264337593543950335", 999)] // 999 is workaround for not being able to specify decimal.MaxValue
[TestCase("-80228162514264337593543950335", -999)] // -999 is workaround for not being able to specify decimal.MaxValue
public void TestDecimalConverterString(string value, decimal? expected)
{
var result = JsonSerializer.Deserialize<STJDecimalObject>("{ \"test\": \""+ value + "\"}");
Assert.That(result.Test, Is.EqualTo(expected == -999 ? decimal.MaxValue : expected));
Assert.That(result.Test, Is.EqualTo(expected == -999 ? decimal.MinValue : expected == 999 ? decimal.MaxValue: expected));
}
[TestCase("1", 1)]

View File

@ -31,21 +31,19 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations.Sockets
internal class TestChannelQuery : Query<SubResponse>
{
public override HashSet<string> ListenerIdentifiers { get; set; }
public TestChannelQuery(string channel, string request, bool authenticated, int weight = 1) : base(request, authenticated, weight)
{
ListenerIdentifiers = new HashSet<string> { request + "-" + channel };
MessageMatcher = MessageMatcher.Create<SubResponse>(request + "-" + channel, HandleMessage);
}
public override CallResult<SubResponse> HandleMessage(SocketConnection connection, DataEvent<SubResponse> message)
public CallResult<SubResponse> HandleMessage(SocketConnection connection, DataEvent<SubResponse> message)
{
if (!message.Data.Status.Equals("confirmed", StringComparison.OrdinalIgnoreCase))
{
return new CallResult<SubResponse>(new ServerError(message.Data.Status));
}
return base.HandleMessage(connection, message);
return message.ToCallResult();
}
}
}

View File

@ -9,11 +9,9 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations.Sockets
{
internal class TestQuery : Query<object>
{
public override HashSet<string> ListenerIdentifiers { get; set; }
public TestQuery(string identifier, object request, bool authenticated, int weight = 1) : base(request, authenticated, weight)
{
ListenerIdentifiers = new HashSet<string> { identifier };
MessageMatcher = MessageMatcher.Create<object>(identifier);
}
}
}

View File

@ -15,21 +15,19 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations.Sockets
{
private readonly Action<DataEvent<T>> _handler;
public override HashSet<string> ListenerIdentifiers { get; set; } = new HashSet<string> { "update-topic" };
public TestSubscription(ILogger logger, Action<DataEvent<T>> handler) : base(logger, false)
{
_handler = handler;
MessageMatcher = MessageMatcher.Create<T>("update-topic", DoHandleMessage);
}
public override CallResult DoHandleMessage(SocketConnection connection, DataEvent<object> message)
public CallResult DoHandleMessage(SocketConnection connection, DataEvent<T> message)
{
var data = (T)message.Data;
_handler.Invoke(message.As(data));
_handler.Invoke(message);
return new CallResult(null);
}
public override Type GetMessageType(IMessageAccessor message) => typeof(T);
public override Query GetSubQuery(SocketConnection connection) => new TestQuery("sub", new object(), false, 1);
public override Query GetUnsubQuery() => new TestQuery("unsub", new object(), false, 1);
}

View File

@ -15,23 +15,19 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations.Sockets
private readonly Action<DataEvent<T>> _handler;
private readonly string _channel;
public override HashSet<string> ListenerIdentifiers { get; set; }
public TestSubscriptionWithResponseCheck(string channel, Action<DataEvent<T>> handler) : base(Mock.Of<ILogger>(), false)
{
ListenerIdentifiers = new HashSet<string>() { channel };
MessageMatcher = MessageMatcher.Create<T>(channel, DoHandleMessage);
_handler = handler;
_channel = channel;
}
public override CallResult DoHandleMessage(SocketConnection connection, DataEvent<object> message)
public CallResult DoHandleMessage(SocketConnection connection, DataEvent<T> message)
{
var data = (T)message.Data;
_handler.Invoke(message.As(data));
_handler.Invoke(message);
return new CallResult(null);
}
public override Type GetMessageType(IMessageAccessor message) => typeof(T);
public override Query GetSubQuery(SocketConnection connection) => new TestChannelQuery(_channel, "subscribe", false, 1);
public override Query GetUnsubQuery() => new TestChannelQuery(_channel, "unsubscribe", false, 1);
}

View File

@ -15,6 +15,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ConsoleClient", "Examples\C
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SharedClients", "Examples\SharedClients\SharedClients.csproj", "{988A87EF-EAEA-4313-A6CF-FA869813D5AB}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "CryptoExchange.Net.Protobuf", "CryptoExchange.Net.Protobuf\CryptoExchange.Net.Protobuf.csproj", "{CC6A807A-9183-6F41-8EF1-8A70172B0E83}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@ -41,6 +43,10 @@ Global
{988A87EF-EAEA-4313-A6CF-FA869813D5AB}.Debug|Any CPU.Build.0 = Debug|Any CPU
{988A87EF-EAEA-4313-A6CF-FA869813D5AB}.Release|Any CPU.ActiveCfg = Release|Any CPU
{988A87EF-EAEA-4313-A6CF-FA869813D5AB}.Release|Any CPU.Build.0 = Release|Any CPU
{CC6A807A-9183-6F41-8EF1-8A70172B0E83}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{CC6A807A-9183-6F41-8EF1-8A70172B0E83}.Debug|Any CPU.Build.0 = Debug|Any CPU
{CC6A807A-9183-6F41-8EF1-8A70172B0E83}.Release|Any CPU.ActiveCfg = Release|Any CPU
{CC6A807A-9183-6F41-8EF1-8A70172B0E83}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE

View File

@ -82,6 +82,11 @@ namespace CryptoExchange.Net.Clients
/// </summary>
protected bool AllowTopicsOnTheSameConnection { get; set; } = true;
/// <summary>
/// Whether to continue processing and forward unparsable messages to handlers
/// </summary>
protected internal bool ProcessUnparsableMessages { get; set; } = false;
/// <inheritdoc />
public double IncomingKbps
{
@ -308,11 +313,10 @@ namespace CryptoExchange.Net.Clients
/// Send a query on a socket connection to the BaseAddress and wait for the response
/// </summary>
/// <typeparam name="THandlerResponse">Expected result type</typeparam>
/// <typeparam name="TServerResponse">The type returned to the caller</typeparam>
/// <param name="query">The query</param>
/// <param name="ct">Cancellation token</param>
/// <returns></returns>
protected virtual Task<CallResult<THandlerResponse>> QueryAsync<TServerResponse, THandlerResponse>(Query<TServerResponse, THandlerResponse> query, CancellationToken ct = default)
protected virtual Task<CallResult<THandlerResponse>> QueryAsync<THandlerResponse>(Query<THandlerResponse> query, CancellationToken ct = default)
{
return QueryAsync(BaseAddress, query, ct);
}
@ -321,12 +325,11 @@ namespace CryptoExchange.Net.Clients
/// Send a query on a socket connection and wait for the response
/// </summary>
/// <typeparam name="THandlerResponse">Expected result type</typeparam>
/// <typeparam name="TServerResponse">The type returned to the caller</typeparam>
/// <param name="url">The url for the request</param>
/// <param name="query">The query</param>
/// <param name="ct">Cancellation token</param>
/// <returns></returns>
protected virtual async Task<CallResult<THandlerResponse>> QueryAsync<TServerResponse, THandlerResponse>(string url, Query<TServerResponse, THandlerResponse> query, CancellationToken ct = default)
protected virtual async Task<CallResult<THandlerResponse>> QueryAsync<THandlerResponse>(string url, Query<THandlerResponse> query, CancellationToken ct = default)
{
if (_disposing)
return new CallResult<THandlerResponse>(new InvalidOperationError("Client disposed, can't query"));
@ -811,7 +814,7 @@ namespace CryptoExchange.Net.Clients
sb.AppendLine($"\t\t\tId: {subState.Id}");
sb.AppendLine($"\t\t\tConfirmed: {subState.Confirmed}");
sb.AppendLine($"\t\t\tInvocations: {subState.Invocations}");
sb.AppendLine($"\t\t\tIdentifiers: [{string.Join(",", subState.Identifiers)}]");
sb.AppendLine($"\t\t\tIdentifiers: [{subState.ListenMatcher.ToString()}]");
});
}
});

View File

@ -6,9 +6,9 @@
<PackageId>CryptoExchange.Net</PackageId>
<Authors>JKorf</Authors>
<Description>CryptoExchange.Net is a base library which is used to implement different cryptocurrency (exchange) API's. It provides a standardized way of implementing different API's, which results in a very similar experience for users of the API implementations.</Description>
<PackageVersion>9.2.0</PackageVersion>
<AssemblyVersion>9.2.0</AssemblyVersion>
<FileVersion>9.2.0</FileVersion>
<PackageVersion>9.3.0</PackageVersion>
<AssemblyVersion>9.3.0</AssemblyVersion>
<FileVersion>9.3.0</FileVersion>
<PackageRequireLicenseAcceptance>false</PackageRequireLicenseAcceptance>
<PackageTags>OKX;OKX.Net;Mexc;Mexc.Net;Kucoin;Kucoin.Net;Kraken;Kraken.Net;Huobi;Huobi.Net;CoinEx;CoinEx.Net;Bybit;Bybit.Net;Bitget;Bitget.Net;Bitfinex;Bitfinex.Net;Binance;Binance.Net;CryptoCurrency;CryptoCurrency Exchange</PackageTags>
<RepositoryType>git</RepositoryType>

View File

@ -348,22 +348,42 @@ namespace CryptoExchange.Net
/// </summary>
public static decimal? ParseDecimal(string? value)
{
if (string.IsNullOrEmpty(value) || string.Equals("null", value, StringComparison.OrdinalIgnoreCase))
// Value is null or empty is the most common case to return null so check before trying to parse
if (string.IsNullOrEmpty(value))
return null;
if (string.Equals("Infinity", value, StringComparison.Ordinal))
// Infinity returned by the server, default to max value
return decimal.MaxValue;
// Try parse, only fails for these reasons:
// 1. string is null or empty
// 2. value is larger or smaller than decimal max/min
// 3. unparsable format
if (decimal.TryParse(value, NumberStyles.Float, CultureInfo.InvariantCulture, out var decValue))
return decValue;
try
// Check for values which should be parsed to null
if (string.Equals("null", value, StringComparison.OrdinalIgnoreCase)
|| string.Equals("NaN", value, StringComparison.OrdinalIgnoreCase))
{
return decimal.Parse(value, NumberStyles.Float, CultureInfo.InvariantCulture);
return null;
}
catch (OverflowException)
// Infinity value should be parsed to min/max value
if (string.Equals("Infinity", value, StringComparison.OrdinalIgnoreCase))
return decimal.MaxValue;
else if(string.Equals("-Infinity", value, StringComparison.OrdinalIgnoreCase))
return decimal.MinValue;
if (value!.Length > 27 && decimal.TryParse(value.Substring(0, 27), out var overflowValue))
{
// Value doesn't fit decimal, default to max value
// Not a valid decimal value and more than 27 chars, from which the first part can be parsed correctly.
// assume overflow
if (overflowValue < 0)
return decimal.MinValue;
else
return decimal.MaxValue;
}
// Unknown decimal format, return null
return null;
}
}
}

View File

@ -17,22 +17,13 @@ namespace CryptoExchange.Net.Interfaces
/// </summary>
public int Id { get; }
/// <summary>
/// The identifiers for this processor
/// The matcher for this listener
/// </summary>
public HashSet<string> ListenerIdentifiers { get; }
public MessageMatcher MessageMatcher { get; }
/// <summary>
/// Handle a message
/// </summary>
/// <param name="connection"></param>
/// <param name="message"></param>
/// <returns></returns>
Task<CallResult> Handle(SocketConnection connection, DataEvent<object> message);
/// <summary>
/// Get the type the message should be deserialized to
/// </summary>
/// <param name="messageAccessor"></param>
/// <returns></returns>
Type? GetMessageType(IMessageAccessor messageAccessor);
Task<CallResult> Handle(SocketConnection connection, DataEvent<object> message, MessageHandlerLink matchedHandler);
/// <summary>
/// Deserialize a message into object of type
/// </summary>

View File

@ -18,7 +18,7 @@ namespace CryptoExchange.Net.Logging.Extensions
private static readonly Action<ILogger, int, string, Exception?> _failedToParse;
private static readonly Action<ILogger, int, string, Exception?> _failedToEvaluateMessage;
private static readonly Action<ILogger, int, Exception?> _errorProcessingMessage;
private static readonly Action<ILogger, int, int, string, Exception?> _processorMatched;
private static readonly Action<ILogger, int, string, string, Exception?> _processorMatched;
private static readonly Action<ILogger, int, int, Exception?> _receivedMessageNotRecognized;
private static readonly Action<ILogger, int, string?, Exception?> _failedToDeserializeMessage;
private static readonly Action<ILogger, int, string, Exception?> _userMessageProcessingFailed;
@ -92,11 +92,6 @@ namespace CryptoExchange.Net.Logging.Extensions
new EventId(2009, "ErrorProcessingMessage"),
"[Sckt {SocketId}] error processing message");
_processorMatched = LoggerMessage.Define<int, int, string>(
LogLevel.Trace,
new EventId(2010, "ProcessorMatched"),
"[Sckt {SocketId}] {Count} processor(s) matched to message with listener identifier {ListenerId}");
_receivedMessageNotRecognized = LoggerMessage.Define<int, int>(
LogLevel.Warning,
new EventId(2011, "ReceivedMessageNotRecognized"),
@ -190,7 +185,7 @@ namespace CryptoExchange.Net.Logging.Extensions
_receivedMessageNotMatchedToAnyListener = LoggerMessage.Define<int, string, string>(
LogLevel.Warning,
new EventId(2029, "ReceivedMessageNotMatchedToAnyListener"),
"[Sckt {SocketId}] received message not matched to any listener. ListenId: {ListenId}, current listeners: {ListenIds}");
"[Sckt {SocketId}] received message not matched to any listener. ListenId: {ListenId}, current listeners: [{ListenIds}]");
_failedToParse = LoggerMessage.Define<int, string>(
LogLevel.Warning,
@ -201,6 +196,12 @@ namespace CryptoExchange.Net.Logging.Extensions
LogLevel.Trace,
new EventId(2031, "SendingByteData"),
"[Sckt {SocketId}] [Req {RequestId}] sending byte message of length: {Length}");
_processorMatched = LoggerMessage.Define<int, string, string>(
LogLevel.Trace,
new EventId(2032, "ProcessorMatched"),
"[Sckt {SocketId}] listener '{ListenId}' matched to message with listener identifier {ListenerId}");
}
public static void ActivityPaused(this ILogger logger, int socketId, bool paused)
@ -256,9 +257,9 @@ namespace CryptoExchange.Net.Logging.Extensions
{
_errorProcessingMessage(logger, socketId, e);
}
public static void ProcessorMatched(this ILogger logger, int socketId, int count, string listenerId)
public static void ProcessorMatched(this ILogger logger, int socketId, string listener, string listenerId)
{
_processorMatched(logger, socketId, count, listenerId, null);
_processorMatched(logger, socketId, listener, listenerId, null);
}
public static void ReceivedMessageNotRecognized(this ILogger logger, int socketId, int id)
{

View File

@ -0,0 +1,180 @@
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Sockets;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Sockets
{
/// <summary>
/// Message link type
/// </summary>
public enum MessageLinkType
{
/// <summary>
/// Match when the listen id matches fully to the value
/// </summary>
Full,
/// <summary>
/// Match when the listen id starts with the value
/// </summary>
StartsWith
}
/// <summary>
/// Matches a message listen id to a specific listener
/// </summary>
public class MessageMatcher
{
/// <summary>
/// Linkers in this matcher
/// </summary>
public MessageHandlerLink[] HandlerLinks { get; }
/// <summary>
/// ctor
/// </summary>
private MessageMatcher(params MessageHandlerLink[] links)
{
HandlerLinks = links;
}
/// <summary>
/// Create message matcher
/// </summary>
public static MessageMatcher Create<T>(string value)
{
return new MessageMatcher(new MessageHandlerLink<T>(MessageLinkType.Full, value, (con, msg) => CallResult.SuccessResult));
}
/// <summary>
/// Create message matcher
/// </summary>
public static MessageMatcher Create<T>(string value, Func<SocketConnection, DataEvent<T>, CallResult> handler)
{
return new MessageMatcher(new MessageHandlerLink<T>(MessageLinkType.Full, value, handler));
}
/// <summary>
/// Create message matcher
/// </summary>
public static MessageMatcher Create<T>(IEnumerable<string> values, Func<SocketConnection, DataEvent<T>, CallResult> handler)
{
return new MessageMatcher(values.Select(x => new MessageHandlerLink<T>(MessageLinkType.Full, x, handler)).ToArray());
}
/// <summary>
/// Create message matcher
/// </summary>
public static MessageMatcher Create<T>(MessageLinkType type, string value, Func<SocketConnection, DataEvent<T>, CallResult> handler)
{
return new MessageMatcher(new MessageHandlerLink<T>(type, value, handler));
}
/// <summary>
/// Create message matcher
/// </summary>
public static MessageMatcher Create(params MessageHandlerLink[] linkers)
{
return new MessageMatcher(linkers);
}
/// <summary>
/// Whether this matcher contains a specific link
/// </summary>
public bool ContainsCheck(MessageHandlerLink link) => HandlerLinks.Any(x => x.Type == link.Type && x.Value == link.Value);
/// <summary>
/// Get any handler links matching with the listen id
/// </summary>
public List<MessageHandlerLink> GetHandlerLinks(string listenId) => HandlerLinks.Where(x => x.Check(listenId)).ToList();
/// <inheritdoc />
public override string ToString() => string.Join(",", HandlerLinks.Select(x => x.ToString()));
}
/// <summary>
/// Message handler link
/// </summary>
public abstract class MessageHandlerLink
{
/// <summary>
/// Type of check
/// </summary>
public MessageLinkType Type { get; }
/// <summary>
/// String value of the check
/// </summary>
public string Value { get; }
/// <summary>
/// Deserialization type
/// </summary>
public abstract Type GetDeserializationType(IMessageAccessor accessor);
/// <summary>
/// ctor
/// </summary>
public MessageHandlerLink(MessageLinkType type, string value)
{
Type = type;
Value = value;
}
/// <summary>
/// Whether this listen id matches this link
/// </summary>
public bool Check(string listenId)
{
if (Type == MessageLinkType.Full)
return Value.Equals(listenId, StringComparison.Ordinal);
return listenId.StartsWith(Value, StringComparison.Ordinal);
}
/// <summary>
/// Message handler
/// </summary>
public abstract CallResult Handle(SocketConnection connection, DataEvent<object> message);
/// <inheritdoc />
public override string ToString() => $"{Type} match for \"{Value}\"";
}
/// <summary>
/// Message handler link
/// </summary>
public class MessageHandlerLink<TServer>: MessageHandlerLink
{
private Func<SocketConnection, DataEvent<TServer>, CallResult> _handler;
/// <inheritdoc />
public override Type GetDeserializationType(IMessageAccessor accessor) => typeof(TServer);
/// <summary>
/// ctor
/// </summary>
public MessageHandlerLink(string value, Func<SocketConnection, DataEvent<TServer>, CallResult> handler)
: this(MessageLinkType.Full, value, handler)
{
}
/// <summary>
/// ctor
/// </summary>
public MessageHandlerLink(MessageLinkType type, string value, Func<SocketConnection, DataEvent<TServer>, CallResult> handler)
: base(type, value)
{
_handler = handler;
}
/// <inheritdoc />
public override CallResult Handle(SocketConnection connection, DataEvent<object> message)
{
return _handler(connection, message.As((TServer)message.Data));
}
}
}

View File

@ -3,6 +3,7 @@ using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Sockets;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
@ -60,9 +61,9 @@ namespace CryptoExchange.Net.Sockets
public AsyncResetEvent? ContinueAwaiter { get; set; }
/// <summary>
/// Strings to match this query to a received message
/// Matcher for this query
/// </summary>
public abstract HashSet<string> ListenerIdentifiers { get; set; }
public MessageMatcher MessageMatcher { get; set; } = null!;
/// <summary>
/// The query request object
@ -84,13 +85,6 @@ namespace CryptoExchange.Net.Sockets
/// </summary>
public bool ExpectsResponse { get; set; } = true;
/// <summary>
/// Get the type the message should be deserialized to
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
public abstract Type? GetMessageType(IMessageAccessor message);
/// <summary>
/// Wait event for response
/// </summary>
@ -161,23 +155,16 @@ namespace CryptoExchange.Net.Sockets
/// <summary>
/// Handle a response message
/// </summary>
/// <param name="message"></param>
/// <param name="connection"></param>
/// <returns></returns>
public abstract Task<CallResult> Handle(SocketConnection connection, DataEvent<object> message);
public abstract Task<CallResult> Handle(SocketConnection connection, DataEvent<object> message, MessageHandlerLink check);
}
/// <summary>
/// Query
/// </summary>
/// <typeparam name="TServerResponse">The type returned from the server</typeparam>
/// <typeparam name="THandlerResponse">The type to be returned to the caller</typeparam>
public abstract class Query<TServerResponse, THandlerResponse> : Query
public abstract class Query<THandlerResponse> : Query
{
/// <inheritdoc />
public override Type? GetMessageType(IMessageAccessor message) => typeof(TServerResponse);
/// <summary>
/// The typed call result
/// </summary>
@ -194,10 +181,9 @@ namespace CryptoExchange.Net.Sockets
}
/// <inheritdoc />
public override async Task<CallResult> Handle(SocketConnection connection, DataEvent<object> message)
public override async Task<CallResult> Handle(SocketConnection connection, DataEvent<object> message, MessageHandlerLink check)
{
var typedMessage = message.As((TServerResponse)message.Data);
if (!ValidateMessage(typedMessage))
if (!PreCheckMessage(message))
return CallResult.SuccessResult;
CurrentResponses++;
@ -209,7 +195,7 @@ namespace CryptoExchange.Net.Sockets
if (Result?.Success != false)
// If an error result is already set don't override that
Result = HandleMessage(connection, typedMessage);
Result = check.Handle(connection, message);
if (CurrentResponses == RequiredResponses)
{
@ -226,15 +212,7 @@ namespace CryptoExchange.Net.Sockets
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
public virtual bool ValidateMessage(DataEvent<TServerResponse> message) => true;
/// <summary>
/// Handle the query response
/// </summary>
/// <param name="connection"></param>
/// <param name="message"></param>
/// <returns></returns>
public abstract CallResult<THandlerResponse> HandleMessage(SocketConnection connection, DataEvent<TServerResponse> message);
public virtual bool PreCheckMessage(DataEvent<object> message) => true;
/// <inheritdoc />
public override void Timeout()
@ -257,29 +235,4 @@ namespace CryptoExchange.Net.Sockets
_event.Set();
}
}
/// <summary>
/// Query
/// </summary>
/// <typeparam name="TResponse">Response object type</typeparam>
public abstract class Query<TResponse> : Query<TResponse, TResponse>
{
/// <summary>
/// ctor
/// </summary>
/// <param name="request"></param>
/// <param name="authenticated"></param>
/// <param name="weight"></param>
protected Query(object request, bool authenticated, int weight = 1) : base(request, authenticated, weight)
{
}
/// <summary>
/// Handle the query response
/// </summary>
/// <param name="connection"></param>
/// <param name="message"></param>
/// <returns></returns>
public override CallResult<TResponse> HandleMessage(SocketConnection connection, DataEvent<TResponse> message) => message.ToCallResult();
}
}

View File

@ -11,8 +11,6 @@ using System.Diagnostics;
using CryptoExchange.Net.Clients;
using CryptoExchange.Net.Logging.Extensions;
using System.Threading;
using CryptoExchange.Net.Objects.Options;
using CryptoExchange.Net.Authentication;
namespace CryptoExchange.Net.Sockets
{
@ -229,6 +227,11 @@ namespace CryptoExchange.Net.Sockets
/// </summary>
private readonly IWebsocket _socket;
/// <summary>
/// Cache for deserialization, only caches for a single message
/// </summary>
private readonly Dictionary<Type, object> _deserializationCache = new Dictionary<Type, object>();
/// <summary>
/// New socket connection
/// </summary>
@ -446,9 +449,6 @@ namespace CryptoExchange.Net.Sockets
/// <summary>
/// Handle a message
/// </summary>
/// <param name="data"></param>
/// <param name="type"></param>
/// <returns></returns>
protected virtual async Task HandleStreamMessage(WebSocketMessageType type, ReadOnlyMemory<byte> data)
{
var sw = Stopwatch.StartNew();
@ -475,7 +475,7 @@ namespace CryptoExchange.Net.Sockets
_logger.ReceivedData(SocketId, originalData);
}
if (!accessor.IsValid)
if (!accessor.IsValid && !ApiClient.ProcessUnparsableMessages)
{
_logger.FailedToParse(SocketId, result.Error!.Message);
return;
@ -485,7 +485,7 @@ namespace CryptoExchange.Net.Sockets
var listenId = ApiClient.GetListenerIdentifier(accessor);
if (listenId == null)
{
originalData = outputOriginalData ? accessor.GetOriginalString() : "[OutputOriginalData is false]";
originalData ??= "[OutputOriginalData is false]";
if (!ApiClient.UnhandledMessageExpected)
_logger.FailedToEvaluateMessage(SocketId, originalData);
@ -493,38 +493,22 @@ namespace CryptoExchange.Net.Sockets
return;
}
// 4. Get the listeners interested in this message
List<IMessageProcessor> processors;
lock (_listenersLock)
processors = _listeners.Where(s => s.ListenerIdentifiers.Contains(listenId)).ToList();
if (processors.Count == 0)
{
if (!ApiClient.UnhandledMessageExpected)
{
List<string> listenerIds;
lock (_listenersLock)
listenerIds = _listeners.SelectMany(l => l.ListenerIdentifiers).ToList();
_logger.ReceivedMessageNotMatchedToAnyListener(SocketId, listenId, string.Join(",", listenerIds));
UnhandledMessage?.Invoke(accessor);
}
return;
}
_logger.ProcessorMatched(SocketId, processors.Count, listenId);
bool processed = false;
var totalUserTime = 0;
Dictionary<Type, object>? desCache = null;
if (processors.Count > 1)
{
// Only instantiate a cache if there are multiple processors
desCache = new Dictionary<Type, object>();
}
foreach (var processor in processors)
List<IMessageProcessor> localListeners;
lock(_listenersLock)
localListeners = _listeners.ToList();
foreach(var processor in localListeners)
{
// 5. Determine the type to deserialize to for this processor
var messageType = processor.GetMessageType(accessor);
foreach(var listener in processor.MessageMatcher.GetHandlerLinks(listenId))
{
processed = true;
_logger.ProcessorMatched(SocketId, listener.ToString(), listenId);
// 4. Determine the type to deserialize to for this processor
var messageType = listener.GetDeserializationType(accessor);
if (messageType == null)
{
_logger.ReceivedMessageNotRecognized(SocketId, processor.Id);
@ -538,9 +522,8 @@ namespace CryptoExchange.Net.Sockets
// This doesn't trigger a waiting subscribe query, should probably also somehow set the wait event for that
}
// 6. Deserialize the message
object? deserialized = null;
desCache?.TryGetValue(messageType, out deserialized);
// 5. Deserialize the message
_deserializationCache.TryGetValue(messageType, out var deserialized);
if (deserialized == null)
{
@ -550,15 +533,16 @@ namespace CryptoExchange.Net.Sockets
_logger.FailedToDeserializeMessage(SocketId, desResult.Error?.ToString(), desResult.Error?.Exception);
continue;
}
deserialized = desResult.Data;
desCache?.Add(messageType, deserialized);
_deserializationCache.Add(messageType, deserialized);
}
// 7. Hand of the message to the subscription
// 6. Pass the message to the handler
try
{
var innerSw = Stopwatch.StartNew();
await processor.Handle(this, new DataEvent<object>(deserialized, null, null, originalData, receiveTime, null)).ConfigureAwait(false);
await processor.Handle(this, new DataEvent<object>(deserialized, null, null, originalData, receiveTime, null), listener).ConfigureAwait(false);
if (processor is Query query && query.RequiredResponses != 1)
_logger.LogDebug($"[Sckt {SocketId}] [Req {query.Id}] responses: {query.CurrentResponses}/{query.RequiredResponses}");
totalUserTime += (int)innerSw.ElapsedMilliseconds;
@ -569,12 +553,30 @@ namespace CryptoExchange.Net.Sockets
if (processor is Subscription subscription)
subscription.InvokeExceptionHandler(ex);
}
}
}
if (!processed)
{
if (!ApiClient.UnhandledMessageExpected)
{
List<string> listenerIds;
lock (_listenersLock)
listenerIds = _listeners.Select(l => l.MessageMatcher.ToString()).ToList();
_logger.ReceivedMessageNotMatchedToAnyListener(SocketId, listenId, string.Join(",", listenerIds));
UnhandledMessage?.Invoke(accessor);
}
return;
}
_logger.MessageProcessed(SocketId, sw.ElapsedMilliseconds, sw.ElapsedMilliseconds - totalUserTime);
}
finally
{
_deserializationCache.Clear();
accessor.Clear();
}
}
@ -654,7 +656,7 @@ namespace CryptoExchange.Net.Sockets
bool anyDuplicateSubscription;
lock (_listenersLock)
anyDuplicateSubscription = _listeners.OfType<Subscription>().Any(x => x != subscription && x.ListenerIdentifiers.All(l => subscription.ListenerIdentifiers.Contains(l)));
anyDuplicateSubscription = _listeners.OfType<Subscription>().Any(x => x != subscription && x.MessageMatcher.HandlerLinks.All(l => subscription.MessageMatcher.ContainsCheck(l)));
bool shouldCloseConnection;
lock (_listenersLock)
@ -770,12 +772,11 @@ namespace CryptoExchange.Net.Sockets
/// Send a query request and wait for an answer
/// </summary>
/// <typeparam name="THandlerResponse">Expected result type</typeparam>
/// <typeparam name="TServerResponse">The type returned to the caller</typeparam>
/// <param name="query">Query to send</param>
/// <param name="continueEvent">Wait event for when the socket message handler can continue</param>
/// <param name="ct">Cancellation token</param>
/// <returns></returns>
public virtual async Task<CallResult<THandlerResponse>> SendAndWaitQueryAsync<TServerResponse, THandlerResponse>(Query<TServerResponse, THandlerResponse> query, AsyncResetEvent? continueEvent = null, CancellationToken ct = default)
public virtual async Task<CallResult<THandlerResponse>> SendAndWaitQueryAsync<THandlerResponse>(Query<THandlerResponse> query, AsyncResetEvent? continueEvent = null, CancellationToken ct = default)
{
await SendAndWaitIntAsync(query, continueEvent, ct).ConfigureAwait(false);
return query.TypedResult ?? new CallResult<THandlerResponse>(new ServerError("Timeout"));

View File

@ -4,6 +4,7 @@ using CryptoExchange.Net.Objects.Sockets;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
@ -60,9 +61,9 @@ namespace CryptoExchange.Net.Sockets
public bool Authenticated { get; }
/// <summary>
/// Strings to match this subscription to a received message
/// Matcher for this subscription
/// </summary>
public abstract HashSet<string> ListenerIdentifiers { get; set; }
public MessageMatcher MessageMatcher { get; set; } = null!;
/// <summary>
/// Cancellation token registration
@ -74,13 +75,6 @@ namespace CryptoExchange.Net.Sockets
/// </summary>
public event Action<Exception>? Exception;
/// <summary>
/// Get the deserialization type for this message
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
public abstract Type? GetMessageType(IMessageAccessor message);
/// <summary>
/// Subscription topic
/// </summary>
@ -89,9 +83,6 @@ namespace CryptoExchange.Net.Sockets
/// <summary>
/// ctor
/// </summary>
/// <param name="logger"></param>
/// <param name="authenticated"></param>
/// <param name="userSubscription"></param>
public Subscription(ILogger logger, bool authenticated, bool userSubscription = true)
{
_logger = logger;
@ -130,14 +121,11 @@ namespace CryptoExchange.Net.Sockets
/// <summary>
/// Handle an update message
/// </summary>
/// <param name="connection"></param>
/// <param name="message"></param>
/// <returns></returns>
public Task<CallResult> Handle(SocketConnection connection, DataEvent<object> message)
public Task<CallResult> Handle(SocketConnection connection, DataEvent<object> message, MessageHandlerLink matcher)
{
ConnectionInvocations++;
TotalInvocations++;
return Task.FromResult(DoHandleMessage(connection, message));
return Task.FromResult(matcher.Handle(connection, message));
}
/// <summary>
@ -154,14 +142,6 @@ namespace CryptoExchange.Net.Sockets
/// </summary>
public virtual void DoHandleReset() { }
/// <summary>
/// Handle the update message
/// </summary>
/// <param name="connection"></param>
/// <param name="message"></param>
/// <returns></returns>
public abstract CallResult DoHandleMessage(SocketConnection connection, DataEvent<object> message);
/// <summary>
/// Invoke the exception event
/// </summary>
@ -177,12 +157,12 @@ namespace CryptoExchange.Net.Sockets
/// <param name="Id">The id of the subscription</param>
/// <param name="Confirmed">True when the subscription query is handled (either accepted or rejected)</param>
/// <param name="Invocations">Number of times this subscription got a message</param>
/// <param name="Identifiers">Identifiers the subscription is listening to</param>
/// <param name="ListenMatcher">Matcher for this subscription</param>
public record SubscriptionState(
int Id,
bool Confirmed,
int Invocations,
HashSet<string> Identifiers
MessageMatcher ListenMatcher
);
/// <summary>
@ -191,7 +171,7 @@ namespace CryptoExchange.Net.Sockets
/// <returns></returns>
public SubscriptionState GetState()
{
return new SubscriptionState(Id, Confirmed, TotalInvocations, ListenerIdentifiers);
return new SubscriptionState(Id, Confirmed, TotalInvocations, MessageMatcher);
}
}

View File

@ -27,32 +27,4 @@ namespace CryptoExchange.Net.Sockets
/// <inheritdoc />
public override Query? GetUnsubQuery() => null;
}
/// <inheritdoc />
public abstract class SystemSubscription<T> : SystemSubscription
{
/// <inheritdoc />
public override Type GetMessageType(IMessageAccessor message) => typeof(T);
/// <inheritdoc />
public override CallResult DoHandleMessage(SocketConnection connection, DataEvent<object> message)
=> HandleMessage(connection, message.As((T)message.Data));
/// <summary>
/// ctor
/// </summary>
/// <param name="logger"></param>
/// <param name="authenticated"></param>
protected SystemSubscription(ILogger logger, bool authenticated) : base(logger, authenticated)
{
}
/// <summary>
/// Handle an update message
/// </summary>
/// <param name="connection"></param>
/// <param name="message"></param>
/// <returns></returns>
public abstract CallResult HandleMessage(SocketConnection connection, DataEvent<T> message);
}
}

View File

@ -386,7 +386,7 @@ namespace CryptoExchange.Net.Testing.Comparers
var stringValue = jsonValue.GetString();
if (objectValue is decimal dec)
{
if (decimal.Parse(stringValue!, CultureInfo.InvariantCulture) != dec)
if (ExchangeHelpers.ParseDecimal(stringValue!) != dec)
throw new Exception($"{method}: {property} not equal: {stringValue} vs {dec}");
}
else if (objectValue is DateTime time)

View File

@ -176,18 +176,32 @@ namespace CryptoExchange.Net.Trackers.Klines
Status = SyncStatus.Syncing;
_logger.KlineTrackerStarting(SymbolName);
var subResult = await _socketClient.SubscribeToKlineUpdatesAsync(new SubscribeKlineRequest(Symbol, _interval),
update =>
{
AddOrUpdate(update.Data);
}).ConfigureAwait(false);
if (!subResult)
{
_logger.KlineTrackerStartFailed(SymbolName, subResult.Error!.Message, subResult.Error.Exception);
Status = SyncStatus.Disconnected;
return subResult;
}
_updateSubscription = subResult.Data;
_updateSubscription.ConnectionLost += HandleConnectionLost;
_updateSubscription.ConnectionClosed += HandleConnectionClosed;
_updateSubscription.ConnectionRestored += HandleConnectionRestored;
var startResult = await DoStartAsync().ConfigureAwait(false);
if (!startResult)
{
_logger.KlineTrackerStartFailed(SymbolName, startResult.Error!.Message, startResult.Error.Exception);
_ = subResult.Data.CloseAsync();
Status = SyncStatus.Disconnected;
return new CallResult(startResult.Error!);
}
_updateSubscription = startResult.Data;
_updateSubscription.ConnectionLost += HandleConnectionLost;
_updateSubscription.ConnectionClosed += HandleConnectionClosed;
_updateSubscription.ConnectionRestored += HandleConnectionRestored;
Status = SyncStatus.Synced;
_logger.KlineTrackerStarted(SymbolName);
return CallResult.SuccessResult;
@ -208,22 +222,10 @@ namespace CryptoExchange.Net.Trackers.Klines
/// The start procedure needed for kline syncing, generally subscribing to an update stream and requesting the snapshot
/// </summary>
/// <returns></returns>
protected virtual async Task<CallResult<UpdateSubscription>> DoStartAsync()
protected virtual async Task<CallResult> DoStartAsync()
{
var subResult = await _socketClient.SubscribeToKlineUpdatesAsync(new SubscribeKlineRequest(Symbol, _interval),
update =>
{
AddOrUpdate(update.Data);
}).ConfigureAwait(false);
if (!subResult)
{
Status = SyncStatus.Disconnected;
return subResult;
}
if (!_startWithSnapshot)
return subResult;
return CallResult.SuccessResult;
var startTime = Period == null ? (DateTime?)null : DateTime.UtcNow.Add(-Period.Value);
if (_restClient.GetKlinesOptions.MaxAge != null && DateTime.UtcNow.Add(-_restClient.GetKlinesOptions.MaxAge.Value) > startTime)
@ -236,11 +238,7 @@ namespace CryptoExchange.Net.Trackers.Klines
await foreach (var result in ExchangeHelpers.ExecutePages(_restClient.GetKlinesAsync, request).ConfigureAwait(false))
{
if (!result)
{
_ = subResult.Data.CloseAsync();
Status = SyncStatus.Disconnected;
return subResult.AsError<UpdateSubscription>(result.Error!);
}
return result;
if (Limit != null && data.Count > Limit)
break;
@ -249,7 +247,7 @@ namespace CryptoExchange.Net.Trackers.Klines
}
SetInitialData(data);
return subResult;
return CallResult.SuccessResult;
}
/// <summary>

View File

@ -199,7 +199,12 @@ namespace CryptoExchange.Net.Trackers.Trades
_startWithSnapshot = startWithSnapshot;
Status = SyncStatus.Syncing;
_logger.TradeTrackerStarting(SymbolName);
var subResult = await DoStartAsync().ConfigureAwait(false);
var subResult = await _socketClient.SubscribeToTradeUpdatesAsync(new SubscribeTradeRequest(Symbol),
update =>
{
AddData(update.Data);
}).ConfigureAwait(false);
if (!subResult)
{
_logger.TradeTrackerStartFailed(SymbolName, subResult.Error!.Message, subResult.Error.Exception);
@ -211,6 +216,15 @@ namespace CryptoExchange.Net.Trackers.Trades
_updateSubscription.ConnectionLost += HandleConnectionLost;
_updateSubscription.ConnectionClosed += HandleConnectionClosed;
_updateSubscription.ConnectionRestored += HandleConnectionRestored;
var result = await DoStartAsync().ConfigureAwait(false);
if (!result)
{
_ = subResult.Data.CloseAsync();
Status = SyncStatus.Disconnected;
return result;
}
SetSyncStatus();
_logger.TradeTrackerStarted(SymbolName);
return CallResult.SuccessResult;
@ -231,22 +245,10 @@ namespace CryptoExchange.Net.Trackers.Trades
/// The start procedure needed for trade syncing, generally subscribing to an update stream and requesting the snapshot
/// </summary>
/// <returns></returns>
protected virtual async Task<CallResult<UpdateSubscription>> DoStartAsync()
protected virtual async Task<CallResult> DoStartAsync()
{
var subResult = await _socketClient.SubscribeToTradeUpdatesAsync(new SubscribeTradeRequest(Symbol),
update =>
{
AddData(update.Data);
}).ConfigureAwait(false);
if (!subResult)
{
Status = SyncStatus.Disconnected;
return subResult;
}
if (!_startWithSnapshot)
return subResult;
return CallResult.SuccessResult;
if (_historyRestClient != null)
{
@ -256,11 +258,7 @@ namespace CryptoExchange.Net.Trackers.Trades
await foreach(var result in ExchangeHelpers.ExecutePages(_historyRestClient.GetTradeHistoryAsync, request).ConfigureAwait(false))
{
if (!result)
{
_ = subResult.Data.CloseAsync();
Status = SyncStatus.Disconnected;
return subResult.AsError<UpdateSubscription>(result.Error!);
}
return result;
if (Limit != null && data.Count > Limit)
break;
@ -279,15 +277,13 @@ namespace CryptoExchange.Net.Trackers.Trades
var snapshot = await _recentRestClient.GetRecentTradesAsync(new GetRecentTradesRequest(Symbol, limit)).ConfigureAwait(false);
if (!snapshot)
{
_ = subResult.Data.CloseAsync();
Status = SyncStatus.Disconnected;
return subResult.AsError<UpdateSubscription>(snapshot.Error!);
return snapshot;
}
SetInitialData(snapshot.Data);
}
return subResult;
return CallResult.SuccessResult;
}
/// <summary>

View File

@ -58,6 +58,14 @@ Make a one time donation in a crypto currency of your choice. If you prefer to d
Alternatively, sponsor me on Github using [Github Sponsors](https://github.com/sponsors/JKorf).
## Release notes
* Version 9.3.0 - 23 Jul 2025
* Updated websocket message to listener matching logic to be more flexible
* Updated decimal parser to support "NaN" and "-Infinity" strings, added check for negative overflow value, improved performance in most cases
* Version 9.2.1 - 16 Jul 2025
* Added setting for whether or not to process unparsable websocket messages
* Fixed issue causing duplicate subscriptions and data in the TradeTracker and KlineTracker when websocket connection was reconnected
* Version 9.2.0 - 14 Jul 2025
* Added support for sending byte data on websocket
* Added support for handling both string and byte data with different IMessageAccessor types