1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-06-09 00:46:19 +00:00
This commit is contained in:
JKorf 2023-11-02 22:19:42 +01:00
parent 35f7dbf9fb
commit 312d54cf04
9 changed files with 411 additions and 260 deletions

View File

@ -170,7 +170,6 @@ namespace CryptoExchange.Net
return new CallResult<UpdateSubscription>(new InvalidOperationError("Client disposed, can't subscribe"));
SocketConnection socketConnection;
MessageListener? messageListener;
var released = false;
// Wait for a semaphore here, so we only connect 1 socket at a time.
// This is necessary for being able to see if connections can be combined
@ -195,8 +194,8 @@ namespace CryptoExchange.Net
socketConnection = socketResult.Data;
// Add a subscription on the socket connection
messageListener = AddSubscription(subscription, true, socketConnection);
if (messageListener == null)
var success = AddSubscription(subscription, true, socketConnection);
if (!success)
{
_logger.Log(LogLevel.Trace, $"Socket {socketConnection.SocketId} failed to add subscription, retrying on different connection");
continue;
@ -234,31 +233,31 @@ namespace CryptoExchange.Net
if (request != null)
{
// Send the request and wait for answer
var subResult = await SubscribeAndWaitAsync(socketConnection, request, messageListener).ConfigureAwait(false);
var subResult = await SubscribeAndWaitAsync(socketConnection, subscription).ConfigureAwait(false);
if (!subResult)
{
_logger.Log(LogLevel.Warning, $"Socket {socketConnection.SocketId} failed to subscribe: {subResult.Error}");
await socketConnection.CloseAsync(messageListener).ConfigureAwait(false);
await socketConnection.CloseAsync(subscription).ConfigureAwait(false);
return new CallResult<UpdateSubscription>(subResult.Error!);
}
}
else
{
// No request to be sent, so just mark the subscription as comfirmed
messageListener.Confirmed = true;
subscription.Confirmed = true;
}
if (ct != default)
{
messageListener.CancellationTokenRegistration = ct.Register(async () =>
subscription.CancellationTokenRegistration = ct.Register(async () =>
{
_logger.Log(LogLevel.Information, $"Socket {socketConnection.SocketId} Cancellation token set, closing subscription {messageListener.Id}");
await socketConnection.CloseAsync(messageListener).ConfigureAwait(false);
_logger.Log(LogLevel.Information, $"Socket {socketConnection.SocketId} Cancellation token set, closing subscription {subscription.Id}");
await socketConnection.CloseAsync(subscription).ConfigureAwait(false);
}, false);
}
_logger.Log(LogLevel.Information, $"Socket {socketConnection.SocketId} subscription {messageListener.Id} completed successfully");
return new CallResult<UpdateSubscription>(new UpdateSubscription(socketConnection, messageListener));
_logger.Log(LogLevel.Information, $"Socket {socketConnection.SocketId} subscription {subscription.Id} completed successfully");
return new CallResult<UpdateSubscription>(new UpdateSubscription(socketConnection, subscription));
}
/// <summary>
@ -268,27 +267,13 @@ namespace CryptoExchange.Net
/// <param name="request">The request to send, will be serialized to json</param>
/// <param name="listener">The message listener for the subscription</param>
/// <returns></returns>
protected internal virtual async Task<CallResult<bool>> SubscribeAndWaitAsync(SocketConnection socketConnection, object request, MessageListener listener)
protected internal virtual async Task<CallResult> SubscribeAndWaitAsync(SocketConnection socketConnection, Subscription subscription)
{
CallResult? callResult = null;
await socketConnection.SendAndWaitAsync(request, ClientOptions.RequestTimeout, listener, 1, x =>
{
var (matches, result) = listener.Subscription!.MessageMatchesSubRequest(x);
if (matches)
callResult = result;
return matches;
}).ConfigureAwait(false);
var callResult = await socketConnection.SendAndWaitSubAsync(subscription).ConfigureAwait(false);
if (callResult)
subscription.Confirmed = true;
if (callResult?.Success == true)
{
listener.Confirmed = true;
return new CallResult<bool>(true);
}
if (callResult == null)
return new CallResult<bool>(new ServerError("No response on subscription request received"));
return new CallResult<bool>(callResult.Error!);
return callResult;
}
/// <summary>
@ -297,7 +282,7 @@ namespace CryptoExchange.Net
/// <typeparam name="T">Expected result type</typeparam>
/// <param name="query">The query</param>
/// <returns></returns>
protected virtual Task<CallResult<T>> QueryAsync<T>(Query query)
protected virtual Task<CallResult<T>> QueryAsync<T>(Query<T> query)
{
return QueryAsync<T>(BaseAddress, query);
}
@ -309,7 +294,7 @@ namespace CryptoExchange.Net
/// <param name="url">The url for the request</param>
/// <param name="query">The query</param>
/// <returns></returns>
protected virtual async Task<CallResult<T>> QueryAsync<T>(string url, Query query)
protected virtual async Task<CallResult<T>> QueryAsync<T>(string url, Query<T> query)
{
if (_disposing)
return new CallResult<T>(new InvalidOperationError("Client disposed, can't query"));
@ -358,22 +343,10 @@ namespace CryptoExchange.Net
/// <param name="socket">The connection to send and wait on</param>
/// <param name="query">The query</param>
/// <returns></returns>
protected virtual async Task<CallResult<T>> QueryAndWaitAsync<T>(SocketConnection socket, Query query)
protected virtual async Task<CallResult<T>> QueryAndWaitAsync<T>(SocketConnection socket, Query<T> query)
{
var dataResult = new CallResult<T>(new ServerError("No response on query received"));
await socket.SendAndWaitAsync(query.Request, ClientOptions.RequestTimeout, null, query.Weight, x =>
{
var matches = query.MessageMatchesQuery(x);
if (matches)
{
query.HandleResponse(x);
return true;
}
return false;
}).ConfigureAwait(false);
return dataResult;
return await socket.SendAndWaitQueryAsync<T>(query).ConfigureAwait(false);
}
/// <summary>
@ -409,27 +382,16 @@ namespace CryptoExchange.Net
{
_logger.Log(LogLevel.Debug, $"Socket {socket.SocketId} Attempting to authenticate");
var authRequest = GetAuthenticationRequest();
var authResult = new CallResult(new ServerError("No response from server"));
await socket.SendAndWaitAsync(authRequest.Request, ClientOptions.RequestTimeout, null, 1, x =>
{
var matches = authRequest.MessageMatchesQuery(x);
if (matches)
{
authResult = authRequest.HandleResponse(x);
return true;
}
var result = await socket.SendAndWaitQueryAsync(authRequest).ConfigureAwait(false);
return false;
}).ConfigureAwait(false);
if (!authResult)
if (!result)
{
_logger.Log(LogLevel.Warning, $"Socket {socket.SocketId} authentication failed");
if (socket.Connected)
await socket.CloseAsync().ConfigureAwait(false);
authResult.Error!.Message = "Authentication failed: " + authResult.Error.Message;
return new CallResult<bool>(authResult.Error);
result.Error!.Message = "Authentication failed: " + result.Error.Message;
return new CallResult<bool>(result.Error)!;
}
_logger.Log(LogLevel.Debug, $"Socket {socket.SocketId} authenticated");
@ -451,13 +413,12 @@ namespace CryptoExchange.Net
/// <param name="userSubscription">Whether or not this is a user subscription (counts towards the max amount of handlers on a socket)</param>
/// <param name="connection">The socket connection the handler is on</param>
/// <returns></returns>
protected virtual MessageListener? AddSubscription(Subscription subscription, bool userSubscription, SocketConnection connection)
protected virtual bool AddSubscription(Subscription subscription, bool userSubscription, SocketConnection connection)
{
var messageListener = new MessageListener(ExchangeHelpers.NextId(), subscription, userSubscription);
if (!connection.AddListener(messageListener))
return null;
if (!connection.AddListener(subscription))
return false;
return messageListener;
return false;
}
/// <summary>
@ -467,9 +428,8 @@ namespace CryptoExchange.Net
protected void AddSystemSubscription(SystemSubscription systemSubscription)
{
systemSubscriptions.Add(systemSubscription);
var subscription = new MessageListener(ExchangeHelpers.NextId(), systemSubscription, false);
foreach (var connection in socketConnections.Values)
connection.AddListener(subscription);
connection.AddListener(systemSubscription);
}
/// <summary>
@ -542,10 +502,7 @@ namespace CryptoExchange.Net
socketConnection.UnparsedMessage += HandleUnparsedMessage;
foreach (var systemSubscription in systemSubscriptions)
{
var handler = new MessageListener(ExchangeHelpers.NextId(), systemSubscription, false);
socketConnection.AddListener(handler);
}
socketConnection.AddListener(systemSubscription);
return new CallResult<SocketConnection>(socketConnection);
}
@ -665,7 +622,7 @@ namespace CryptoExchange.Net
/// <returns></returns>
public virtual async Task<bool> UnsubscribeAsync(int subscriptionId)
{
MessageListener? subscription = null;
Subscription? subscription = null;
SocketConnection? connection = null;
foreach (var socket in socketConnections.Values.ToList())
{
@ -747,7 +704,7 @@ namespace CryptoExchange.Net
foreach (var connection in socketConnections)
{
sb.AppendLine($" Connection {connection.Key}: {connection.Value.UserListenerCount} subscriptions, status: {connection.Value.Status}, authenticated: {connection.Value.Authenticated}, kbps: {connection.Value.IncomingKbps}");
foreach (var subscription in connection.Value.MessageListeners)
foreach (var subscription in connection.Value.Subscriptions)
sb.AppendLine($" Subscription {subscription.Id}, authenticated: {subscription.Authenticated}, confirmed: {subscription.Confirmed}");
}
return sb.ToString();

View File

@ -1,4 +1,5 @@
using CryptoExchange.Net.Objects.Sockets;
using CryptoExchange.Net.Sockets;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System;
@ -28,10 +29,10 @@ namespace CryptoExchange.Net.Converters
/// <param name="idValues"></param>
/// <param name="listeners"></param>
/// <returns></returns>
public abstract Type? GetDeserializationType(Dictionary<string, string?> idValues, List<MessageListener> listeners);
public abstract Type? GetDeserializationType(Dictionary<string, string?> idValues, List<BasePendingRequest> pendingRequests, List<Subscription> listeners);
/// <inheritdoc />
public ParsedMessage? ReadJson(Stream stream, List<MessageListener> listeners, bool outputOriginalData)
public ParsedMessage? ReadJson(Stream stream, List<BasePendingRequest> pendingRequests, List<Subscription> listeners, bool outputOriginalData)
{
// Start reading the data
// Once we reach the properties that identify the message we save those in a dict
@ -81,7 +82,7 @@ namespace CryptoExchange.Net.Converters
}
result.Identifier = idString;
var resultType = GetDeserializationType(typeIdDict, listeners);
var resultType = GetDeserializationType(typeIdDict, pendingRequests, listeners);
result.Data = resultType == null ? null : token.ToObject(resultType);
return result;
}

View File

@ -1,43 +1,47 @@
using CryptoExchange.Net.Converters;
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Sockets;
using System;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Objects.Sockets
{
internal class PendingRequest
public abstract class BasePendingRequest
{
public int Id { get; set; }
public Func<ParsedMessage, bool> MessageMatchesHandler { get; }
public bool Completed { get; private set; }
public abstract Type ResponseType { get; }
public AsyncResetEvent Event { get; }
public DateTime RequestTimestamp { get; set; }
public TimeSpan Timeout { get; }
public MessageListener? MessageListener { get; }
public object Request { get; set; }
private CancellationTokenSource? _cts;
public int Priority => 100;
public PendingRequest(int id, Func<ParsedMessage, bool> messageMatchesHandler, TimeSpan timeout, MessageListener? subscription)
protected BasePendingRequest(int id, object request, Func<ParsedMessage, bool> messageMatchesHandler, TimeSpan timeout)
{
Id = id;
MessageMatchesHandler = messageMatchesHandler;
Event = new AsyncResetEvent(false, false);
Timeout = timeout;
Request = request;
RequestTimestamp = DateTime.UtcNow;
MessageListener = subscription;
}
public void IsSend()
{
// Start timeout countdown
_cts = new CancellationTokenSource(Timeout);
_cts.Token.Register(Fail, false);
_cts.Token.Register(() => Fail("No response"), false);
}
public void Fail()
public virtual void Fail(string error)
{
Completed = true;
Event.Set();
@ -48,11 +52,85 @@ namespace CryptoExchange.Net.Objects.Sockets
return MessageMatchesHandler(message);
}
public Task ProcessAsync(ParsedMessage message)
public virtual Task ProcessAsync(ParsedMessage message)
{
Completed = true;
Event.Set();
return Task.CompletedTask;
}
}
public class PendingRequest : BasePendingRequest
{
public CallResult Result { get; set; }
public Func<ParsedMessage, CallResult> Handler { get; }
public override Type? ResponseType => null;
private PendingRequest(int id, object request, Func<ParsedMessage, bool> messageMatchesHandler, Func<ParsedMessage, CallResult> messageHandler, TimeSpan timeout)
: base(id, request, messageMatchesHandler, timeout)
{
Handler = messageHandler;
}
public static PendingRequest CreateForQuery(Query query)
{
return new PendingRequest(ExchangeHelpers.NextId(), query.Request, query.MessageMatchesQuery, query.HandleResult, TimeSpan.FromSeconds(5));
}
public static PendingRequest CreateForSubRequest(Subscription subscription)
{
return new PendingRequest(ExchangeHelpers.NextId(), subscription.GetSubRequest, subscription.MessageMatchesSubRequest, subscription.HandleSubResponse, TimeSpan.FromSeconds(5));
}
public static PendingRequest CreateForUnsubRequest(Subscription subscription)
{
return new PendingRequest(ExchangeHelpers.NextId(), subscription.GetUnsubRequest, subscription.MessageMatchesUnsubRequest, subscription.HandleUnsubResponse, TimeSpan.FromSeconds(5));
}
public override void Fail(string error)
{
Result = new CallResult(new ServerError(error));
base.Fail(error);
}
public override Task ProcessAsync(ParsedMessage message)
{
Result = Handler(message);
return base.ProcessAsync(message);
}
}
public class PendingRequest<T> : BasePendingRequest
{
public CallResult<T> Result { get; set; }
public Func<ParsedMessage, CallResult<T>> Handler { get; }
public override Type? ResponseType => typeof(T);
public PendingRequest(int id, object request, Func<ParsedMessage, bool> messageMatchesHandler, Func<ParsedMessage, CallResult<T>> messageHandler, TimeSpan timeout)
: base(id, request, messageMatchesHandler, timeout)
{
Handler = messageHandler;
}
public static PendingRequest<T> CreateForQuery<T>(Query<T> query)
{
return new PendingRequest<T>(ExchangeHelpers.NextId(), query.Request, query.MessageMatchesQuery, x =>
{
var response = query.HandleResponse(x);
return response.As((T)response.Data);
}, TimeSpan.FromSeconds(5));
}
public override void Fail(string error)
{
Result = new CallResult<T>(new ServerError(error));
base.Fail(error);
}
public override Task ProcessAsync(ParsedMessage message)
{
Result = Handler(message);
return base.ProcessAsync(message);
}
}
}

View File

@ -1,95 +1,95 @@
using CryptoExchange.Net.Converters;
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Sockets;
using System;
using System.Threading;
using System.Threading.Tasks;
//using CryptoExchange.Net.Converters;
//using CryptoExchange.Net.Interfaces;
//using CryptoExchange.Net.Sockets;
//using System;
//using System.Threading;
//using System.Threading.Tasks;
namespace CryptoExchange.Net.Objects.Sockets
{
/// <summary>
/// Socket listener
/// </summary>
public class MessageListener
{
/// <summary>
/// Unique listener id
/// </summary>
public int Id { get; }
//namespace CryptoExchange.Net.Objects.Sockets
//{
// /// <summary>
// /// Socket listener
// /// </summary>
// public class MessageListener
// {
// /// <summary>
// /// Unique listener id
// /// </summary>
// public int Id { get; }
/// <summary>
/// Exception event
/// </summary>
public event Action<Exception>? Exception;
// /// <summary>
// /// Exception event
// /// </summary>
// public event Action<Exception>? Exception;
/// <summary>
/// The request object send when subscribing on the server. Either this or the `Identifier` property should be set
/// </summary>
public Subscription Subscription { get; set; }
// /// <summary>
// /// The request object send when subscribing on the server. Either this or the `Identifier` property should be set
// /// </summary>
// public Subscription Subscription { get; set; }
/// <summary>
/// Whether this is a user subscription or an internal listener
/// </summary>
public bool UserListener { get; set; }
// /// <summary>
// /// Whether this is a user subscription or an internal listener
// /// </summary>
// public bool UserListener { get; set; }
/// <summary>
/// If the subscription has been confirmed to be subscribed by the server
/// </summary>
public bool Confirmed { get; set; }
// /// <summary>
// /// If the subscription has been confirmed to be subscribed by the server
// /// </summary>
// public bool Confirmed { get; set; }
/// <summary>
/// Whether authentication is needed for this subscription
/// </summary>
public bool Authenticated => Subscription.Authenticated;
// /// <summary>
// /// Whether authentication is needed for this subscription
// /// </summary>
// public bool Authenticated => Subscription.Authenticated;
/// <summary>
/// Whether we're closing this subscription and a socket connection shouldn't be kept open for it
/// </summary>
public bool Closed { get; set; }
// /// <summary>
// /// Whether we're closing this subscription and a socket connection shouldn't be kept open for it
// /// </summary>
// public bool Closed { get; set; }
/// <summary>
/// Cancellation token registration, should be disposed when subscription is closed. Used for closing the subscription with
/// a provided cancelation token
/// </summary>
public CancellationTokenRegistration? CancellationTokenRegistration { get; set; }
// /// <summary>
// /// Cancellation token registration, should be disposed when subscription is closed. Used for closing the subscription with
// /// a provided cancelation token
// /// </summary>
// public CancellationTokenRegistration? CancellationTokenRegistration { get; set; }
/// <summary>
/// ctor
/// </summary>
/// <param name="id"></param>
/// <param name="request"></param>
/// <param name="userSubscription"></param>
public MessageListener(int id, Subscription request, bool userSubscription)
{
Id = id;
UserListener = userSubscription;
Subscription = request;
}
// /// <summary>
// /// ctor
// /// </summary>
// /// <param name="id"></param>
// /// <param name="request"></param>
// /// <param name="userSubscription"></param>
// public MessageListener(int id, Subscription request, bool userSubscription)
// {
// Id = id;
// UserListener = userSubscription;
// Subscription = request;
// }
/// <summary>
/// Invoke the exception event
/// </summary>
/// <param name="e"></param>
public void InvokeExceptionHandler(Exception e)
{
Exception?.Invoke(e);
}
// /// <summary>
// /// Invoke the exception event
// /// </summary>
// /// <param name="e"></param>
// public void InvokeExceptionHandler(Exception e)
// {
// Exception?.Invoke(e);
// }
/// <summary>
/// The priority of this subscription
/// </summary>
public int Priority => Subscription is SystemSubscription ? 50 : 1;
// /// <summary>
// /// The priority of this subscription
// /// </summary>
// public int Priority => Subscription is SystemSubscription ? 50 : 1;
/// <summary>
/// Process the message
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
public Task ProcessAsync(ParsedMessage message)
{
// TODO
var dataEvent = new DataEvent<ParsedMessage>(message, null, message.OriginalData, DateTime.UtcNow, null);
return Subscription.HandleEventAsync(dataEvent);
}
}
}
// /// <summary>
// /// Process the message
// /// </summary>
// /// <param name="message"></param>
// /// <returns></returns>
// public Task ProcessAsync(ParsedMessage message)
// {
// // TODO
// var dataEvent = new DataEvent<ParsedMessage>(message, null, message.OriginalData, DateTime.UtcNow, null);
// return Subscription.HandleEventAsync(dataEvent);
// }
// }
//}

View File

@ -11,7 +11,7 @@ namespace CryptoExchange.Net.Objects.Sockets
public class UpdateSubscription
{
private readonly SocketConnection _connection;
private readonly MessageListener _listener;
private readonly Subscription _listener;
/// <summary>
/// Event when the connection is lost. The socket will automatically reconnect when possible.
@ -84,7 +84,7 @@ namespace CryptoExchange.Net.Objects.Sockets
/// </summary>
/// <param name="connection">The socket connection the subscription is on</param>
/// <param name="subscription">The subscription</param>
public UpdateSubscription(SocketConnection connection, MessageListener subscription)
public UpdateSubscription(SocketConnection connection, Subscription subscription)
{
_connection = connection;
_listener = subscription;
@ -121,7 +121,7 @@ namespace CryptoExchange.Net.Objects.Sockets
/// Resubscribe this subscription
/// </summary>
/// <returns></returns>
internal async Task<CallResult<bool>> ResubscribeAsync()
internal async Task<CallResult> ResubscribeAsync()
{
return await _connection.ResubscribeAsync(_listener).ConfigureAwait(false);
}

View File

@ -1,13 +1,14 @@
using CryptoExchange.Net.Converters;
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Sockets;
using System;
namespace CryptoExchange.Net.Sockets
{
/// <summary>
/// Query
/// </summary>
public abstract class Query
public abstract class BaseQuery
{
/// <summary>
/// The query request
@ -30,12 +31,6 @@ namespace CryptoExchange.Net.Sockets
/// <param name="message"></param>
/// <returns></returns>
public abstract bool MessageMatchesQuery(ParsedMessage message);
/// <summary>
/// Handle the query response
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
public abstract CallResult HandleResponse(ParsedMessage message);
/// <summary>
/// ctor
@ -43,11 +38,39 @@ namespace CryptoExchange.Net.Sockets
/// <param name="request"></param>
/// <param name="authenticated"></param>
/// <param name="weight"></param>
public Query(object request, bool authenticated, int weight = 1)
public BaseQuery(object request, bool authenticated, int weight = 1)
{
Authenticated = authenticated;
Request = request;
Weight = weight;
}
}
public abstract class Query : BaseQuery
{
protected Query(object request, bool authenticated, int weight = 1) : base(request, authenticated, weight)
{
}
/// <summary>
/// Handle the query response
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
public abstract CallResult HandleResult(ParsedMessage message);
}
public abstract class Query<TResponse> : BaseQuery
{
protected Query(object request, bool authenticated, int weight = 1) : base(request, authenticated, weight)
{
}
/// <summary>
/// Handle the query response
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
public abstract CallResult<TResponse> HandleResponse(ParsedMessage message);
}
}

View File

@ -62,18 +62,18 @@ namespace CryptoExchange.Net.Sockets
public int UserListenerCount
{
get { lock (_listenerLock)
return _messageIdentifierListeners.Values.Count(h => h.UserListener); }
return _messageIdentifierListeners.Values.Count(h => h.UserSubscription); }
}
/// <summary>
/// Get a copy of the current message listeners
/// </summary>
public MessageListener[] MessageListeners
public Subscription[] Subscriptions
{
get
{
lock (_listenerLock)
return _messageIdentifierListeners.Values.Where(h => h.UserListener).ToArray();
return _messageIdentifierListeners.Values.Where(h => h.UserSubscription).ToArray();
}
}
@ -158,9 +158,9 @@ namespace CryptoExchange.Net.Sockets
}
private bool _pausedActivity;
private readonly List<PendingRequest> _pendingRequests;
private readonly List<MessageListener> _messageListeners;
private readonly Dictionary<string, MessageListener> _messageIdentifierListeners;
private readonly List<BasePendingRequest> _pendingRequests;
private readonly List<Subscription> _messageListeners;
private readonly Dictionary<string, Subscription> _messageIdentifierListeners;
private readonly object _listenerLock = new();
private readonly ILogger _logger;
@ -186,9 +186,9 @@ namespace CryptoExchange.Net.Sockets
Tag = tag;
Properties = new Dictionary<string, object>();
_pendingRequests = new List<PendingRequest>();
_messageListeners = new List<MessageListener>();
_messageIdentifierListeners = new Dictionary<string, MessageListener>();
_pendingRequests = new List<BasePendingRequest>();
_messageListeners = new List<Subscription>();
_messageIdentifierListeners = new Dictionary<string, Subscription>();
_socket = socket;
_socket.OnStreamMessage += HandleStreamMessage;
@ -261,7 +261,7 @@ namespace CryptoExchange.Net.Sockets
{
foreach (var pendingRequest in _pendingRequests.ToList())
{
pendingRequest.Fail();
pendingRequest.Fail("Connection interupted");
// Remove?
}
}
@ -301,7 +301,7 @@ namespace CryptoExchange.Net.Sockets
/// <param name="requestId">Id of the request sent</param>
protected virtual void HandleRequestSent(int requestId)
{
PendingRequest pendingRequest;
BasePendingRequest pendingRequest;
lock (_pendingRequests)
pendingRequest = _pendingRequests.SingleOrDefault(p => p.Id == requestId);
@ -324,11 +324,11 @@ namespace CryptoExchange.Net.Sockets
var timestamp = DateTime.UtcNow;
TimeSpan userCodeDuration = TimeSpan.Zero;
List<MessageListener> listeners;
List<Subscription> listeners;
lock (_listenerLock)
listeners = _messageListeners.OrderByDescending(x => x.Priority).ToList();
listeners = _messageListeners.OrderByDescending(x => !x.UserSubscription).ToList();
var result = ApiClient.StreamConverter.ReadJson(stream, listeners, ApiClient.ApiOptions.OutputOriginalData ?? ApiClient.ClientOptions.OutputOriginalData); // TODO
var result = ApiClient.StreamConverter.ReadJson(stream, _pendingRequests, listeners, ApiClient.ApiOptions.OutputOriginalData ?? ApiClient.ClientOptions.OutputOriginalData);
if(result == null)
{
stream.Position = 0;
@ -352,12 +352,13 @@ namespace CryptoExchange.Net.Sockets
{
// Matched based on identifier
var userSw = Stopwatch.StartNew();
await idListener.ProcessAsync(result).ConfigureAwait(false);
var dataEvent = new DataEvent<ParsedMessage>(result, null, result.OriginalData, DateTime.UtcNow, null);
await idListener.HandleEventAsync(dataEvent).ConfigureAwait(false);
userSw.Stop();
return;
}
List<PendingRequest> pendingRequests;
List<BasePendingRequest> pendingRequests;
lock (_pendingRequests)
pendingRequests = _pendingRequests.ToList();
@ -371,11 +372,12 @@ namespace CryptoExchange.Net.Sockets
if (pendingRequest.Completed)
{
// Answer to a timed out request, unsub if it is a subscription request
if (pendingRequest.MessageListener != null)
{
_logger.Log(LogLevel.Warning, $"Socket {SocketId} Received subscription info after request timed out; unsubscribing. Consider increasing the RequestTimeout");
_ = UnsubscribeAsync(pendingRequest.MessageListener).ConfigureAwait(false);
}
// TODO
//if (pendingRequest.MessageListener != null)
//{
// _logger.Log(LogLevel.Warning, $"Socket {SocketId} Received subscription info after request timed out; unsubscribing. Consider increasing the RequestTimeout");
// _ = UnsubscribeAsync(pendingRequest.MessageListener).ConfigureAwait(false);
//}
}
else
{
@ -443,25 +445,25 @@ namespace CryptoExchange.Net.Sockets
/// </summary>
/// <param name="listener">Listener to close</param>
/// <returns></returns>
public async Task CloseAsync(MessageListener listener)
public async Task CloseAsync(Subscription subscription)
{
lock (_listenerLock)
{
if (!_messageListeners.Contains(listener))
if (!_messageListeners.Contains(subscription))
return;
listener.Closed = true;
subscription.Closed = true;
}
if (Status == SocketStatus.Closing || Status == SocketStatus.Closed || Status == SocketStatus.Disposed)
return;
_logger.Log(LogLevel.Debug, $"Socket {SocketId} closing listener {listener.Id}");
if (listener.CancellationTokenRegistration.HasValue)
listener.CancellationTokenRegistration.Value.Dispose();
_logger.Log(LogLevel.Debug, $"Socket {SocketId} closing listener {subscription.Id}");
if (subscription.CancellationTokenRegistration.HasValue)
subscription.CancellationTokenRegistration.Value.Dispose();
if (listener.Confirmed && _socket.IsOpen)
await UnsubscribeAsync(listener).ConfigureAwait(false);
if (subscription.Confirmed && _socket.IsOpen)
await UnsubscribeAsync(subscription).ConfigureAwait(false);
bool shouldCloseConnection;
lock (_listenerLock)
@ -472,7 +474,7 @@ namespace CryptoExchange.Net.Sockets
return;
}
shouldCloseConnection = _messageIdentifierListeners.All(r => !r.Value.UserListener || r.Value.Closed);
shouldCloseConnection = _messageIdentifierListeners.All(r => !r.Value.UserSubscription || r.Value.Closed);
if (shouldCloseConnection)
Status = SocketStatus.Closing;
}
@ -485,8 +487,8 @@ namespace CryptoExchange.Net.Sockets
lock (_listenerLock)
{
_messageListeners.Remove(listener);
foreach (var id in listener.Subscription.Identifiers)
_messageListeners.Remove(subscription);
foreach (var id in subscription.Identifiers)
_messageIdentifierListeners.Remove(id);
}
}
@ -504,22 +506,22 @@ namespace CryptoExchange.Net.Sockets
/// Add a listener to this connection
/// </summary>
/// <param name="listener"></param>
public bool AddListener(MessageListener listener)
public bool AddListener(Subscription subscription)
{
lock (_listenerLock)
{
if (Status != SocketStatus.None && Status != SocketStatus.Connected)
return false;
_messageListeners.Add(listener);
if (listener.Subscription.Identifiers != null)
_messageListeners.Add(subscription);
if (subscription.Identifiers != null)
{
foreach (var id in listener.Subscription.Identifiers)
_messageIdentifierListeners.Add(id.ToLowerInvariant(), listener);
foreach (var id in subscription.Identifiers)
_messageIdentifierListeners.Add(id.ToLowerInvariant(), subscription);
}
if (listener.UserListener)
_logger.Log(LogLevel.Debug, $"Socket {SocketId} adding new listener with id {listener.Id}, total listeners on connection: {_messageListeners.Count(s => s.UserListener)}");
if (subscription.UserSubscription)
_logger.Log(LogLevel.Debug, $"Socket {SocketId} adding new listener with id {subscription.Id}, total listeners on connection: {_messageListeners.Count(s => s.UserSubscription)}");
return true;
}
}
@ -528,7 +530,7 @@ namespace CryptoExchange.Net.Sockets
/// Get a listener on this connection by id
/// </summary>
/// <param name="id"></param>
public MessageListener? GetListener(int id)
public Subscription? GetListener(int id)
{
lock (_listenerLock)
return _messageListeners.SingleOrDefault(s => s.Id == id);
@ -539,42 +541,59 @@ namespace CryptoExchange.Net.Sockets
/// </summary>
/// <param name="predicate">Filter for a request</param>
/// <returns></returns>
public MessageListener? GetListenerByRequest(Func<object?, bool> predicate)
public Subscription? GetListenerByRequest(Func<object?, bool> predicate)
{
lock(_listenerLock)
return _messageListeners.SingleOrDefault(s => predicate(s.Subscription));
return _messageListeners.SingleOrDefault(s => predicate(s));
}
/// <summary>
/// Send data and wait for an answer
/// </summary>
/// <typeparam name="T">The data type expected in response</typeparam>
/// <param name="obj">The object to send</param>
/// <param name="timeout">The timeout for response</param>
/// <param name="listener">Listener if this is a subscribe request</param>
/// <param name="handler">The response handler</param>
/// <param name="weight">The weight of the message</param>
/// <returns></returns>
public virtual async Task SendAndWaitAsync<T>(T obj, TimeSpan timeout, MessageListener? listener, int weight, Func<ParsedMessage, bool> handler)
public virtual async Task<CallResult<T>> SendAndWaitQueryAsync<T>(Query<T> query)
{
var pendingRequest = PendingRequest<T>.CreateForQuery(query);
await SendAndWaitAsync(pendingRequest, query.Weight).ConfigureAwait(false);
return pendingRequest.Result;
}
public virtual async Task<CallResult> SendAndWaitQueryAsync(Query query)
{
var pendingRequest = PendingRequest.CreateForQuery(query);
await SendAndWaitAsync(pendingRequest, query.Weight).ConfigureAwait(false);
return pendingRequest.Result;
}
public virtual async Task<CallResult> SendAndWaitSubAsync(Subscription subscription)
{
var pendingRequest = PendingRequest.CreateForSubRequest(subscription);
await SendAndWaitAsync(pendingRequest, 1).ConfigureAwait(false);
return pendingRequest.Result;
}
public virtual async Task<CallResult> SendAndWaitUnsubAsync(Subscription subscription)
{
var pendingRequest = PendingRequest.CreateForUnsubRequest(subscription);
await SendAndWaitAsync(pendingRequest, 1).ConfigureAwait(false);
return pendingRequest.Result;
}
private async Task SendAndWaitAsync(BasePendingRequest pending, int weight)
{
var pending = new PendingRequest(ExchangeHelpers.NextId(), handler, timeout, listener);
lock (_messageListeners)
{
_pendingRequests.Add(pending);
}
var sendOk = Send(pending.Id, obj, weight);
var sendOk = Send(pending.Id, pending.Request, weight);
if (!sendOk)
{
pending.Fail();
pending.Fail("Failed to send");
return;
}
while (true)
{
if(!_socket.IsOpen)
if (!_socket.IsOpen)
{
pending.Fail();
pending.Fail("Socket not open");
return;
}
@ -588,6 +607,52 @@ namespace CryptoExchange.Net.Sockets
}
}
///// <summary>
///// Send data and wait for an answer
///// </summary>
///// <typeparam name="T">The data type expected in response</typeparam>
///// <param name="obj">The object to send</param>
///// <param name="timeout">The timeout for response</param>
///// <param name="listener">Listener if this is a subscribe request</param>
///// <param name="handler">The response handler</param>
///// <param name="weight">The weight of the message</param>
///// <returns></returns>
//public virtual async Task SendAndWaitAsync<T>(T obj, TimeSpan timeout, MessageListener? listener, int weight, Func<ParsedMessage, bool> handler)
//{
// // TODO either Query<T> or Subscription<T> should be passed here instead of T obj
// // That would allow to track the Query/Subscription on the PendingRequest instead of the listener, which allow us to match the pending request in the Converter
// var pending = new PendingRequest(ExchangeHelpers.NextId(), handler, timeout, listener);
// lock (_messageListeners)
// {
// _pendingRequests.Add(pending);
// }
// var sendOk = Send(pending.Id, obj, weight);
// if (!sendOk)
// {
// pending.Fail();
// return;
// }
// while (true)
// {
// if(!_socket.IsOpen)
// {
// pending.Fail();
// return;
// }
// if (pending.Completed)
// return;
// await pending.Event.WaitAsync(TimeSpan.FromMilliseconds(500)).ConfigureAwait(false);
// if (pending.Completed)
// return;
// }
//}
/// <summary>
/// Send data over the websocket connection
/// </summary>
@ -624,14 +689,14 @@ namespace CryptoExchange.Net.Sockets
}
}
private async Task<CallResult<bool>> ProcessReconnectAsync()
private async Task<CallResult> ProcessReconnectAsync()
{
if (!_socket.IsOpen)
return new CallResult<bool>(new WebError("Socket not connected"));
bool anySubscriptions = false;
lock (_listenerLock)
anySubscriptions = _messageListeners.Any(s => s.UserListener);
anySubscriptions = _messageListeners.Any(s => s.UserSubscription);
if (!anySubscriptions)
{
@ -660,21 +725,22 @@ namespace CryptoExchange.Net.Sockets
}
// Get a list of all subscriptions on the socket
List<MessageListener> listenerList = new List<MessageListener>();
List<Subscription> listenerList = new List<Subscription>();
lock (_listenerLock)
{
// ?
foreach (var listener in _messageListeners)
{
if (listener.Subscription != null)
if (listener != null)
listenerList.Add(listener);
else
listener.Confirmed = true;
}
}
foreach(var listener in listenerList.Where(s => s.Subscription != null))
foreach(var listener in listenerList)
{
var result = await ApiClient.RevitalizeRequestAsync(listener.Subscription!).ConfigureAwait(false);
var result = await ApiClient.RevitalizeRequestAsync(listener).ConfigureAwait(false);
if (!result)
{
_logger.Log(LogLevel.Warning, $"Socket {SocketId} Failed request revitalization: " + result.Error);
@ -688,9 +754,9 @@ namespace CryptoExchange.Net.Sockets
if (!_socket.IsOpen)
return new CallResult<bool>(new WebError("Socket not connected"));
var taskList = new List<Task<CallResult<bool>>>();
var taskList = new List<Task<CallResult>>();
foreach (var listener in listenerList.Skip(i).Take(ApiClient.ClientOptions.MaxConcurrentResubscriptionsPerSocket))
taskList.Add(ApiClient.SubscribeAndWaitAsync(this, listener.Subscription!, listener));
taskList.Add(ApiClient.SubscribeAndWaitAsync(this, listener));
await Task.WhenAll(taskList).ConfigureAwait(false);
if (taskList.Any(t => !t.Result.Success))
@ -707,28 +773,24 @@ namespace CryptoExchange.Net.Sockets
return new CallResult<bool>(true);
}
internal async Task UnsubscribeAsync(MessageListener listener)
internal async Task UnsubscribeAsync(Subscription subscription)
{
var unsubscribeRequest = listener.Subscription?.GetUnsubRequest();
var unsubscribeRequest = subscription?.GetUnsubRequest();
if (unsubscribeRequest != null)
{
await SendAndWaitAsync(unsubscribeRequest, TimeSpan.FromSeconds(10), listener, 0, x =>
{
var (matches, result) = listener.Subscription!.MessageMatchesUnsubRequest(x);
// TODO check result?
return matches;
}).ConfigureAwait(false);
var pendingRequest = PendingRequest.CreateForUnsubRequest(subscription!);
await SendAndWaitAsync(pendingRequest, 1).ConfigureAwait(false);
_logger.Log(LogLevel.Information, $"Socket {SocketId} subscription {listener.Id} unsubscribed");
_logger.Log(LogLevel.Information, $"Socket {SocketId} subscription {subscription.Id} unsubscribed");
}
}
internal async Task<CallResult<bool>> ResubscribeAsync(MessageListener listener)
internal async Task<CallResult> ResubscribeAsync(Subscription subscription)
{
if (!_socket.IsOpen)
return new CallResult<bool>(new UnknownError("Socket is not connected"));
return new CallResult(new UnknownError("Socket is not connected"));
return await ApiClient.SubscribeAndWaitAsync(this, listener.Subscription!, listener).ConfigureAwait(false);
return await ApiClient.SubscribeAndWaitAsync(this, subscription).ConfigureAwait(false);
}
/// <summary>

View File

@ -1,7 +1,9 @@
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Sockets;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Sockets
@ -11,6 +13,12 @@ namespace CryptoExchange.Net.Sockets
/// </summary>
public abstract class Subscription
{
public int Id { get; set; }
public bool UserSubscription { get; set; }
public bool Confirmed { get; set; }
public bool Closed { get; set; }
/// <summary>
/// Logger
/// </summary>
@ -26,6 +34,13 @@ namespace CryptoExchange.Net.Sockets
/// </summary>
public abstract List<string> Identifiers { get; }
public CancellationTokenRegistration? CancellationTokenRegistration { get; set; }
/// <summary>
/// Exception event
/// </summary>
public event Action<Exception>? Exception;
/// <summary>
/// ctor
/// </summary>
@ -47,7 +62,8 @@ namespace CryptoExchange.Net.Sockets
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
public abstract (bool, CallResult?) MessageMatchesSubRequest(ParsedMessage message);
public abstract bool MessageMatchesSubRequest(ParsedMessage message);
public abstract CallResult HandleSubResponse(ParsedMessage message);
/// <summary>
/// Get the unsubscribe object to send when unsubscribing
@ -59,7 +75,8 @@ namespace CryptoExchange.Net.Sockets
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
public abstract (bool, CallResult?) MessageMatchesUnsubRequest(ParsedMessage message);
public abstract bool MessageMatchesUnsubRequest(ParsedMessage message);
public abstract CallResult HandleUnsubResponse(ParsedMessage message);
/// <summary>
/// Handle the update message
@ -67,5 +84,14 @@ namespace CryptoExchange.Net.Sockets
/// <param name="message"></param>
/// <returns></returns>
public abstract Task HandleEventAsync(DataEvent<ParsedMessage> message);
/// <summary>
/// Invoke the exception event
/// </summary>
/// <param name="e"></param>
public void InvokeExceptionHandler(Exception e)
{
Exception?.Invoke(e);
}
}
}

View File

@ -24,11 +24,15 @@ namespace CryptoExchange.Net.Sockets
/// <inheritdoc />
public override object? GetSubRequest() => null;
/// <inheritdoc />
public override (bool, CallResult?) MessageMatchesSubRequest(ParsedMessage message) => throw new NotImplementedException();
public override bool MessageMatchesSubRequest(ParsedMessage message) => throw new NotImplementedException();
/// <inheritdoc />
public override CallResult HandleSubResponse(ParsedMessage message) => throw new NotImplementedException();
/// <inheritdoc />
public override object? GetUnsubRequest() => null;
/// <inheritdoc />
public override (bool, CallResult?) MessageMatchesUnsubRequest(ParsedMessage message) => throw new NotImplementedException();
public override bool MessageMatchesUnsubRequest(ParsedMessage message) => throw new NotImplementedException();
/// <inheritdoc />
public override CallResult HandleUnsubResponse(ParsedMessage message) => throw new NotImplementedException();
}
}