1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2026-02-16 22:23:54 +00:00
This commit is contained in:
Jkorf 2025-12-04 16:04:02 +01:00
parent ab8a93d1e2
commit 02b38383ca
10 changed files with 65 additions and 95 deletions

View File

@ -11,7 +11,7 @@ using System.Threading.Tasks;
namespace CryptoExchange.Net.UnitTests.TestImplementations.Sockets
{
internal class TestSubscription<T> : Subscription<object, object>
internal class TestSubscription<T> : Subscription
{
private readonly Action<DataEvent<T>> _handler;
@ -22,9 +22,9 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations.Sockets
MessageMatcher = MessageMatcher.Create<T>("update-topic", DoHandleMessage);
}
public CallResult DoHandleMessage(SocketConnection connection, DataEvent<T> message)
public CallResult DoHandleMessage(SocketConnection connection, DateTime receiveTime, string? originalData, T message)
{
_handler.Invoke(message);
_handler.Invoke(new DataEvent<T>(message, receiveTime, originalData));
return new CallResult(null);
}

View File

@ -10,7 +10,7 @@ using System.Threading.Tasks;
namespace CryptoExchange.Net.UnitTests.TestImplementations.Sockets
{
internal class TestSubscriptionWithResponseCheck<T> : Subscription<SubResponse, UnsubResponse>
internal class TestSubscriptionWithResponseCheck<T> : Subscription
{
private readonly Action<DataEvent<T>> _handler;
private readonly string _channel;
@ -22,9 +22,9 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations.Sockets
_channel = channel;
}
public CallResult DoHandleMessage(SocketConnection connection, DataEvent<T> message)
public CallResult DoHandleMessage(SocketConnection connection, DateTime receiveTime, string? originalData, T message)
{
_handler.Invoke(message);
_handler.Invoke(new DataEvent<T>(message, receiveTime, originalData));
return new CallResult(null);
}

View File

@ -19,6 +19,7 @@ using System.Linq;
using CryptoExchange.Net.Converters.SystemTextJson;
using System.Text.Json.Serialization;
using CryptoExchange.Net.Objects.Errors;
using System.Net.Http.Headers;
namespace CryptoExchange.Net.UnitTests.TestImplementations
{
@ -86,7 +87,7 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations
var request = new Mock<IRequest>();
request.Setup(c => c.Uri).Returns(new Uri("http://www.test.com"));
request.Setup(c => c.GetHeaders()).Returns(new KeyValuePair<string, string[]>[0]);
request.Setup(c => c.GetHeaders()).Returns(new HttpRequestMessage().Headers);
request.Setup(c => c.GetResponseAsync(It.IsAny<CancellationToken>())).Throws(we);
var factory = Mock.Get(Api1.RequestFactory);
@ -115,7 +116,7 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations
request.Setup(c => c.Uri).Returns(new Uri("http://www.test.com"));
request.Setup(c => c.GetResponseAsync(It.IsAny<CancellationToken>())).Returns(Task.FromResult(response.Object));
request.Setup(c => c.AddHeader(It.IsAny<string>(), It.IsAny<string>())).Callback<string, string>((key, val) => headers.Add(new KeyValuePair<string, string[]>(key, new string[] { val })));
request.Setup(c => c.GetHeaders()).Returns(headers.ToArray());
request.Setup(c => c.GetHeaders()).Returns(new HttpRequestMessage().Headers);
var factory = Mock.Get(Api1.RequestFactory);
factory.Setup(c => c.Create(It.IsAny<Version>(), It.IsAny<HttpMethod>(), It.IsAny<Uri>(), It.IsAny<int>()))
@ -194,7 +195,7 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations
return await SendAsync<T>("http://www.test.com", new RequestDefinition("/", HttpMethod.Get) { Weight = 0 }, null, ct);
}
protected override Error ParseErrorResponse(int httpStatusCode, KeyValuePair<string, string[]>[] responseHeaders, IMessageAccessor accessor, Exception exception)
protected override Error ParseErrorResponse(int httpStatusCode, HttpResponseHeaders responseHeaders, IMessageAccessor accessor, Exception exception)
{
var errorData = accessor.Deserialize<TestError>();

View File

@ -264,7 +264,9 @@ namespace CryptoExchange.Net.Clients
uriParameters,
bodyParameters,
additionalHeaders);
_logger.RestApiSendRequest(request.RequestId, definition, request.Content, string.IsNullOrEmpty(request.Uri.Query) ? "-" : request.Uri.Query, string.Join(", ", request.GetHeaders().Select(h => h.Key + $"=[{string.Join(",", h.Value)}]")));
if (_logger.IsEnabled(LogLevel.Debug))
_logger.RestApiSendRequest(request.RequestId, definition, request.Content, string.IsNullOrEmpty(request.Uri.Query) ? "-" : request.Uri.Query, string.Join(", ", request.GetHeaders().Select(h => h.Key + $"=[{string.Join(",", h.Value)}]")));
TotalRequestsMade++;
WebCallResult<T> result;
@ -281,9 +283,14 @@ namespace CryptoExchange.Net.Clients
{
var originalData = OutputOriginalData ? result.OriginalData : "[Data only available when OutputOriginal = true]";
if (!result)
{
_logger.RestApiErrorReceived(result.RequestId, result.ResponseStatusCode, (long)Math.Floor(result.ResponseTime!.Value.TotalMilliseconds), result.Error?.ToString(), originalData, result.Error?.Exception);
}
else
_logger.RestApiResponseReceived(result.RequestId, result.ResponseStatusCode, (long)Math.Floor(result.ResponseTime!.Value.TotalMilliseconds), originalData);
{
if (_logger.IsEnabled(LogLevel.Debug))
_logger.RestApiResponseReceived(result.RequestId, result.ResponseStatusCode, (long)Math.Floor(result.ResponseTime!.Value.TotalMilliseconds), originalData);
}
}
else
{
@ -466,7 +473,6 @@ namespace CryptoExchange.Net.Clients
var sw = Stopwatch.StartNew();
Stream? responseStream = null;
IResponse? response = null;
var state = MessageHandler.CreateState();
try
{
@ -475,19 +481,22 @@ namespace CryptoExchange.Net.Clients
responseStream = await response.GetResponseStreamAsync().ConfigureAwait(false);
string? originalData = null;
var outputOriginalData = ApiOptions.OutputOriginalData ?? ClientOptions.OutputOriginalData;
if (outputOriginalData)
if (outputOriginalData || MessageHandler.RequiresSeekableStream)
{
// If we want to return the original string data from the stream, but still want to process it
// we'll need to copy it as the stream isn't seekable, and thus we can only read it once
var memoryStream = new MemoryStream();
await responseStream.CopyToAsync(memoryStream).ConfigureAwait(false);
using var reader = new StreamReader(memoryStream, Encoding.UTF8,false, 4096, true);
memoryStream.Position = 0;
originalData = await reader.ReadToEndAsync().ConfigureAwait(false);
if (outputOriginalData)
{
memoryStream.Position = 0;
originalData = await reader.ReadToEndAsync().ConfigureAwait(false);
if (_logger.IsEnabled(LogLevel.Trace))
if (_logger.IsEnabled(LogLevel.Trace))
#warning TODO extension
_logger.LogTrace("[Req {RequestId}] Received response: {Data}", request.RequestId, originalData);
_logger.LogTrace("[Req {RequestId}] Received response: {Data}", request.RequestId, originalData);
}
// Continue processing from the memory stream since the response stream is already read and we can't seek it
responseStream.Close();
@ -505,7 +514,6 @@ namespace CryptoExchange.Net.Clients
// Specifically handle rate limit errors
var rateError = await MessageHandler.ParseErrorRateLimitResponse(
(int)response.StatusCode,
state,
response.ResponseHeaders,
responseStream).ConfigureAwait(false);
if (rateError.RetryAfter != null && gate != null && ClientOptions.RateLimiterEnabled)
@ -521,7 +529,6 @@ namespace CryptoExchange.Net.Clients
// Handle a 'normal' error response. Can still be either a json error message or some random HTML or other string
error = await MessageHandler.ParseErrorResponse(
(int)response.StatusCode,
state,
response.ResponseHeaders,
responseStream).ConfigureAwait(false);
}
@ -536,7 +543,6 @@ namespace CryptoExchange.Net.Clients
// Data response received, inspect the message and check if it is an error or not
var parsedError = await MessageHandler.CheckForErrorResponse(
requestDefinition,
state,
response.ResponseHeaders,
responseStream).ConfigureAwait(false);
if (parsedError != null)
@ -554,8 +560,12 @@ namespace CryptoExchange.Net.Clients
return new WebCallResult<T>(response.StatusCode, response.HttpVersion, response.ResponseHeaders, sw.Elapsed, response.ContentLength, originalData, request.RequestId, request.Uri.ToString(), request.Content, request.Method, request.GetHeaders(), ResultDataSource.Server, default, parsedError);
}
if (MessageHandler.RequiresSeekableStream)
// Reset stream read position as it might not be at the start if `CheckForErrorResponse` has read from it
responseStream.Position = 0;
// Try deserialization into the expected type
var (deserializeResult, deserializeError) = await MessageHandler.TryDeserializeAsync<T>(responseStream, state, cancellationToken).ConfigureAwait(false);
var (deserializeResult, deserializeError) = await MessageHandler.TryDeserializeAsync<T>(responseStream, cancellationToken).ConfigureAwait(false);
if (deserializeError != null)
return new WebCallResult<T>(response.StatusCode, response.HttpVersion, response.ResponseHeaders, sw.Elapsed, response.ContentLength, originalData, request.RequestId, request.Uri.ToString(), request.Content, request.Method, request.GetHeaders(), ResultDataSource.Server, deserializeResult, deserializeError); ;

View File

@ -20,17 +20,15 @@ namespace CryptoExchange.Net.Converters.MessageParsing.DynamicConverters
MediaTypeWithQualityHeaderValue AcceptHeader { get; }
/// <summary>
/// Create an object to keep state for a request
/// Whether a seekable stream is required
/// </summary>
/// <returns></returns>
object? CreateState();
bool RequiresSeekableStream { get; }
/// <summary>
/// Parse the response when the HTTP response status indicated an error
/// </summary>
ValueTask<Error> ParseErrorResponse(
int httpStatusCode,
object? state,
HttpResponseHeaders responseHeaders,
Stream responseStream);
@ -39,7 +37,6 @@ namespace CryptoExchange.Net.Converters.MessageParsing.DynamicConverters
/// </summary>
ValueTask<ServerRateLimitError> ParseErrorRateLimitResponse(
int httpStatusCode,
object? state,
HttpResponseHeaders responseHeaders,
Stream responseStream);
@ -51,7 +48,6 @@ namespace CryptoExchange.Net.Converters.MessageParsing.DynamicConverters
/// </summary>
ValueTask<Error?> CheckForErrorResponse(
RequestDefinition request,
object? state,
HttpResponseHeaders responseHeaders,
Stream responseStream);
@ -60,7 +56,6 @@ namespace CryptoExchange.Net.Converters.MessageParsing.DynamicConverters
/// </summary>
ValueTask<(T? Result, Error? Error)> TryDeserializeAsync<T>(
Stream responseStream,
object? state,
CancellationToken ct);
/// <summary>

View File

@ -17,11 +17,6 @@ using System.Threading.Tasks;
namespace CryptoExchange.Net.Converters.SystemTextJson.MessageConverters
{
internal class JsonDocState
{
public JsonDocument? Document { get; set; }
}
/// <summary>
/// JSON REST message handler
/// </summary>
@ -34,6 +29,9 @@ namespace CryptoExchange.Net.Converters.SystemTextJson.MessageConverters
/// </summary>
protected static readonly ServerRateLimitError _emptyRateLimitError = new ServerRateLimitError();
/// <inheritdoc />
public virtual bool RequiresSeekableStream => false;
/// <summary>
/// The serializer options to use
/// </summary>
@ -42,13 +40,9 @@ namespace CryptoExchange.Net.Converters.SystemTextJson.MessageConverters
/// <inheritdoc />
public MediaTypeWithQualityHeaderValue AcceptHeader => _acceptJsonContent;
/// <inheritdoc />
public virtual object CreateState() => new JsonDocState();
/// <inheritdoc />
public virtual ValueTask<ServerRateLimitError> ParseErrorRateLimitResponse(
int httpStatusCode,
object? state,
HttpResponseHeaders responseHeaders,
Stream responseStream)
{
@ -70,29 +64,23 @@ namespace CryptoExchange.Net.Converters.SystemTextJson.MessageConverters
/// <inheritdoc />
public abstract ValueTask<Error> ParseErrorResponse(
int httpStatusCode,
object? state,
HttpResponseHeaders responseHeaders,
Stream responseStream);
/// <inheritdoc />
public virtual ValueTask<Error?> CheckForErrorResponse(
RequestDefinition request,
object? state,
HttpResponseHeaders responseHeaders,
Stream responseStream) => new ValueTask<Error?>((Error?)null);
/// <summary>
/// Read the response into a JsonDocument object
/// </summary>
protected virtual async ValueTask<(Error?, JsonDocument?)> GetJsonDocument(Stream stream, object? state)
protected virtual async ValueTask<(Error?, JsonDocument?)> GetJsonDocument(Stream stream)
{
if (state is JsonDocState documentState && documentState.Document != null)
return (null, documentState.Document);
try
{
var document = await JsonDocument.ParseAsync(stream).ConfigureAwait(false);
((JsonDocState)state!).Document = document;
return (null, document);
}
catch (Exception ex)
@ -102,21 +90,11 @@ namespace CryptoExchange.Net.Converters.SystemTextJson.MessageConverters
}
/// <inheritdoc />
public async ValueTask<(T? Result, Error? Error)> TryDeserializeAsync<T>(Stream responseStream, object? state, CancellationToken cancellationToken)
public async ValueTask<(T? Result, Error? Error)> TryDeserializeAsync<T>(Stream responseStream, CancellationToken cancellationToken)
{
try
{
// If the document was already loaded (because we needed it for checking a response code for instance)
// then we deserialize from the document, else from the stream
T result;
if (state is JsonDocState documentState && documentState.Document != null)
{
result = documentState.Document.Deserialize<T>(Options)!;
}
else
{
result = await JsonSerializer.DeserializeAsync<T>(responseStream, Options)!.ConfigureAwait(false)!;
}
var result = await JsonSerializer.DeserializeAsync<T>(responseStream, Options)!.ConfigureAwait(false)!;
return (result, null);
}
catch (JsonException ex)

View File

@ -139,10 +139,13 @@ namespace CryptoExchange.Net.RateLimiting
{
RateLimitUpdated?.Invoke(new RateLimitUpdateEvent(itemId, _name, guard.Description, result.Current, result.Limit, result.Period));
if (type == RateLimitItemType.Connection)
logger.RateLimitAppliedConnection(itemId, guard.Name, guard.Description, result.Current);
else
logger.RateLimitAppliedRequest(itemId, definition.Path, guard.Name, guard.Description, result.Current);
if (logger.IsEnabled(LogLevel.Trace))
{
if (type == RateLimitItemType.Connection)
logger.RateLimitAppliedConnection(itemId, guard.Name, guard.Description, result.Current);
else
logger.RateLimitAppliedRequest(itemId, definition.Path, guard.Name, guard.Description, result.Current);
}
}
}

View File

@ -65,6 +65,18 @@ namespace CryptoExchange.Net.Sockets
return new MessageRouter(new MessageRoute<T>(typeIdentifier, topicFilter, handler));
}
/// <summary>
/// Create message matcher
/// </summary>
public static MessageRouter CreateWithTopicFilter<T>(IEnumerable<string> typeIdentifiers, string topicFilter, Func<SocketConnection, DateTime, string?, T, CallResult> handler)
{
var routes = new List<MessageRoute>();
foreach (var type in typeIdentifiers)
routes.Add(new MessageRoute<T>(type, topicFilter, handler));
return new MessageRouter(routes.ToArray());
}
/// <summary>
/// Create message matcher
/// </summary>

View File

@ -235,38 +235,4 @@ namespace CryptoExchange.Net.Sockets
return new SubscriptionState(Id, Status, TotalInvocations, MessageMatcher);
}
}
/// <inheritdoc />
public abstract class Subscription<TSubResponse, TUnsubResponse> : Subscription
{
/// <summary>
/// ctor
/// </summary>
/// <param name="logger"></param>
/// <param name="authenticated"></param>
protected Subscription(ILogger logger, bool authenticated) : base(logger, authenticated)
{
}
/// <inheritdoc />
public override void HandleSubQueryResponse(object? message)
=> HandleSubQueryResponse((TSubResponse?)message);
/// <summary>
/// Handle a subscription query response
/// </summary>
/// <param name="message"></param>
public virtual void HandleSubQueryResponse(TSubResponse? message) { }
/// <inheritdoc />
public override void HandleUnsubQueryResponse(object message)
=> HandleUnsubQueryResponse((TUnsubResponse)message);
/// <summary>
/// Handle an unsubscription query response
/// </summary>
/// <param name="message"></param>
public virtual void HandleUnsubQueryResponse(TUnsubResponse message) { }
}
}

View File

@ -93,10 +93,11 @@ namespace CryptoExchange.Net.Testing
};
TUpdate? update = default;
Task<CallResult<UpdateSubscription>> task;
// Invoke subscription method
try
{
var task = methodInvoke(_client, x => { update = x.Data; });
task = methodInvoke(_client, x => { update = x.Data; });
}
catch(Exception)
{
@ -194,6 +195,10 @@ namespace CryptoExchange.Net.Testing
}
}
var res = await task.ConfigureAwait(false);
if (!res)
throw new Exception("Subscribe failed: " + res.Error!.ToString());
await _client.UnsubscribeAllAsync().ConfigureAwait(false);
Trace.Listeners.Remove(listener);
}