mirror of
https://github.com/JKorf/CryptoExchange.Net
synced 2025-12-14 01:33:26 +00:00
wip
This commit is contained in:
parent
18ed614ff0
commit
d1ebb65c5d
@ -73,6 +73,8 @@ namespace CryptoExchange.Net.UnitTests
|
||||
|
||||
public class TestAuthProvider : AuthenticationProvider
|
||||
{
|
||||
public override ApiCredentialsType[] SupportedCredentialTypes => [ApiCredentialsType.Hmac];
|
||||
|
||||
public TestAuthProvider(ApiCredentials credentials) : base(credentials)
|
||||
{
|
||||
}
|
||||
|
||||
@ -199,13 +199,6 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations
|
||||
return await SendAsync<T>("http://www.test.com", new RequestDefinition("/", HttpMethod.Get) { Weight = 0 }, null, ct);
|
||||
}
|
||||
|
||||
protected override Error ParseErrorResponse(int httpStatusCode, HttpResponseHeaders responseHeaders, IMessageAccessor accessor, Exception exception)
|
||||
{
|
||||
var errorData = accessor.Deserialize<TestError>();
|
||||
|
||||
return new ServerError(errorData.Data.ErrorCode, GetErrorInfo(errorData.Data.ErrorCode, errorData.Data.ErrorMessage));
|
||||
}
|
||||
|
||||
public override TimeSpan? GetTimeOffset()
|
||||
{
|
||||
throw new NotImplementedException();
|
||||
|
||||
@ -16,11 +16,14 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations
|
||||
{
|
||||
internal class TestRestMessageHandler : JsonRestMessageHandler
|
||||
{
|
||||
private ErrorMapping _errorMapping;
|
||||
public override JsonSerializerOptions Options => new JsonSerializerOptions();
|
||||
|
||||
public override ValueTask<Error> ParseErrorResponse(int httpStatusCode, HttpResponseHeaders responseHeaders, Stream responseStream)
|
||||
{
|
||||
return new ValueTask<Error>(new ServerError(ErrorInfo.Unknown));
|
||||
var errorData = JsonSerializer.Deserialize<TestError>(responseStream);
|
||||
|
||||
return new ValueTask<Error>(new ServerError(errorData.ErrorCode, _errorMapping.GetErrorInfo(errorData.ErrorCode.ToString(), errorData.ErrorMessage)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -268,16 +268,7 @@ namespace CryptoExchange.Net.Clients
|
||||
_logger.RestApiSendRequest(request.RequestId, definition, request.Content, string.IsNullOrEmpty(request.Uri.Query) ? "-" : request.Uri.Query, string.Join(", ", request.GetHeaders().Select(h => h.Key + $"=[{string.Join(",", h.Value)}]")));
|
||||
TotalRequestsMade++;
|
||||
|
||||
WebCallResult<T> result;
|
||||
if (ClientOptions.UseUpdatedDeserialization)
|
||||
{
|
||||
result = await GetResponseAsync2<T>(definition, request, definition.RateLimitGate, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
else
|
||||
{
|
||||
result = await GetResponseAsync<T>(definition, request, definition.RateLimitGate, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
var result = await GetResponseAsync2<T>(definition, request, definition.RateLimitGate, cancellationToken).ConfigureAwait(false);
|
||||
if (result.Error is not CancellationRequestedError)
|
||||
{
|
||||
var originalData = OutputOriginalData ? result.OriginalData : "[Data only available when OutputOriginal = true]";
|
||||
@ -763,192 +754,5 @@ namespace CryptoExchange.Net.Clients
|
||||
&& definition.Method == HttpMethod.Get
|
||||
&& !definition.PreventCaching;
|
||||
|
||||
#region TO BE REMOVED
|
||||
/// <summary>
|
||||
/// Executes the request and returns the result deserialized into the type parameter class
|
||||
/// </summary>
|
||||
/// <param name="requestDefinition">The request definition</param>
|
||||
/// <param name="request">The request object to execute</param>
|
||||
/// <param name="gate">The ratelimit gate used</param>
|
||||
/// <param name="cancellationToken">Cancellation token</param>
|
||||
/// <returns></returns>
|
||||
protected virtual async Task<WebCallResult<T>> GetResponseAsync<T>(
|
||||
RequestDefinition requestDefinition,
|
||||
IRequest request,
|
||||
IRateLimitGate? gate,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
var sw = Stopwatch.StartNew();
|
||||
Stream? responseStream = null;
|
||||
IResponse? response = null;
|
||||
IStreamMessageAccessor? accessor = null;
|
||||
try
|
||||
{
|
||||
response = await request.GetResponseAsync(cancellationToken).ConfigureAwait(false);
|
||||
sw.Stop();
|
||||
responseStream = await response.GetResponseStreamAsync().ConfigureAwait(false);
|
||||
var outputOriginalData = ApiOptions.OutputOriginalData ?? ClientOptions.OutputOriginalData;
|
||||
|
||||
accessor = CreateAccessor();
|
||||
if (!response.IsSuccessStatusCode && !requestDefinition.TryParseOnNonSuccess)
|
||||
{
|
||||
// Error response
|
||||
var readResult = await accessor.Read(responseStream, true).ConfigureAwait(false);
|
||||
|
||||
Error error;
|
||||
if (response.StatusCode == (HttpStatusCode)418 || response.StatusCode == (HttpStatusCode)429)
|
||||
{
|
||||
var rateError = ParseRateLimitResponse((int)response.StatusCode, response.ResponseHeaders, accessor);
|
||||
if (rateError.RetryAfter != null && gate != null && ClientOptions.RateLimiterEnabled)
|
||||
{
|
||||
_logger.RestApiRateLimitPauseUntil(request.RequestId, rateError.RetryAfter.Value);
|
||||
await gate.SetRetryAfterGuardAsync(rateError.RetryAfter.Value).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
error = rateError;
|
||||
}
|
||||
else
|
||||
{
|
||||
error = ParseErrorResponse((int)response.StatusCode, response.ResponseHeaders, accessor, readResult.Error?.Exception);
|
||||
}
|
||||
|
||||
if (error.Code == null || error.Code == 0)
|
||||
error.Code = (int)response.StatusCode;
|
||||
|
||||
return new WebCallResult<T>(response.StatusCode, response.HttpVersion, response.ResponseHeaders, sw.Elapsed, response.ContentLength, OutputOriginalData ? accessor.GetOriginalString() : null, request.RequestId, request.Uri.ToString(), request.Content, request.Method, request.GetHeaders(), ResultDataSource.Server, default, error!);
|
||||
}
|
||||
|
||||
var valid = await accessor.Read(responseStream, outputOriginalData).ConfigureAwait(false);
|
||||
if (typeof(T) == typeof(object))
|
||||
// Success status code and expected empty response, assume it's correct
|
||||
return new WebCallResult<T>(response.StatusCode, response.HttpVersion, response.ResponseHeaders, sw.Elapsed, 0, accessor.OriginalDataAvailable ? accessor.GetOriginalString() : "[Data only available when OutputOriginal = true in client options]", request.RequestId, request.Uri.ToString(), request.Content, request.Method, request.GetHeaders(), ResultDataSource.Server, default, null);
|
||||
|
||||
if (!valid)
|
||||
{
|
||||
// Invalid data
|
||||
return new WebCallResult<T>(response.StatusCode, response.HttpVersion, response.ResponseHeaders, sw.Elapsed, response.ContentLength, OutputOriginalData ? accessor.GetOriginalString() : null, request.RequestId, request.Uri.ToString(), request.Content, request.Method, request.GetHeaders(), ResultDataSource.Server, default, valid.Error);
|
||||
}
|
||||
|
||||
// Data response received
|
||||
var parsedError = TryParseError(requestDefinition, response.ResponseHeaders, accessor);
|
||||
if (parsedError != null)
|
||||
{
|
||||
if (parsedError is ServerRateLimitError rateError)
|
||||
{
|
||||
if (rateError.RetryAfter != null && gate != null && ClientOptions.RateLimiterEnabled)
|
||||
{
|
||||
_logger.RestApiRateLimitPauseUntil(request.RequestId, rateError.RetryAfter.Value);
|
||||
await gate.SetRetryAfterGuardAsync(rateError.RetryAfter.Value).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
// Success status code, but TryParseError determined it was an error response
|
||||
return new WebCallResult<T>(response.StatusCode, response.HttpVersion, response.ResponseHeaders, sw.Elapsed, response.ContentLength, OutputOriginalData ? accessor.GetOriginalString() : null, request.RequestId, request.Uri.ToString(), request.Content, request.Method, request.GetHeaders(), ResultDataSource.Server, default, parsedError);
|
||||
}
|
||||
|
||||
var deserializeResult = accessor.Deserialize<T>();
|
||||
return new WebCallResult<T>(response.StatusCode, response.HttpVersion, response.ResponseHeaders, sw.Elapsed, response.ContentLength, OutputOriginalData ? accessor.GetOriginalString() : null, request.RequestId, request.Uri.ToString(), request.Content, request.Method, request.GetHeaders(), ResultDataSource.Server, deserializeResult.Data, deserializeResult.Error);
|
||||
}
|
||||
catch (HttpRequestException requestException)
|
||||
{
|
||||
// Request exception, can't reach server for instance
|
||||
var error = new WebError(requestException.Message, requestException);
|
||||
return new WebCallResult<T>(null, null, null, sw.Elapsed, null, null, request.RequestId, request.Uri.ToString(), request.Content, request.Method, request.GetHeaders(), ResultDataSource.Server, default, error);
|
||||
}
|
||||
catch (OperationCanceledException canceledException)
|
||||
{
|
||||
if (cancellationToken != default && canceledException.CancellationToken == cancellationToken)
|
||||
{
|
||||
// Cancellation token canceled by caller
|
||||
return new WebCallResult<T>(null, null, null, sw.Elapsed, null, null, request.RequestId, request.Uri.ToString(), request.Content, request.Method, request.GetHeaders(), ResultDataSource.Server, default, new CancellationRequestedError(canceledException));
|
||||
}
|
||||
else
|
||||
{
|
||||
// Request timed out
|
||||
var error = new WebError($"Request timed out", exception: canceledException);
|
||||
error.ErrorType = ErrorType.Timeout;
|
||||
return new WebCallResult<T>(null, null, null, sw.Elapsed, null, null, request.RequestId, request.Uri.ToString(), request.Content, request.Method, request.GetHeaders(), ResultDataSource.Server, default, error);
|
||||
}
|
||||
}
|
||||
catch (ArgumentException argumentException)
|
||||
{
|
||||
if (argumentException.Message.StartsWith("Only HTTP/"))
|
||||
{
|
||||
// Unsupported HTTP version error .net framework
|
||||
var error = ArgumentError.Invalid(nameof(RestExchangeOptions.HttpVersion), $"Invalid HTTP version {request.HttpVersion}: " + argumentException.Message);
|
||||
return new WebCallResult<T>(null, null, null, sw.Elapsed, null, null, request.RequestId, request.Uri.ToString(), request.Content, request.Method, request.GetHeaders(), ResultDataSource.Server, default, error);
|
||||
}
|
||||
|
||||
throw;
|
||||
}
|
||||
catch (NotSupportedException notSupportedException)
|
||||
{
|
||||
if (notSupportedException.Message.StartsWith("Request version value must be one of"))
|
||||
{
|
||||
// Unsupported HTTP version error dotnet code
|
||||
var error = ArgumentError.Invalid(nameof(RestExchangeOptions.HttpVersion), $"Invalid HTTP version {request.HttpVersion}: " + notSupportedException.Message);
|
||||
return new WebCallResult<T>(null, null, null, sw.Elapsed, null, null, request.RequestId, request.Uri.ToString(), request.Content, request.Method, request.GetHeaders(), ResultDataSource.Server, default, error);
|
||||
}
|
||||
|
||||
throw;
|
||||
}
|
||||
finally
|
||||
{
|
||||
accessor?.Clear();
|
||||
responseStream?.Close();
|
||||
response?.Close();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// <summary>
|
||||
/// Parse an error response from the server. Only used when server returns a status other than Success(200) or ratelimit error (429 or 418)
|
||||
/// </summary>
|
||||
/// <param name="httpStatusCode">The response status code</param>
|
||||
/// <param name="responseHeaders">The response headers</param>
|
||||
/// <param name="accessor">Data accessor</param>
|
||||
/// <param name="exception">Exception</param>
|
||||
/// <returns></returns>
|
||||
protected virtual Error ParseErrorResponse(int httpStatusCode, HttpResponseHeaders responseHeaders, IMessageAccessor accessor, Exception? exception)
|
||||
{
|
||||
return new ServerError(ErrorInfo.Unknown, exception);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Parse a rate limit error response from the server. Only used when server returns http status 429 or 418
|
||||
/// </summary>
|
||||
/// <param name="httpStatusCode">The response status code</param>
|
||||
/// <param name="responseHeaders">The response headers</param>
|
||||
/// <param name="accessor">Data accessor</param>
|
||||
/// <returns></returns>
|
||||
protected virtual ServerRateLimitError ParseRateLimitResponse(int httpStatusCode, HttpResponseHeaders responseHeaders, IMessageAccessor accessor)
|
||||
{
|
||||
// Handle retry after header
|
||||
var retryAfterHeader = responseHeaders.SingleOrDefault(r => r.Key.Equals("Retry-After", StringComparison.InvariantCultureIgnoreCase));
|
||||
if (retryAfterHeader.Value?.Any() != true)
|
||||
return new ServerRateLimitError();
|
||||
|
||||
var value = retryAfterHeader.Value.First();
|
||||
if (int.TryParse(value, out var seconds))
|
||||
return new ServerRateLimitError() { RetryAfter = DateTime.UtcNow.AddSeconds(seconds) };
|
||||
|
||||
if (DateTime.TryParse(value, out var datetime))
|
||||
return new ServerRateLimitError() { RetryAfter = datetime };
|
||||
|
||||
return new ServerRateLimitError();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Can be used to parse an error even though response status indicates success. Some apis always return 200 OK, even though there is an error.
|
||||
/// This method will be called for each response to be able to check if the response is an error or not.
|
||||
/// If the response is an error this method should return the parsed error, else it should return null
|
||||
/// </summary>
|
||||
/// <param name="requestDefinition">Request definition</param>
|
||||
/// <param name="accessor">Data accessor</param>
|
||||
/// <param name="responseHeaders">The response headers</param>
|
||||
/// <returns>Null if not an error, Error otherwise</returns>
|
||||
protected virtual Error? TryParseError(RequestDefinition requestDefinition, HttpResponseHeaders responseHeaders, IMessageAccessor accessor) => null;
|
||||
|
||||
#endregion
|
||||
}
|
||||
}
|
||||
|
||||
@ -412,7 +412,7 @@ namespace CryptoExchange.Net.Clients
|
||||
var sendResult = await socketConnection.SendAsync(subRequest).ConfigureAwait(false);
|
||||
if (!sendResult)
|
||||
{
|
||||
await socketConnection.CloseAsync(subscription).ConfigureAwait(false);
|
||||
await socketConnection.CloseAsync().ConfigureAwait(false);
|
||||
return new CallResult<HighPerfUpdateSubscription>(sendResult.Error!);
|
||||
}
|
||||
}
|
||||
@ -422,7 +422,7 @@ namespace CryptoExchange.Net.Clients
|
||||
subscription.CancellationTokenRegistration = ct.Register(async () =>
|
||||
{
|
||||
_logger.CancellationTokenSetClosingSubscription(socketConnection.SocketId, subscription.Id);
|
||||
await socketConnection.CloseAsync(subscription).ConfigureAwait(false);
|
||||
await socketConnection.CloseAsync().ConfigureAwait(false);
|
||||
}, false);
|
||||
}
|
||||
|
||||
|
||||
@ -393,7 +393,10 @@ namespace CryptoExchange.Net
|
||||
{
|
||||
var processor = new ProcessQueue<T>(asyncHandler, maxQueuedItems, fullBehavior);
|
||||
await processor.StartAsync().ConfigureAwait(false);
|
||||
ct.Register(() => _ = processor.StopAsync());
|
||||
ct.Register(async () =>
|
||||
{
|
||||
await processor.StopAsync().ConfigureAwait(false);
|
||||
});
|
||||
|
||||
await subscribeCall(upd => processor.Write(upd)).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
@ -42,11 +42,6 @@ namespace CryptoExchange.Net.Objects.Options
|
||||
/// </summary>
|
||||
public TimeSpan? HttpKeepAliveInterval { get; set; } = TimeSpan.FromSeconds(15);
|
||||
|
||||
/// <summary>
|
||||
/// Whether or not to use the updated deserialization logic
|
||||
/// </summary>
|
||||
public bool UseUpdatedDeserialization { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Set the values of this options on the target options
|
||||
/// </summary>
|
||||
@ -64,7 +59,6 @@ namespace CryptoExchange.Net.Objects.Options
|
||||
item.CachingMaxAge = CachingMaxAge;
|
||||
item.HttpVersion = HttpVersion;
|
||||
item.HttpKeepAliveInterval = HttpKeepAliveInterval;
|
||||
item.UseUpdatedDeserialization = UseUpdatedDeserialization;
|
||||
return item;
|
||||
}
|
||||
}
|
||||
|
||||
@ -95,7 +95,7 @@ namespace CryptoExchange.Net.Objects.Sockets
|
||||
/// <returns></returns>
|
||||
public Task CloseAsync()
|
||||
{
|
||||
return _connection.CloseAsync(_subscription);
|
||||
return _connection.CloseAsync();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -38,21 +38,14 @@ namespace CryptoExchange.Net.Sockets.HighPerf
|
||||
{
|
||||
try
|
||||
{
|
||||
while (!ct.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
#pragma warning disable IL2026 // Members annotated with 'RequiresUnreferencedCodeAttribute' require dynamic access otherwise can break functionality when trimming application code
|
||||
#pragma warning disable IL3050 // Calling members annotated with 'RequiresDynamicCodeAttribute' may break functionality when AOT compiling.
|
||||
await foreach (var update in JsonSerializer.DeserializeAsyncEnumerable<T>(_pipe.Reader, true, _jsonOptions, ct).ConfigureAwait(false))
|
||||
await foreach (var update in JsonSerializer.DeserializeAsyncEnumerable<T>(_pipe.Reader, true, _jsonOptions, ct).ConfigureAwait(false))
|
||||
#pragma warning restore IL3050 // Calling members annotated with 'RequiresDynamicCodeAttribute' may break functionality when AOT compiling.
|
||||
#pragma warning restore IL2026 // Members annotated with 'RequiresUnreferencedCodeAttribute' require dynamic access otherwise can break functionality when trimming application code
|
||||
{
|
||||
foreach (var sub in _typedSubscriptions)
|
||||
DelegateToSubscription(_typedSubscriptions[0], update!);
|
||||
}
|
||||
}
|
||||
catch (CeDeserializationException) { } // Might just be a different message, ignore
|
||||
{
|
||||
foreach (var sub in _typedSubscriptions)
|
||||
DelegateToSubscription(_typedSubscriptions[0], update!);
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException) { }
|
||||
|
||||
@ -213,8 +213,8 @@ namespace CryptoExchange.Net.Sockets.HighPerf
|
||||
if (Status == SocketStatus.Closed || Status == SocketStatus.Disposed)
|
||||
return;
|
||||
|
||||
if (ApiClient._socketConnections.ContainsKey(SocketId))
|
||||
ApiClient._socketConnections.TryRemove(SocketId, out _);
|
||||
if (ApiClient._highPerfSocketConnections.ContainsKey(SocketId))
|
||||
ApiClient._highPerfSocketConnections.TryRemove(SocketId, out _);
|
||||
|
||||
foreach (var subscription in Subscriptions)
|
||||
{
|
||||
@ -226,24 +226,6 @@ namespace CryptoExchange.Net.Sockets.HighPerf
|
||||
_socket.Dispose();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Close a subscription on this connection. If all subscriptions on this connection are closed the connection gets closed as well
|
||||
/// </summary>
|
||||
/// <param name="subscription">Subscription to close</param>
|
||||
/// <returns></returns>
|
||||
public async Task CloseAsync(HighPerfSubscription subscription)
|
||||
{
|
||||
if (Status == SocketStatus.Closing || Status == SocketStatus.Closed || Status == SocketStatus.Disposed)
|
||||
return;
|
||||
|
||||
_logger.ClosingSubscription(SocketId, subscription.Id);
|
||||
if (subscription.CancellationTokenRegistration.HasValue)
|
||||
subscription.CancellationTokenRegistration.Value.Dispose();
|
||||
|
||||
Status = SocketStatus.Closing;
|
||||
await CloseAsync().ConfigureAwait(false);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Dispose the connection
|
||||
/// </summary>
|
||||
|
||||
@ -17,7 +17,7 @@ namespace CryptoExchange.Net.Testing
|
||||
/// <summary>
|
||||
/// Get a client instance
|
||||
/// </summary>
|
||||
public abstract TClient GetClient(ILoggerFactory loggerFactory, bool newDeserialization);
|
||||
public abstract TClient GetClient(ILoggerFactory loggerFactory);
|
||||
|
||||
/// <summary>
|
||||
/// Whether the test should be run. By default integration tests aren't executed, can be set to true to force execution.
|
||||
@ -33,11 +33,11 @@ namespace CryptoExchange.Net.Testing
|
||||
/// Create a client
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
protected TClient CreateClient(bool newDeserialization)
|
||||
protected TClient CreateClient()
|
||||
{
|
||||
var fact = new LoggerFactory();
|
||||
fact.AddProvider(new TraceLoggerProvider());
|
||||
return GetClient(fact, newDeserialization);
|
||||
return GetClient(fact);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@ -57,15 +57,14 @@ namespace CryptoExchange.Net.Testing
|
||||
/// Execute a REST endpoint call and check for any errors or warnings.
|
||||
/// </summary>
|
||||
/// <typeparam name="T">Type of response</typeparam>
|
||||
/// <param name="useNewDeserialization">Whether to use the new deserialization method</param>
|
||||
/// <param name="expression">The call expression</param>
|
||||
/// <param name="authRequest">Whether this is an authenticated request</param>
|
||||
public async Task RunAndCheckResult<T>(bool useNewDeserialization, Expression<Func<TClient, Task<WebCallResult<T>>>> expression, bool authRequest)
|
||||
public async Task RunAndCheckResult<T>(Expression<Func<TClient, Task<WebCallResult<T>>>> expression, bool authRequest)
|
||||
{
|
||||
if (!ShouldRun())
|
||||
return;
|
||||
|
||||
var client = CreateClient(useNewDeserialization);
|
||||
var client = CreateClient();
|
||||
|
||||
var expressionBody = (MethodCallExpression)expression.Body;
|
||||
if (authRequest && !Authenticated)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user