mirror of
https://github.com/JKorf/CryptoExchange.Net
synced 2025-07-23 09:55:48 +00:00
Compare commits
12 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
122a6cad43 | ||
|
4c0e841425 | ||
|
92f5839aec | ||
|
30475dae67 | ||
|
3d942bd503 | ||
|
f739520e52 | ||
|
0152603ddb | ||
|
aa06e0eead | ||
|
2fde9a285e | ||
|
b9f6eb6abb | ||
|
d77c4354a6 | ||
|
21860ddf85 |
@ -1,16 +1,14 @@
|
|||||||
<Project Sdk="Microsoft.NET.Sdk">
|
<Project Sdk="Microsoft.NET.Sdk">
|
||||||
|
|
||||||
<PropertyGroup>
|
<PropertyGroup>
|
||||||
<TargetFrameworks>netstandard2.0;netstandard2.1;net8.0;net9.0</TargetFrameworks>
|
<TargetFrameworks>netstandard2.0;netstandard2.1;net8.0;net9.0</TargetFrameworks>
|
||||||
</PropertyGroup>
|
</PropertyGroup>
|
||||||
|
|
||||||
<PropertyGroup>
|
<PropertyGroup>
|
||||||
<PackageId>CryptoExchange.Net.Protobuf</PackageId>
|
<PackageId>CryptoExchange.Net.Protobuf</PackageId>
|
||||||
<Authors>JKorf</Authors>
|
<Authors>JKorf</Authors>
|
||||||
<Description>Protobuf support for CryptoExchange.Net</Description>
|
<Description>Protobuf support for CryptoExchange.Net</Description>
|
||||||
<PackageVersion>9.1.0</PackageVersion>
|
<PackageVersion>9.3.0</PackageVersion>
|
||||||
<AssemblyVersion>9.1.0</AssemblyVersion>
|
<AssemblyVersion>9.3.0</AssemblyVersion>
|
||||||
<FileVersion>9.1.0</FileVersion>
|
<FileVersion>9.3.0</FileVersion>
|
||||||
<PackageRequireLicenseAcceptance>false</PackageRequireLicenseAcceptance>
|
<PackageRequireLicenseAcceptance>false</PackageRequireLicenseAcceptance>
|
||||||
<PackageTags>CryptoExchange;CryptoExchange.Net</PackageTags>
|
<PackageTags>CryptoExchange;CryptoExchange.Net</PackageTags>
|
||||||
<RepositoryType>git</RepositoryType>
|
<RepositoryType>git</RepositoryType>
|
||||||
@ -43,11 +41,7 @@
|
|||||||
<DocumentationFile>CryptoExchange.Net.Protobuf.xml</DocumentationFile>
|
<DocumentationFile>CryptoExchange.Net.Protobuf.xml</DocumentationFile>
|
||||||
</PropertyGroup>
|
</PropertyGroup>
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
|
<PackageReference Include="CryptoExchange.Net" Version="9.3.0" />
|
||||||
<PackageReference Include="protobuf-net" Version="3.2.52" />
|
<PackageReference Include="protobuf-net" Version="3.2.52" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
<ItemGroup>
|
|
||||||
<ProjectReference Include="..\CryptoExchange.Net\CryptoExchange.Net.csproj" />
|
|
||||||
</ItemGroup>
|
|
||||||
|
|
||||||
</Project>
|
</Project>
|
@ -53,7 +53,6 @@
|
|||||||
</member>
|
</member>
|
||||||
<member name="M:CryptoExchange.Net.Converters.Protobuf.ProtobufMessageAccessor`1.Deserialize(System.Type,System.Nullable{CryptoExchange.Net.Converters.MessageParsing.MessagePath})">
|
<member name="M:CryptoExchange.Net.Converters.Protobuf.ProtobufMessageAccessor`1.Deserialize(System.Type,System.Nullable{CryptoExchange.Net.Converters.MessageParsing.MessagePath})">
|
||||||
<inheritdoc />
|
<inheritdoc />
|
||||||
|
|
||||||
</member>
|
</member>
|
||||||
<member name="M:CryptoExchange.Net.Converters.Protobuf.ProtobufMessageAccessor`1.Deserialize``1(System.Nullable{CryptoExchange.Net.Converters.MessageParsing.MessagePath})">
|
<member name="M:CryptoExchange.Net.Converters.Protobuf.ProtobufMessageAccessor`1.Deserialize``1(System.Nullable{CryptoExchange.Net.Converters.MessageParsing.MessagePath})">
|
||||||
<inheritdoc />
|
<inheritdoc />
|
||||||
|
@ -1,7 +1,12 @@
|
|||||||
#  CryptoExchange.Net.Proto
|
#  CryptoExchange.Net.Proto
|
||||||
|
|
||||||
[](https://github.com/JKorf/CryptoExchange.NetProtobuf/actions/workflows/dotnet.yml) [](https://www.nuget.org/packages/CryptoExchange.NetProtobuf) 
|
[](https://github.com/JKorf/CryptoExchange.Net/actions/workflows/dotnet.yml) [](https://www.nuget.org/packages/CryptoExchange.Net.Protobuf) 
|
||||||
|
|
||||||
Protobuf support for CryptoExchange.Net.
|
Protobuf support for CryptoExchange.Net.
|
||||||
|
|
||||||
## Release notes
|
## Release notes
|
||||||
|
* Version 9.3.0 - 23 Jul 2025
|
||||||
|
* Updated CryptoExchange.Net to version 9.3.0, see https://github.com/JKorf/CryptoExchange.Net/releases/
|
||||||
|
|
||||||
|
* Version 9.2.0 - 14 Jul 2025
|
||||||
|
* Initial release
|
||||||
|
@ -224,13 +224,17 @@ namespace CryptoExchange.Net.UnitTests
|
|||||||
[TestCase(null, null)]
|
[TestCase(null, null)]
|
||||||
[TestCase("", null)]
|
[TestCase("", null)]
|
||||||
[TestCase("null", null)]
|
[TestCase("null", null)]
|
||||||
|
[TestCase("nan", null)]
|
||||||
[TestCase("1E+2", 100)]
|
[TestCase("1E+2", 100)]
|
||||||
[TestCase("1E-2", 0.01)]
|
[TestCase("1E-2", 0.01)]
|
||||||
[TestCase("80228162514264337593543950335", -999)] // -999 is workaround for not being able to specify decimal.MaxValue
|
[TestCase("Infinity", 999)] // 999 is workaround for not being able to specify decimal.MinValue
|
||||||
|
[TestCase("-Infinity", -999)] // -999 is workaround for not being able to specify decimal.MaxValue
|
||||||
|
[TestCase("80228162514264337593543950335", 999)] // 999 is workaround for not being able to specify decimal.MaxValue
|
||||||
|
[TestCase("-80228162514264337593543950335", -999)] // -999 is workaround for not being able to specify decimal.MaxValue
|
||||||
public void TestDecimalConverterString(string value, decimal? expected)
|
public void TestDecimalConverterString(string value, decimal? expected)
|
||||||
{
|
{
|
||||||
var result = JsonSerializer.Deserialize<STJDecimalObject>("{ \"test\": \""+ value + "\"}");
|
var result = JsonSerializer.Deserialize<STJDecimalObject>("{ \"test\": \""+ value + "\"}");
|
||||||
Assert.That(result.Test, Is.EqualTo(expected == -999 ? decimal.MaxValue : expected));
|
Assert.That(result.Test, Is.EqualTo(expected == -999 ? decimal.MinValue : expected == 999 ? decimal.MaxValue: expected));
|
||||||
}
|
}
|
||||||
|
|
||||||
[TestCase("1", 1)]
|
[TestCase("1", 1)]
|
||||||
|
@ -31,21 +31,19 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations.Sockets
|
|||||||
|
|
||||||
internal class TestChannelQuery : Query<SubResponse>
|
internal class TestChannelQuery : Query<SubResponse>
|
||||||
{
|
{
|
||||||
public override HashSet<string> ListenerIdentifiers { get; set; }
|
|
||||||
|
|
||||||
public TestChannelQuery(string channel, string request, bool authenticated, int weight = 1) : base(request, authenticated, weight)
|
public TestChannelQuery(string channel, string request, bool authenticated, int weight = 1) : base(request, authenticated, weight)
|
||||||
{
|
{
|
||||||
ListenerIdentifiers = new HashSet<string> { request + "-" + channel };
|
MessageMatcher = MessageMatcher.Create<SubResponse>(request + "-" + channel, HandleMessage);
|
||||||
}
|
}
|
||||||
|
|
||||||
public override CallResult<SubResponse> HandleMessage(SocketConnection connection, DataEvent<SubResponse> message)
|
public CallResult<SubResponse> HandleMessage(SocketConnection connection, DataEvent<SubResponse> message)
|
||||||
{
|
{
|
||||||
if (!message.Data.Status.Equals("confirmed", StringComparison.OrdinalIgnoreCase))
|
if (!message.Data.Status.Equals("confirmed", StringComparison.OrdinalIgnoreCase))
|
||||||
{
|
{
|
||||||
return new CallResult<SubResponse>(new ServerError(message.Data.Status));
|
return new CallResult<SubResponse>(new ServerError(message.Data.Status));
|
||||||
}
|
}
|
||||||
|
|
||||||
return base.HandleMessage(connection, message);
|
return message.ToCallResult();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -9,11 +9,9 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations.Sockets
|
|||||||
{
|
{
|
||||||
internal class TestQuery : Query<object>
|
internal class TestQuery : Query<object>
|
||||||
{
|
{
|
||||||
public override HashSet<string> ListenerIdentifiers { get; set; }
|
|
||||||
|
|
||||||
public TestQuery(string identifier, object request, bool authenticated, int weight = 1) : base(request, authenticated, weight)
|
public TestQuery(string identifier, object request, bool authenticated, int weight = 1) : base(request, authenticated, weight)
|
||||||
{
|
{
|
||||||
ListenerIdentifiers = new HashSet<string> { identifier };
|
MessageMatcher = MessageMatcher.Create<object>(identifier);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -15,21 +15,19 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations.Sockets
|
|||||||
{
|
{
|
||||||
private readonly Action<DataEvent<T>> _handler;
|
private readonly Action<DataEvent<T>> _handler;
|
||||||
|
|
||||||
public override HashSet<string> ListenerIdentifiers { get; set; } = new HashSet<string> { "update-topic" };
|
|
||||||
|
|
||||||
public TestSubscription(ILogger logger, Action<DataEvent<T>> handler) : base(logger, false)
|
public TestSubscription(ILogger logger, Action<DataEvent<T>> handler) : base(logger, false)
|
||||||
{
|
{
|
||||||
_handler = handler;
|
_handler = handler;
|
||||||
|
|
||||||
|
MessageMatcher = MessageMatcher.Create<T>("update-topic", DoHandleMessage);
|
||||||
}
|
}
|
||||||
|
|
||||||
public override CallResult DoHandleMessage(SocketConnection connection, DataEvent<object> message)
|
public CallResult DoHandleMessage(SocketConnection connection, DataEvent<T> message)
|
||||||
{
|
{
|
||||||
var data = (T)message.Data;
|
_handler.Invoke(message);
|
||||||
_handler.Invoke(message.As(data));
|
|
||||||
return new CallResult(null);
|
return new CallResult(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public override Type GetMessageType(IMessageAccessor message) => typeof(T);
|
|
||||||
public override Query GetSubQuery(SocketConnection connection) => new TestQuery("sub", new object(), false, 1);
|
public override Query GetSubQuery(SocketConnection connection) => new TestQuery("sub", new object(), false, 1);
|
||||||
public override Query GetUnsubQuery() => new TestQuery("unsub", new object(), false, 1);
|
public override Query GetUnsubQuery() => new TestQuery("unsub", new object(), false, 1);
|
||||||
}
|
}
|
||||||
|
@ -15,23 +15,19 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations.Sockets
|
|||||||
private readonly Action<DataEvent<T>> _handler;
|
private readonly Action<DataEvent<T>> _handler;
|
||||||
private readonly string _channel;
|
private readonly string _channel;
|
||||||
|
|
||||||
public override HashSet<string> ListenerIdentifiers { get; set; }
|
|
||||||
|
|
||||||
public TestSubscriptionWithResponseCheck(string channel, Action<DataEvent<T>> handler) : base(Mock.Of<ILogger>(), false)
|
public TestSubscriptionWithResponseCheck(string channel, Action<DataEvent<T>> handler) : base(Mock.Of<ILogger>(), false)
|
||||||
{
|
{
|
||||||
ListenerIdentifiers = new HashSet<string>() { channel };
|
MessageMatcher = MessageMatcher.Create<T>(channel, DoHandleMessage);
|
||||||
_handler = handler;
|
_handler = handler;
|
||||||
_channel = channel;
|
_channel = channel;
|
||||||
}
|
}
|
||||||
|
|
||||||
public override CallResult DoHandleMessage(SocketConnection connection, DataEvent<object> message)
|
public CallResult DoHandleMessage(SocketConnection connection, DataEvent<T> message)
|
||||||
{
|
{
|
||||||
var data = (T)message.Data;
|
_handler.Invoke(message);
|
||||||
_handler.Invoke(message.As(data));
|
|
||||||
return new CallResult(null);
|
return new CallResult(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public override Type GetMessageType(IMessageAccessor message) => typeof(T);
|
|
||||||
public override Query GetSubQuery(SocketConnection connection) => new TestChannelQuery(_channel, "subscribe", false, 1);
|
public override Query GetSubQuery(SocketConnection connection) => new TestChannelQuery(_channel, "subscribe", false, 1);
|
||||||
public override Query GetUnsubQuery() => new TestChannelQuery(_channel, "unsubscribe", false, 1);
|
public override Query GetUnsubQuery() => new TestChannelQuery(_channel, "unsubscribe", false, 1);
|
||||||
}
|
}
|
||||||
|
@ -15,6 +15,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ConsoleClient", "Examples\C
|
|||||||
EndProject
|
EndProject
|
||||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SharedClients", "Examples\SharedClients\SharedClients.csproj", "{988A87EF-EAEA-4313-A6CF-FA869813D5AB}"
|
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SharedClients", "Examples\SharedClients\SharedClients.csproj", "{988A87EF-EAEA-4313-A6CF-FA869813D5AB}"
|
||||||
EndProject
|
EndProject
|
||||||
|
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "CryptoExchange.Net.Protobuf", "CryptoExchange.Net.Protobuf\CryptoExchange.Net.Protobuf.csproj", "{CC6A807A-9183-6F41-8EF1-8A70172B0E83}"
|
||||||
|
EndProject
|
||||||
Global
|
Global
|
||||||
GlobalSection(SolutionConfigurationPlatforms) = preSolution
|
GlobalSection(SolutionConfigurationPlatforms) = preSolution
|
||||||
Debug|Any CPU = Debug|Any CPU
|
Debug|Any CPU = Debug|Any CPU
|
||||||
@ -41,6 +43,10 @@ Global
|
|||||||
{988A87EF-EAEA-4313-A6CF-FA869813D5AB}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
{988A87EF-EAEA-4313-A6CF-FA869813D5AB}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||||
{988A87EF-EAEA-4313-A6CF-FA869813D5AB}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
{988A87EF-EAEA-4313-A6CF-FA869813D5AB}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||||
{988A87EF-EAEA-4313-A6CF-FA869813D5AB}.Release|Any CPU.Build.0 = Release|Any CPU
|
{988A87EF-EAEA-4313-A6CF-FA869813D5AB}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||||
|
{CC6A807A-9183-6F41-8EF1-8A70172B0E83}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
|
||||||
|
{CC6A807A-9183-6F41-8EF1-8A70172B0E83}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||||
|
{CC6A807A-9183-6F41-8EF1-8A70172B0E83}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||||
|
{CC6A807A-9183-6F41-8EF1-8A70172B0E83}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||||
EndGlobalSection
|
EndGlobalSection
|
||||||
GlobalSection(SolutionProperties) = preSolution
|
GlobalSection(SolutionProperties) = preSolution
|
||||||
HideSolutionNode = FALSE
|
HideSolutionNode = FALSE
|
||||||
|
@ -82,6 +82,11 @@ namespace CryptoExchange.Net.Clients
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
protected bool AllowTopicsOnTheSameConnection { get; set; } = true;
|
protected bool AllowTopicsOnTheSameConnection { get; set; } = true;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Whether to continue processing and forward unparsable messages to handlers
|
||||||
|
/// </summary>
|
||||||
|
protected internal bool ProcessUnparsableMessages { get; set; } = false;
|
||||||
|
|
||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
public double IncomingKbps
|
public double IncomingKbps
|
||||||
{
|
{
|
||||||
@ -308,11 +313,10 @@ namespace CryptoExchange.Net.Clients
|
|||||||
/// Send a query on a socket connection to the BaseAddress and wait for the response
|
/// Send a query on a socket connection to the BaseAddress and wait for the response
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <typeparam name="THandlerResponse">Expected result type</typeparam>
|
/// <typeparam name="THandlerResponse">Expected result type</typeparam>
|
||||||
/// <typeparam name="TServerResponse">The type returned to the caller</typeparam>
|
|
||||||
/// <param name="query">The query</param>
|
/// <param name="query">The query</param>
|
||||||
/// <param name="ct">Cancellation token</param>
|
/// <param name="ct">Cancellation token</param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
protected virtual Task<CallResult<THandlerResponse>> QueryAsync<TServerResponse, THandlerResponse>(Query<TServerResponse, THandlerResponse> query, CancellationToken ct = default)
|
protected virtual Task<CallResult<THandlerResponse>> QueryAsync<THandlerResponse>(Query<THandlerResponse> query, CancellationToken ct = default)
|
||||||
{
|
{
|
||||||
return QueryAsync(BaseAddress, query, ct);
|
return QueryAsync(BaseAddress, query, ct);
|
||||||
}
|
}
|
||||||
@ -321,12 +325,11 @@ namespace CryptoExchange.Net.Clients
|
|||||||
/// Send a query on a socket connection and wait for the response
|
/// Send a query on a socket connection and wait for the response
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <typeparam name="THandlerResponse">Expected result type</typeparam>
|
/// <typeparam name="THandlerResponse">Expected result type</typeparam>
|
||||||
/// <typeparam name="TServerResponse">The type returned to the caller</typeparam>
|
|
||||||
/// <param name="url">The url for the request</param>
|
/// <param name="url">The url for the request</param>
|
||||||
/// <param name="query">The query</param>
|
/// <param name="query">The query</param>
|
||||||
/// <param name="ct">Cancellation token</param>
|
/// <param name="ct">Cancellation token</param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
protected virtual async Task<CallResult<THandlerResponse>> QueryAsync<TServerResponse, THandlerResponse>(string url, Query<TServerResponse, THandlerResponse> query, CancellationToken ct = default)
|
protected virtual async Task<CallResult<THandlerResponse>> QueryAsync<THandlerResponse>(string url, Query<THandlerResponse> query, CancellationToken ct = default)
|
||||||
{
|
{
|
||||||
if (_disposing)
|
if (_disposing)
|
||||||
return new CallResult<THandlerResponse>(new InvalidOperationError("Client disposed, can't query"));
|
return new CallResult<THandlerResponse>(new InvalidOperationError("Client disposed, can't query"));
|
||||||
@ -811,7 +814,7 @@ namespace CryptoExchange.Net.Clients
|
|||||||
sb.AppendLine($"\t\t\tId: {subState.Id}");
|
sb.AppendLine($"\t\t\tId: {subState.Id}");
|
||||||
sb.AppendLine($"\t\t\tConfirmed: {subState.Confirmed}");
|
sb.AppendLine($"\t\t\tConfirmed: {subState.Confirmed}");
|
||||||
sb.AppendLine($"\t\t\tInvocations: {subState.Invocations}");
|
sb.AppendLine($"\t\t\tInvocations: {subState.Invocations}");
|
||||||
sb.AppendLine($"\t\t\tIdentifiers: [{string.Join(",", subState.Identifiers)}]");
|
sb.AppendLine($"\t\t\tIdentifiers: [{subState.ListenMatcher.ToString()}]");
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -6,9 +6,9 @@
|
|||||||
<PackageId>CryptoExchange.Net</PackageId>
|
<PackageId>CryptoExchange.Net</PackageId>
|
||||||
<Authors>JKorf</Authors>
|
<Authors>JKorf</Authors>
|
||||||
<Description>CryptoExchange.Net is a base library which is used to implement different cryptocurrency (exchange) API's. It provides a standardized way of implementing different API's, which results in a very similar experience for users of the API implementations.</Description>
|
<Description>CryptoExchange.Net is a base library which is used to implement different cryptocurrency (exchange) API's. It provides a standardized way of implementing different API's, which results in a very similar experience for users of the API implementations.</Description>
|
||||||
<PackageVersion>9.2.0</PackageVersion>
|
<PackageVersion>9.3.0</PackageVersion>
|
||||||
<AssemblyVersion>9.2.0</AssemblyVersion>
|
<AssemblyVersion>9.3.0</AssemblyVersion>
|
||||||
<FileVersion>9.2.0</FileVersion>
|
<FileVersion>9.3.0</FileVersion>
|
||||||
<PackageRequireLicenseAcceptance>false</PackageRequireLicenseAcceptance>
|
<PackageRequireLicenseAcceptance>false</PackageRequireLicenseAcceptance>
|
||||||
<PackageTags>OKX;OKX.Net;Mexc;Mexc.Net;Kucoin;Kucoin.Net;Kraken;Kraken.Net;Huobi;Huobi.Net;CoinEx;CoinEx.Net;Bybit;Bybit.Net;Bitget;Bitget.Net;Bitfinex;Bitfinex.Net;Binance;Binance.Net;CryptoCurrency;CryptoCurrency Exchange</PackageTags>
|
<PackageTags>OKX;OKX.Net;Mexc;Mexc.Net;Kucoin;Kucoin.Net;Kraken;Kraken.Net;Huobi;Huobi.Net;CoinEx;CoinEx.Net;Bybit;Bybit.Net;Bitget;Bitget.Net;Bitfinex;Bitfinex.Net;Binance;Binance.Net;CryptoCurrency;CryptoCurrency Exchange</PackageTags>
|
||||||
<RepositoryType>git</RepositoryType>
|
<RepositoryType>git</RepositoryType>
|
||||||
|
@ -348,22 +348,42 @@ namespace CryptoExchange.Net
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
public static decimal? ParseDecimal(string? value)
|
public static decimal? ParseDecimal(string? value)
|
||||||
{
|
{
|
||||||
if (string.IsNullOrEmpty(value) || string.Equals("null", value, StringComparison.OrdinalIgnoreCase))
|
// Value is null or empty is the most common case to return null so check before trying to parse
|
||||||
|
if (string.IsNullOrEmpty(value))
|
||||||
return null;
|
return null;
|
||||||
|
|
||||||
if (string.Equals("Infinity", value, StringComparison.Ordinal))
|
// Try parse, only fails for these reasons:
|
||||||
// Infinity returned by the server, default to max value
|
// 1. string is null or empty
|
||||||
return decimal.MaxValue;
|
// 2. value is larger or smaller than decimal max/min
|
||||||
|
// 3. unparsable format
|
||||||
|
if (decimal.TryParse(value, NumberStyles.Float, CultureInfo.InvariantCulture, out var decValue))
|
||||||
|
return decValue;
|
||||||
|
|
||||||
try
|
// Check for values which should be parsed to null
|
||||||
|
if (string.Equals("null", value, StringComparison.OrdinalIgnoreCase)
|
||||||
|
|| string.Equals("NaN", value, StringComparison.OrdinalIgnoreCase))
|
||||||
{
|
{
|
||||||
return decimal.Parse(value, NumberStyles.Float, CultureInfo.InvariantCulture);
|
return null;
|
||||||
}
|
}
|
||||||
catch (OverflowException)
|
|
||||||
|
// Infinity value should be parsed to min/max value
|
||||||
|
if (string.Equals("Infinity", value, StringComparison.OrdinalIgnoreCase))
|
||||||
|
return decimal.MaxValue;
|
||||||
|
else if(string.Equals("-Infinity", value, StringComparison.OrdinalIgnoreCase))
|
||||||
|
return decimal.MinValue;
|
||||||
|
|
||||||
|
if (value!.Length > 27 && decimal.TryParse(value.Substring(0, 27), out var overflowValue))
|
||||||
{
|
{
|
||||||
// Value doesn't fit decimal, default to max value
|
// Not a valid decimal value and more than 27 chars, from which the first part can be parsed correctly.
|
||||||
|
// assume overflow
|
||||||
|
if (overflowValue < 0)
|
||||||
|
return decimal.MinValue;
|
||||||
|
else
|
||||||
return decimal.MaxValue;
|
return decimal.MaxValue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Unknown decimal format, return null
|
||||||
|
return null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -17,22 +17,13 @@ namespace CryptoExchange.Net.Interfaces
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
public int Id { get; }
|
public int Id { get; }
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// The identifiers for this processor
|
/// The matcher for this listener
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public HashSet<string> ListenerIdentifiers { get; }
|
public MessageMatcher MessageMatcher { get; }
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Handle a message
|
/// Handle a message
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="connection"></param>
|
Task<CallResult> Handle(SocketConnection connection, DataEvent<object> message, MessageHandlerLink matchedHandler);
|
||||||
/// <param name="message"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
Task<CallResult> Handle(SocketConnection connection, DataEvent<object> message);
|
|
||||||
/// <summary>
|
|
||||||
/// Get the type the message should be deserialized to
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="messageAccessor"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
Type? GetMessageType(IMessageAccessor messageAccessor);
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Deserialize a message into object of type
|
/// Deserialize a message into object of type
|
||||||
/// </summary>
|
/// </summary>
|
||||||
|
@ -18,7 +18,7 @@ namespace CryptoExchange.Net.Logging.Extensions
|
|||||||
private static readonly Action<ILogger, int, string, Exception?> _failedToParse;
|
private static readonly Action<ILogger, int, string, Exception?> _failedToParse;
|
||||||
private static readonly Action<ILogger, int, string, Exception?> _failedToEvaluateMessage;
|
private static readonly Action<ILogger, int, string, Exception?> _failedToEvaluateMessage;
|
||||||
private static readonly Action<ILogger, int, Exception?> _errorProcessingMessage;
|
private static readonly Action<ILogger, int, Exception?> _errorProcessingMessage;
|
||||||
private static readonly Action<ILogger, int, int, string, Exception?> _processorMatched;
|
private static readonly Action<ILogger, int, string, string, Exception?> _processorMatched;
|
||||||
private static readonly Action<ILogger, int, int, Exception?> _receivedMessageNotRecognized;
|
private static readonly Action<ILogger, int, int, Exception?> _receivedMessageNotRecognized;
|
||||||
private static readonly Action<ILogger, int, string?, Exception?> _failedToDeserializeMessage;
|
private static readonly Action<ILogger, int, string?, Exception?> _failedToDeserializeMessage;
|
||||||
private static readonly Action<ILogger, int, string, Exception?> _userMessageProcessingFailed;
|
private static readonly Action<ILogger, int, string, Exception?> _userMessageProcessingFailed;
|
||||||
@ -92,11 +92,6 @@ namespace CryptoExchange.Net.Logging.Extensions
|
|||||||
new EventId(2009, "ErrorProcessingMessage"),
|
new EventId(2009, "ErrorProcessingMessage"),
|
||||||
"[Sckt {SocketId}] error processing message");
|
"[Sckt {SocketId}] error processing message");
|
||||||
|
|
||||||
_processorMatched = LoggerMessage.Define<int, int, string>(
|
|
||||||
LogLevel.Trace,
|
|
||||||
new EventId(2010, "ProcessorMatched"),
|
|
||||||
"[Sckt {SocketId}] {Count} processor(s) matched to message with listener identifier {ListenerId}");
|
|
||||||
|
|
||||||
_receivedMessageNotRecognized = LoggerMessage.Define<int, int>(
|
_receivedMessageNotRecognized = LoggerMessage.Define<int, int>(
|
||||||
LogLevel.Warning,
|
LogLevel.Warning,
|
||||||
new EventId(2011, "ReceivedMessageNotRecognized"),
|
new EventId(2011, "ReceivedMessageNotRecognized"),
|
||||||
@ -190,7 +185,7 @@ namespace CryptoExchange.Net.Logging.Extensions
|
|||||||
_receivedMessageNotMatchedToAnyListener = LoggerMessage.Define<int, string, string>(
|
_receivedMessageNotMatchedToAnyListener = LoggerMessage.Define<int, string, string>(
|
||||||
LogLevel.Warning,
|
LogLevel.Warning,
|
||||||
new EventId(2029, "ReceivedMessageNotMatchedToAnyListener"),
|
new EventId(2029, "ReceivedMessageNotMatchedToAnyListener"),
|
||||||
"[Sckt {SocketId}] received message not matched to any listener. ListenId: {ListenId}, current listeners: {ListenIds}");
|
"[Sckt {SocketId}] received message not matched to any listener. ListenId: {ListenId}, current listeners: [{ListenIds}]");
|
||||||
|
|
||||||
_failedToParse = LoggerMessage.Define<int, string>(
|
_failedToParse = LoggerMessage.Define<int, string>(
|
||||||
LogLevel.Warning,
|
LogLevel.Warning,
|
||||||
@ -201,6 +196,12 @@ namespace CryptoExchange.Net.Logging.Extensions
|
|||||||
LogLevel.Trace,
|
LogLevel.Trace,
|
||||||
new EventId(2031, "SendingByteData"),
|
new EventId(2031, "SendingByteData"),
|
||||||
"[Sckt {SocketId}] [Req {RequestId}] sending byte message of length: {Length}");
|
"[Sckt {SocketId}] [Req {RequestId}] sending byte message of length: {Length}");
|
||||||
|
|
||||||
|
_processorMatched = LoggerMessage.Define<int, string, string>(
|
||||||
|
LogLevel.Trace,
|
||||||
|
new EventId(2032, "ProcessorMatched"),
|
||||||
|
"[Sckt {SocketId}] listener '{ListenId}' matched to message with listener identifier {ListenerId}");
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void ActivityPaused(this ILogger logger, int socketId, bool paused)
|
public static void ActivityPaused(this ILogger logger, int socketId, bool paused)
|
||||||
@ -256,9 +257,9 @@ namespace CryptoExchange.Net.Logging.Extensions
|
|||||||
{
|
{
|
||||||
_errorProcessingMessage(logger, socketId, e);
|
_errorProcessingMessage(logger, socketId, e);
|
||||||
}
|
}
|
||||||
public static void ProcessorMatched(this ILogger logger, int socketId, int count, string listenerId)
|
public static void ProcessorMatched(this ILogger logger, int socketId, string listener, string listenerId)
|
||||||
{
|
{
|
||||||
_processorMatched(logger, socketId, count, listenerId, null);
|
_processorMatched(logger, socketId, listener, listenerId, null);
|
||||||
}
|
}
|
||||||
public static void ReceivedMessageNotRecognized(this ILogger logger, int socketId, int id)
|
public static void ReceivedMessageNotRecognized(this ILogger logger, int socketId, int id)
|
||||||
{
|
{
|
||||||
|
180
CryptoExchange.Net/Sockets/MessageMatcher.cs
Normal file
180
CryptoExchange.Net/Sockets/MessageMatcher.cs
Normal file
@ -0,0 +1,180 @@
|
|||||||
|
using CryptoExchange.Net.Interfaces;
|
||||||
|
using CryptoExchange.Net.Objects;
|
||||||
|
using CryptoExchange.Net.Objects.Sockets;
|
||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Linq;
|
||||||
|
using System.Text;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
|
namespace CryptoExchange.Net.Sockets
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Message link type
|
||||||
|
/// </summary>
|
||||||
|
public enum MessageLinkType
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Match when the listen id matches fully to the value
|
||||||
|
/// </summary>
|
||||||
|
Full,
|
||||||
|
/// <summary>
|
||||||
|
/// Match when the listen id starts with the value
|
||||||
|
/// </summary>
|
||||||
|
StartsWith
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Matches a message listen id to a specific listener
|
||||||
|
/// </summary>
|
||||||
|
public class MessageMatcher
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Linkers in this matcher
|
||||||
|
/// </summary>
|
||||||
|
public MessageHandlerLink[] HandlerLinks { get; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// ctor
|
||||||
|
/// </summary>
|
||||||
|
private MessageMatcher(params MessageHandlerLink[] links)
|
||||||
|
{
|
||||||
|
HandlerLinks = links;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Create message matcher
|
||||||
|
/// </summary>
|
||||||
|
public static MessageMatcher Create<T>(string value)
|
||||||
|
{
|
||||||
|
return new MessageMatcher(new MessageHandlerLink<T>(MessageLinkType.Full, value, (con, msg) => CallResult.SuccessResult));
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Create message matcher
|
||||||
|
/// </summary>
|
||||||
|
public static MessageMatcher Create<T>(string value, Func<SocketConnection, DataEvent<T>, CallResult> handler)
|
||||||
|
{
|
||||||
|
return new MessageMatcher(new MessageHandlerLink<T>(MessageLinkType.Full, value, handler));
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Create message matcher
|
||||||
|
/// </summary>
|
||||||
|
public static MessageMatcher Create<T>(IEnumerable<string> values, Func<SocketConnection, DataEvent<T>, CallResult> handler)
|
||||||
|
{
|
||||||
|
return new MessageMatcher(values.Select(x => new MessageHandlerLink<T>(MessageLinkType.Full, x, handler)).ToArray());
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Create message matcher
|
||||||
|
/// </summary>
|
||||||
|
public static MessageMatcher Create<T>(MessageLinkType type, string value, Func<SocketConnection, DataEvent<T>, CallResult> handler)
|
||||||
|
{
|
||||||
|
return new MessageMatcher(new MessageHandlerLink<T>(type, value, handler));
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Create message matcher
|
||||||
|
/// </summary>
|
||||||
|
public static MessageMatcher Create(params MessageHandlerLink[] linkers)
|
||||||
|
{
|
||||||
|
return new MessageMatcher(linkers);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Whether this matcher contains a specific link
|
||||||
|
/// </summary>
|
||||||
|
public bool ContainsCheck(MessageHandlerLink link) => HandlerLinks.Any(x => x.Type == link.Type && x.Value == link.Value);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Get any handler links matching with the listen id
|
||||||
|
/// </summary>
|
||||||
|
public List<MessageHandlerLink> GetHandlerLinks(string listenId) => HandlerLinks.Where(x => x.Check(listenId)).ToList();
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public override string ToString() => string.Join(",", HandlerLinks.Select(x => x.ToString()));
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Message handler link
|
||||||
|
/// </summary>
|
||||||
|
public abstract class MessageHandlerLink
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Type of check
|
||||||
|
/// </summary>
|
||||||
|
public MessageLinkType Type { get; }
|
||||||
|
/// <summary>
|
||||||
|
/// String value of the check
|
||||||
|
/// </summary>
|
||||||
|
public string Value { get; }
|
||||||
|
/// <summary>
|
||||||
|
/// Deserialization type
|
||||||
|
/// </summary>
|
||||||
|
public abstract Type GetDeserializationType(IMessageAccessor accessor);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// ctor
|
||||||
|
/// </summary>
|
||||||
|
public MessageHandlerLink(MessageLinkType type, string value)
|
||||||
|
{
|
||||||
|
Type = type;
|
||||||
|
Value = value;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Whether this listen id matches this link
|
||||||
|
/// </summary>
|
||||||
|
public bool Check(string listenId)
|
||||||
|
{
|
||||||
|
if (Type == MessageLinkType.Full)
|
||||||
|
return Value.Equals(listenId, StringComparison.Ordinal);
|
||||||
|
|
||||||
|
return listenId.StartsWith(Value, StringComparison.Ordinal);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Message handler
|
||||||
|
/// </summary>
|
||||||
|
public abstract CallResult Handle(SocketConnection connection, DataEvent<object> message);
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public override string ToString() => $"{Type} match for \"{Value}\"";
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Message handler link
|
||||||
|
/// </summary>
|
||||||
|
public class MessageHandlerLink<TServer>: MessageHandlerLink
|
||||||
|
{
|
||||||
|
private Func<SocketConnection, DataEvent<TServer>, CallResult> _handler;
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public override Type GetDeserializationType(IMessageAccessor accessor) => typeof(TServer);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// ctor
|
||||||
|
/// </summary>
|
||||||
|
public MessageHandlerLink(string value, Func<SocketConnection, DataEvent<TServer>, CallResult> handler)
|
||||||
|
: this(MessageLinkType.Full, value, handler)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// ctor
|
||||||
|
/// </summary>
|
||||||
|
public MessageHandlerLink(MessageLinkType type, string value, Func<SocketConnection, DataEvent<TServer>, CallResult> handler)
|
||||||
|
: base(type, value)
|
||||||
|
{
|
||||||
|
_handler = handler;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public override CallResult Handle(SocketConnection connection, DataEvent<object> message)
|
||||||
|
{
|
||||||
|
return _handler(connection, message.As((TServer)message.Data));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -3,6 +3,7 @@ using CryptoExchange.Net.Objects;
|
|||||||
using CryptoExchange.Net.Objects.Sockets;
|
using CryptoExchange.Net.Objects.Sockets;
|
||||||
using System;
|
using System;
|
||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
|
using System.Linq;
|
||||||
using System.Threading;
|
using System.Threading;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
@ -60,9 +61,9 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
public AsyncResetEvent? ContinueAwaiter { get; set; }
|
public AsyncResetEvent? ContinueAwaiter { get; set; }
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Strings to match this query to a received message
|
/// Matcher for this query
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public abstract HashSet<string> ListenerIdentifiers { get; set; }
|
public MessageMatcher MessageMatcher { get; set; } = null!;
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// The query request object
|
/// The query request object
|
||||||
@ -84,13 +85,6 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
public bool ExpectsResponse { get; set; } = true;
|
public bool ExpectsResponse { get; set; } = true;
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// Get the type the message should be deserialized to
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="message"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
public abstract Type? GetMessageType(IMessageAccessor message);
|
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Wait event for response
|
/// Wait event for response
|
||||||
/// </summary>
|
/// </summary>
|
||||||
@ -161,23 +155,16 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
/// <summary>
|
/// <summary>
|
||||||
/// Handle a response message
|
/// Handle a response message
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="message"></param>
|
public abstract Task<CallResult> Handle(SocketConnection connection, DataEvent<object> message, MessageHandlerLink check);
|
||||||
/// <param name="connection"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
public abstract Task<CallResult> Handle(SocketConnection connection, DataEvent<object> message);
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Query
|
/// Query
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <typeparam name="TServerResponse">The type returned from the server</typeparam>
|
|
||||||
/// <typeparam name="THandlerResponse">The type to be returned to the caller</typeparam>
|
/// <typeparam name="THandlerResponse">The type to be returned to the caller</typeparam>
|
||||||
public abstract class Query<TServerResponse, THandlerResponse> : Query
|
public abstract class Query<THandlerResponse> : Query
|
||||||
{
|
{
|
||||||
/// <inheritdoc />
|
|
||||||
public override Type? GetMessageType(IMessageAccessor message) => typeof(TServerResponse);
|
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// The typed call result
|
/// The typed call result
|
||||||
/// </summary>
|
/// </summary>
|
||||||
@ -194,10 +181,9 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
public override async Task<CallResult> Handle(SocketConnection connection, DataEvent<object> message)
|
public override async Task<CallResult> Handle(SocketConnection connection, DataEvent<object> message, MessageHandlerLink check)
|
||||||
{
|
{
|
||||||
var typedMessage = message.As((TServerResponse)message.Data);
|
if (!PreCheckMessage(message))
|
||||||
if (!ValidateMessage(typedMessage))
|
|
||||||
return CallResult.SuccessResult;
|
return CallResult.SuccessResult;
|
||||||
|
|
||||||
CurrentResponses++;
|
CurrentResponses++;
|
||||||
@ -209,7 +195,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
|
|
||||||
if (Result?.Success != false)
|
if (Result?.Success != false)
|
||||||
// If an error result is already set don't override that
|
// If an error result is already set don't override that
|
||||||
Result = HandleMessage(connection, typedMessage);
|
Result = check.Handle(connection, message);
|
||||||
|
|
||||||
if (CurrentResponses == RequiredResponses)
|
if (CurrentResponses == RequiredResponses)
|
||||||
{
|
{
|
||||||
@ -226,15 +212,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="message"></param>
|
/// <param name="message"></param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public virtual bool ValidateMessage(DataEvent<TServerResponse> message) => true;
|
public virtual bool PreCheckMessage(DataEvent<object> message) => true;
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// Handle the query response
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="connection"></param>
|
|
||||||
/// <param name="message"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
public abstract CallResult<THandlerResponse> HandleMessage(SocketConnection connection, DataEvent<TServerResponse> message);
|
|
||||||
|
|
||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
public override void Timeout()
|
public override void Timeout()
|
||||||
@ -257,29 +235,4 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
_event.Set();
|
_event.Set();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// Query
|
|
||||||
/// </summary>
|
|
||||||
/// <typeparam name="TResponse">Response object type</typeparam>
|
|
||||||
public abstract class Query<TResponse> : Query<TResponse, TResponse>
|
|
||||||
{
|
|
||||||
/// <summary>
|
|
||||||
/// ctor
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="request"></param>
|
|
||||||
/// <param name="authenticated"></param>
|
|
||||||
/// <param name="weight"></param>
|
|
||||||
protected Query(object request, bool authenticated, int weight = 1) : base(request, authenticated, weight)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// Handle the query response
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="connection"></param>
|
|
||||||
/// <param name="message"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
public override CallResult<TResponse> HandleMessage(SocketConnection connection, DataEvent<TResponse> message) => message.ToCallResult();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -11,8 +11,6 @@ using System.Diagnostics;
|
|||||||
using CryptoExchange.Net.Clients;
|
using CryptoExchange.Net.Clients;
|
||||||
using CryptoExchange.Net.Logging.Extensions;
|
using CryptoExchange.Net.Logging.Extensions;
|
||||||
using System.Threading;
|
using System.Threading;
|
||||||
using CryptoExchange.Net.Objects.Options;
|
|
||||||
using CryptoExchange.Net.Authentication;
|
|
||||||
|
|
||||||
namespace CryptoExchange.Net.Sockets
|
namespace CryptoExchange.Net.Sockets
|
||||||
{
|
{
|
||||||
@ -229,6 +227,11 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
private readonly IWebsocket _socket;
|
private readonly IWebsocket _socket;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Cache for deserialization, only caches for a single message
|
||||||
|
/// </summary>
|
||||||
|
private readonly Dictionary<Type, object> _deserializationCache = new Dictionary<Type, object>();
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// New socket connection
|
/// New socket connection
|
||||||
/// </summary>
|
/// </summary>
|
||||||
@ -446,9 +449,6 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
/// <summary>
|
/// <summary>
|
||||||
/// Handle a message
|
/// Handle a message
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="data"></param>
|
|
||||||
/// <param name="type"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
protected virtual async Task HandleStreamMessage(WebSocketMessageType type, ReadOnlyMemory<byte> data)
|
protected virtual async Task HandleStreamMessage(WebSocketMessageType type, ReadOnlyMemory<byte> data)
|
||||||
{
|
{
|
||||||
var sw = Stopwatch.StartNew();
|
var sw = Stopwatch.StartNew();
|
||||||
@ -475,7 +475,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
_logger.ReceivedData(SocketId, originalData);
|
_logger.ReceivedData(SocketId, originalData);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!accessor.IsValid)
|
if (!accessor.IsValid && !ApiClient.ProcessUnparsableMessages)
|
||||||
{
|
{
|
||||||
_logger.FailedToParse(SocketId, result.Error!.Message);
|
_logger.FailedToParse(SocketId, result.Error!.Message);
|
||||||
return;
|
return;
|
||||||
@ -485,7 +485,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
var listenId = ApiClient.GetListenerIdentifier(accessor);
|
var listenId = ApiClient.GetListenerIdentifier(accessor);
|
||||||
if (listenId == null)
|
if (listenId == null)
|
||||||
{
|
{
|
||||||
originalData = outputOriginalData ? accessor.GetOriginalString() : "[OutputOriginalData is false]";
|
originalData ??= "[OutputOriginalData is false]";
|
||||||
if (!ApiClient.UnhandledMessageExpected)
|
if (!ApiClient.UnhandledMessageExpected)
|
||||||
_logger.FailedToEvaluateMessage(SocketId, originalData);
|
_logger.FailedToEvaluateMessage(SocketId, originalData);
|
||||||
|
|
||||||
@ -493,38 +493,22 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// 4. Get the listeners interested in this message
|
bool processed = false;
|
||||||
List<IMessageProcessor> processors;
|
|
||||||
lock (_listenersLock)
|
|
||||||
processors = _listeners.Where(s => s.ListenerIdentifiers.Contains(listenId)).ToList();
|
|
||||||
|
|
||||||
if (processors.Count == 0)
|
|
||||||
{
|
|
||||||
if (!ApiClient.UnhandledMessageExpected)
|
|
||||||
{
|
|
||||||
List<string> listenerIds;
|
|
||||||
lock (_listenersLock)
|
|
||||||
listenerIds = _listeners.SelectMany(l => l.ListenerIdentifiers).ToList();
|
|
||||||
_logger.ReceivedMessageNotMatchedToAnyListener(SocketId, listenId, string.Join(",", listenerIds));
|
|
||||||
UnhandledMessage?.Invoke(accessor);
|
|
||||||
}
|
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
_logger.ProcessorMatched(SocketId, processors.Count, listenId);
|
|
||||||
var totalUserTime = 0;
|
var totalUserTime = 0;
|
||||||
Dictionary<Type, object>? desCache = null;
|
|
||||||
if (processors.Count > 1)
|
|
||||||
{
|
|
||||||
// Only instantiate a cache if there are multiple processors
|
|
||||||
desCache = new Dictionary<Type, object>();
|
|
||||||
}
|
|
||||||
|
|
||||||
foreach (var processor in processors)
|
List<IMessageProcessor> localListeners;
|
||||||
|
lock(_listenersLock)
|
||||||
|
localListeners = _listeners.ToList();
|
||||||
|
|
||||||
|
foreach(var processor in localListeners)
|
||||||
{
|
{
|
||||||
// 5. Determine the type to deserialize to for this processor
|
foreach(var listener in processor.MessageMatcher.GetHandlerLinks(listenId))
|
||||||
var messageType = processor.GetMessageType(accessor);
|
{
|
||||||
|
processed = true;
|
||||||
|
_logger.ProcessorMatched(SocketId, listener.ToString(), listenId);
|
||||||
|
|
||||||
|
// 4. Determine the type to deserialize to for this processor
|
||||||
|
var messageType = listener.GetDeserializationType(accessor);
|
||||||
if (messageType == null)
|
if (messageType == null)
|
||||||
{
|
{
|
||||||
_logger.ReceivedMessageNotRecognized(SocketId, processor.Id);
|
_logger.ReceivedMessageNotRecognized(SocketId, processor.Id);
|
||||||
@ -538,9 +522,8 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
// This doesn't trigger a waiting subscribe query, should probably also somehow set the wait event for that
|
// This doesn't trigger a waiting subscribe query, should probably also somehow set the wait event for that
|
||||||
}
|
}
|
||||||
|
|
||||||
// 6. Deserialize the message
|
// 5. Deserialize the message
|
||||||
object? deserialized = null;
|
_deserializationCache.TryGetValue(messageType, out var deserialized);
|
||||||
desCache?.TryGetValue(messageType, out deserialized);
|
|
||||||
|
|
||||||
if (deserialized == null)
|
if (deserialized == null)
|
||||||
{
|
{
|
||||||
@ -550,15 +533,16 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
_logger.FailedToDeserializeMessage(SocketId, desResult.Error?.ToString(), desResult.Error?.Exception);
|
_logger.FailedToDeserializeMessage(SocketId, desResult.Error?.ToString(), desResult.Error?.Exception);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
deserialized = desResult.Data;
|
deserialized = desResult.Data;
|
||||||
desCache?.Add(messageType, deserialized);
|
_deserializationCache.Add(messageType, deserialized);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 7. Hand of the message to the subscription
|
// 6. Pass the message to the handler
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
var innerSw = Stopwatch.StartNew();
|
var innerSw = Stopwatch.StartNew();
|
||||||
await processor.Handle(this, new DataEvent<object>(deserialized, null, null, originalData, receiveTime, null)).ConfigureAwait(false);
|
await processor.Handle(this, new DataEvent<object>(deserialized, null, null, originalData, receiveTime, null), listener).ConfigureAwait(false);
|
||||||
if (processor is Query query && query.RequiredResponses != 1)
|
if (processor is Query query && query.RequiredResponses != 1)
|
||||||
_logger.LogDebug($"[Sckt {SocketId}] [Req {query.Id}] responses: {query.CurrentResponses}/{query.RequiredResponses}");
|
_logger.LogDebug($"[Sckt {SocketId}] [Req {query.Id}] responses: {query.CurrentResponses}/{query.RequiredResponses}");
|
||||||
totalUserTime += (int)innerSw.ElapsedMilliseconds;
|
totalUserTime += (int)innerSw.ElapsedMilliseconds;
|
||||||
@ -569,12 +553,30 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
if (processor is Subscription subscription)
|
if (processor is Subscription subscription)
|
||||||
subscription.InvokeExceptionHandler(ex);
|
subscription.InvokeExceptionHandler(ex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!processed)
|
||||||
|
{
|
||||||
|
if (!ApiClient.UnhandledMessageExpected)
|
||||||
|
{
|
||||||
|
List<string> listenerIds;
|
||||||
|
lock (_listenersLock)
|
||||||
|
listenerIds = _listeners.Select(l => l.MessageMatcher.ToString()).ToList();
|
||||||
|
|
||||||
|
_logger.ReceivedMessageNotMatchedToAnyListener(SocketId, listenId, string.Join(",", listenerIds));
|
||||||
|
UnhandledMessage?.Invoke(accessor);
|
||||||
|
}
|
||||||
|
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
_logger.MessageProcessed(SocketId, sw.ElapsedMilliseconds, sw.ElapsedMilliseconds - totalUserTime);
|
_logger.MessageProcessed(SocketId, sw.ElapsedMilliseconds, sw.ElapsedMilliseconds - totalUserTime);
|
||||||
}
|
}
|
||||||
finally
|
finally
|
||||||
{
|
{
|
||||||
|
_deserializationCache.Clear();
|
||||||
accessor.Clear();
|
accessor.Clear();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -654,7 +656,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
|
|
||||||
bool anyDuplicateSubscription;
|
bool anyDuplicateSubscription;
|
||||||
lock (_listenersLock)
|
lock (_listenersLock)
|
||||||
anyDuplicateSubscription = _listeners.OfType<Subscription>().Any(x => x != subscription && x.ListenerIdentifiers.All(l => subscription.ListenerIdentifiers.Contains(l)));
|
anyDuplicateSubscription = _listeners.OfType<Subscription>().Any(x => x != subscription && x.MessageMatcher.HandlerLinks.All(l => subscription.MessageMatcher.ContainsCheck(l)));
|
||||||
|
|
||||||
bool shouldCloseConnection;
|
bool shouldCloseConnection;
|
||||||
lock (_listenersLock)
|
lock (_listenersLock)
|
||||||
@ -770,12 +772,11 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
/// Send a query request and wait for an answer
|
/// Send a query request and wait for an answer
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <typeparam name="THandlerResponse">Expected result type</typeparam>
|
/// <typeparam name="THandlerResponse">Expected result type</typeparam>
|
||||||
/// <typeparam name="TServerResponse">The type returned to the caller</typeparam>
|
|
||||||
/// <param name="query">Query to send</param>
|
/// <param name="query">Query to send</param>
|
||||||
/// <param name="continueEvent">Wait event for when the socket message handler can continue</param>
|
/// <param name="continueEvent">Wait event for when the socket message handler can continue</param>
|
||||||
/// <param name="ct">Cancellation token</param>
|
/// <param name="ct">Cancellation token</param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public virtual async Task<CallResult<THandlerResponse>> SendAndWaitQueryAsync<TServerResponse, THandlerResponse>(Query<TServerResponse, THandlerResponse> query, AsyncResetEvent? continueEvent = null, CancellationToken ct = default)
|
public virtual async Task<CallResult<THandlerResponse>> SendAndWaitQueryAsync<THandlerResponse>(Query<THandlerResponse> query, AsyncResetEvent? continueEvent = null, CancellationToken ct = default)
|
||||||
{
|
{
|
||||||
await SendAndWaitIntAsync(query, continueEvent, ct).ConfigureAwait(false);
|
await SendAndWaitIntAsync(query, continueEvent, ct).ConfigureAwait(false);
|
||||||
return query.TypedResult ?? new CallResult<THandlerResponse>(new ServerError("Timeout"));
|
return query.TypedResult ?? new CallResult<THandlerResponse>(new ServerError("Timeout"));
|
||||||
|
@ -4,6 +4,7 @@ using CryptoExchange.Net.Objects.Sockets;
|
|||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
using System;
|
using System;
|
||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
|
using System.Linq;
|
||||||
using System.Threading;
|
using System.Threading;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
@ -60,9 +61,9 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
public bool Authenticated { get; }
|
public bool Authenticated { get; }
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Strings to match this subscription to a received message
|
/// Matcher for this subscription
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public abstract HashSet<string> ListenerIdentifiers { get; set; }
|
public MessageMatcher MessageMatcher { get; set; } = null!;
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Cancellation token registration
|
/// Cancellation token registration
|
||||||
@ -74,13 +75,6 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
public event Action<Exception>? Exception;
|
public event Action<Exception>? Exception;
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// Get the deserialization type for this message
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="message"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
public abstract Type? GetMessageType(IMessageAccessor message);
|
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Subscription topic
|
/// Subscription topic
|
||||||
/// </summary>
|
/// </summary>
|
||||||
@ -89,9 +83,6 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
/// <summary>
|
/// <summary>
|
||||||
/// ctor
|
/// ctor
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="logger"></param>
|
|
||||||
/// <param name="authenticated"></param>
|
|
||||||
/// <param name="userSubscription"></param>
|
|
||||||
public Subscription(ILogger logger, bool authenticated, bool userSubscription = true)
|
public Subscription(ILogger logger, bool authenticated, bool userSubscription = true)
|
||||||
{
|
{
|
||||||
_logger = logger;
|
_logger = logger;
|
||||||
@ -130,14 +121,11 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
/// <summary>
|
/// <summary>
|
||||||
/// Handle an update message
|
/// Handle an update message
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="connection"></param>
|
public Task<CallResult> Handle(SocketConnection connection, DataEvent<object> message, MessageHandlerLink matcher)
|
||||||
/// <param name="message"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
public Task<CallResult> Handle(SocketConnection connection, DataEvent<object> message)
|
|
||||||
{
|
{
|
||||||
ConnectionInvocations++;
|
ConnectionInvocations++;
|
||||||
TotalInvocations++;
|
TotalInvocations++;
|
||||||
return Task.FromResult(DoHandleMessage(connection, message));
|
return Task.FromResult(matcher.Handle(connection, message));
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -154,14 +142,6 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
public virtual void DoHandleReset() { }
|
public virtual void DoHandleReset() { }
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// Handle the update message
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="connection"></param>
|
|
||||||
/// <param name="message"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
public abstract CallResult DoHandleMessage(SocketConnection connection, DataEvent<object> message);
|
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Invoke the exception event
|
/// Invoke the exception event
|
||||||
/// </summary>
|
/// </summary>
|
||||||
@ -177,12 +157,12 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
/// <param name="Id">The id of the subscription</param>
|
/// <param name="Id">The id of the subscription</param>
|
||||||
/// <param name="Confirmed">True when the subscription query is handled (either accepted or rejected)</param>
|
/// <param name="Confirmed">True when the subscription query is handled (either accepted or rejected)</param>
|
||||||
/// <param name="Invocations">Number of times this subscription got a message</param>
|
/// <param name="Invocations">Number of times this subscription got a message</param>
|
||||||
/// <param name="Identifiers">Identifiers the subscription is listening to</param>
|
/// <param name="ListenMatcher">Matcher for this subscription</param>
|
||||||
public record SubscriptionState(
|
public record SubscriptionState(
|
||||||
int Id,
|
int Id,
|
||||||
bool Confirmed,
|
bool Confirmed,
|
||||||
int Invocations,
|
int Invocations,
|
||||||
HashSet<string> Identifiers
|
MessageMatcher ListenMatcher
|
||||||
);
|
);
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -191,7 +171,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public SubscriptionState GetState()
|
public SubscriptionState GetState()
|
||||||
{
|
{
|
||||||
return new SubscriptionState(Id, Confirmed, TotalInvocations, ListenerIdentifiers);
|
return new SubscriptionState(Id, Confirmed, TotalInvocations, MessageMatcher);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -27,32 +27,4 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
public override Query? GetUnsubQuery() => null;
|
public override Query? GetUnsubQuery() => null;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <inheritdoc />
|
|
||||||
public abstract class SystemSubscription<T> : SystemSubscription
|
|
||||||
{
|
|
||||||
/// <inheritdoc />
|
|
||||||
public override Type GetMessageType(IMessageAccessor message) => typeof(T);
|
|
||||||
|
|
||||||
/// <inheritdoc />
|
|
||||||
public override CallResult DoHandleMessage(SocketConnection connection, DataEvent<object> message)
|
|
||||||
=> HandleMessage(connection, message.As((T)message.Data));
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// ctor
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="logger"></param>
|
|
||||||
/// <param name="authenticated"></param>
|
|
||||||
protected SystemSubscription(ILogger logger, bool authenticated) : base(logger, authenticated)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// Handle an update message
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="connection"></param>
|
|
||||||
/// <param name="message"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
public abstract CallResult HandleMessage(SocketConnection connection, DataEvent<T> message);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -386,7 +386,7 @@ namespace CryptoExchange.Net.Testing.Comparers
|
|||||||
var stringValue = jsonValue.GetString();
|
var stringValue = jsonValue.GetString();
|
||||||
if (objectValue is decimal dec)
|
if (objectValue is decimal dec)
|
||||||
{
|
{
|
||||||
if (decimal.Parse(stringValue!, CultureInfo.InvariantCulture) != dec)
|
if (ExchangeHelpers.ParseDecimal(stringValue!) != dec)
|
||||||
throw new Exception($"{method}: {property} not equal: {stringValue} vs {dec}");
|
throw new Exception($"{method}: {property} not equal: {stringValue} vs {dec}");
|
||||||
}
|
}
|
||||||
else if (objectValue is DateTime time)
|
else if (objectValue is DateTime time)
|
||||||
|
@ -176,18 +176,32 @@ namespace CryptoExchange.Net.Trackers.Klines
|
|||||||
Status = SyncStatus.Syncing;
|
Status = SyncStatus.Syncing;
|
||||||
_logger.KlineTrackerStarting(SymbolName);
|
_logger.KlineTrackerStarting(SymbolName);
|
||||||
|
|
||||||
|
var subResult = await _socketClient.SubscribeToKlineUpdatesAsync(new SubscribeKlineRequest(Symbol, _interval),
|
||||||
|
update =>
|
||||||
|
{
|
||||||
|
AddOrUpdate(update.Data);
|
||||||
|
}).ConfigureAwait(false);
|
||||||
|
|
||||||
|
if (!subResult)
|
||||||
|
{
|
||||||
|
_logger.KlineTrackerStartFailed(SymbolName, subResult.Error!.Message, subResult.Error.Exception);
|
||||||
|
Status = SyncStatus.Disconnected;
|
||||||
|
return subResult;
|
||||||
|
}
|
||||||
|
|
||||||
|
_updateSubscription = subResult.Data;
|
||||||
|
_updateSubscription.ConnectionLost += HandleConnectionLost;
|
||||||
|
_updateSubscription.ConnectionClosed += HandleConnectionClosed;
|
||||||
|
_updateSubscription.ConnectionRestored += HandleConnectionRestored;
|
||||||
|
|
||||||
var startResult = await DoStartAsync().ConfigureAwait(false);
|
var startResult = await DoStartAsync().ConfigureAwait(false);
|
||||||
if (!startResult)
|
if (!startResult)
|
||||||
{
|
{
|
||||||
_logger.KlineTrackerStartFailed(SymbolName, startResult.Error!.Message, startResult.Error.Exception);
|
_ = subResult.Data.CloseAsync();
|
||||||
Status = SyncStatus.Disconnected;
|
Status = SyncStatus.Disconnected;
|
||||||
return new CallResult(startResult.Error!);
|
return new CallResult(startResult.Error!);
|
||||||
}
|
}
|
||||||
|
|
||||||
_updateSubscription = startResult.Data;
|
|
||||||
_updateSubscription.ConnectionLost += HandleConnectionLost;
|
|
||||||
_updateSubscription.ConnectionClosed += HandleConnectionClosed;
|
|
||||||
_updateSubscription.ConnectionRestored += HandleConnectionRestored;
|
|
||||||
Status = SyncStatus.Synced;
|
Status = SyncStatus.Synced;
|
||||||
_logger.KlineTrackerStarted(SymbolName);
|
_logger.KlineTrackerStarted(SymbolName);
|
||||||
return CallResult.SuccessResult;
|
return CallResult.SuccessResult;
|
||||||
@ -208,22 +222,10 @@ namespace CryptoExchange.Net.Trackers.Klines
|
|||||||
/// The start procedure needed for kline syncing, generally subscribing to an update stream and requesting the snapshot
|
/// The start procedure needed for kline syncing, generally subscribing to an update stream and requesting the snapshot
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
protected virtual async Task<CallResult<UpdateSubscription>> DoStartAsync()
|
protected virtual async Task<CallResult> DoStartAsync()
|
||||||
{
|
{
|
||||||
var subResult = await _socketClient.SubscribeToKlineUpdatesAsync(new SubscribeKlineRequest(Symbol, _interval),
|
|
||||||
update =>
|
|
||||||
{
|
|
||||||
AddOrUpdate(update.Data);
|
|
||||||
}).ConfigureAwait(false);
|
|
||||||
|
|
||||||
if (!subResult)
|
|
||||||
{
|
|
||||||
Status = SyncStatus.Disconnected;
|
|
||||||
return subResult;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!_startWithSnapshot)
|
if (!_startWithSnapshot)
|
||||||
return subResult;
|
return CallResult.SuccessResult;
|
||||||
|
|
||||||
var startTime = Period == null ? (DateTime?)null : DateTime.UtcNow.Add(-Period.Value);
|
var startTime = Period == null ? (DateTime?)null : DateTime.UtcNow.Add(-Period.Value);
|
||||||
if (_restClient.GetKlinesOptions.MaxAge != null && DateTime.UtcNow.Add(-_restClient.GetKlinesOptions.MaxAge.Value) > startTime)
|
if (_restClient.GetKlinesOptions.MaxAge != null && DateTime.UtcNow.Add(-_restClient.GetKlinesOptions.MaxAge.Value) > startTime)
|
||||||
@ -236,11 +238,7 @@ namespace CryptoExchange.Net.Trackers.Klines
|
|||||||
await foreach (var result in ExchangeHelpers.ExecutePages(_restClient.GetKlinesAsync, request).ConfigureAwait(false))
|
await foreach (var result in ExchangeHelpers.ExecutePages(_restClient.GetKlinesAsync, request).ConfigureAwait(false))
|
||||||
{
|
{
|
||||||
if (!result)
|
if (!result)
|
||||||
{
|
return result;
|
||||||
_ = subResult.Data.CloseAsync();
|
|
||||||
Status = SyncStatus.Disconnected;
|
|
||||||
return subResult.AsError<UpdateSubscription>(result.Error!);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (Limit != null && data.Count > Limit)
|
if (Limit != null && data.Count > Limit)
|
||||||
break;
|
break;
|
||||||
@ -249,7 +247,7 @@ namespace CryptoExchange.Net.Trackers.Klines
|
|||||||
}
|
}
|
||||||
|
|
||||||
SetInitialData(data);
|
SetInitialData(data);
|
||||||
return subResult;
|
return CallResult.SuccessResult;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
|
@ -199,7 +199,12 @@ namespace CryptoExchange.Net.Trackers.Trades
|
|||||||
_startWithSnapshot = startWithSnapshot;
|
_startWithSnapshot = startWithSnapshot;
|
||||||
Status = SyncStatus.Syncing;
|
Status = SyncStatus.Syncing;
|
||||||
_logger.TradeTrackerStarting(SymbolName);
|
_logger.TradeTrackerStarting(SymbolName);
|
||||||
var subResult = await DoStartAsync().ConfigureAwait(false);
|
var subResult = await _socketClient.SubscribeToTradeUpdatesAsync(new SubscribeTradeRequest(Symbol),
|
||||||
|
update =>
|
||||||
|
{
|
||||||
|
AddData(update.Data);
|
||||||
|
}).ConfigureAwait(false);
|
||||||
|
|
||||||
if (!subResult)
|
if (!subResult)
|
||||||
{
|
{
|
||||||
_logger.TradeTrackerStartFailed(SymbolName, subResult.Error!.Message, subResult.Error.Exception);
|
_logger.TradeTrackerStartFailed(SymbolName, subResult.Error!.Message, subResult.Error.Exception);
|
||||||
@ -211,6 +216,15 @@ namespace CryptoExchange.Net.Trackers.Trades
|
|||||||
_updateSubscription.ConnectionLost += HandleConnectionLost;
|
_updateSubscription.ConnectionLost += HandleConnectionLost;
|
||||||
_updateSubscription.ConnectionClosed += HandleConnectionClosed;
|
_updateSubscription.ConnectionClosed += HandleConnectionClosed;
|
||||||
_updateSubscription.ConnectionRestored += HandleConnectionRestored;
|
_updateSubscription.ConnectionRestored += HandleConnectionRestored;
|
||||||
|
|
||||||
|
var result = await DoStartAsync().ConfigureAwait(false);
|
||||||
|
if (!result)
|
||||||
|
{
|
||||||
|
_ = subResult.Data.CloseAsync();
|
||||||
|
Status = SyncStatus.Disconnected;
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
SetSyncStatus();
|
SetSyncStatus();
|
||||||
_logger.TradeTrackerStarted(SymbolName);
|
_logger.TradeTrackerStarted(SymbolName);
|
||||||
return CallResult.SuccessResult;
|
return CallResult.SuccessResult;
|
||||||
@ -231,22 +245,10 @@ namespace CryptoExchange.Net.Trackers.Trades
|
|||||||
/// The start procedure needed for trade syncing, generally subscribing to an update stream and requesting the snapshot
|
/// The start procedure needed for trade syncing, generally subscribing to an update stream and requesting the snapshot
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
protected virtual async Task<CallResult<UpdateSubscription>> DoStartAsync()
|
protected virtual async Task<CallResult> DoStartAsync()
|
||||||
{
|
{
|
||||||
var subResult = await _socketClient.SubscribeToTradeUpdatesAsync(new SubscribeTradeRequest(Symbol),
|
|
||||||
update =>
|
|
||||||
{
|
|
||||||
AddData(update.Data);
|
|
||||||
}).ConfigureAwait(false);
|
|
||||||
|
|
||||||
if (!subResult)
|
|
||||||
{
|
|
||||||
Status = SyncStatus.Disconnected;
|
|
||||||
return subResult;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!_startWithSnapshot)
|
if (!_startWithSnapshot)
|
||||||
return subResult;
|
return CallResult.SuccessResult;
|
||||||
|
|
||||||
if (_historyRestClient != null)
|
if (_historyRestClient != null)
|
||||||
{
|
{
|
||||||
@ -256,11 +258,7 @@ namespace CryptoExchange.Net.Trackers.Trades
|
|||||||
await foreach(var result in ExchangeHelpers.ExecutePages(_historyRestClient.GetTradeHistoryAsync, request).ConfigureAwait(false))
|
await foreach(var result in ExchangeHelpers.ExecutePages(_historyRestClient.GetTradeHistoryAsync, request).ConfigureAwait(false))
|
||||||
{
|
{
|
||||||
if (!result)
|
if (!result)
|
||||||
{
|
return result;
|
||||||
_ = subResult.Data.CloseAsync();
|
|
||||||
Status = SyncStatus.Disconnected;
|
|
||||||
return subResult.AsError<UpdateSubscription>(result.Error!);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (Limit != null && data.Count > Limit)
|
if (Limit != null && data.Count > Limit)
|
||||||
break;
|
break;
|
||||||
@ -279,15 +277,13 @@ namespace CryptoExchange.Net.Trackers.Trades
|
|||||||
var snapshot = await _recentRestClient.GetRecentTradesAsync(new GetRecentTradesRequest(Symbol, limit)).ConfigureAwait(false);
|
var snapshot = await _recentRestClient.GetRecentTradesAsync(new GetRecentTradesRequest(Symbol, limit)).ConfigureAwait(false);
|
||||||
if (!snapshot)
|
if (!snapshot)
|
||||||
{
|
{
|
||||||
_ = subResult.Data.CloseAsync();
|
return snapshot;
|
||||||
Status = SyncStatus.Disconnected;
|
|
||||||
return subResult.AsError<UpdateSubscription>(snapshot.Error!);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SetInitialData(snapshot.Data);
|
SetInitialData(snapshot.Data);
|
||||||
}
|
}
|
||||||
|
|
||||||
return subResult;
|
return CallResult.SuccessResult;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
|
@ -58,6 +58,14 @@ Make a one time donation in a crypto currency of your choice. If you prefer to d
|
|||||||
Alternatively, sponsor me on Github using [Github Sponsors](https://github.com/sponsors/JKorf).
|
Alternatively, sponsor me on Github using [Github Sponsors](https://github.com/sponsors/JKorf).
|
||||||
|
|
||||||
## Release notes
|
## Release notes
|
||||||
|
* Version 9.3.0 - 23 Jul 2025
|
||||||
|
* Updated websocket message to listener matching logic to be more flexible
|
||||||
|
* Updated decimal parser to support "NaN" and "-Infinity" strings, added check for negative overflow value, improved performance in most cases
|
||||||
|
|
||||||
|
* Version 9.2.1 - 16 Jul 2025
|
||||||
|
* Added setting for whether or not to process unparsable websocket messages
|
||||||
|
* Fixed issue causing duplicate subscriptions and data in the TradeTracker and KlineTracker when websocket connection was reconnected
|
||||||
|
|
||||||
* Version 9.2.0 - 14 Jul 2025
|
* Version 9.2.0 - 14 Jul 2025
|
||||||
* Added support for sending byte data on websocket
|
* Added support for sending byte data on websocket
|
||||||
* Added support for handling both string and byte data with different IMessageAccessor types
|
* Added support for handling both string and byte data with different IMessageAccessor types
|
||||||
|
Loading…
x
Reference in New Issue
Block a user