mirror of
https://github.com/JKorf/CryptoExchange.Net
synced 2025-06-10 09:26:22 +00:00
wip
This commit is contained in:
parent
6fa66d819d
commit
081c2d4268
@ -99,7 +99,7 @@ namespace CryptoExchange.Net
|
||||
if (!socketConnections.Any())
|
||||
return 0;
|
||||
|
||||
return socketConnections.Sum(s => s.Value.UserListenerCount);
|
||||
return socketConnections.Sum(s => s.Value.UserSubscriptionCount);
|
||||
}
|
||||
}
|
||||
|
||||
@ -194,7 +194,7 @@ namespace CryptoExchange.Net
|
||||
socketConnection = socketResult.Data;
|
||||
|
||||
// Add a subscription on the socket connection
|
||||
var success = AddSubscription(subscription, true, socketConnection);
|
||||
var success = socketConnection.AddSubscription(subscription);
|
||||
if (!success)
|
||||
{
|
||||
_logger.Log(LogLevel.Trace, $"Socket {socketConnection.SocketId} failed to add subscription, retrying on different connection");
|
||||
@ -229,15 +229,18 @@ namespace CryptoExchange.Net
|
||||
return new CallResult<UpdateSubscription>(new ServerError("Socket is paused"));
|
||||
}
|
||||
|
||||
var request = subscription.GetSubRequest();
|
||||
if (request != null)
|
||||
var subQuery = subscription.GetSubQuery();
|
||||
if (subQuery != null)
|
||||
{
|
||||
// Send the request and wait for answer
|
||||
var subResult = await SubscribeAndWaitAsync(socketConnection, subscription).ConfigureAwait(false);
|
||||
var subResult = await socketConnection.SendAndWaitQueryAsync(subQuery).ConfigureAwait(false);
|
||||
if (!subResult)
|
||||
{
|
||||
_logger.Log(LogLevel.Warning, $"Socket {socketConnection.SocketId} failed to subscribe: {subResult.Error}");
|
||||
await socketConnection.CloseAsync(subscription).ConfigureAwait(false);
|
||||
// If this was a timeout we still need to send an unsubscribe to prevent messages coming in later
|
||||
var unsubscribe = subResult.Error is CancellationRequestedError;
|
||||
await socketConnection.CloseAsync(subscription, unsubscribe).ConfigureAwait(false);
|
||||
|
||||
return new CallResult<UpdateSubscription>(subResult.Error!);
|
||||
}
|
||||
}
|
||||
@ -260,22 +263,6 @@ namespace CryptoExchange.Net
|
||||
return new CallResult<UpdateSubscription>(new UpdateSubscription(socketConnection, subscription));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Sends the subscribe request and waits for a response to that request
|
||||
/// </summary>
|
||||
/// <param name="socketConnection">The connection to send the request on</param>
|
||||
/// <param name="request">The request to send, will be serialized to json</param>
|
||||
/// <param name="listener">The message listener for the subscription</param>
|
||||
/// <returns></returns>
|
||||
protected internal virtual async Task<CallResult> SubscribeAndWaitAsync(SocketConnection socketConnection, Subscription subscription)
|
||||
{
|
||||
var callResult = await socketConnection.SendAndWaitSubAsync(subscription).ConfigureAwait(false);
|
||||
if (callResult)
|
||||
subscription.Confirmed = true;
|
||||
|
||||
return callResult;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Send a query on a socket connection to the BaseAddress and wait for the response
|
||||
/// </summary>
|
||||
@ -333,20 +320,7 @@ namespace CryptoExchange.Net
|
||||
return new CallResult<T>(new ServerError("Socket is paused"));
|
||||
}
|
||||
|
||||
return await QueryAndWaitAsync<T>(socketConnection, query).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Sends the query request and waits for the result
|
||||
/// </summary>
|
||||
/// <typeparam name="T">The expected result type</typeparam>
|
||||
/// <param name="socket">The connection to send and wait on</param>
|
||||
/// <param name="query">The query</param>
|
||||
/// <returns></returns>
|
||||
protected virtual async Task<CallResult<T>> QueryAndWaitAsync<T>(SocketConnection socket, Query<T> query)
|
||||
{
|
||||
var dataResult = new CallResult<T>(new ServerError("No response on query received"));
|
||||
return await socket.SendAndWaitQueryAsync<T>(query).ConfigureAwait(false);
|
||||
return await socketConnection.SendAndWaitQueryAsync(query).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@ -405,22 +379,6 @@ namespace CryptoExchange.Net
|
||||
/// <returns></returns>
|
||||
protected internal virtual BaseQuery GetAuthenticationRequest() => throw new NotImplementedException();
|
||||
|
||||
/// <summary>
|
||||
/// Add a subscription to a connection
|
||||
/// </summary>
|
||||
/// <typeparam name="T">The type of data the subscription expects</typeparam>
|
||||
/// <param name="subscription">The subscription</param>
|
||||
/// <param name="userSubscription">Whether or not this is a user subscription (counts towards the max amount of handlers on a socket)</param>
|
||||
/// <param name="connection">The socket connection the handler is on</param>
|
||||
/// <returns></returns>
|
||||
protected virtual bool AddSubscription(Subscription subscription, bool userSubscription, SocketConnection connection)
|
||||
{
|
||||
if (!connection.AddListener(subscription))
|
||||
return false;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Adds a system subscription. Used for example to reply to ping requests
|
||||
/// </summary>
|
||||
@ -429,7 +387,7 @@ namespace CryptoExchange.Net
|
||||
{
|
||||
systemSubscriptions.Add(systemSubscription);
|
||||
foreach (var connection in socketConnections.Values)
|
||||
connection.AddListener(systemSubscription);
|
||||
connection.AddSubscription(systemSubscription);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@ -474,11 +432,11 @@ namespace CryptoExchange.Net
|
||||
var socketResult = socketConnections.Where(s => (s.Value.Status == SocketConnection.SocketStatus.None || s.Value.Status == SocketConnection.SocketStatus.Connected)
|
||||
&& s.Value.Tag.TrimEnd('/') == address.TrimEnd('/')
|
||||
&& (s.Value.ApiClient.GetType() == GetType())
|
||||
&& (s.Value.Authenticated == authenticated || !authenticated) && s.Value.Connected).OrderBy(s => s.Value.UserListenerCount).FirstOrDefault();
|
||||
&& (s.Value.Authenticated == authenticated || !authenticated) && s.Value.Connected).OrderBy(s => s.Value.UserSubscriptionCount).FirstOrDefault();
|
||||
var result = socketResult.Equals(default(KeyValuePair<int, SocketConnection>)) ? null : socketResult.Value;
|
||||
if (result != null)
|
||||
{
|
||||
if (result.UserListenerCount < ClientOptions.SocketSubscriptionsCombineTarget || (socketConnections.Count >= (ApiOptions.MaxSocketConnections ?? ClientOptions.MaxSocketConnections) && socketConnections.All(s => s.Value.UserListenerCount >= ClientOptions.SocketSubscriptionsCombineTarget)))
|
||||
if (result.UserSubscriptionCount < ClientOptions.SocketSubscriptionsCombineTarget || (socketConnections.Count >= (ApiOptions.MaxSocketConnections ?? ClientOptions.MaxSocketConnections) && socketConnections.All(s => s.Value.UserSubscriptionCount >= ClientOptions.SocketSubscriptionsCombineTarget)))
|
||||
{
|
||||
// Use existing socket if it has less than target connections OR it has the least connections and we can't make new
|
||||
return new CallResult<SocketConnection>(result);
|
||||
@ -502,7 +460,7 @@ namespace CryptoExchange.Net
|
||||
socketConnection.UnparsedMessage += HandleUnparsedMessage;
|
||||
|
||||
foreach (var systemSubscription in systemSubscriptions)
|
||||
socketConnection.AddListener(systemSubscription);
|
||||
socketConnection.AddSubscription(systemSubscription);
|
||||
|
||||
return new CallResult<SocketConnection>(socketConnection);
|
||||
}
|
||||
@ -573,11 +531,14 @@ namespace CryptoExchange.Net
|
||||
/// </summary>
|
||||
/// <param name="identifier">Identifier for the periodic send</param>
|
||||
/// <param name="interval">How often</param>
|
||||
/// <param name="objGetter">Method returning the object to send</param>
|
||||
protected virtual void SendPeriodic(string identifier, TimeSpan interval, Func<SocketConnection, object> objGetter)
|
||||
/// <param name="queryDelegate">Method returning the query to send</param>
|
||||
protected virtual void QueryPeriodic(string identifier, TimeSpan interval, Func<SocketConnection, BaseQuery> queryDelegate)
|
||||
{
|
||||
if (objGetter == null)
|
||||
throw new ArgumentNullException(nameof(objGetter));
|
||||
if (queryDelegate == null)
|
||||
throw new ArgumentNullException(nameof(queryDelegate));
|
||||
|
||||
// TODO instead of having this on ApiClient level, this should be registered on the socket connection
|
||||
// This would prevent this looping without any connections
|
||||
|
||||
periodicEvent = new AsyncResetEvent();
|
||||
periodicTask = Task.Run(async () =>
|
||||
@ -596,15 +557,15 @@ namespace CryptoExchange.Net
|
||||
if (!socketConnection.Connected)
|
||||
continue;
|
||||
|
||||
var obj = objGetter(socketConnection);
|
||||
if (obj == null)
|
||||
var query = queryDelegate(socketConnection);
|
||||
if (query == null)
|
||||
continue;
|
||||
|
||||
_logger.Log(LogLevel.Trace, $"Socket {socketConnection.SocketId} sending periodic {identifier}");
|
||||
|
||||
try
|
||||
{
|
||||
socketConnection.Send(ExchangeHelpers.NextId(), obj, 1);
|
||||
await socketConnection.SendAndWaitQueryAsync(query).ConfigureAwait(false);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
@ -626,7 +587,7 @@ namespace CryptoExchange.Net
|
||||
SocketConnection? connection = null;
|
||||
foreach (var socket in socketConnections.Values.ToList())
|
||||
{
|
||||
subscription = socket.GetListener(subscriptionId);
|
||||
subscription = socket.GetSubscription(subscriptionId);
|
||||
if (subscription != null)
|
||||
{
|
||||
connection = socket;
|
||||
@ -662,11 +623,11 @@ namespace CryptoExchange.Net
|
||||
/// <returns></returns>
|
||||
public virtual async Task UnsubscribeAllAsync()
|
||||
{
|
||||
var sum = socketConnections.Sum(s => s.Value.UserListenerCount);
|
||||
var sum = socketConnections.Sum(s => s.Value.UserSubscriptionCount);
|
||||
if (sum == 0)
|
||||
return;
|
||||
|
||||
_logger.Log(LogLevel.Information, $"Unsubscribing all {socketConnections.Sum(s => s.Value.UserListenerCount)} subscriptions");
|
||||
_logger.Log(LogLevel.Information, $"Unsubscribing all {socketConnections.Sum(s => s.Value.UserSubscriptionCount)} subscriptions");
|
||||
var tasks = new List<Task>();
|
||||
{
|
||||
var socketList = socketConnections.Values;
|
||||
@ -703,7 +664,7 @@ namespace CryptoExchange.Net
|
||||
sb.AppendLine($"{socketConnections.Count} connections, {CurrentSubscriptions} subscriptions, kbps: {IncomingKbps}");
|
||||
foreach (var connection in socketConnections)
|
||||
{
|
||||
sb.AppendLine($" Connection {connection.Key}: {connection.Value.UserListenerCount} subscriptions, status: {connection.Value.Status}, authenticated: {connection.Value.Authenticated}, kbps: {connection.Value.IncomingKbps}");
|
||||
sb.AppendLine($" Connection {connection.Key}: {connection.Value.UserSubscriptionCount} subscriptions, status: {connection.Value.Status}, authenticated: {connection.Value.Authenticated}, kbps: {connection.Value.IncomingKbps}");
|
||||
foreach (var subscription in connection.Value.Subscriptions)
|
||||
sb.AppendLine($" Subscription {subscription.Id}, authenticated: {subscription.Authenticated}, confirmed: {subscription.Confirmed}");
|
||||
}
|
||||
@ -718,7 +679,7 @@ namespace CryptoExchange.Net
|
||||
_disposing = true;
|
||||
periodicEvent?.Set();
|
||||
periodicEvent?.Dispose();
|
||||
if (socketConnections.Sum(s => s.Value.UserListenerCount) > 0)
|
||||
if (socketConnections.Sum(s => s.Value.UserSubscriptionCount) > 0)
|
||||
{
|
||||
_logger.Log(LogLevel.Debug, "Disposing socket client, closing all subscriptions");
|
||||
_ = UnsubscribeAllAsync();
|
||||
|
@ -14,6 +14,8 @@ namespace CryptoExchange.Net.Converters
|
||||
/// </summary>
|
||||
public abstract class SocketConverter
|
||||
{
|
||||
private static JsonSerializer _serializer = JsonSerializer.Create(SerializerOptions.WithConverters);
|
||||
|
||||
/// <summary>
|
||||
/// Fields to use for the message subscription identifier
|
||||
/// </summary>
|
||||
@ -40,12 +42,6 @@ namespace CryptoExchange.Net.Converters
|
||||
// Deserialize to the correct type
|
||||
|
||||
using var sr = new StreamReader(stream, Encoding.UTF8, false, (int)stream.Length, true);
|
||||
if (outputOriginalData)
|
||||
{
|
||||
//result.OriginalData = sr.ReadToEnd();
|
||||
stream.Position = 0;
|
||||
}
|
||||
|
||||
using var jsonTextReader = new JsonTextReader(sr);
|
||||
JToken token;
|
||||
try
|
||||
@ -81,8 +77,20 @@ namespace CryptoExchange.Net.Converters
|
||||
}
|
||||
|
||||
var resultType = GetDeserializationType(typeIdDict, pendingRequests, listeners);
|
||||
if (resultType == null)
|
||||
{
|
||||
// ?
|
||||
return null;
|
||||
}
|
||||
|
||||
var resultMessageType = typeof(ParsedMessage<>).MakeGenericType(resultType);
|
||||
var instance = (BaseParsedMessage)Activator.CreateInstance(resultMessageType, resultType == null ? null : token.ToObject(resultType));
|
||||
var instance = (BaseParsedMessage)Activator.CreateInstance(resultMessageType, resultType == null ? null : token.ToObject(resultType, _serializer));
|
||||
if (outputOriginalData)
|
||||
{
|
||||
stream.Position = 0;
|
||||
instance.OriginalData = sr.ReadToEnd();
|
||||
}
|
||||
|
||||
instance.Identifier = idString;
|
||||
instance.Parsed = resultType != null;
|
||||
return instance;
|
||||
|
@ -1,5 +1,4 @@
|
||||
using CryptoExchange.Net.Objects;
|
||||
using System;
|
||||
using System;
|
||||
|
||||
namespace CryptoExchange.Net.Objects.Sockets
|
||||
{
|
||||
|
@ -19,6 +19,10 @@
|
||||
public bool Parsed { get; set; }
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Parsed message object
|
||||
/// </summary>
|
||||
/// <typeparam name="T">Data type</typeparam>
|
||||
public class ParsedMessage<T> : BaseParsedMessage
|
||||
{
|
||||
/// <summary>
|
||||
@ -26,6 +30,10 @@
|
||||
/// </summary>
|
||||
public T? Data { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// ctor
|
||||
/// </summary>
|
||||
/// <param name="data"></param>
|
||||
public ParsedMessage(T? data)
|
||||
{
|
||||
Data = data;
|
||||
|
@ -1,136 +1,171 @@
|
||||
using CryptoExchange.Net.Converters;
|
||||
using CryptoExchange.Net.Interfaces;
|
||||
using CryptoExchange.Net.Sockets;
|
||||
using CryptoExchange.Net.Sockets;
|
||||
using System;
|
||||
using System.Runtime.InteropServices;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace CryptoExchange.Net.Objects.Sockets
|
||||
{
|
||||
/// <summary>
|
||||
/// Pending socket request
|
||||
/// </summary>
|
||||
public abstract class BasePendingRequest
|
||||
{
|
||||
/// <summary>
|
||||
/// Request id
|
||||
/// </summary>
|
||||
public int Id { get; set; }
|
||||
/// <summary>
|
||||
/// Callback for checking if a message is a response to this request
|
||||
/// </summary>
|
||||
public Func<BaseParsedMessage, bool> MessageMatchesHandler { get; }
|
||||
public bool Completed { get; private set; }
|
||||
public abstract Type ResponseType { get; }
|
||||
|
||||
public AsyncResetEvent Event { get; }
|
||||
/// <summary>
|
||||
/// If the request is completed
|
||||
/// </summary>
|
||||
public bool Completed { get; protected set; }
|
||||
/// <summary>
|
||||
/// The response object type
|
||||
/// </summary>
|
||||
public abstract Type? ResponseType { get; }
|
||||
/// <summary>
|
||||
/// Timer event
|
||||
/// </summary>
|
||||
public DateTime RequestTimestamp { get; set; }
|
||||
public TimeSpan Timeout { get; }
|
||||
/// <summary>
|
||||
/// The request object
|
||||
/// </summary>
|
||||
public object Request { get; set; }
|
||||
/// <summary>
|
||||
/// The result
|
||||
/// </summary>
|
||||
public abstract CallResult? Result { get; set; }
|
||||
|
||||
private CancellationTokenSource? _cts;
|
||||
public abstract CallResult Result { get; set; }
|
||||
protected AsyncResetEvent _event;
|
||||
protected TimeSpan _timeout;
|
||||
protected CancellationTokenSource? _cts;
|
||||
|
||||
/// <summary>
|
||||
/// ctor
|
||||
/// </summary>
|
||||
/// <param name="id"></param>
|
||||
/// <param name="request"></param>
|
||||
/// <param name="messageMatchesHandler"></param>
|
||||
/// <param name="timeout"></param>
|
||||
protected BasePendingRequest(int id, object request, Func<BaseParsedMessage, bool> messageMatchesHandler, TimeSpan timeout)
|
||||
{
|
||||
Id = id;
|
||||
MessageMatchesHandler = messageMatchesHandler;
|
||||
Event = new AsyncResetEvent(false, false);
|
||||
Timeout = timeout;
|
||||
_event = new AsyncResetEvent(false, false);
|
||||
_timeout = timeout;
|
||||
Request = request;
|
||||
RequestTimestamp = DateTime.UtcNow;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Signal that the request has been send and the timeout timer should start
|
||||
/// </summary>
|
||||
public void IsSend()
|
||||
{
|
||||
// Start timeout countdown
|
||||
_cts = new CancellationTokenSource(Timeout);
|
||||
_cts.Token.Register(() => Fail("No response"), false);
|
||||
_cts = new CancellationTokenSource(_timeout);
|
||||
_cts.Token.Register(Timeout, false);
|
||||
}
|
||||
|
||||
public virtual void Fail(string error)
|
||||
{
|
||||
Completed = true;
|
||||
Event.Set();
|
||||
}
|
||||
/// <summary>
|
||||
/// Wait untill timeout or the request is competed
|
||||
/// </summary>
|
||||
/// <param name="timeout"></param>
|
||||
/// <returns></returns>
|
||||
public async Task WaitAsync(TimeSpan timeout) => await _event.WaitAsync(timeout).ConfigureAwait(false);
|
||||
|
||||
public bool MessageMatches(BaseParsedMessage message)
|
||||
{
|
||||
return MessageMatchesHandler(message);
|
||||
}
|
||||
/// <summary>
|
||||
/// Mark request as timeout
|
||||
/// </summary>
|
||||
public abstract void Timeout();
|
||||
|
||||
public virtual Task ProcessAsync(BaseParsedMessage message)
|
||||
{
|
||||
Completed = true;
|
||||
Event.Set();
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
/// <summary>
|
||||
/// Mark request as failed
|
||||
/// </summary>
|
||||
/// <param name="error"></param>
|
||||
public abstract void Fail(string error);
|
||||
|
||||
/// <summary>
|
||||
/// Process a response
|
||||
/// </summary>
|
||||
/// <param name="message"></param>
|
||||
/// <returns></returns>
|
||||
public abstract void ProcessAsync(BaseParsedMessage message);
|
||||
}
|
||||
|
||||
//public class PendingRequest : BasePendingRequest
|
||||
//{
|
||||
// public CallResult Result { get; set; }
|
||||
// public Func<BaseParsedMessage, CallResult> Handler { get; }
|
||||
// public override Type? ResponseType => null;
|
||||
|
||||
// private PendingRequest(int id, object request, Func<BaseParsedMessage, bool> messageMatchesHandler, Func<BaseParsedMessage, CallResult> messageHandler, TimeSpan timeout)
|
||||
// : base(id, request, messageMatchesHandler, timeout)
|
||||
// {
|
||||
// Handler = messageHandler;
|
||||
// }
|
||||
|
||||
// public static PendingRequest CreateForQuery(BaseQuery query)
|
||||
// {
|
||||
// return new PendingRequest(ExchangeHelpers.NextId(), query.Request, query.MessageMatchesQuery, query.HandleResult, TimeSpan.FromSeconds(5));
|
||||
// }
|
||||
|
||||
// public static PendingRequest CreateForSubRequest(Subscription subscription)
|
||||
// {
|
||||
// return new PendingRequest(ExchangeHelpers.NextId(), subscription.GetSubRequest(), subscription.MessageMatchesSubRequest, subscription.HandleSubResponse, TimeSpan.FromSeconds(5));
|
||||
// }
|
||||
|
||||
// public static PendingRequest CreateForUnsubRequest(Subscription subscription)
|
||||
// {
|
||||
// return new PendingRequest(ExchangeHelpers.NextId(), subscription.GetUnsubRequest(), subscription.MessageMatchesUnsubRequest, subscription.HandleUnsubResponse, TimeSpan.FromSeconds(5));
|
||||
// }
|
||||
|
||||
// public override void Fail(string error)
|
||||
// {
|
||||
// Result = new CallResult(new ServerError(error));
|
||||
// base.Fail(error);
|
||||
// }
|
||||
|
||||
// public override Task ProcessAsync(BaseParsedMessage message)
|
||||
// {
|
||||
// Result = Handler(message);
|
||||
// return base.ProcessAsync(message);
|
||||
// }
|
||||
//}
|
||||
|
||||
/// <summary>
|
||||
/// Pending socket request
|
||||
/// </summary>
|
||||
/// <typeparam name="T">The response data type</typeparam>
|
||||
public class PendingRequest<T> : BasePendingRequest
|
||||
{
|
||||
public override CallResult Result { get; set; }
|
||||
public CallResult<T> TypedResult => (CallResult<T>)Result;
|
||||
/// <inheritdoc />
|
||||
public override CallResult? Result { get; set; }
|
||||
/// <summary>
|
||||
/// The typed call result
|
||||
/// </summary>
|
||||
public CallResult<T>? TypedResult => (CallResult<T>?)Result;
|
||||
/// <summary>
|
||||
/// Data handler
|
||||
/// </summary>
|
||||
public Func<ParsedMessage<T>, CallResult<T>> Handler { get; }
|
||||
/// <summary>
|
||||
/// The response object type
|
||||
/// </summary>
|
||||
public override Type? ResponseType => typeof(T);
|
||||
|
||||
public PendingRequest(int id, object request, Func<ParsedMessage<T>, bool> messageMatchesHandler, Func<ParsedMessage<T>, CallResult<T>> messageHandler, TimeSpan timeout)
|
||||
/// <summary>
|
||||
/// ctor
|
||||
/// </summary>
|
||||
/// <param name="id"></param>
|
||||
/// <param name="request"></param>
|
||||
/// <param name="messageMatchesHandler"></param>
|
||||
/// <param name="messageHandler"></param>
|
||||
/// <param name="timeout"></param>
|
||||
private PendingRequest(int id, object request, Func<ParsedMessage<T>, bool> messageMatchesHandler, Func<ParsedMessage<T>, CallResult<T>> messageHandler, TimeSpan timeout)
|
||||
: base(id, request, (x) => messageMatchesHandler((ParsedMessage<T>)x), timeout)
|
||||
{
|
||||
Handler = messageHandler;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Create a new pending request for provided query
|
||||
/// </summary>
|
||||
/// <param name="query"></param>
|
||||
/// <returns></returns>
|
||||
public static PendingRequest<T> CreateForQuery(Query<T> query)
|
||||
{
|
||||
return new PendingRequest<T>(ExchangeHelpers.NextId(), query.Request, query.MessageMatchesQuery, x =>
|
||||
{
|
||||
var response = query.HandleResponse(x);
|
||||
return response.As((T)response.Data);
|
||||
return response.As(response.Data);
|
||||
}, TimeSpan.FromSeconds(5));
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public override void Timeout()
|
||||
{
|
||||
Completed = true;
|
||||
Result = new CallResult<T>(new CancellationRequestedError());
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public override void Fail(string error)
|
||||
{
|
||||
Result = new CallResult<T>(new ServerError(error));
|
||||
base.Fail(error);
|
||||
Completed = true;
|
||||
_event.Set();
|
||||
}
|
||||
|
||||
public override Task ProcessAsync(BaseParsedMessage message)
|
||||
/// <inheritdoc />
|
||||
public override void ProcessAsync(BaseParsedMessage message)
|
||||
{
|
||||
Completed = true;
|
||||
Result = Handler((ParsedMessage<T>)message);
|
||||
return base.ProcessAsync(message);
|
||||
_event.Set();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,95 +0,0 @@
|
||||
//using CryptoExchange.Net.Converters;
|
||||
//using CryptoExchange.Net.Interfaces;
|
||||
//using CryptoExchange.Net.Sockets;
|
||||
//using System;
|
||||
//using System.Threading;
|
||||
//using System.Threading.Tasks;
|
||||
|
||||
//namespace CryptoExchange.Net.Objects.Sockets
|
||||
//{
|
||||
// /// <summary>
|
||||
// /// Socket listener
|
||||
// /// </summary>
|
||||
// public class MessageListener
|
||||
// {
|
||||
// /// <summary>
|
||||
// /// Unique listener id
|
||||
// /// </summary>
|
||||
// public int Id { get; }
|
||||
|
||||
// /// <summary>
|
||||
// /// Exception event
|
||||
// /// </summary>
|
||||
// public event Action<Exception>? Exception;
|
||||
|
||||
// /// <summary>
|
||||
// /// The request object send when subscribing on the server. Either this or the `Identifier` property should be set
|
||||
// /// </summary>
|
||||
// public Subscription Subscription { get; set; }
|
||||
|
||||
// /// <summary>
|
||||
// /// Whether this is a user subscription or an internal listener
|
||||
// /// </summary>
|
||||
// public bool UserListener { get; set; }
|
||||
|
||||
// /// <summary>
|
||||
// /// If the subscription has been confirmed to be subscribed by the server
|
||||
// /// </summary>
|
||||
// public bool Confirmed { get; set; }
|
||||
|
||||
// /// <summary>
|
||||
// /// Whether authentication is needed for this subscription
|
||||
// /// </summary>
|
||||
// public bool Authenticated => Subscription.Authenticated;
|
||||
|
||||
// /// <summary>
|
||||
// /// Whether we're closing this subscription and a socket connection shouldn't be kept open for it
|
||||
// /// </summary>
|
||||
// public bool Closed { get; set; }
|
||||
|
||||
// /// <summary>
|
||||
// /// Cancellation token registration, should be disposed when subscription is closed. Used for closing the subscription with
|
||||
// /// a provided cancelation token
|
||||
// /// </summary>
|
||||
// public CancellationTokenRegistration? CancellationTokenRegistration { get; set; }
|
||||
|
||||
// /// <summary>
|
||||
// /// ctor
|
||||
// /// </summary>
|
||||
// /// <param name="id"></param>
|
||||
// /// <param name="request"></param>
|
||||
// /// <param name="userSubscription"></param>
|
||||
// public MessageListener(int id, Subscription request, bool userSubscription)
|
||||
// {
|
||||
// Id = id;
|
||||
// UserListener = userSubscription;
|
||||
// Subscription = request;
|
||||
// }
|
||||
|
||||
// /// <summary>
|
||||
// /// Invoke the exception event
|
||||
// /// </summary>
|
||||
// /// <param name="e"></param>
|
||||
// public void InvokeExceptionHandler(Exception e)
|
||||
// {
|
||||
// Exception?.Invoke(e);
|
||||
// }
|
||||
|
||||
// /// <summary>
|
||||
// /// The priority of this subscription
|
||||
// /// </summary>
|
||||
// public int Priority => Subscription is SystemSubscription ? 50 : 1;
|
||||
|
||||
// /// <summary>
|
||||
// /// Process the message
|
||||
// /// </summary>
|
||||
// /// <param name="message"></param>
|
||||
// /// <returns></returns>
|
||||
// public Task ProcessAsync(ParsedMessage message)
|
||||
// {
|
||||
// // TODO
|
||||
// var dataEvent = new DataEvent<ParsedMessage>(message, null, message.OriginalData, DateTime.UtcNow, null);
|
||||
// return Subscription.HandleEventAsync(dataEvent);
|
||||
// }
|
||||
// }
|
||||
//}
|
@ -1,70 +0,0 @@
|
||||
//using System;
|
||||
//using System.Collections.Generic;
|
||||
//using System.Data.Common;
|
||||
//using System.IO;
|
||||
//using System.Text;
|
||||
//using System.Threading.Tasks;
|
||||
//using CryptoExchange.Net.Sockets;
|
||||
|
||||
//namespace CryptoExchange.Net.Objects.Sockets
|
||||
//{
|
||||
// /// <summary>
|
||||
// /// A message received from a stream
|
||||
// /// </summary>
|
||||
// public class StreamMessage : IDisposable
|
||||
// {
|
||||
// /// <summary>
|
||||
// /// The connection it was received on
|
||||
// /// </summary>
|
||||
// public SocketConnection Connection { get; }
|
||||
// /// <summary>
|
||||
// /// The data stream
|
||||
// /// </summary>
|
||||
// public Stream Stream { get; }
|
||||
// /// <summary>
|
||||
// /// Receive timestamp
|
||||
// /// </summary>
|
||||
// public DateTime Timestamp { get; set; }
|
||||
|
||||
// private Dictionary<Type, object> _casted;
|
||||
|
||||
// /// <summary>
|
||||
// /// Get the data from the memory in specified type using the converter. If this type has been resolved before it will use that instead
|
||||
// /// </summary>
|
||||
// /// <typeparam name="T"></typeparam>
|
||||
// /// <param name="converter"></param>
|
||||
// /// <returns></returns>
|
||||
// public T Get<T>(Func<Stream, T> converter)
|
||||
// {
|
||||
// if (_casted.TryGetValue(typeof(T), out var casted))
|
||||
// return (T)casted;
|
||||
|
||||
// var result = converter(Stream);
|
||||
// _casted.Add(typeof(T), result!);
|
||||
// Stream.Position = 0;
|
||||
// return result;
|
||||
// }
|
||||
|
||||
// /// <summary>
|
||||
// /// Dispose
|
||||
// /// </summary>
|
||||
// public void Dispose()
|
||||
// {
|
||||
// Stream.Dispose();
|
||||
// }
|
||||
|
||||
// /// <summary>
|
||||
// /// ctor
|
||||
// /// </summary>
|
||||
// /// <param name="connection"></param>
|
||||
// /// <param name="stream"></param>
|
||||
// /// <param name="timestamp"></param>
|
||||
// public StreamMessage(SocketConnection connection, Stream stream, DateTime timestamp)
|
||||
// {
|
||||
// Connection = connection;
|
||||
// Stream = stream;
|
||||
// Timestamp = timestamp;
|
||||
// _casted = new Dictionary<Type, object>();
|
||||
// }
|
||||
// }
|
||||
//}
|
@ -1,7 +1,5 @@
|
||||
using CryptoExchange.Net.Converters;
|
||||
using CryptoExchange.Net.Objects;
|
||||
using CryptoExchange.Net.Objects;
|
||||
using CryptoExchange.Net.Objects.Sockets;
|
||||
using System;
|
||||
|
||||
namespace CryptoExchange.Net.Sockets
|
||||
{
|
||||
@ -11,7 +9,7 @@ namespace CryptoExchange.Net.Sockets
|
||||
public abstract class BaseQuery
|
||||
{
|
||||
/// <summary>
|
||||
/// The query request
|
||||
/// The query request object
|
||||
/// </summary>
|
||||
public object Request { get; set; }
|
||||
|
||||
@ -25,10 +23,6 @@ namespace CryptoExchange.Net.Sockets
|
||||
/// </summary>
|
||||
public int Weight { get; }
|
||||
|
||||
public abstract bool MessageMatchesQuery(BaseParsedMessage message);
|
||||
public abstract CallResult HandleResult(BaseParsedMessage message);
|
||||
|
||||
|
||||
/// <summary>
|
||||
/// ctor
|
||||
/// </summary>
|
||||
@ -42,18 +36,28 @@ namespace CryptoExchange.Net.Sockets
|
||||
Weight = weight;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Create a pending request for this query
|
||||
/// </summary>
|
||||
public abstract BasePendingRequest CreatePendingRequest();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Query
|
||||
/// </summary>
|
||||
/// <typeparam name="TResponse">Response object type</typeparam>
|
||||
public abstract class Query<TResponse> : BaseQuery
|
||||
{
|
||||
/// <summary>
|
||||
/// ctor
|
||||
/// </summary>
|
||||
/// <param name="request"></param>
|
||||
/// <param name="authenticated"></param>
|
||||
/// <param name="weight"></param>
|
||||
protected Query(object request, bool authenticated, int weight = 1) : base(request, authenticated, weight)
|
||||
{
|
||||
}
|
||||
|
||||
public override CallResult HandleResult(BaseParsedMessage message) => HandleResponse((ParsedMessage<TResponse>) message);
|
||||
public override bool MessageMatchesQuery(BaseParsedMessage message) => MessageMatchesQuery((ParsedMessage<TResponse>)message);
|
||||
|
||||
/// <summary>
|
||||
/// Handle the query response
|
||||
/// </summary>
|
||||
@ -68,6 +72,7 @@ namespace CryptoExchange.Net.Sockets
|
||||
/// <returns></returns>
|
||||
public abstract bool MessageMatchesQuery(ParsedMessage<TResponse> message);
|
||||
|
||||
/// <inheritdoc />
|
||||
public override BasePendingRequest CreatePendingRequest() => PendingRequest<TResponse>.CreateForQuery(this);
|
||||
}
|
||||
}
|
||||
|
@ -10,9 +10,7 @@ using CryptoExchange.Net.Objects;
|
||||
using System.Net.WebSockets;
|
||||
using System.IO;
|
||||
using CryptoExchange.Net.Objects.Sockets;
|
||||
using CryptoExchange.Net.Converters;
|
||||
using System.Text;
|
||||
using System.Runtime;
|
||||
|
||||
namespace CryptoExchange.Net.Sockets
|
||||
{
|
||||
@ -57,23 +55,23 @@ namespace CryptoExchange.Net.Sockets
|
||||
public event Action<byte[]>? UnparsedMessage;
|
||||
|
||||
/// <summary>
|
||||
/// The amount of listeners on this connection
|
||||
/// The amount of subscriptions on this connection
|
||||
/// </summary>
|
||||
public int UserListenerCount
|
||||
public int UserSubscriptionCount
|
||||
{
|
||||
get { lock (_listenerLock)
|
||||
return _messageIdentifierListeners.Values.Count(h => h.UserSubscription); }
|
||||
get { lock (_subscriptionLock)
|
||||
return _messageIdentifierSubscriptions.Values.Count(h => h.UserSubscription); }
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Get a copy of the current message listeners
|
||||
/// Get a copy of the current message subscriptions
|
||||
/// </summary>
|
||||
public Subscription[] Subscriptions
|
||||
{
|
||||
get
|
||||
{
|
||||
lock (_listenerLock)
|
||||
return _messageIdentifierListeners.Values.Where(h => h.UserSubscription).ToArray();
|
||||
lock (_subscriptionLock)
|
||||
return _messageIdentifierSubscriptions.Values.Where(h => h.UserSubscription).ToArray();
|
||||
}
|
||||
}
|
||||
|
||||
@ -159,10 +157,10 @@ namespace CryptoExchange.Net.Sockets
|
||||
|
||||
private bool _pausedActivity;
|
||||
private readonly List<BasePendingRequest> _pendingRequests;
|
||||
private readonly List<Subscription> _messageListeners;
|
||||
private readonly Dictionary<string, Subscription> _messageIdentifierListeners;
|
||||
private readonly List<Subscription> _subscriptions;
|
||||
private readonly Dictionary<string, Subscription> _messageIdentifierSubscriptions;
|
||||
|
||||
private readonly object _listenerLock = new();
|
||||
private readonly object _subscriptionLock = new();
|
||||
private readonly ILogger _logger;
|
||||
|
||||
private SocketStatus _status;
|
||||
@ -187,8 +185,8 @@ namespace CryptoExchange.Net.Sockets
|
||||
Properties = new Dictionary<string, object>();
|
||||
|
||||
_pendingRequests = new List<BasePendingRequest>();
|
||||
_messageListeners = new List<Subscription>();
|
||||
_messageIdentifierListeners = new Dictionary<string, Subscription>();
|
||||
_subscriptions = new List<Subscription>();
|
||||
_messageIdentifierSubscriptions = new Dictionary<string, Subscription>();
|
||||
|
||||
_socket = socket;
|
||||
_socket.OnStreamMessage += HandleStreamMessage;
|
||||
@ -217,10 +215,10 @@ namespace CryptoExchange.Net.Sockets
|
||||
{
|
||||
Status = SocketStatus.Closed;
|
||||
Authenticated = false;
|
||||
lock(_listenerLock)
|
||||
lock(_subscriptionLock)
|
||||
{
|
||||
foreach (var listener in _messageListeners)
|
||||
listener.Confirmed = false;
|
||||
foreach (var subscription in _subscriptions)
|
||||
subscription.Confirmed = false;
|
||||
}
|
||||
Task.Run(() => ConnectionClosed?.Invoke());
|
||||
}
|
||||
@ -233,10 +231,10 @@ namespace CryptoExchange.Net.Sockets
|
||||
Status = SocketStatus.Reconnecting;
|
||||
DisconnectTime = DateTime.UtcNow;
|
||||
Authenticated = false;
|
||||
lock (_listenerLock)
|
||||
lock (_subscriptionLock)
|
||||
{
|
||||
foreach (var listener in _messageListeners)
|
||||
listener.Confirmed = false;
|
||||
foreach (var subscription in _subscriptions)
|
||||
subscription.Confirmed = false;
|
||||
}
|
||||
|
||||
_ = Task.Run(() => ConnectionLost?.Invoke());
|
||||
@ -257,7 +255,7 @@ namespace CryptoExchange.Net.Sockets
|
||||
protected virtual async void HandleReconnected()
|
||||
{
|
||||
Status = SocketStatus.Resubscribing;
|
||||
lock (_messageListeners)
|
||||
lock (_subscriptions)
|
||||
{
|
||||
foreach (var pendingRequest in _pendingRequests.ToList())
|
||||
{
|
||||
@ -324,11 +322,11 @@ namespace CryptoExchange.Net.Sockets
|
||||
var timestamp = DateTime.UtcNow;
|
||||
TimeSpan userCodeDuration = TimeSpan.Zero;
|
||||
|
||||
List<Subscription> listeners;
|
||||
lock (_listenerLock)
|
||||
listeners = _messageListeners.OrderByDescending(x => !x.UserSubscription).ToList();
|
||||
List<Subscription> subscriptions;
|
||||
lock (_subscriptionLock)
|
||||
subscriptions = _subscriptions.OrderByDescending(x => !x.UserSubscription).ToList();
|
||||
|
||||
var result = ApiClient.StreamConverter.ReadJson(stream, _pendingRequests, listeners, ApiClient.ApiOptions.OutputOriginalData ?? ApiClient.ClientOptions.OutputOriginalData);
|
||||
var result = ApiClient.StreamConverter.ReadJson(stream, _pendingRequests, subscriptions, ApiClient.ApiOptions.OutputOriginalData ?? ApiClient.ClientOptions.OutputOriginalData);
|
||||
if(result == null)
|
||||
{
|
||||
stream.Position = 0;
|
||||
@ -348,12 +346,12 @@ namespace CryptoExchange.Net.Sockets
|
||||
}
|
||||
|
||||
// TODO lock
|
||||
if (_messageIdentifierListeners.TryGetValue(result.Identifier.ToLowerInvariant(), out var idListener))
|
||||
if (_messageIdentifierSubscriptions.TryGetValue(result.Identifier.ToLowerInvariant(), out var idSubscription))
|
||||
{
|
||||
// Matched based on identifier
|
||||
var userSw = Stopwatch.StartNew();
|
||||
var dataEvent = new DataEvent<BaseParsedMessage>(result, null, result.OriginalData, DateTime.UtcNow, null);
|
||||
await idListener.HandleEventAsync(dataEvent).ConfigureAwait(false);
|
||||
await idSubscription.HandleEventAsync(dataEvent).ConfigureAwait(false);
|
||||
userSw.Stop();
|
||||
return;
|
||||
}
|
||||
@ -371,18 +369,13 @@ namespace CryptoExchange.Net.Sockets
|
||||
|
||||
if (pendingRequest.Completed)
|
||||
{
|
||||
// Answer to a timed out request, unsub if it is a subscription request
|
||||
// TODO
|
||||
//if (pendingRequest.MessageListener != null)
|
||||
//{
|
||||
// _logger.Log(LogLevel.Warning, $"Socket {SocketId} Received subscription info after request timed out; unsubscribing. Consider increasing the RequestTimeout");
|
||||
// _ = UnsubscribeAsync(pendingRequest.MessageListener).ConfigureAwait(false);
|
||||
//}
|
||||
// Answer to a timed out request
|
||||
_logger.Log(LogLevel.Warning, $"Socket {SocketId} Received after request timeout. Consider increasing the RequestTimeout");
|
||||
}
|
||||
else
|
||||
{
|
||||
_logger.Log(LogLevel.Trace, $"Socket {SocketId} - msg {pendingRequest.Id} - received data matched to pending request");
|
||||
await pendingRequest.ProcessAsync(result).ConfigureAwait(false);
|
||||
pendingRequest.ProcessAsync(result);
|
||||
}
|
||||
|
||||
return;
|
||||
@ -427,12 +420,12 @@ namespace CryptoExchange.Net.Sockets
|
||||
if (ApiClient.socketConnections.ContainsKey(SocketId))
|
||||
ApiClient.socketConnections.TryRemove(SocketId, out _);
|
||||
|
||||
lock (_listenerLock)
|
||||
lock (_subscriptionLock)
|
||||
{
|
||||
foreach (var listener in _messageListeners)
|
||||
foreach (var subscription in _subscriptions)
|
||||
{
|
||||
if (listener.CancellationTokenRegistration.HasValue)
|
||||
listener.CancellationTokenRegistration.Value.Dispose();
|
||||
if (subscription.CancellationTokenRegistration.HasValue)
|
||||
subscription.CancellationTokenRegistration.Value.Dispose();
|
||||
}
|
||||
}
|
||||
|
||||
@ -441,15 +434,16 @@ namespace CryptoExchange.Net.Sockets
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Close a listener on this connection. If all listener on this connection are closed the connection gets closed as well
|
||||
/// Close a subscription on this connection. If all subscriptions on this connection are closed the connection gets closed as well
|
||||
/// </summary>
|
||||
/// <param name="listener">Listener to close</param>
|
||||
/// <param name="subscription">Subscription to close</param>
|
||||
/// <param name="unsubEvenIfNotConfirmed">Whether to send an unsub request even if the subscription wasn't confirmed</param>
|
||||
/// <returns></returns>
|
||||
public async Task CloseAsync(Subscription subscription)
|
||||
public async Task CloseAsync(Subscription subscription, bool unsubEvenIfNotConfirmed = false)
|
||||
{
|
||||
lock (_listenerLock)
|
||||
lock (_subscriptionLock)
|
||||
{
|
||||
if (!_messageListeners.Contains(subscription))
|
||||
if (!_subscriptions.Contains(subscription))
|
||||
return;
|
||||
|
||||
subscription.Closed = true;
|
||||
@ -458,15 +452,15 @@ namespace CryptoExchange.Net.Sockets
|
||||
if (Status == SocketStatus.Closing || Status == SocketStatus.Closed || Status == SocketStatus.Disposed)
|
||||
return;
|
||||
|
||||
_logger.Log(LogLevel.Debug, $"Socket {SocketId} closing listener {subscription.Id}");
|
||||
_logger.Log(LogLevel.Debug, $"Socket {SocketId} closing subscription {subscription.Id}");
|
||||
if (subscription.CancellationTokenRegistration.HasValue)
|
||||
subscription.CancellationTokenRegistration.Value.Dispose();
|
||||
|
||||
if (subscription.Confirmed && _socket.IsOpen)
|
||||
if ((unsubEvenIfNotConfirmed || subscription.Confirmed) && _socket.IsOpen)
|
||||
await UnsubscribeAsync(subscription).ConfigureAwait(false);
|
||||
|
||||
bool shouldCloseConnection;
|
||||
lock (_listenerLock)
|
||||
lock (_subscriptionLock)
|
||||
{
|
||||
if (Status == SocketStatus.Closing)
|
||||
{
|
||||
@ -474,22 +468,22 @@ namespace CryptoExchange.Net.Sockets
|
||||
return;
|
||||
}
|
||||
|
||||
shouldCloseConnection = _messageIdentifierListeners.All(r => !r.Value.UserSubscription || r.Value.Closed);
|
||||
shouldCloseConnection = _messageIdentifierSubscriptions.All(r => !r.Value.UserSubscription || r.Value.Closed);
|
||||
if (shouldCloseConnection)
|
||||
Status = SocketStatus.Closing;
|
||||
}
|
||||
|
||||
if (shouldCloseConnection)
|
||||
{
|
||||
_logger.Log(LogLevel.Debug, $"Socket {SocketId} closing as there are no more listeners");
|
||||
_logger.Log(LogLevel.Debug, $"Socket {SocketId} closing as there are no more subscriptions");
|
||||
await CloseAsync().ConfigureAwait(false);
|
||||
}
|
||||
|
||||
lock (_listenerLock)
|
||||
lock (_subscriptionLock)
|
||||
{
|
||||
_messageListeners.Remove(subscription);
|
||||
_subscriptions.Remove(subscription);
|
||||
foreach (var id in subscription.Identifiers)
|
||||
_messageIdentifierListeners.Remove(id);
|
||||
_messageIdentifierSubscriptions.Remove(id);
|
||||
}
|
||||
}
|
||||
|
||||
@ -503,84 +497,79 @@ namespace CryptoExchange.Net.Sockets
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Add a listener to this connection
|
||||
/// Add a subscription to this connection
|
||||
/// </summary>
|
||||
/// <param name="listener"></param>
|
||||
public bool AddListener(Subscription subscription)
|
||||
/// <param name="subscription"></param>
|
||||
public bool AddSubscription(Subscription subscription)
|
||||
{
|
||||
lock (_listenerLock)
|
||||
lock (_subscriptionLock)
|
||||
{
|
||||
if (Status != SocketStatus.None && Status != SocketStatus.Connected)
|
||||
return false;
|
||||
|
||||
_messageListeners.Add(subscription);
|
||||
_subscriptions.Add(subscription);
|
||||
if (subscription.Identifiers != null)
|
||||
{
|
||||
foreach (var id in subscription.Identifiers)
|
||||
_messageIdentifierListeners.Add(id.ToLowerInvariant(), subscription);
|
||||
_messageIdentifierSubscriptions.Add(id.ToLowerInvariant(), subscription);
|
||||
}
|
||||
|
||||
if (subscription.UserSubscription)
|
||||
_logger.Log(LogLevel.Debug, $"Socket {SocketId} adding new listener with id {subscription.Id}, total listeners on connection: {_messageListeners.Count(s => s.UserSubscription)}");
|
||||
_logger.Log(LogLevel.Debug, $"Socket {SocketId} adding new subscription with id {subscription.Id}, total subscriptions on connection: {_subscriptions.Count(s => s.UserSubscription)}");
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Get a listener on this connection by id
|
||||
/// Get a subscription on this connection by id
|
||||
/// </summary>
|
||||
/// <param name="id"></param>
|
||||
public Subscription? GetListener(int id)
|
||||
public Subscription? GetSubscription(int id)
|
||||
{
|
||||
lock (_listenerLock)
|
||||
return _messageListeners.SingleOrDefault(s => s.Id == id);
|
||||
lock (_subscriptionLock)
|
||||
return _subscriptions.SingleOrDefault(s => s.Id == id);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Get a listener on this connection by its subscribe request
|
||||
/// Get a subscription on this connection by its subscribe request
|
||||
/// </summary>
|
||||
/// <param name="predicate">Filter for a request</param>
|
||||
/// <returns></returns>
|
||||
public Subscription? GetListenerByRequest(Func<object?, bool> predicate)
|
||||
public Subscription? GetSubscriptionByRequest(Func<object?, bool> predicate)
|
||||
{
|
||||
lock(_listenerLock)
|
||||
return _messageListeners.SingleOrDefault(s => predicate(s));
|
||||
lock(_subscriptionLock)
|
||||
return _subscriptions.SingleOrDefault(s => predicate(s));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Send a query request and wait for an answer
|
||||
/// </summary>
|
||||
/// <param name="query">Query to send</param>
|
||||
/// <returns></returns>
|
||||
public virtual async Task<CallResult> SendAndWaitQueryAsync(BaseQuery query)
|
||||
{
|
||||
var pendingRequest = query.CreatePendingRequest();
|
||||
await SendAndWaitAsync(pendingRequest, query.Weight).ConfigureAwait(false);
|
||||
return pendingRequest.Result;
|
||||
return pendingRequest.Result!;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Send a query request and wait for an answer
|
||||
/// </summary>
|
||||
/// <typeparam name="T">Query response type</typeparam>
|
||||
/// <param name="query">Query to send</param>
|
||||
/// <returns></returns>
|
||||
public virtual async Task<CallResult<T>> SendAndWaitQueryAsync<T>(Query<T> query)
|
||||
{
|
||||
var pendingRequest = PendingRequest<T>.CreateForQuery(query);
|
||||
await SendAndWaitAsync(pendingRequest, query.Weight).ConfigureAwait(false);
|
||||
return pendingRequest.TypedResult;
|
||||
}
|
||||
|
||||
public virtual async Task<CallResult> SendAndWaitSubAsync(Subscription subscription)
|
||||
{
|
||||
var pendingRequest = PendingRequest.CreateForSubRequest(subscription);
|
||||
await SendAndWaitAsync(pendingRequest, 1).ConfigureAwait(false);
|
||||
return pendingRequest.Result;
|
||||
}
|
||||
|
||||
public virtual async Task<CallResult> SendAndWaitUnsubAsync(Subscription subscription)
|
||||
{
|
||||
var pendingRequest = PendingRequest.CreateForUnsubRequest(subscription);
|
||||
await SendAndWaitAsync(pendingRequest, 1).ConfigureAwait(false);
|
||||
return pendingRequest.Result;
|
||||
return pendingRequest.TypedResult!;
|
||||
}
|
||||
|
||||
private async Task SendAndWaitAsync(BasePendingRequest pending, int weight)
|
||||
{
|
||||
lock (_messageListeners)
|
||||
{
|
||||
lock (_subscriptions)
|
||||
_pendingRequests.Add(pending);
|
||||
}
|
||||
|
||||
var sendOk = Send(pending.Id, pending.Request, weight);
|
||||
if (!sendOk)
|
||||
@ -600,7 +589,7 @@ namespace CryptoExchange.Net.Sockets
|
||||
if (pending.Completed)
|
||||
return;
|
||||
|
||||
await pending.Event.WaitAsync(TimeSpan.FromMilliseconds(500)).ConfigureAwait(false);
|
||||
await pending.WaitAsync(TimeSpan.FromMilliseconds(500)).ConfigureAwait(false);
|
||||
|
||||
if (pending.Completed)
|
||||
return;
|
||||
@ -649,8 +638,8 @@ namespace CryptoExchange.Net.Sockets
|
||||
return new CallResult<bool>(new WebError("Socket not connected"));
|
||||
|
||||
bool anySubscriptions = false;
|
||||
lock (_listenerLock)
|
||||
anySubscriptions = _messageListeners.Any(s => s.UserSubscription);
|
||||
lock (_subscriptionLock)
|
||||
anySubscriptions = _subscriptions.Any(s => s.UserSubscription);
|
||||
|
||||
if (!anySubscriptions)
|
||||
{
|
||||
@ -661,8 +650,8 @@ namespace CryptoExchange.Net.Sockets
|
||||
}
|
||||
|
||||
bool anyAuthenticated = false;
|
||||
lock (_listenerLock)
|
||||
anyAuthenticated = _messageListeners.Any(s => s.Authenticated);
|
||||
lock (_subscriptionLock)
|
||||
anyAuthenticated = _subscriptions.Any(s => s.Authenticated);
|
||||
|
||||
if (anyAuthenticated)
|
||||
{
|
||||
@ -679,13 +668,13 @@ namespace CryptoExchange.Net.Sockets
|
||||
}
|
||||
|
||||
// Get a list of all subscriptions on the socket
|
||||
List<Subscription> listenerList = new List<Subscription>();
|
||||
lock (_listenerLock)
|
||||
listenerList = _messageListeners.ToList();
|
||||
List<Subscription> subList = new List<Subscription>();
|
||||
lock (_subscriptionLock)
|
||||
subList = _subscriptions.ToList();
|
||||
|
||||
foreach(var listener in listenerList)
|
||||
foreach(var subscription in subList)
|
||||
{
|
||||
var result = await ApiClient.RevitalizeRequestAsync(listener).ConfigureAwait(false);
|
||||
var result = await ApiClient.RevitalizeRequestAsync(subscription).ConfigureAwait(false);
|
||||
if (!result)
|
||||
{
|
||||
_logger.Log(LogLevel.Warning, $"Socket {SocketId} Failed request revitalization: " + result.Error);
|
||||
@ -694,22 +683,28 @@ namespace CryptoExchange.Net.Sockets
|
||||
}
|
||||
|
||||
// Foreach subscription which is subscribed by a subscription request we will need to resend that request to resubscribe
|
||||
for (var i = 0; i < listenerList.Count; i += ApiClient.ClientOptions.MaxConcurrentResubscriptionsPerSocket)
|
||||
for (var i = 0; i < subList.Count; i += ApiClient.ClientOptions.MaxConcurrentResubscriptionsPerSocket)
|
||||
{
|
||||
if (!_socket.IsOpen)
|
||||
return new CallResult<bool>(new WebError("Socket not connected"));
|
||||
|
||||
var taskList = new List<Task<CallResult>>();
|
||||
foreach (var listener in listenerList.Skip(i).Take(ApiClient.ClientOptions.MaxConcurrentResubscriptionsPerSocket))
|
||||
taskList.Add(ApiClient.SubscribeAndWaitAsync(this, listener));
|
||||
foreach (var subscription in subList.Skip(i).Take(ApiClient.ClientOptions.MaxConcurrentResubscriptionsPerSocket))
|
||||
{
|
||||
var subQuery = subscription.GetSubQuery();
|
||||
if (subQuery == null)
|
||||
continue;
|
||||
|
||||
taskList.Add(SendAndWaitQueryAsync(subQuery));
|
||||
}
|
||||
|
||||
await Task.WhenAll(taskList).ConfigureAwait(false);
|
||||
if (taskList.Any(t => !t.Result.Success))
|
||||
return taskList.First(t => !t.Result.Success).Result;
|
||||
}
|
||||
|
||||
foreach (var listener in listenerList)
|
||||
listener.Confirmed = true;
|
||||
foreach (var subscription in subList)
|
||||
subscription.Confirmed = true;
|
||||
|
||||
if (!_socket.IsOpen)
|
||||
return new CallResult<bool>(new WebError("Socket not connected"));
|
||||
@ -720,12 +715,12 @@ namespace CryptoExchange.Net.Sockets
|
||||
|
||||
internal async Task UnsubscribeAsync(Subscription subscription)
|
||||
{
|
||||
var unsubscribeRequest = subscription?.GetUnsubRequest();
|
||||
if (unsubscribeRequest != null)
|
||||
{
|
||||
await SendAndWaitUnsubAsync(subscription!).ConfigureAwait(false);
|
||||
_logger.Log(LogLevel.Information, $"Socket {SocketId} subscription {subscription!.Id} unsubscribed");
|
||||
}
|
||||
var unsubscribeRequest = subscription.GetUnsubQuery();
|
||||
if (unsubscribeRequest == null)
|
||||
return;
|
||||
|
||||
await SendAndWaitQueryAsync(unsubscribeRequest).ConfigureAwait(false);
|
||||
_logger.Log(LogLevel.Information, $"Socket {SocketId} subscription {subscription!.Id} unsubscribed");
|
||||
}
|
||||
|
||||
internal async Task<CallResult> ResubscribeAsync(Subscription subscription)
|
||||
@ -733,7 +728,11 @@ namespace CryptoExchange.Net.Sockets
|
||||
if (!_socket.IsOpen)
|
||||
return new CallResult(new UnknownError("Socket is not connected"));
|
||||
|
||||
return await ApiClient.SubscribeAndWaitAsync(this, subscription).ConfigureAwait(false);
|
||||
var subQuery = subscription.GetSubQuery();
|
||||
if (subQuery == null)
|
||||
return new CallResult(null);
|
||||
|
||||
return await SendAndWaitQueryAsync(subQuery).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
@ -9,14 +9,28 @@ using System.Threading.Tasks;
|
||||
namespace CryptoExchange.Net.Sockets
|
||||
{
|
||||
/// <summary>
|
||||
/// Subscription base
|
||||
/// Socket subscription
|
||||
/// </summary>
|
||||
public abstract class Subscription
|
||||
{
|
||||
/// <summary>
|
||||
/// Subscription id
|
||||
/// </summary>
|
||||
public int Id { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Is it a user subscription
|
||||
/// </summary>
|
||||
public bool UserSubscription { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Has the subscription been confirmed
|
||||
/// </summary>
|
||||
public bool Confirmed { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Is the subscription closed
|
||||
/// </summary>
|
||||
public bool Closed { get; set; }
|
||||
|
||||
/// <summary>
|
||||
@ -34,6 +48,9 @@ namespace CryptoExchange.Net.Sockets
|
||||
/// </summary>
|
||||
public abstract List<string> Identifiers { get; }
|
||||
|
||||
/// <summary>
|
||||
/// Cancellation token registration
|
||||
/// </summary>
|
||||
public CancellationTokenRegistration? CancellationTokenRegistration { get; set; }
|
||||
|
||||
/// <summary>
|
||||
@ -56,13 +73,13 @@ namespace CryptoExchange.Net.Sockets
|
||||
/// Get the subscribe object to send when subscribing
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
public abstract object? GetSubRequest();
|
||||
public abstract BaseQuery? GetSubQuery();
|
||||
|
||||
/// <summary>
|
||||
/// Get the unsubscribe object to send when unsubscribing
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
public abstract object? GetUnsubRequest();
|
||||
public abstract BaseQuery? GetUnsubQuery();
|
||||
|
||||
/// <summary>
|
||||
/// Handle the update message
|
||||
@ -70,11 +87,6 @@ namespace CryptoExchange.Net.Sockets
|
||||
/// <param name="message"></param>
|
||||
/// <returns></returns>
|
||||
public abstract Task HandleEventAsync(DataEvent<BaseParsedMessage> message);
|
||||
public abstract CallResult HandleSubResponse(BaseParsedMessage message);
|
||||
public abstract CallResult HandleUnsubResponse(BaseParsedMessage message);
|
||||
|
||||
public abstract bool MessageMatchesUnsubRequest(BaseParsedMessage message);
|
||||
public abstract bool MessageMatchesSubRequest(BaseParsedMessage message);
|
||||
|
||||
/// <summary>
|
||||
/// Invoke the exception event
|
||||
@ -86,55 +98,40 @@ namespace CryptoExchange.Net.Sockets
|
||||
}
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public abstract class Subscription<TQuery, TEvent> : Subscription<TQuery, TEvent, TQuery>
|
||||
{
|
||||
/// <summary>
|
||||
/// ctor
|
||||
/// </summary>
|
||||
/// <param name="logger"></param>
|
||||
/// <param name="authenticated"></param>
|
||||
protected Subscription(ILogger logger, bool authenticated) : base(logger, authenticated)
|
||||
{
|
||||
}
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public abstract class Subscription<TSubResponse, TEvent, TUnsubResponse> : Subscription
|
||||
{
|
||||
/// <summary>
|
||||
/// ctor
|
||||
/// </summary>
|
||||
/// <param name="logger"></param>
|
||||
/// <param name="authenticated"></param>
|
||||
protected Subscription(ILogger logger, bool authenticated) : base(logger, authenticated)
|
||||
{
|
||||
}
|
||||
|
||||
public override CallResult HandleUnsubResponse(BaseParsedMessage message)
|
||||
=> HandleUnsubResponse((ParsedMessage<TUnsubResponse>)message);
|
||||
|
||||
public override CallResult HandleSubResponse(BaseParsedMessage message)
|
||||
=> HandleSubResponse((ParsedMessage<TSubResponse>)message);
|
||||
|
||||
/// <inheritdoc />
|
||||
public override Task HandleEventAsync(DataEvent<BaseParsedMessage> message)
|
||||
=> HandleEventAsync(message.As((ParsedMessage<TEvent>)message.Data));
|
||||
|
||||
public override bool MessageMatchesSubRequest(BaseParsedMessage message)
|
||||
=> MessageMatchesSubRequest((ParsedMessage<TSubResponse>)message);
|
||||
|
||||
public override bool MessageMatchesUnsubRequest(BaseParsedMessage message)
|
||||
=> MessageMatchesUnsubRequest((ParsedMessage<TUnsubResponse>)message);
|
||||
|
||||
/// <summary>
|
||||
/// Handle the update message
|
||||
/// </summary>
|
||||
/// <param name="message"></param>
|
||||
/// <returns></returns>
|
||||
public abstract Task HandleEventAsync(DataEvent<ParsedMessage<TEvent>> message);
|
||||
|
||||
/// <summary>
|
||||
/// Check if the message is the response to the subscribe request
|
||||
/// </summary>
|
||||
/// <param name="message"></param>
|
||||
/// <returns></returns>
|
||||
public abstract bool MessageMatchesSubRequest(ParsedMessage<TSubResponse> message);
|
||||
public abstract CallResult HandleSubResponse(ParsedMessage<TSubResponse> message);
|
||||
|
||||
/// <summary>
|
||||
/// Check if the message is the response to the unsubscribe request
|
||||
/// </summary>
|
||||
/// <param name="message"></param>
|
||||
/// <returns></returns>
|
||||
public abstract bool MessageMatchesUnsubRequest(ParsedMessage<TUnsubResponse> message);
|
||||
public abstract CallResult HandleUnsubResponse(ParsedMessage<TUnsubResponse> message);
|
||||
}
|
||||
}
|
||||
|
@ -1,9 +1,4 @@
|
||||
using CryptoExchange.Net.Converters;
|
||||
using CryptoExchange.Net.Interfaces;
|
||||
using CryptoExchange.Net.Objects;
|
||||
using CryptoExchange.Net.Objects.Sockets;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using System;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
namespace CryptoExchange.Net.Sockets
|
||||
{
|
||||
@ -22,17 +17,9 @@ namespace CryptoExchange.Net.Sockets
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public override object? GetSubRequest() => null;
|
||||
/// <inheritdoc />
|
||||
public override bool MessageMatchesSubRequest(BaseParsedMessage message) => throw new NotImplementedException();
|
||||
/// <inheritdoc />
|
||||
public override CallResult HandleSubResponse(BaseParsedMessage message) => throw new NotImplementedException();
|
||||
public override BaseQuery? GetSubQuery() => null;
|
||||
|
||||
/// <inheritdoc />
|
||||
public override object? GetUnsubRequest() => null;
|
||||
/// <inheritdoc />
|
||||
public override bool MessageMatchesUnsubRequest(BaseParsedMessage message) => throw new NotImplementedException();
|
||||
/// <inheritdoc />
|
||||
public override CallResult HandleUnsubResponse(BaseParsedMessage message) => throw new NotImplementedException();
|
||||
public override BaseQuery? GetUnsubQuery() => null;
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user