1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-12-14 01:33:26 +00:00
This commit is contained in:
Jkorf 2025-12-09 15:42:08 +01:00
parent 23efb3f432
commit 90abcb78aa
7 changed files with 45 additions and 92 deletions

View File

@ -1,22 +0,0 @@
//using CryptoExchange.Net.Converters.MessageParsing;
//using CryptoExchange.Net.Converters.MessageParsing.DynamicConverters;
//using CryptoExchange.Net.Objects;
//using LightProto;
//using System;
//using System.Collections.Generic;
//using System.Net.WebSockets;
//using System.Text;
//namespace CryptoExchange.Net.Protobuf.Converters.Protobuf
//{
// public abstract class DynamicProtobufConverter<T> : ISocketMessageHandler
// {
// public object Deserialize(ReadOnlySpan<byte> data, Type type)
// {
// var result = Serializer.Deserialize<T>(data);
// return result;
// }
// public abstract string GetMessageIdentifier(ReadOnlySpan<byte> data, WebSocketMessageType? webSocketMessageType);
// }
//}

View File

@ -1,11 +1,12 @@
using System; using CryptoExchange.Net.Exceptions;
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Globalization; using System.Globalization;
using System.Linq; using System.Linq;
using System.Reflection; using System.Reflection;
using System.Text.Json.Serialization;
using System.Text.Json; using System.Text.Json;
using System.Collections.Generic; using System.Text.Json.Serialization;
using System.Diagnostics.CodeAnalysis;
using System.Threading; using System.Threading;
namespace CryptoExchange.Net.Converters.SystemTextJson namespace CryptoExchange.Net.Converters.SystemTextJson
@ -109,7 +110,7 @@ namespace CryptoExchange.Net.Converters.SystemTextJson
#endif #endif
{ {
if (reader.TokenType != JsonTokenType.StartArray) if (reader.TokenType != JsonTokenType.StartArray)
throw new Exception("Not an array"); throw new CeDeserializationException("Not an array");
int index = 0; int index = 0;
while (reader.Read()) while (reader.Read())
@ -158,7 +159,7 @@ namespace CryptoExchange.Net.Converters.SystemTextJson
JsonTokenType.String => reader.GetString(), JsonTokenType.String => reader.GetString(),
JsonTokenType.Number => reader.GetDecimal(), JsonTokenType.Number => reader.GetDecimal(),
JsonTokenType.StartObject => JsonSerializer.Deserialize(ref reader, attribute.TargetType, options), JsonTokenType.StartObject => JsonSerializer.Deserialize(ref reader, attribute.TargetType, options),
_ => throw new NotImplementedException($"Array deserialization of type {reader.TokenType} not supported"), _ => throw new CeDeserializationException($"Array deserialization of type {reader.TokenType} not supported"),
}; };
} }

View File

@ -0,0 +1,24 @@
using System;
namespace CryptoExchange.Net.Exceptions
{
/// <summary>
/// Exception during deserialization
/// </summary>
public class CeDeserializationException : Exception
{
/// <summary>
/// ctor
/// </summary>
public CeDeserializationException(string message) : base(message)
{
}
/// <summary>
/// ctor
/// </summary>
public CeDeserializationException(string message, Exception innerException) : base(message, innerException)
{
}
}
}

View File

@ -97,14 +97,5 @@ namespace CryptoExchange.Net.Objects.Sockets
{ {
return _connection.CloseAsync(_subscription); return _connection.CloseAsync(_subscription);
} }
/// <summary>
/// Unsubscribe a subscription
/// </summary>
/// <returns></returns>
internal async Task UnsubscribeAsync()
{
await _connection.UnsubscribeAsync(_subscription).ConfigureAwait(false);
}
} }
} }

View File

@ -1,4 +1,5 @@
using CryptoExchange.Net.Clients; using CryptoExchange.Net.Clients;
using CryptoExchange.Net.Exceptions;
using CryptoExchange.Net.Objects.Sockets; using CryptoExchange.Net.Objects.Sockets;
using CryptoExchange.Net.Sockets.Default.Interfaces; using CryptoExchange.Net.Sockets.Default.Interfaces;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
@ -37,14 +38,21 @@ namespace CryptoExchange.Net.Sockets.HighPerf
{ {
try try
{ {
while (!ct.IsCancellationRequested)
{
try
{
#pragma warning disable IL2026 // Members annotated with 'RequiresUnreferencedCodeAttribute' require dynamic access otherwise can break functionality when trimming application code #pragma warning disable IL2026 // Members annotated with 'RequiresUnreferencedCodeAttribute' require dynamic access otherwise can break functionality when trimming application code
#pragma warning disable IL3050 // Calling members annotated with 'RequiresDynamicCodeAttribute' may break functionality when AOT compiling. #pragma warning disable IL3050 // Calling members annotated with 'RequiresDynamicCodeAttribute' may break functionality when AOT compiling.
await foreach (var update in JsonSerializer.DeserializeAsyncEnumerable<T>(_pipe.Reader, true, _jsonOptions, ct).ConfigureAwait(false)) await foreach (var update in JsonSerializer.DeserializeAsyncEnumerable<T>(_pipe.Reader, true, _jsonOptions, ct).ConfigureAwait(false))
#pragma warning restore IL3050 // Calling members annotated with 'RequiresDynamicCodeAttribute' may break functionality when AOT compiling. #pragma warning restore IL3050 // Calling members annotated with 'RequiresDynamicCodeAttribute' may break functionality when AOT compiling.
#pragma warning restore IL2026 // Members annotated with 'RequiresUnreferencedCodeAttribute' require dynamic access otherwise can break functionality when trimming application code #pragma warning restore IL2026 // Members annotated with 'RequiresUnreferencedCodeAttribute' require dynamic access otherwise can break functionality when trimming application code
{ {
foreach(var sub in _typedSubscriptions) foreach (var sub in _typedSubscriptions)
DelegateToSubscription(_typedSubscriptions[0], update!); DelegateToSubscription(_typedSubscriptions[0], update!);
}
}
catch (CeDeserializationException) { } // Might just be a different message, ignore
} }
} }
catch (OperationCanceledException) { } catch (OperationCanceledException) { }

View File

@ -240,24 +240,8 @@ namespace CryptoExchange.Net.Sockets.HighPerf
if (subscription.CancellationTokenRegistration.HasValue) if (subscription.CancellationTokenRegistration.HasValue)
subscription.CancellationTokenRegistration.Value.Dispose(); subscription.CancellationTokenRegistration.Value.Dispose();
var anyOtherSubscriptions = Subscriptions.Any(x => x != subscription); Status = SocketStatus.Closing;
await CloseAsync().ConfigureAwait(false);
if (anyOtherSubscriptions)
await UnsubscribeAsync(subscription).ConfigureAwait(false);
if (Status == SocketStatus.Closing)
{
_logger.AlreadyClosing(SocketId);
return;
}
if (!anyOtherSubscriptions)
{
Status = SocketStatus.Closing;
_logger.ClosingNoMoreSubscriptions(SocketId);
await CloseAsync().ConfigureAwait(false);
}
} }
/// <summary> /// <summary>
@ -311,7 +295,6 @@ namespace CryptoExchange.Net.Sockets.HighPerf
return new CallResult(new WebError("Failed to send message, socket no longer open")); return new CallResult(new WebError("Failed to send message, socket no longer open"));
} }
_logger.SendingByteData(SocketId, 0, data.Length);
try try
{ {
if (!await _socket.SendAsync(data).ConfigureAwait(false)) if (!await _socket.SendAsync(data).ConfigureAwait(false))
@ -344,7 +327,6 @@ namespace CryptoExchange.Net.Sockets.HighPerf
return new CallResult(new WebError("Failed to send message, socket no longer open")); return new CallResult(new WebError("Failed to send message, socket no longer open"));
} }
_logger.SendingData(SocketId, 0, data);
try try
{ {
if (!await _socket.SendAsync(data).ConfigureAwait(false)) if (!await _socket.SendAsync(data).ConfigureAwait(false))
@ -358,16 +340,6 @@ namespace CryptoExchange.Net.Sockets.HighPerf
} }
} }
internal async Task UnsubscribeAsync(HighPerfSubscription subscription)
{
var unsubscribeRequest = subscription.CreateUnsubscriptionQuery(this);
if (unsubscribeRequest == null)
return;
await SendAsync(unsubscribeRequest).ConfigureAwait(false);
_logger.SubscriptionUnsubscribed(SocketId, subscription.Id);
}
/// <summary> /// <summary>
/// Periodically sends data over a socket connection /// Periodically sends data over a socket connection
/// </summary> /// </summary>

View File

@ -33,11 +33,6 @@ namespace CryptoExchange.Net.Sockets.HighPerf
/// </summary> /// </summary>
public object? SubscriptionQuery { get; private set; } public object? SubscriptionQuery { get; private set; }
/// <summary>
/// The unsubscribe query for this subscription
/// </summary>
public object? UnsubscriptionQuery { get; private set; }
/// <summary> /// <summary>
/// ctor /// ctor
/// </summary> /// </summary>
@ -62,22 +57,6 @@ namespace CryptoExchange.Net.Sockets.HighPerf
/// <returns></returns> /// <returns></returns>
protected abstract object? GetSubQuery(HighPerfSocketConnection connection); protected abstract object? GetSubQuery(HighPerfSocketConnection connection);
/// <summary>
/// Create a new unsubscription query
/// </summary>
public object? CreateUnsubscriptionQuery(HighPerfSocketConnection connection)
{
var query = GetUnsubQuery(connection);
UnsubscriptionQuery = query;
return query;
}
/// <summary>
/// Get the unsubscribe query to send when unsubscribing
/// </summary>
/// <returns></returns>
protected abstract object? GetUnsubQuery(HighPerfSocketConnection connection);
/// <summary> /// <summary>
/// Invoke the exception event /// Invoke the exception event
/// </summary> /// </summary>