using CryptoExchange.Net.Interfaces; using CryptoExchange.Net.Objects; using CryptoExchange.Net.Sockets.Default; using CryptoExchange.Net.Sockets.Interfaces; using System; using System.Threading; using System.Threading.Tasks; namespace CryptoExchange.Net.Sockets { /// /// Query /// public abstract class Query : IMessageProcessor { /// /// Unique identifier /// public int Id { get; } = ExchangeHelpers.NextId(); /// /// Has this query been completed /// public bool Completed { get; set; } /// /// Timeout for the request /// public TimeSpan? RequestTimeout { get; set; } /// /// What should happen if the query times out /// public TimeoutBehavior TimeoutBehavior { get; set; } = TimeoutBehavior.Fail; /// /// The number of required responses. Can be more than 1 when for example subscribing multiple symbols streams in a single request, /// and each symbol receives it's own confirmation response /// public int RequiredResponses { get; set; } = 1; /// /// The current number of responses received on this query /// public int CurrentResponses { get; set; } /// /// Timestamp of when the request was send /// public DateTime RequestTimestamp { get; set; } /// /// Result /// public CallResult? Result { get; set; } /// /// Response /// public object? Response { get; set; } private MessageRouter _router; /// /// Router for this subscription /// public MessageRouter MessageRouter { get => _router; set { _router = value; _router.BuildRouteMap(); } } /// /// The query request object /// public object Request { get; set; } /// /// If this is a private request /// public bool Authenticated { get; } /// /// Weight of the query /// public int Weight { get; } /// /// Whether the query should wait for a response or not /// public bool ExpectsResponse { get; set; } = true; /// /// Wait event for response /// protected AsyncResetEvent _event; /// /// Cancellation token /// protected CancellationTokenSource? _cts; /// /// On complete callback /// public Action? OnComplete { get; set; } /// /// ctor /// #pragma warning disable CS8618 // Non-nullable field must contain a non-null value when exiting constructor. Consider adding the 'required' modifier or declaring as nullable. public Query(object request, bool authenticated, int weight = 1) #pragma warning restore CS8618 // Non-nullable field must contain a non-null value when exiting constructor. Consider adding the 'required' modifier or declaring as nullable. { _event = new AsyncResetEvent(false, false); Authenticated = authenticated; Request = request; Weight = weight; } /// /// Signal that the request has been send and the timeout timer should start /// public void IsSend(TimeSpan timeout) { RequestTimestamp = DateTime.UtcNow; if (ExpectsResponse) { // Start timeout countdown _cts = new CancellationTokenSource(timeout); _cts.Token.Register(Timeout, false); } else { Result = CallResult.SuccessResult; Completed = true; _event.Set(); } } /// /// Wait until timeout or the request is completed /// /// /// Cancellation token /// public async Task WaitAsync(TimeSpan timeout, CancellationToken ct) => await _event.WaitAsync(timeout, ct).ConfigureAwait(false); /// /// Mark request as timeout /// public abstract void Timeout(); /// /// Mark request as failed /// /// public abstract void Fail(Error error); /// /// Handle a response message /// public abstract CallResult Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object message, MessageRoute route); } /// /// Query /// /// The type to be returned to the caller public abstract class Query : Query { /// /// The typed call result /// public CallResult? TypedResult => (CallResult?)Result; /// /// ctor /// /// /// /// protected Query( object request, bool authenticated, int weight = 1) : base(request, authenticated, weight) { } /// public override CallResult Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object message, MessageRoute route) { CurrentResponses++; if (CurrentResponses == RequiredResponses) Response = message; if (Result?.Success != false) { // If an error result is already set don't override that Result = route.Handle(connection, receiveTime, originalData, message); if (Result == null) // Null from Handle means it wasn't actually for this query CurrentResponses -= 1; } if (CurrentResponses == RequiredResponses) { Completed = true; _event.Set(); OnComplete?.Invoke(); } return Result ?? CallResult.SuccessResult; } /// public override void Timeout() { if (Completed) return; if (TimeoutBehavior == TimeoutBehavior.Fail) Result = new CallResult(new TimeoutError()); else Result = new CallResult(default, null, default); Completed = true; _event.Set(); OnComplete?.Invoke(); } /// public override void Fail(Error error) { if (Completed) return; Result = new CallResult(error); Completed = true; _event.Set(); OnComplete?.Invoke(); } } }