1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-06-08 00:16:27 +00:00
This commit is contained in:
JKorf 2023-11-14 20:29:42 +01:00
parent ff6a9d5f13
commit 1ba66be29f
11 changed files with 214 additions and 288 deletions

View File

@ -101,8 +101,10 @@ namespace CryptoExchange.Net
public string GetSubscriptionsState() public string GetSubscriptionsState()
{ {
var result = new StringBuilder(); var result = new StringBuilder();
foreach(var client in ApiClients.OfType<SocketApiClient>()) foreach (var client in ApiClients.OfType<SocketApiClient>())
{
result.AppendLine(client.GetSubscriptionsState()); result.AppendLine(client.GetSubscriptionsState());
}
return result.ToString(); return result.ToString();
} }
} }

View File

@ -229,7 +229,7 @@ namespace CryptoExchange.Net
return new CallResult<UpdateSubscription>(new ServerError("Socket is paused")); return new CallResult<UpdateSubscription>(new ServerError("Socket is paused"));
} }
var subQuery = subscription.GetSubQuery(); var subQuery = subscription.GetSubQuery(socketConnection);
if (subQuery != null) if (subQuery != null)
{ {
// Send the request and wait for answer // Send the request and wait for answer
@ -664,12 +664,26 @@ namespace CryptoExchange.Net
public string GetSubscriptionsState() public string GetSubscriptionsState()
{ {
var sb = new StringBuilder(); var sb = new StringBuilder();
sb.AppendLine($"{socketConnections.Count} connections, {CurrentSubscriptions} subscriptions, kbps: {IncomingKbps}"); sb.AppendLine($"{GetType().Name}");
sb.AppendLine($" Connections: {socketConnections.Count}");
sb.AppendLine($" Subscriptions: {CurrentSubscriptions}");
sb.AppendLine($" Download speed: {IncomingKbps} kbps");
foreach (var connection in socketConnections) foreach (var connection in socketConnections)
{ {
sb.AppendLine($" Connection {connection.Key}: {connection.Value.UserSubscriptionCount} subscriptions, status: {connection.Value.Status}, authenticated: {connection.Value.Authenticated}, kbps: {connection.Value.IncomingKbps}"); sb.AppendLine($" Id: {connection.Key}");
sb.AppendLine($" Address: {connection.Value.ConnectionUri}");
sb.AppendLine($" Subscriptions: {connection.Value.UserSubscriptionCount}");
sb.AppendLine($" Status: {connection.Value.Status}");
sb.AppendLine($" Authenticated: {connection.Value.Authenticated}");
sb.AppendLine($" Download speed: {connection.Value.IncomingKbps} kbps");
sb.AppendLine($" Subscriptions:");
foreach (var subscription in connection.Value.Subscriptions) foreach (var subscription in connection.Value.Subscriptions)
sb.AppendLine($" Subscription {subscription.Id}, authenticated: {subscription.Authenticated}, confirmed: {subscription.Confirmed}"); {
sb.AppendLine($" Id: {subscription.Id}");
sb.AppendLine($" Confirmed: {subscription.Confirmed}");
sb.AppendLine($" Invocations: {subscription.TotalInvocations}");
sb.AppendLine($" Identifiers: [{string.Join(", ", subscription.Identifiers)}]");
}
} }
return sb.ToString(); return sb.ToString();
} }

View File

@ -63,7 +63,7 @@ namespace CryptoExchange.Net.Converters
} }
PostInspectResult? inspectResult = null; PostInspectResult? inspectResult = null;
Dictionary<string, string> typeIdDict = new Dictionary<string, string>(); Dictionary<string, string?> typeIdDict = new Dictionary<string, string?>();
object? usedParser = null; object? usedParser = null;
if (token.Type == JTokenType.Object) if (token.Type == JTokenType.Object)
{ {
@ -75,8 +75,11 @@ namespace CryptoExchange.Net.Converters
var value = typeIdDict.TryGetValue(field, out var cachedValue) ? cachedValue : GetValueForKey(token, field); var value = typeIdDict.TryGetValue(field, out var cachedValue) ? cachedValue : GetValueForKey(token, field);
if (value == null) if (value == null)
{ {
allFieldsPresent = false; if (callback.AllFieldPresentNeeded)
break; {
allFieldsPresent = false;
break;
}
} }
typeIdDict[field] = value; typeIdDict[field] = value;
@ -86,7 +89,8 @@ namespace CryptoExchange.Net.Converters
{ {
inspectResult = callback.Callback(typeIdDict, processors); inspectResult = callback.Callback(typeIdDict, processors);
usedParser = callback; usedParser = callback;
break; if (inspectResult.Type != null)
break;
} }
} }
} }
@ -124,7 +128,12 @@ namespace CryptoExchange.Net.Converters
if (usedParser == null) if (usedParser == null)
throw new Exception("No parser found for message"); throw new Exception("No parser found for message");
var instance = InterpreterPipeline.ObjectInitializer(token, inspectResult.Type); BaseParsedMessage instance;
if (inspectResult.Type != null)
instance = InterpreterPipeline.ObjectInitializer(token, inspectResult.Type);
else
instance = new ParsedMessage<object>(null);
if (outputOriginalData) if (outputOriginalData)
{ {
stream.Position = 0; stream.Position = 0;

View File

@ -17,6 +17,12 @@
/// If parsed /// If parsed
/// </summary> /// </summary>
public bool Parsed { get; set; } public bool Parsed { get; set; }
/// <summary>
/// Get the data object
/// </summary>
/// <returns></returns>
public abstract object Data { get; }
} }
/// <summary> /// <summary>
@ -28,7 +34,9 @@
/// <summary> /// <summary>
/// Parsed data object /// Parsed data object
/// </summary> /// </summary>
public T? Data { get; set; } public override object? Data { get; }
public T? TypedData => (T)Data;
/// <summary> /// <summary>
/// ctor /// ctor

View File

@ -1,165 +0,0 @@
using CryptoExchange.Net.Sockets;
using System;
using System.Threading;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Objects.Sockets
{
/// <summary>
/// Pending socket request
/// </summary>
public abstract class BasePendingRequest
{
/// <summary>
/// Request id
/// </summary>
public int Id { get; set; }
/// <summary>
/// If the request is completed
/// </summary>
public bool Completed { get; protected set; }
/// <summary>
/// The response object type
/// </summary>
public abstract Type? ResponseType { get; }
/// <summary>
/// Timer event
/// </summary>
public DateTime RequestTimestamp { get; set; }
/// <summary>
/// The request object
/// </summary>
public object Request { get; set; }
/// <summary>
/// The result
/// </summary>
public abstract CallResult? Result { get; set; }
protected AsyncResetEvent _event;
protected TimeSpan _timeout;
protected CancellationTokenSource? _cts;
/// <summary>
/// ctor
/// </summary>
/// <param name="id"></param>
/// <param name="request"></param>
/// <param name="timeout"></param>
protected BasePendingRequest(int id, object request, TimeSpan timeout)
{
Id = id;
_event = new AsyncResetEvent(false, false);
_timeout = timeout;
Request = request;
RequestTimestamp = DateTime.UtcNow;
}
/// <summary>
/// Signal that the request has been send and the timeout timer should start
/// </summary>
public void IsSend()
{
// Start timeout countdown
_cts = new CancellationTokenSource(_timeout);
_cts.Token.Register(Timeout, false);
}
/// <summary>
/// Wait untill timeout or the request is competed
/// </summary>
/// <param name="timeout"></param>
/// <returns></returns>
public async Task WaitAsync(TimeSpan timeout) => await _event.WaitAsync(timeout).ConfigureAwait(false);
/// <summary>
/// Mark request as timeout
/// </summary>
public abstract void Timeout();
/// <summary>
/// Mark request as failed
/// </summary>
/// <param name="error"></param>
public abstract void Fail(string error);
/// <summary>
/// Process a response
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
public abstract Task ProcessAsync(DataEvent<BaseParsedMessage> message);
}
/// <summary>
/// Pending socket request
/// </summary>
/// <typeparam name="T">The response data type</typeparam>
public class PendingRequest<T> : BasePendingRequest
{
/// <inheritdoc />
public override CallResult? Result { get; set; }
/// <summary>
/// The typed call result
/// </summary>
public CallResult<T>? TypedResult => (CallResult<T>?)Result;
/// <summary>
/// Data handler
/// </summary>
public Func<DataEvent<ParsedMessage<T>>, Task<CallResult<T>>> Handler { get; }
/// <summary>
/// The response object type
/// </summary>
public override Type? ResponseType => typeof(T);
/// <summary>
/// ctor
/// </summary>
/// <param name="id"></param>
/// <param name="request"></param>
/// <param name="messageHandler"></param>
/// <param name="timeout"></param>
private PendingRequest(int id, object request, Func<DataEvent<ParsedMessage<T>>, Task<CallResult<T>>> messageHandler, TimeSpan timeout)
: base(id, request, timeout)
{
Handler = messageHandler;
}
/// <summary>
/// Create a new pending request for provided query
/// </summary>
/// <param name="query"></param>
/// <param name="id"></param>
/// <returns></returns>
public static PendingRequest<T> CreateForQuery(Query<T> query, int id)
{
return new PendingRequest<T>(id, query.Request, async x =>
{
var response = await query.HandleMessageAsync(x).ConfigureAwait(false);
return response.As(response.Data);
}, TimeSpan.FromSeconds(5));
}
/// <inheritdoc />
public override void Timeout()
{
Completed = true;
Result = new CallResult<T>(new CancellationRequestedError());
}
/// <inheritdoc />
public override void Fail(string error)
{
Result = new CallResult<T>(new ServerError(error));
Completed = true;
_event.Set();
}
/// <inheritdoc />
public override async Task ProcessAsync(DataEvent<BaseParsedMessage> message)
{
Completed = true;
Result = await Handler(message.As((ParsedMessage<T>)message.Data)).ConfigureAwait(false);
_event.Set();
}
}
}

View File

@ -23,8 +23,9 @@ namespace CryptoExchange.Net.Objects.Sockets
public class PostInspectCallback public class PostInspectCallback
{ {
public bool AllFieldPresentNeeded { get; set; } = true;
public List<string> TypeFields { get; set; } = new List<string>(); public List<string> TypeFields { get; set; } = new List<string>();
public Func<Dictionary<string, string>, Dictionary<string, Type>, PostInspectResult> Callback { get; set; } public Func<Dictionary<string, string?>, Dictionary<string, Type>, PostInspectResult> Callback { get; set; }
} }
public class PostInspectArrayCallback public class PostInspectArrayCallback

View File

@ -3,6 +3,7 @@ using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Sockets; using CryptoExchange.Net.Objects.Sockets;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace CryptoExchange.Net.Sockets namespace CryptoExchange.Net.Sockets
@ -16,6 +17,14 @@ namespace CryptoExchange.Net.Sockets
/// Unique identifier /// Unique identifier
/// </summary> /// </summary>
public int Id { get; } = ExchangeHelpers.NextId(); public int Id { get; } = ExchangeHelpers.NextId();
public bool Completed { get; set; }
public DateTime RequestTimestamp { get; set; }
public CallResult? Result { get; set; }
protected AsyncResetEvent _event;
protected CancellationTokenSource? _cts;
/// <summary> /// <summary>
/// Strings to identify this subscription with /// Strings to identify this subscription with
/// </summary> /// </summary>
@ -36,11 +45,6 @@ namespace CryptoExchange.Net.Sockets
/// </summary> /// </summary>
public int Weight { get; } public int Weight { get; }
/// <summary>
/// The pending request for this query
/// </summary>
public BasePendingRequest? PendingRequest { get; private set; }
public abstract Type ExpectedMessageType { get; } public abstract Type ExpectedMessageType { get; }
/// <summary> /// <summary>
@ -51,26 +55,41 @@ namespace CryptoExchange.Net.Sockets
/// <param name="weight"></param> /// <param name="weight"></param>
public BaseQuery(object request, bool authenticated, int weight = 1) public BaseQuery(object request, bool authenticated, int weight = 1)
{ {
_event = new AsyncResetEvent(false, false);
Authenticated = authenticated; Authenticated = authenticated;
Request = request; Request = request;
Weight = weight; Weight = weight;
} }
/// <summary> /// <summary>
/// Create a pending request for this query /// Signal that the request has been send and the timeout timer should start
/// </summary> /// </summary>
public BasePendingRequest CreatePendingRequest() public void IsSend(TimeSpan timeout)
{ {
PendingRequest = GetPendingRequest(Id); // Start timeout countdown
return PendingRequest; RequestTimestamp = DateTime.UtcNow;
_cts = new CancellationTokenSource(timeout);
_cts.Token.Register(Timeout, false);
} }
/// <summary> /// <summary>
/// Create a pending request for this query /// Wait untill timeout or the request is competed
/// </summary> /// </summary>
/// <param name="id"></param> /// <param name="timeout"></param>
/// <returns></returns> /// <returns></returns>
public abstract BasePendingRequest GetPendingRequest(int id); public async Task WaitAsync(TimeSpan timeout) => await _event.WaitAsync(timeout).ConfigureAwait(false);
/// <summary>
/// Mark request as timeout
/// </summary>
public abstract void Timeout();
/// <summary>
/// Mark request as failed
/// </summary>
/// <param name="error"></param>
public abstract void Fail(string error);
/// <summary> /// <summary>
/// Handle a response message /// Handle a response message
@ -89,6 +108,11 @@ namespace CryptoExchange.Net.Sockets
{ {
public override Type ExpectedMessageType => typeof(TResponse); public override Type ExpectedMessageType => typeof(TResponse);
/// <summary>
/// The typed call result
/// </summary>
public CallResult<TResponse>? TypedResult => (CallResult<TResponse>?)Result;
/// <summary> /// <summary>
/// ctor /// ctor
/// </summary> /// </summary>
@ -102,8 +126,10 @@ namespace CryptoExchange.Net.Sockets
/// <inheritdoc /> /// <inheritdoc />
public override async Task<CallResult> HandleMessageAsync(DataEvent<BaseParsedMessage> message) public override async Task<CallResult> HandleMessageAsync(DataEvent<BaseParsedMessage> message)
{ {
await PendingRequest!.ProcessAsync(message).ConfigureAwait(false); Completed = true;
return await HandleMessageAsync(message.As((ParsedMessage<TResponse>)message.Data)).ConfigureAwait(false); Result = await HandleMessageAsync(message.As((ParsedMessage<TResponse>)message.Data)).ConfigureAwait(false);
_event.Set();
return Result;
} }
/// <summary> /// <summary>
@ -111,9 +137,25 @@ namespace CryptoExchange.Net.Sockets
/// </summary> /// </summary>
/// <param name="message"></param> /// <param name="message"></param>
/// <returns></returns> /// <returns></returns>
public virtual Task<CallResult<TResponse>> HandleMessageAsync(DataEvent<ParsedMessage<TResponse>> message) => Task.FromResult(new CallResult<TResponse>(message.Data.Data!)); public virtual Task<CallResult<TResponse>> HandleMessageAsync(DataEvent<ParsedMessage<TResponse>> message) => Task.FromResult(new CallResult<TResponse>(message.Data.TypedData!));
/// <inheritdoc /> /// <inheritdoc />
public override BasePendingRequest GetPendingRequest(int id) => PendingRequest<TResponse>.CreateForQuery(this, id); public override void Timeout()
{
if (Completed)
return;
Completed = true;
Result = new CallResult<TResponse>(new CancellationRequestedError());
_event.Set();
}
/// <inheritdoc />
public override void Fail(string error)
{
Result = new CallResult<TResponse>(new ServerError(error));
Completed = true;
_event.Set();
}
} }
} }

View File

@ -1,7 +1,6 @@
using CryptoExchange.Net.Interfaces; using CryptoExchange.Net.Interfaces;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Diagnostics;
using System.Linq; using System.Linq;
using System.Threading.Tasks; using System.Threading.Tasks;
using Newtonsoft.Json; using Newtonsoft.Json;
@ -11,7 +10,6 @@ using System.Net.WebSockets;
using System.IO; using System.IO;
using CryptoExchange.Net.Objects.Sockets; using CryptoExchange.Net.Objects.Sockets;
using System.Text; using System.Text;
using System.Collections.Concurrent;
namespace CryptoExchange.Net.Sockets namespace CryptoExchange.Net.Sockets
{ {
@ -58,7 +56,7 @@ namespace CryptoExchange.Net.Sockets
/// <summary> /// <summary>
/// The amount of subscriptions on this connection /// The amount of subscriptions on this connection
/// </summary> /// </summary>
public int UserSubscriptionCount => _subscriptions.Count(h => h.UserSubscription); public int UserSubscriptionCount => _listenerManager.GetSubscriptions().Count(h => h.UserSubscription);
/// <summary> /// <summary>
/// Get a copy of the current message subscriptions /// Get a copy of the current message subscriptions
@ -67,14 +65,14 @@ namespace CryptoExchange.Net.Sockets
{ {
get get
{ {
return _subscriptions.ToArray(h => h.UserSubscription); return _listenerManager.GetSubscriptions().Where(h => h.UserSubscription).ToArray();
} }
} }
/// <summary> /// <summary>
/// If the connection has been authenticated /// If the connection has been authenticated
/// </summary> /// </summary>
public bool Authenticated { get; internal set; } public bool Authenticated { get; set; }
/// <summary> /// <summary>
/// If connection is made /// If connection is made
@ -152,9 +150,7 @@ namespace CryptoExchange.Net.Sockets
} }
private bool _pausedActivity; private bool _pausedActivity;
//private readonly ConcurrentList<BasePendingRequest> _pendingRequests; private readonly SocketListenerManager _listenerManager;
//private readonly ConcurrentList<Subscription> _subscriptions;
private readonly SocketListenerManager _messageIdMap;
private readonly ILogger _logger; private readonly ILogger _logger;
private SocketStatus _status; private SocketStatus _status;
@ -177,10 +173,6 @@ namespace CryptoExchange.Net.Sockets
Tag = tag; Tag = tag;
Properties = new Dictionary<string, object>(); Properties = new Dictionary<string, object>();
//_pendingRequests = new ConcurrentList<BasePendingRequest>();
//_subscriptions = new ConcurrentList<Subscription>();
_messageIdMap = new SocketListenerManager(_logger);
_socket = socket; _socket = socket;
_socket.OnStreamMessage += HandleStreamMessage; _socket.OnStreamMessage += HandleStreamMessage;
_socket.OnRequestSent += HandleRequestSent; _socket.OnRequestSent += HandleRequestSent;
@ -190,6 +182,8 @@ namespace CryptoExchange.Net.Sockets
_socket.OnReconnected += HandleReconnected; _socket.OnReconnected += HandleReconnected;
_socket.OnError += HandleError; _socket.OnError += HandleError;
_socket.GetReconnectionUrl = GetReconnectionUrlAsync; _socket.GetReconnectionUrl = GetReconnectionUrlAsync;
_listenerManager = new SocketListenerManager(_logger, SocketId);
} }
/// <summary> /// <summary>
@ -209,9 +203,15 @@ namespace CryptoExchange.Net.Sockets
Status = SocketStatus.Closed; Status = SocketStatus.Closed;
Authenticated = false; Authenticated = false;
foreach (var subscription in _messageIdMap.GetSubscriptions()) foreach (var subscription in _listenerManager.GetSubscriptions())
subscription.Confirmed = false; subscription.Confirmed = false;
foreach (var query in _listenerManager.GetQueries())
{
query.Fail("Connection interupted");
_listenerManager.Remove(query);
}
Task.Run(() => ConnectionClosed?.Invoke()); Task.Run(() => ConnectionClosed?.Invoke());
} }
@ -224,9 +224,15 @@ namespace CryptoExchange.Net.Sockets
DisconnectTime = DateTime.UtcNow; DisconnectTime = DateTime.UtcNow;
Authenticated = false; Authenticated = false;
foreach (var subscription in _messageIdMap.GetSubscriptions()) foreach (var subscription in _listenerManager.GetSubscriptions())
subscription.Confirmed = false; subscription.Confirmed = false;
foreach (var query in _listenerManager.GetQueries())
{
query.Fail("Connection interupted");
_listenerManager.Remove(query);
}
_ = Task.Run(() => ConnectionLost?.Invoke()); _ = Task.Run(() => ConnectionLost?.Invoke());
} }
@ -246,10 +252,10 @@ namespace CryptoExchange.Net.Sockets
{ {
Status = SocketStatus.Resubscribing; Status = SocketStatus.Resubscribing;
foreach (var pendingRequest in _pendingRequests.ToList()) foreach (var query in _listenerManager.GetQueries())
{ {
pendingRequest.Fail("Connection interupted"); query.Fail("Connection interupted");
// Remove? _listenerManager.Remove(query);
} }
var reconnectSuccessful = await ProcessReconnectAsync().ConfigureAwait(false); var reconnectSuccessful = await ProcessReconnectAsync().ConfigureAwait(false);
@ -287,14 +293,14 @@ namespace CryptoExchange.Net.Sockets
/// <param name="requestId">Id of the request sent</param> /// <param name="requestId">Id of the request sent</param>
protected virtual void HandleRequestSent(int requestId) protected virtual void HandleRequestSent(int requestId)
{ {
var pendingRequest = _pendingRequests.SingleOrDefault(p => p.Id == requestId); var query = _listenerManager.GetById<BaseQuery>(requestId);
if (pendingRequest == null) if (query == null)
{ {
_logger.Log(LogLevel.Debug, $"Socket {SocketId} - msg {requestId} - message sent, but not pending"); _logger.Log(LogLevel.Debug, $"Socket {SocketId} - msg {requestId} - message sent, but not pending");
return; return;
} }
pendingRequest.IsSend(); query.IsSend(ApiClient.ClientOptions.RequestTimeout);
} }
/// <summary> /// <summary>
@ -304,14 +310,10 @@ namespace CryptoExchange.Net.Sockets
/// <returns></returns> /// <returns></returns>
protected virtual async Task HandleStreamMessage(Stream stream) protected virtual async Task HandleStreamMessage(Stream stream)
{ {
var timestamp = DateTime.UtcNow; var result = ApiClient.StreamConverter.ReadJson(stream, _listenerManager.GetMapping(), ApiClient.ApiOptions.OutputOriginalData ?? ApiClient.ClientOptions.OutputOriginalData);
TimeSpan userCodeDuration = TimeSpan.Zero;
// TODO This shouldn't be done for every request, just when something changes. Might want to make it a seperate type or something with functions 'Add', 'Remove' and 'GetMapping' or something
// This could then cache the internal dictionary mapping of `GetMapping` until something changes, and also make sure there aren't duplicate ids with different message types
var result = ApiClient.StreamConverter.ReadJson(stream, _messageIdMap.GetMapping(), ApiClient.ApiOptions.OutputOriginalData ?? ApiClient.ClientOptions.OutputOriginalData);
if(result == null) if(result == null)
{ {
// Not able to parse at all
stream.Position = 0; stream.Position = 0;
var buffer = new byte[stream.Length]; var buffer = new byte[stream.Length];
stream.Read(buffer, 0, buffer.Length); stream.Read(buffer, 0, buffer.Length);
@ -324,18 +326,18 @@ namespace CryptoExchange.Net.Sockets
if (!result.Parsed) if (!result.Parsed)
{ {
// Not able to determine the message type for the message
_logger.LogWarning("Message not matched to type"); _logger.LogWarning("Message not matched to type");
return; return;
} }
if (!await _messageIdMap.InvokeListenersAsync(result.Identifier, result).ConfigureAwait(false)) if (!await _listenerManager.InvokeListenersAsync(result.Identifier, result).ConfigureAwait(false))
{ {
// Not able to find a listener for this message
stream.Position = 0; stream.Position = 0;
var unhandledBuffer = new byte[stream.Length]; var unhandledBuffer = new byte[stream.Length];
stream.Read(unhandledBuffer, 0, unhandledBuffer.Length); stream.Read(unhandledBuffer, 0, unhandledBuffer.Length);
_logger.Log(LogLevel.Warning, $"Socket {SocketId} Message unidentified. Id: {result.Identifier.ToLowerInvariant()}, listening ids: [{string.Join(", ", _listenerManager.GetListenIds())}], Message: {Encoding.UTF8.GetString(unhandledBuffer)} ");
_logger.Log(LogLevel.Warning, $"Socket {SocketId} Message unidentified. Id: {result.Identifier.ToLowerInvariant()}, Message: {Encoding.UTF8.GetString(unhandledBuffer)} ");
UnhandledMessage?.Invoke(result); UnhandledMessage?.Invoke(result);
return; return;
} }
@ -373,7 +375,7 @@ namespace CryptoExchange.Net.Sockets
if (ApiClient.socketConnections.ContainsKey(SocketId)) if (ApiClient.socketConnections.ContainsKey(SocketId))
ApiClient.socketConnections.TryRemove(SocketId, out _); ApiClient.socketConnections.TryRemove(SocketId, out _);
foreach (var subscription in _messageIdMap.GetSubscriptions()) foreach (var subscription in _listenerManager.GetSubscriptions())
{ {
if (subscription.CancellationTokenRegistration.HasValue) if (subscription.CancellationTokenRegistration.HasValue)
subscription.CancellationTokenRegistration.Value.Dispose(); subscription.CancellationTokenRegistration.Value.Dispose();
@ -391,7 +393,7 @@ namespace CryptoExchange.Net.Sockets
/// <returns></returns> /// <returns></returns>
public async Task CloseAsync(Subscription subscription, bool unsubEvenIfNotConfirmed = false) public async Task CloseAsync(Subscription subscription, bool unsubEvenIfNotConfirmed = false)
{ {
if (!_messageIdMap.Contains(subscription)) if (!_listenerManager.Contains(subscription))
return; return;
subscription.Closed = true; subscription.Closed = true;
@ -412,7 +414,7 @@ namespace CryptoExchange.Net.Sockets
return; return;
} }
var shouldCloseConnection = _messageIdMap.GetSubscriptions().All(r => !r.UserSubscription || r.Closed); var shouldCloseConnection = _listenerManager.GetSubscriptions().All(r => !r.UserSubscription || r.Closed);
if (shouldCloseConnection) if (shouldCloseConnection)
Status = SocketStatus.Closing; Status = SocketStatus.Closing;
@ -422,7 +424,7 @@ namespace CryptoExchange.Net.Sockets
await CloseAsync().ConfigureAwait(false); await CloseAsync().ConfigureAwait(false);
} }
_messageIdMap.Remove(subscription); _listenerManager.Remove(subscription);
} }
/// <summary> /// <summary>
@ -443,12 +445,10 @@ namespace CryptoExchange.Net.Sockets
if (Status != SocketStatus.None && Status != SocketStatus.Connected) if (Status != SocketStatus.None && Status != SocketStatus.Connected)
return false; return false;
_messageIdMap.Add(subscription); _listenerManager.Add(subscription);
if (subscription.Identifiers != null)
_messageIdMap.Add(subscription);
//if (subscription.UserSubscription) if (subscription.UserSubscription)
// _logger.Log(LogLevel.Debug, $"Socket {SocketId} adding new subscription with id {subscription.Id}, total subscriptions on connection: {_subscriptions.Count(s => s.UserSubscription)}"); _logger.Log(LogLevel.Debug, $"Socket {SocketId} adding new subscription with id {subscription.Id}, total subscriptions on connection: {UserSubscriptionCount}");
return true; return true;
} }
@ -456,14 +456,14 @@ namespace CryptoExchange.Net.Sockets
/// Get a subscription on this connection by id /// Get a subscription on this connection by id
/// </summary> /// </summary>
/// <param name="id"></param> /// <param name="id"></param>
public Subscription? GetSubscription(int id) => _messageIdMap.GetSubscriptions().SingleOrDefault(s => s.Id == id); public Subscription? GetSubscription(int id) => _listenerManager.GetSubscriptions().SingleOrDefault(s => s.Id == id);
/// <summary> /// <summary>
/// Get a subscription on this connection by its subscribe request /// Get a subscription on this connection by its subscribe request
/// </summary> /// </summary>
/// <param name="predicate">Filter for a request</param> /// <param name="predicate">Filter for a request</param>
/// <returns></returns> /// <returns></returns>
public Subscription? GetSubscriptionByRequest(Func<object?, bool> predicate) => _messageIdMap.GetSubscriptions().SingleOrDefault(s => predicate(s)); public Subscription? GetSubscriptionByRequest(Func<object?, bool> predicate) => _listenerManager.GetSubscriptions().SingleOrDefault(s => predicate(s));
/// <summary> /// <summary>
/// Send a query request and wait for an answer /// Send a query request and wait for an answer
@ -472,12 +472,8 @@ namespace CryptoExchange.Net.Sockets
/// <returns></returns> /// <returns></returns>
public virtual async Task<CallResult> SendAndWaitQueryAsync(BaseQuery query) public virtual async Task<CallResult> SendAndWaitQueryAsync(BaseQuery query)
{ {
var pendingRequest = query.CreatePendingRequest(); await SendAndWaitIntAsync(query).ConfigureAwait(false);
if (query.Identifiers != null) return query.Result ?? new CallResult(new ServerError("Timeout"));
_messageIdMap.Add(query);
await SendAndWaitAsync(pendingRequest, query.Weight).ConfigureAwait(false);
return pendingRequest.Result ?? new CallResult(new ServerError("Timeout"));
} }
/// <summary> /// <summary>
@ -488,23 +484,17 @@ namespace CryptoExchange.Net.Sockets
/// <returns></returns> /// <returns></returns>
public virtual async Task<CallResult<T>> SendAndWaitQueryAsync<T>(Query<T> query) public virtual async Task<CallResult<T>> SendAndWaitQueryAsync<T>(Query<T> query)
{ {
var pendingRequest = (PendingRequest<T>)query.CreatePendingRequest(); await SendAndWaitIntAsync(query).ConfigureAwait(false);
if (query.Identifiers != null) return query.TypedResult ?? new CallResult<T>(new ServerError("Timeout"));
_messageIdMap.Add(query);
await SendAndWaitAsync(pendingRequest, query.Weight).ConfigureAwait(false);
return pendingRequest.TypedResult ?? new CallResult<T>(new ServerError("Timeout"));
} }
private async Task SendAndWaitAsync(BasePendingRequest pending, int weight) private async Task SendAndWaitIntAsync(BaseQuery query)
{ {
lock (_subscriptions) _listenerManager.Add(query);
_pendingRequests.Add(pending); var sendOk = Send(query.Id, query.Request, query.Weight);
var sendOk = Send(pending.Id, pending.Request, weight);
if (!sendOk) if (!sendOk)
{ {
pending.Fail("Failed to send"); query.Fail("Failed to send");
return; return;
} }
@ -512,16 +502,16 @@ namespace CryptoExchange.Net.Sockets
{ {
if (!_socket.IsOpen) if (!_socket.IsOpen)
{ {
pending.Fail("Socket not open"); query.Fail("Socket not open");
return; return;
} }
if (pending.Completed) if (query.Completed)
return; return;
await pending.WaitAsync(TimeSpan.FromMilliseconds(500)).ConfigureAwait(false); await query.WaitAsync(TimeSpan.FromMilliseconds(500)).ConfigureAwait(false);
if (pending.Completed) if (query.Completed)
return; return;
} }
} }
@ -567,7 +557,7 @@ namespace CryptoExchange.Net.Sockets
if (!_socket.IsOpen) if (!_socket.IsOpen)
return new CallResult<bool>(new WebError("Socket not connected")); return new CallResult<bool>(new WebError("Socket not connected"));
var anySubscriptions = _messageIdMap.GetSubscriptions().Any(s => s.UserSubscription); var anySubscriptions = _listenerManager.GetSubscriptions().Any(s => s.UserSubscription);
if (!anySubscriptions) if (!anySubscriptions)
{ {
// No need to resubscribe anything // No need to resubscribe anything
@ -576,7 +566,7 @@ namespace CryptoExchange.Net.Sockets
return new CallResult<bool>(true); return new CallResult<bool>(true);
} }
var anyAuthenticated = _messageIdMap.GetSubscriptions().Any(s => s.Authenticated); var anyAuthenticated = _listenerManager.GetSubscriptions().Any(s => s.Authenticated);
if (anyAuthenticated) if (anyAuthenticated)
{ {
// If we reconnected a authenticated connection we need to re-authenticate // If we reconnected a authenticated connection we need to re-authenticate
@ -592,7 +582,7 @@ namespace CryptoExchange.Net.Sockets
} }
// Get a list of all subscriptions on the socket // Get a list of all subscriptions on the socket
var subList = _messageIdMap.GetSubscriptions(); var subList = _listenerManager.GetSubscriptions();
foreach(var subscription in subList) foreach(var subscription in subList)
{ {
@ -614,7 +604,7 @@ namespace CryptoExchange.Net.Sockets
var taskList = new List<Task<CallResult>>(); var taskList = new List<Task<CallResult>>();
foreach (var subscription in subList.Skip(i).Take(ApiClient.ClientOptions.MaxConcurrentResubscriptionsPerSocket)) foreach (var subscription in subList.Skip(i).Take(ApiClient.ClientOptions.MaxConcurrentResubscriptionsPerSocket))
{ {
var subQuery = subscription.GetSubQuery(); var subQuery = subscription.GetSubQuery(this);
if (subQuery == null) if (subQuery == null)
continue; continue;
@ -651,7 +641,7 @@ namespace CryptoExchange.Net.Sockets
if (!_socket.IsOpen) if (!_socket.IsOpen)
return new CallResult(new UnknownError("Socket is not connected")); return new CallResult(new UnknownError("Socket is not connected"));
var subQuery = subscription.GetSubQuery(); var subQuery = subscription.GetSubQuery(this);
if (subQuery == null) if (subQuery == null)
return new CallResult(null); return new CallResult(null);

View File

@ -14,14 +14,19 @@ namespace CryptoExchange.Net.Sockets
internal class SocketListenerManager internal class SocketListenerManager
{ {
private ILogger _logger; private ILogger _logger;
private int _socketId;
private object _lock = new object(); private object _lock = new object();
private Dictionary<int, IMessageProcessor> _idMap;
private Dictionary<string, Type> _typeMap; private Dictionary<string, Type> _typeMap;
private Dictionary<string, List<IMessageProcessor>> _listeners; private Dictionary<string, List<IMessageProcessor>> _listeners;
public SocketListenerManager(ILogger logger) public SocketListenerManager(ILogger logger, int socketId)
{ {
_idMap = new Dictionary<int, IMessageProcessor>();
_listeners = new Dictionary<string, List<IMessageProcessor>>();
_typeMap = new Dictionary<string, Type>(); _typeMap = new Dictionary<string, Type>();
_logger = logger; _logger = logger;
_socketId = socketId;
} }
public Dictionary<string, Type> GetMapping() public Dictionary<string, Type> GetMapping()
@ -30,19 +35,29 @@ namespace CryptoExchange.Net.Sockets
return _typeMap; return _typeMap;
} }
public List<string> GetListenIds()
{
lock(_lock)
return _listeners.Keys.ToList();
}
public void Add(IMessageProcessor processor) public void Add(IMessageProcessor processor)
{ {
lock (_lock) lock (_lock)
{ {
foreach (var identifier in processor.Identifiers) _idMap.Add(processor.Id, processor);
if (processor.Identifiers?.Any() == true)
{ {
if (!_listeners.TryGetValue(identifier, out var list)) foreach (var identifier in processor.Identifiers)
{ {
list = new List<IMessageProcessor>(); if (!_listeners.TryGetValue(identifier, out var list))
_listeners.Add(identifier, list); {
} list = new List<IMessageProcessor>();
_listeners.Add(identifier, list);
}
list.Add(processor); list.Add(processor);
}
} }
UpdateMap(); UpdateMap();
@ -62,18 +77,14 @@ namespace CryptoExchange.Net.Sockets
foreach (var listener in listeners) foreach (var listener in listeners)
{ {
//_logger.Log(LogLevel.Trace, $"Socket {SocketId} Message mapped to processor {messageProcessor.Id} with identifier {result.Identifier}"); _logger.Log(LogLevel.Trace, $"Socket {_socketId} Message mapped to processor {listener.Id} with identifier {data.Identifier}");
if (listener is BaseQuery query) if (listener is BaseQuery query)
{ {
Remove(listener); Remove(listener);
if (query?.Completed == true)
if (query.PendingRequest != null)
_pendingRequests.Remove(query.PendingRequest);
if (query.PendingRequest?.Completed == true)
{ {
// Answer to a timed out request // Answer to a timed out request
//_logger.Log(LogLevel.Warning, $"Socket {SocketId} Received after request timeout. Consider increasing the RequestTimeout"); _logger.Log(LogLevel.Warning, $"Socket {_socketId} Received after request timeout. Consider increasing the RequestTimeout");
} }
} }
@ -82,15 +93,29 @@ namespace CryptoExchange.Net.Sockets
var dataEvent = new DataEvent<BaseParsedMessage>(data, null, data.OriginalData, DateTime.UtcNow, null); var dataEvent = new DataEvent<BaseParsedMessage>(data, null, data.OriginalData, DateTime.UtcNow, null);
await listener.HandleMessageAsync(dataEvent).ConfigureAwait(false); await listener.HandleMessageAsync(dataEvent).ConfigureAwait(false);
userSw.Stop(); userSw.Stop();
if (userSw.ElapsedMilliseconds > 500)
{
_logger.Log(LogLevel.Debug, $"Socket {_socketId} {(listener is Subscription ? "subscription " : "query " + listener!.Id)} message processing slow ({(int)userSw.ElapsedMilliseconds}ms), consider offloading data handling to another thread. " +
"Data from this socket may arrive late or not at all if message processing is continuously slow.");
}
} }
return true; return true;
} }
public T? GetById<T>(int id) where T : BaseQuery
{
lock (_lock)
{
_idMap.TryGetValue(id, out var val);
return (T)val;
}
}
public List<Subscription> GetSubscriptions() public List<Subscription> GetSubscriptions()
{ {
lock (_lock) lock (_lock)
return _listeners.Values.SelectMany(v => v.OfType<Subscription>()).ToList(); return _listeners.Values.SelectMany(v => v.OfType<Subscription>()).Distinct().ToList();
} }
public List<BaseQuery> GetQueries() public List<BaseQuery> GetQueries()
@ -105,22 +130,22 @@ namespace CryptoExchange.Net.Sockets
return _listeners.Any(l => l.Value.Contains(processor)); return _listeners.Any(l => l.Value.Contains(processor));
} }
public bool Remove(IMessageProcessor processor) public void Remove(IMessageProcessor processor)
{ {
lock (_lock) lock (_lock)
{ {
var removed = false; _idMap.Remove(processor.Id);
foreach (var identifier in processor.Identifiers) if (processor.Identifiers?.Any() == true)
{ {
if (_listeners[identifier].Remove(processor)) foreach (var identifier in processor.Identifiers)
removed = true; {
_listeners[identifier].Remove(processor);
if (!_listeners[identifier].Any()) if (!_listeners[identifier].Any())
_listeners.Remove(identifier); _listeners.Remove(identifier);
}
} }
UpdateMap(); UpdateMap();
return removed;
} }
} }

View File

@ -89,7 +89,7 @@ namespace CryptoExchange.Net.Sockets
/// Get the subscribe object to send when subscribing /// Get the subscribe object to send when subscribing
/// </summary> /// </summary>
/// <returns></returns> /// <returns></returns>
public abstract BaseQuery? GetSubQuery(); public abstract BaseQuery? GetSubQuery(SocketConnection connection);
/// <summary> /// <summary>
/// Get the unsubscribe object to send when unsubscribing /// Get the unsubscribe object to send when unsubscribing

View File

@ -21,7 +21,7 @@ namespace CryptoExchange.Net.Sockets
} }
/// <inheritdoc /> /// <inheritdoc />
public override BaseQuery? GetSubQuery() => null; public override BaseQuery? GetSubQuery(SocketConnection connection) => null;
/// <inheritdoc /> /// <inheritdoc />
public override BaseQuery? GetUnsubQuery() => null; public override BaseQuery? GetUnsubQuery() => null;