mirror of
https://github.com/JKorf/CryptoExchange.Net
synced 2025-06-07 16:06:15 +00:00
Fix for intermittently failing rate limiting test Added ConnectionId to RequestDefinition to correctly handle connection and path rate limiting configuration Added ValidateMessage method to websocket Query object to filter messages even though it is matched to the query based on the ListenIdentifier Added KlineTracker and TradeTracker implementation
267 lines
8.5 KiB
C#
267 lines
8.5 KiB
C#
using CryptoExchange.Net.Interfaces;
|
|
using CryptoExchange.Net.Objects;
|
|
using CryptoExchange.Net.Objects.Sockets;
|
|
using System;
|
|
using System.Collections.Generic;
|
|
using System.Threading;
|
|
using System.Threading.Tasks;
|
|
|
|
namespace CryptoExchange.Net.Sockets
|
|
{
|
|
/// <summary>
|
|
/// Query
|
|
/// </summary>
|
|
public abstract class Query : IMessageProcessor
|
|
{
|
|
/// <summary>
|
|
/// Unique identifier
|
|
/// </summary>
|
|
public int Id { get; } = ExchangeHelpers.NextId();
|
|
|
|
/// <summary>
|
|
/// Has this query been completed
|
|
/// </summary>
|
|
public bool Completed { get; set; }
|
|
|
|
/// <summary>
|
|
/// The number of required responses. Can be more than 1 when for example subscribing multiple symbols streams in a single request,
|
|
/// and each symbol receives it's own confirmation response
|
|
/// </summary>
|
|
public int RequiredResponses { get; set; } = 1;
|
|
|
|
/// <summary>
|
|
/// The current number of responses received on this query
|
|
/// </summary>
|
|
public int CurrentResponses { get; set; }
|
|
|
|
/// <summary>
|
|
/// Timestamp of when the request was send
|
|
/// </summary>
|
|
public DateTime RequestTimestamp { get; set; }
|
|
|
|
/// <summary>
|
|
/// Result
|
|
/// </summary>
|
|
public CallResult? Result { get; set; }
|
|
|
|
/// <summary>
|
|
/// Response
|
|
/// </summary>
|
|
public object? Response { get; set; }
|
|
|
|
/// <summary>
|
|
/// Wait event for the calling message processing thread
|
|
/// </summary>
|
|
public AsyncResetEvent? ContinueAwaiter { get; set; }
|
|
|
|
/// <summary>
|
|
/// Strings to match this query to a received message
|
|
/// </summary>
|
|
public abstract HashSet<string> ListenerIdentifiers { get; set; }
|
|
|
|
/// <summary>
|
|
/// The query request object
|
|
/// </summary>
|
|
public object Request { get; set; }
|
|
|
|
/// <summary>
|
|
/// If this is a private request
|
|
/// </summary>
|
|
public bool Authenticated { get; }
|
|
|
|
/// <summary>
|
|
/// Weight of the query
|
|
/// </summary>
|
|
public int Weight { get; }
|
|
|
|
/// <summary>
|
|
/// Get the type the message should be deserialized to
|
|
/// </summary>
|
|
/// <param name="message"></param>
|
|
/// <returns></returns>
|
|
public abstract Type? GetMessageType(IMessageAccessor message);
|
|
|
|
/// <summary>
|
|
/// Wait event for response
|
|
/// </summary>
|
|
protected AsyncResetEvent _event;
|
|
|
|
/// <summary>
|
|
/// Cancellation token
|
|
/// </summary>
|
|
protected CancellationTokenSource? _cts;
|
|
|
|
/// <summary>
|
|
/// ctor
|
|
/// </summary>
|
|
/// <param name="request"></param>
|
|
/// <param name="authenticated"></param>
|
|
/// <param name="weight"></param>
|
|
public Query(object request, bool authenticated, int weight = 1)
|
|
{
|
|
_event = new AsyncResetEvent(false, false);
|
|
|
|
Authenticated = authenticated;
|
|
Request = request;
|
|
Weight = weight;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Signal that the request has been send and the timeout timer should start
|
|
/// </summary>
|
|
public void IsSend(TimeSpan timeout)
|
|
{
|
|
// Start timeout countdown
|
|
RequestTimestamp = DateTime.UtcNow;
|
|
_cts = new CancellationTokenSource(timeout);
|
|
_cts.Token.Register(Timeout, false);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Wait until timeout or the request is completed
|
|
/// </summary>
|
|
/// <param name="timeout"></param>
|
|
/// <param name="ct">Cancellation token</param>
|
|
/// <returns></returns>
|
|
public async Task WaitAsync(TimeSpan timeout, CancellationToken ct) => await _event.WaitAsync(timeout, ct).ConfigureAwait(false);
|
|
|
|
/// <inheritdoc />
|
|
public virtual CallResult<object> Deserialize(IMessageAccessor message, Type type) => message.Deserialize(type);
|
|
|
|
/// <summary>
|
|
/// Mark request as timeout
|
|
/// </summary>
|
|
public abstract void Timeout();
|
|
|
|
/// <summary>
|
|
/// Mark request as failed
|
|
/// </summary>
|
|
/// <param name="error"></param>
|
|
public abstract void Fail(Error error);
|
|
|
|
/// <summary>
|
|
/// Handle a response message
|
|
/// </summary>
|
|
/// <param name="message"></param>
|
|
/// <param name="connection"></param>
|
|
/// <returns></returns>
|
|
public abstract Task<CallResult> Handle(SocketConnection connection, DataEvent<object> message);
|
|
|
|
}
|
|
|
|
/// <summary>
|
|
/// Query
|
|
/// </summary>
|
|
/// <typeparam name="TServerResponse">The type returned from the server</typeparam>
|
|
/// <typeparam name="THandlerResponse">The type to be returned to the caller</typeparam>
|
|
public abstract class Query<TServerResponse, THandlerResponse> : Query
|
|
{
|
|
/// <inheritdoc />
|
|
public override Type? GetMessageType(IMessageAccessor message) => typeof(TServerResponse);
|
|
|
|
/// <summary>
|
|
/// The typed call result
|
|
/// </summary>
|
|
public CallResult<THandlerResponse>? TypedResult => (CallResult<THandlerResponse>?)Result;
|
|
|
|
/// <summary>
|
|
/// ctor
|
|
/// </summary>
|
|
/// <param name="request"></param>
|
|
/// <param name="authenticated"></param>
|
|
/// <param name="weight"></param>
|
|
protected Query(object request, bool authenticated, int weight = 1) : base(request, authenticated, weight)
|
|
{
|
|
}
|
|
|
|
/// <inheritdoc />
|
|
public override async Task<CallResult> Handle(SocketConnection connection, DataEvent<object> message)
|
|
{
|
|
var typedMessage = message.As((TServerResponse)message.Data);
|
|
if (!ValidateMessage(typedMessage))
|
|
return new CallResult(null);
|
|
|
|
CurrentResponses++;
|
|
if (CurrentResponses == RequiredResponses)
|
|
{
|
|
Completed = true;
|
|
Response = message.Data;
|
|
}
|
|
|
|
if (Result?.Success != false)
|
|
// If an error result is already set don't override that
|
|
Result = HandleMessage(connection, typedMessage);
|
|
|
|
if (CurrentResponses == RequiredResponses)
|
|
{
|
|
_event.Set();
|
|
if (ContinueAwaiter != null)
|
|
await ContinueAwaiter.WaitAsync().ConfigureAwait(false);
|
|
}
|
|
|
|
return Result;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Validate if a message is actually processable by this query
|
|
/// </summary>
|
|
/// <param name="message"></param>
|
|
/// <returns></returns>
|
|
public virtual bool ValidateMessage(DataEvent<TServerResponse> message) => true;
|
|
|
|
/// <summary>
|
|
/// Handle the query response
|
|
/// </summary>
|
|
/// <param name="connection"></param>
|
|
/// <param name="message"></param>
|
|
/// <returns></returns>
|
|
public abstract CallResult<THandlerResponse> HandleMessage(SocketConnection connection, DataEvent<TServerResponse> message);
|
|
|
|
/// <inheritdoc />
|
|
public override void Timeout()
|
|
{
|
|
if (Completed)
|
|
return;
|
|
|
|
Completed = true;
|
|
Result = new CallResult<THandlerResponse>(new CancellationRequestedError(null, "Query timeout", null));
|
|
ContinueAwaiter?.Set();
|
|
_event.Set();
|
|
}
|
|
|
|
/// <inheritdoc />
|
|
public override void Fail(Error error)
|
|
{
|
|
Result = new CallResult<THandlerResponse>(error);
|
|
Completed = true;
|
|
ContinueAwaiter?.Set();
|
|
_event.Set();
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Query
|
|
/// </summary>
|
|
/// <typeparam name="TResponse">Response object type</typeparam>
|
|
public abstract class Query<TResponse> : Query<TResponse, TResponse>
|
|
{
|
|
/// <summary>
|
|
/// ctor
|
|
/// </summary>
|
|
/// <param name="request"></param>
|
|
/// <param name="authenticated"></param>
|
|
/// <param name="weight"></param>
|
|
protected Query(object request, bool authenticated, int weight = 1) : base(request, authenticated, weight)
|
|
{
|
|
}
|
|
|
|
/// <summary>
|
|
/// Handle the query response
|
|
/// </summary>
|
|
/// <param name="connection"></param>
|
|
/// <param name="message"></param>
|
|
/// <returns></returns>
|
|
public override CallResult<TResponse> HandleMessage(SocketConnection connection, DataEvent<TResponse> message) => message.ToCallResult();
|
|
}
|
|
}
|