mirror of
https://github.com/JKorf/CryptoExchange.Net
synced 2026-04-13 00:22:22 +00:00
Updated WebSocket message routing improving performance for scenarios with multiple different subscriptions and topics Added AddCommaSeparated helper for Enum value arrays to ParameterCollection Improved EnumConverter performance and removed string allocation for happy path Fixed CreateParamString extension method for ArrayParametersSerialization.Json Fixed Shared GetOrderBookOptions and GetRecentTradeOptions base validations not being called
1208 lines
45 KiB
C#
1208 lines
45 KiB
C#
using CryptoExchange.Net.Clients;
|
|
using CryptoExchange.Net.Converters.MessageParsing.DynamicConverters;
|
|
using CryptoExchange.Net.Interfaces;
|
|
using CryptoExchange.Net.Logging.Extensions;
|
|
using CryptoExchange.Net.Objects;
|
|
using CryptoExchange.Net.Objects.Sockets;
|
|
using CryptoExchange.Net.Sockets.Default.Interfaces;
|
|
using CryptoExchange.Net.Sockets.Default.Routing;
|
|
using CryptoExchange.Net.Sockets.Interfaces;
|
|
using Microsoft.Extensions.Logging;
|
|
using System;
|
|
using System.Collections.Generic;
|
|
using System.Collections.ObjectModel;
|
|
using System.Linq;
|
|
using System.Net.WebSockets;
|
|
using System.Text;
|
|
using System.Threading;
|
|
using System.Threading.Tasks;
|
|
|
|
namespace CryptoExchange.Net.Sockets.Default
|
|
{
|
|
/// <summary>
|
|
/// State of a the connection
|
|
/// </summary>
|
|
/// <param name="Id">The id of the socket connection</param>
|
|
/// <param name="Address">The connection URI</param>
|
|
/// <param name="Subscriptions">Number of subscriptions on this socket</param>
|
|
/// <param name="Status">Socket status</param>
|
|
/// <param name="Authenticated">If the connection is authenticated</param>
|
|
/// <param name="DownloadSpeed">Download speed over this socket</param>
|
|
/// <param name="PendingQueries">Number of non-completed queries</param>
|
|
/// <param name="SubscriptionStates">State for each subscription on this socket</param>
|
|
public record SocketConnectionState(
|
|
int Id,
|
|
string Address,
|
|
int Subscriptions,
|
|
SocketStatus Status,
|
|
bool Authenticated,
|
|
double DownloadSpeed,
|
|
int PendingQueries,
|
|
List<Subscription.SubscriptionState> SubscriptionStates
|
|
);
|
|
|
|
/// <summary>
|
|
/// Status of the socket connection
|
|
/// </summary>
|
|
public enum SocketStatus
|
|
{
|
|
/// <summary>
|
|
/// None/Initial
|
|
/// </summary>
|
|
None,
|
|
/// <summary>
|
|
/// Connected
|
|
/// </summary>
|
|
Connected,
|
|
/// <summary>
|
|
/// Reconnecting
|
|
/// </summary>
|
|
Reconnecting,
|
|
/// <summary>
|
|
/// Resubscribing on reconnected socket
|
|
/// </summary>
|
|
Resubscribing,
|
|
/// <summary>
|
|
/// Closing
|
|
/// </summary>
|
|
Closing,
|
|
/// <summary>
|
|
/// Closed
|
|
/// </summary>
|
|
Closed,
|
|
/// <summary>
|
|
/// Disposed
|
|
/// </summary>
|
|
Disposed
|
|
}
|
|
|
|
/// <summary>
|
|
/// A single socket connection to the server
|
|
/// </summary>
|
|
public class SocketConnection : ISocketConnection
|
|
{
|
|
|
|
/// <summary>
|
|
/// Connection lost event
|
|
/// </summary>
|
|
public event Action? ConnectionLost;
|
|
|
|
/// <summary>
|
|
/// Connection closed and no reconnect is happening
|
|
/// </summary>
|
|
public event Action? ConnectionClosed;
|
|
|
|
/// <summary>
|
|
/// Failed to resubscribe all subscription on the reconnected socket
|
|
/// </summary>
|
|
public event Action<Error>? ResubscribingFailed;
|
|
|
|
/// <summary>
|
|
/// Connecting restored event
|
|
/// </summary>
|
|
public event Action<TimeSpan>? ConnectionRestored;
|
|
|
|
/// <summary>
|
|
/// The connection is paused event
|
|
/// </summary>
|
|
public event Action? ActivityPaused;
|
|
|
|
/// <summary>
|
|
/// The connection is unpaused event
|
|
/// </summary>
|
|
public event Action? ActivityUnpaused;
|
|
|
|
/// <summary>
|
|
/// Connection was rate limited and couldn't be established
|
|
/// </summary>
|
|
public Func<Task>? ConnectRateLimitedAsync;
|
|
|
|
/// <summary>
|
|
/// The amount of subscriptions on this connection
|
|
/// </summary>
|
|
public int UserSubscriptionCount
|
|
{
|
|
get
|
|
{
|
|
return _listeners.OfType<Subscription>().Count(h => h.UserSubscription);
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Get a copy of the current message subscriptions
|
|
/// </summary>
|
|
public Subscription[] Subscriptions
|
|
{
|
|
get
|
|
{
|
|
return _listeners.OfType<Subscription>().Where(h => h.UserSubscription).ToArray();
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// If the connection has been authenticated
|
|
/// </summary>
|
|
public bool Authenticated { get; set; }
|
|
|
|
/// <inheritdoc />
|
|
public bool HasAuthenticatedSubscription => Subscriptions.Any(x => x.Authenticated);
|
|
|
|
/// <summary>
|
|
/// If connection is made
|
|
/// </summary>
|
|
public bool Connected => _socket.IsOpen;
|
|
|
|
/// <summary>
|
|
/// The unique ID of the socket
|
|
/// </summary>
|
|
public int SocketId => _socket.Id;
|
|
|
|
/// <summary>
|
|
/// The current kilobytes per second of data being received, averaged over the last 3 seconds
|
|
/// </summary>
|
|
public double IncomingKbps => _socket.IncomingKbps;
|
|
|
|
/// <summary>
|
|
/// The connection uri
|
|
/// </summary>
|
|
public Uri ConnectionUri => _socket.Uri;
|
|
|
|
/// <summary>
|
|
/// The API client the connection is for
|
|
/// </summary>
|
|
public SocketApiClient ApiClient { get; set; }
|
|
|
|
/// <summary>
|
|
/// Time of disconnecting
|
|
/// </summary>
|
|
public DateTime? DisconnectTime { get; set; }
|
|
|
|
/// <summary>
|
|
/// Last timestamp something was received from the server
|
|
/// </summary>
|
|
public DateTime? LastReceiveTime => _socket.LastReceiveTime;
|
|
|
|
/// <summary>
|
|
/// Tag for identification
|
|
/// </summary>
|
|
public string Tag { get; set; }
|
|
|
|
/// <summary>
|
|
/// Additional properties for this connection
|
|
/// </summary>
|
|
public Dictionary<string, object> Properties { get; set; }
|
|
|
|
/// <summary>
|
|
/// If activity is paused
|
|
/// </summary>
|
|
public bool PausedActivity
|
|
{
|
|
get => _pausedActivity;
|
|
set
|
|
{
|
|
if (_pausedActivity != value)
|
|
{
|
|
_pausedActivity = value;
|
|
_logger.ActivityPaused(SocketId, value);
|
|
if (_pausedActivity) _ = Task.Run(() => ActivityPaused?.Invoke());
|
|
else _ = Task.Run(() => ActivityUnpaused?.Invoke());
|
|
}
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Status of the socket connection
|
|
/// </summary>
|
|
public SocketStatus Status
|
|
{
|
|
get => _status;
|
|
private set
|
|
{
|
|
if (_status == value)
|
|
return;
|
|
|
|
var oldStatus = _status;
|
|
_status = value;
|
|
_logger.SocketStatusChanged(SocketId, oldStatus, value);
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Info on whether this connection is a dedicated request connection
|
|
/// </summary>
|
|
public DedicatedConnectionState DedicatedRequestConnection { get; internal set; } = new DedicatedConnectionState();
|
|
|
|
/// <summary>
|
|
/// Current subscription topics on this connection
|
|
/// </summary>
|
|
public string[] Topics
|
|
{
|
|
get
|
|
{
|
|
return _listeners.OfType<Subscription>().Select(x => x.Topic).Where(t => t != null).ToArray()!;
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// The number of current pending requests
|
|
/// </summary>
|
|
public int PendingRequests
|
|
{
|
|
get
|
|
{
|
|
return _listeners.OfType<Query>().Where(x => !x.Completed).Count();
|
|
}
|
|
}
|
|
|
|
|
|
private bool _pausedActivity;
|
|
#if NET9_0_OR_GREATER
|
|
private readonly Lock _listenersLock = new Lock();
|
|
#else
|
|
private readonly object _listenersLock = new object();
|
|
#endif
|
|
|
|
private RoutingTable _routingTable = new RoutingTable();
|
|
|
|
private ReadOnlyCollection<IMessageProcessor> _listeners;
|
|
private readonly ILogger _logger;
|
|
private SocketStatus _status;
|
|
|
|
private readonly IMessageSerializer _serializer;
|
|
|
|
private ISocketMessageHandler? _byteMessageConverter;
|
|
private ISocketMessageHandler? _textMessageConverter;
|
|
|
|
private long _lastSequenceNumber;
|
|
|
|
/// <summary>
|
|
/// The task that is sending periodic data on the websocket. Can be used for sending Ping messages every x seconds or similar. Not necessary.
|
|
/// </summary>
|
|
protected Task? periodicTask;
|
|
|
|
/// <summary>
|
|
/// Wait event for the periodicTask
|
|
/// </summary>
|
|
protected AsyncResetEvent? periodicEvent;
|
|
|
|
/// <summary>
|
|
/// The underlying websocket
|
|
/// </summary>
|
|
private readonly IWebsocket _socket;
|
|
|
|
/// <summary>
|
|
/// New socket connection
|
|
/// </summary>
|
|
public SocketConnection(ILogger logger, IWebsocketFactory socketFactory, WebSocketParameters parameters, SocketApiClient apiClient, string tag)
|
|
{
|
|
_logger = logger;
|
|
ApiClient = apiClient;
|
|
Tag = tag;
|
|
Properties = new Dictionary<string, object>();
|
|
|
|
_socket = socketFactory.CreateWebsocket(logger, this, parameters);
|
|
_logger.SocketCreatedForAddress(_socket.Id, parameters.Uri.ToString());
|
|
|
|
_socket.OnRequestSent += HandleRequestSentAsync;
|
|
_socket.OnRequestRateLimited += HandleRequestRateLimitedAsync;
|
|
_socket.OnConnectRateLimited += HandleConnectRateLimitedAsync;
|
|
_socket.OnOpen += HandleOpenAsync;
|
|
_socket.OnClose += HandleCloseAsync;
|
|
_socket.OnReconnecting += HandleReconnectingAsync;
|
|
_socket.OnReconnected += HandleReconnectedAsync;
|
|
_socket.OnError += HandleErrorAsync;
|
|
_socket.GetReconnectionUrl = GetReconnectionUrlAsync;
|
|
|
|
_listeners = new ReadOnlyCollection<IMessageProcessor>([]);
|
|
|
|
_serializer = apiClient.CreateSerializer();
|
|
}
|
|
|
|
/// <summary>
|
|
/// Handler for a socket opening
|
|
/// </summary>
|
|
protected virtual Task HandleOpenAsync()
|
|
{
|
|
Status = SocketStatus.Connected;
|
|
PausedActivity = false;
|
|
return Task.CompletedTask;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Handler for a socket closing without reconnect
|
|
/// </summary>
|
|
protected virtual Task HandleCloseAsync()
|
|
{
|
|
Status = SocketStatus.Closed;
|
|
Authenticated = false;
|
|
_lastSequenceNumber = 0;
|
|
|
|
if (ApiClient._socketConnections.ContainsKey(SocketId))
|
|
ApiClient._socketConnections.TryRemove(SocketId, out _);
|
|
|
|
foreach (var subscription in _listeners.OfType<Subscription>().Where(l => l.UserSubscription && !l.IsClosingConnection))
|
|
{
|
|
subscription.IsClosingConnection = true;
|
|
subscription.Reset();
|
|
}
|
|
|
|
var queryList = _listeners.OfType<Query>().ToList();
|
|
foreach (var query in queryList)
|
|
query.Fail(new WebError("Connection interrupted"));
|
|
|
|
RemoveMessageProcessors(queryList);
|
|
|
|
_ = Task.Run(() => ConnectionClosed?.Invoke());
|
|
return Task.CompletedTask;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Handler for a socket losing connection and starting reconnect
|
|
/// </summary>
|
|
protected virtual Task HandleReconnectingAsync()
|
|
{
|
|
Status = SocketStatus.Reconnecting;
|
|
DisconnectTime = DateTime.UtcNow;
|
|
Authenticated = false;
|
|
_lastSequenceNumber = 0;
|
|
|
|
foreach (var subscription in _listeners.OfType<Subscription>().Where(l => l.UserSubscription))
|
|
subscription.Reset();
|
|
|
|
var queryList = _listeners.OfType<Query>().ToList();
|
|
foreach (var query in queryList)
|
|
query.Fail(new WebError("Connection interrupted"));
|
|
|
|
RemoveMessageProcessors(queryList);
|
|
|
|
_ = Task.Run(() => ConnectionLost?.Invoke());
|
|
return Task.CompletedTask;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Get the url to connect to when reconnecting
|
|
/// </summary>
|
|
/// <returns></returns>
|
|
protected virtual async Task<Uri?> GetReconnectionUrlAsync()
|
|
{
|
|
return await ApiClient.GetReconnectUriAsync(this).ConfigureAwait(false);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Handler for a socket which has reconnected
|
|
/// </summary>
|
|
protected virtual Task HandleReconnectedAsync()
|
|
{
|
|
Status = SocketStatus.Resubscribing;
|
|
|
|
var queryList = _listeners.OfType<Query>().ToList();
|
|
foreach (var query in queryList)
|
|
query.Fail(new WebError("Connection interrupted"));
|
|
|
|
RemoveMessageProcessors(queryList);
|
|
|
|
// Can't wait for this as it would cause a deadlock
|
|
_ = Task.Run(async () =>
|
|
{
|
|
try
|
|
{
|
|
var reconnectSuccessful = await ProcessReconnectAsync().ConfigureAwait(false);
|
|
if (!reconnectSuccessful)
|
|
{
|
|
_logger.FailedReconnectProcessing(SocketId, reconnectSuccessful.Error!.ToString());
|
|
_ = Task.Run(() => ResubscribingFailed?.Invoke(reconnectSuccessful.Error));
|
|
_ = _socket.ReconnectAsync().ConfigureAwait(false);
|
|
}
|
|
else
|
|
{
|
|
Status = SocketStatus.Connected;
|
|
_ = Task.Run(() =>
|
|
{
|
|
ConnectionRestored?.Invoke(DateTime.UtcNow - DisconnectTime!.Value);
|
|
DisconnectTime = null;
|
|
});
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.UnknownExceptionWhileProcessingReconnection(SocketId, ex);
|
|
_ = _socket.ReconnectAsync().ConfigureAwait(false);
|
|
}
|
|
});
|
|
|
|
return Task.CompletedTask;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Handler for an error on a websocket
|
|
/// </summary>
|
|
/// <param name="e">The exception</param>
|
|
protected virtual Task HandleErrorAsync(Exception e)
|
|
{
|
|
if (e is WebSocketException wse)
|
|
_logger.WebSocketErrorCodeAndDetails(SocketId, wse.WebSocketErrorCode, wse.Message, wse);
|
|
else
|
|
_logger.WebSocketError(SocketId, e.Message, e);
|
|
|
|
return Task.CompletedTask;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Handler for whenever a request is rate limited and rate limit behavior is set to fail
|
|
/// </summary>
|
|
/// <param name="requestId"></param>
|
|
/// <returns></returns>
|
|
protected virtual Task HandleRequestRateLimitedAsync(int requestId)
|
|
{
|
|
var query = _listeners.OfType<Query>().FirstOrDefault(x => x.Id == requestId);
|
|
if (query == null)
|
|
return Task.CompletedTask;
|
|
|
|
query.Fail(new ClientRateLimitError("Connection rate limit reached"));
|
|
return Task.CompletedTask;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Handler for whenever a connection was rate limited and couldn't be established
|
|
/// </summary>
|
|
/// <returns></returns>
|
|
protected async virtual Task HandleConnectRateLimitedAsync()
|
|
{
|
|
if (ConnectRateLimitedAsync is not null)
|
|
await ConnectRateLimitedAsync().ConfigureAwait(false);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Handler for whenever a request is sent over the websocket
|
|
/// </summary>
|
|
/// <param name="requestId">Id of the request sent</param>
|
|
protected virtual Task HandleRequestSentAsync(int requestId)
|
|
{
|
|
var query = _listeners.OfType<Query>().FirstOrDefault(x => x.Id == requestId);
|
|
if (query == null)
|
|
return Task.CompletedTask;
|
|
|
|
query.IsSend(query.RequestTimeout ?? ApiClient.ClientOptions.RequestTimeout);
|
|
return Task.CompletedTask;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Handle a message
|
|
/// </summary>
|
|
protected internal virtual void HandleStreamMessage2(WebSocketMessageType type, ReadOnlySpan<byte> data)
|
|
{
|
|
// Forward message rules:
|
|
// | Message Topic | Route Topic Filter | Topics Match | Forward | Description
|
|
// | N | N | - | Y | No topic filter applied
|
|
// | N | Y | - | N | Route only listens to specific topic
|
|
// | Y | N | - | Y | Route listens to all message regardless of topic
|
|
// | Y | Y | Y | Y | Route listens to specific message topic
|
|
// | Y | Y | N | N | Route listens to different topic
|
|
|
|
var receiveTime = DateTime.UtcNow;
|
|
|
|
// 1. Decrypt/Preprocess if necessary
|
|
data = ApiClient.PreprocessStreamMessage(this, type, data);
|
|
|
|
ISocketMessageHandler messageConverter;
|
|
if (type == WebSocketMessageType.Binary)
|
|
messageConverter = _byteMessageConverter ??= ApiClient.CreateMessageConverter(type);
|
|
else
|
|
messageConverter = _textMessageConverter ??= ApiClient.CreateMessageConverter(type);
|
|
|
|
string? originalData = null;
|
|
if (ApiClient.ApiOptions.OutputOriginalData ?? ApiClient.ClientOptions.OutputOriginalData)
|
|
{
|
|
#if NETSTANDARD2_0
|
|
originalData = Encoding.UTF8.GetString(data.ToArray());
|
|
#else
|
|
originalData = Encoding.UTF8.GetString(data);
|
|
#endif
|
|
|
|
if (_logger.IsEnabled(LogLevel.Trace))
|
|
_logger.ReceivedData(SocketId, originalData);
|
|
}
|
|
|
|
var typeIdentifier = messageConverter.GetTypeIdentifier(data, type);
|
|
if (typeIdentifier == null)
|
|
{
|
|
// Both deserialization type and identifier null, can't process
|
|
_logger.LogWarning("Failed to evaluate message. Data: {Message}", Encoding.UTF8.GetString(data.ToArray()));
|
|
return;
|
|
}
|
|
|
|
var routingEntry = _routingTable.GetRouteTableEntry(typeIdentifier);
|
|
if (routingEntry == null)
|
|
{
|
|
if (!ApiClient.HandleUnhandledMessage(this, typeIdentifier, data))
|
|
{
|
|
// No handler found for identifier either, can't process
|
|
_logger.LogWarning("Failed to determine message type for identifier {Identifier}. Data: {Message}", typeIdentifier, Encoding.UTF8.GetString(data.ToArray()));
|
|
}
|
|
|
|
return;
|
|
}
|
|
|
|
object result;
|
|
try
|
|
{
|
|
if (routingEntry.IsStringOutput)
|
|
{
|
|
#if NETSTANDARD2_0
|
|
result = Encoding.UTF8.GetString(data.ToArray());
|
|
#else
|
|
result = Encoding.UTF8.GetString(data);
|
|
#endif
|
|
}
|
|
else
|
|
{
|
|
result = messageConverter.Deserialize(data, routingEntry.DeserializationType);
|
|
}
|
|
}
|
|
catch(Exception ex)
|
|
{
|
|
_logger.LogWarning(ex, "Deserialization failed. Data: {Message}", Encoding.UTF8.GetString(data.ToArray()));
|
|
return;
|
|
}
|
|
|
|
if (result == null)
|
|
{
|
|
// Deserialize error
|
|
_logger.LogWarning("Deserialization returned null. Data: {Message}", Encoding.UTF8.GetString(data.ToArray()));
|
|
return;
|
|
}
|
|
|
|
var topicFilter = messageConverter.GetTopicFilter(result);
|
|
var processed = false;
|
|
foreach (var handler in routingEntry.Handlers)
|
|
{
|
|
var thisHandled = handler.Handle(typeIdentifier, topicFilter, this, receiveTime, originalData, result);
|
|
if (thisHandled)
|
|
processed = true;
|
|
}
|
|
|
|
if (!processed)
|
|
{
|
|
if (!ApiClient.HandleUnhandledMessage(this, typeIdentifier, data))
|
|
{
|
|
_logger.ReceivedMessageNotMatchedToAnyListener(
|
|
SocketId,
|
|
typeIdentifier,
|
|
topicFilter!,
|
|
string.Join(",", _listeners.Select(x => string.Join(",", x.MessageRouter.Routes.Where(x => x.TypeIdentifier == typeIdentifier).Select(x => x.TopicFilter != null ? string.Join(",", x.TopicFilter) : "[null]")))));
|
|
}
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Connect the websocket
|
|
/// </summary>
|
|
/// <returns></returns>
|
|
public async Task<CallResult> ConnectAsync(CancellationToken ct) => await _socket.ConnectAsync(ct).ConfigureAwait(false);
|
|
|
|
/// <summary>
|
|
/// Retrieve the underlying socket
|
|
/// </summary>
|
|
/// <returns></returns>
|
|
public IWebsocket GetSocket() => _socket;
|
|
|
|
/// <summary>
|
|
/// Trigger a reconnect of the socket connection
|
|
/// </summary>
|
|
/// <returns></returns>
|
|
public async Task TriggerReconnectAsync() => await _socket.ReconnectAsync().ConfigureAwait(false);
|
|
|
|
/// <summary>
|
|
/// Update the proxy setting and reconnect
|
|
/// </summary>
|
|
/// <param name="proxy">New proxy setting</param>
|
|
public async Task UpdateProxy(ApiProxy? proxy)
|
|
{
|
|
_socket.UpdateProxy(proxy);
|
|
await TriggerReconnectAsync().ConfigureAwait(false);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Close the connection
|
|
/// </summary>
|
|
/// <returns></returns>
|
|
public async Task CloseAsync()
|
|
{
|
|
if (Status == SocketStatus.Closed || Status == SocketStatus.Disposed)
|
|
return;
|
|
|
|
if (ApiClient._socketConnections.ContainsKey(SocketId))
|
|
ApiClient._socketConnections.TryRemove(SocketId, out _);
|
|
|
|
foreach (var subscription in _listeners.OfType<Subscription>())
|
|
{
|
|
if (subscription.CancellationTokenRegistration.HasValue)
|
|
subscription.CancellationTokenRegistration.Value.Dispose();
|
|
}
|
|
|
|
await _socket.CloseAsync().ConfigureAwait(false);
|
|
_socket.Dispose();
|
|
}
|
|
|
|
/// <summary>
|
|
/// Close a subscription on this connection. If all subscriptions on this connection are closed the connection gets closed as well
|
|
/// </summary>
|
|
/// <param name="subscription">Subscription to close</param>
|
|
/// <returns></returns>
|
|
public async Task CloseAsync(Subscription subscription)
|
|
{
|
|
// If we are resubscribing this subscription at this moment we'll want to wait for a bit until it is finished to avoid concurrency issues
|
|
while (subscription.Status == SubscriptionStatus.Subscribing)
|
|
await Task.Delay(50).ConfigureAwait(false);
|
|
|
|
subscription.Status = SubscriptionStatus.Closing;
|
|
|
|
if (Status == SocketStatus.Closing || Status == SocketStatus.Closed || Status == SocketStatus.Disposed)
|
|
{
|
|
subscription.Status = SubscriptionStatus.Closed;
|
|
return;
|
|
}
|
|
|
|
_logger.ClosingSubscription(SocketId, subscription.Id);
|
|
if (subscription.CancellationTokenRegistration.HasValue)
|
|
subscription.CancellationTokenRegistration.Value.Dispose();
|
|
|
|
bool anyDuplicateSubscription = _listeners.OfType<Subscription>().Any(x => x != subscription && x.MessageRouter.Routes.All(l => subscription.MessageRouter.ContainsCheck(l)));
|
|
bool shouldCloseConnection = _listeners.OfType<Subscription>().All(r => !r.UserSubscription || r.Status == SubscriptionStatus.Closing || r.Status == SubscriptionStatus.Closed) && !DedicatedRequestConnection.IsDedicatedRequestConnection;
|
|
|
|
if (!anyDuplicateSubscription)
|
|
{
|
|
var needUnsub = _listeners.Contains(subscription) && !shouldCloseConnection;
|
|
if (needUnsub && _socket.IsOpen)
|
|
await UnsubscribeAsync(subscription).ConfigureAwait(false);
|
|
}
|
|
else
|
|
{
|
|
_logger.NotUnsubscribingSubscriptionBecauseDuplicateRunning(SocketId);
|
|
}
|
|
|
|
if (Status == SocketStatus.Closing)
|
|
{
|
|
subscription.Status = SubscriptionStatus.Closed;
|
|
_logger.AlreadyClosing(SocketId);
|
|
return;
|
|
}
|
|
|
|
if (shouldCloseConnection)
|
|
{
|
|
Status = SocketStatus.Closing;
|
|
subscription.IsClosingConnection = true;
|
|
_logger.ClosingNoMoreSubscriptions(SocketId);
|
|
await CloseAsync().ConfigureAwait(false);
|
|
}
|
|
|
|
RemoveMessageProcessor(subscription);
|
|
|
|
subscription.Status = SubscriptionStatus.Closed;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Dispose the connection
|
|
/// </summary>
|
|
public void Dispose()
|
|
{
|
|
Status = SocketStatus.Disposed;
|
|
periodicEvent?.Set();
|
|
_socket.Dispose();
|
|
}
|
|
|
|
/// <summary>
|
|
/// Add a subscription to this connection
|
|
/// </summary>
|
|
/// <param name="subscription"></param>
|
|
public bool AddSubscription(Subscription subscription)
|
|
{
|
|
if (Status != SocketStatus.None && Status != SocketStatus.Connected)
|
|
return false;
|
|
|
|
AddMessageProcessor(subscription);
|
|
|
|
if (subscription.UserSubscription)
|
|
_logger.AddingNewSubscription(SocketId, subscription.Id, UserSubscriptionCount);
|
|
return true;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Get a subscription on this connection by id
|
|
/// </summary>
|
|
/// <param name="id"></param>
|
|
public Subscription? GetSubscription(int id)
|
|
{
|
|
return _listeners.OfType<Subscription>().SingleOrDefault(s => s.Id == id);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Get the state of the connection
|
|
/// </summary>
|
|
/// <returns></returns>
|
|
public SocketConnectionState GetState(bool includeSubDetails)
|
|
{
|
|
return new SocketConnectionState(
|
|
SocketId,
|
|
ConnectionUri.AbsoluteUri,
|
|
UserSubscriptionCount,
|
|
Status,
|
|
Authenticated,
|
|
IncomingKbps,
|
|
PendingQueries: _listeners.OfType<Query>().Count(x => !x.Completed),
|
|
includeSubDetails ? Subscriptions.Select(sub => sub.GetState()).ToList() : new List<Subscription.SubscriptionState>()
|
|
);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Send a query request and wait for an answer
|
|
/// </summary>
|
|
/// <param name="query">Query to send</param>
|
|
/// <param name="ct">Cancellation token</param>
|
|
/// <returns></returns>
|
|
public virtual async Task<CallResult> SendAndWaitQueryAsync(Query query, CancellationToken ct = default)
|
|
{
|
|
await SendAndWaitIntAsync(query, ct).ConfigureAwait(false);
|
|
return query.Result ?? new CallResult(new TimeoutError());
|
|
}
|
|
|
|
/// <summary>
|
|
/// Send a query request and wait for an answer
|
|
/// </summary>
|
|
/// <typeparam name="THandlerResponse">Expected result type</typeparam>
|
|
/// <param name="query">Query to send</param>
|
|
/// <param name="ct">Cancellation token</param>
|
|
/// <returns></returns>
|
|
public virtual async Task<CallResult<THandlerResponse>> SendAndWaitQueryAsync<THandlerResponse>(Query<THandlerResponse> query, CancellationToken ct = default)
|
|
{
|
|
await SendAndWaitIntAsync(query, ct).ConfigureAwait(false);
|
|
return query.TypedResult ?? new CallResult<THandlerResponse>(new TimeoutError());
|
|
}
|
|
|
|
private async Task SendAndWaitIntAsync(Query query, CancellationToken ct = default)
|
|
{
|
|
AddMessageProcessor(query);
|
|
var sendResult = await SendAsync(query.Id, query.Request, query.Weight).ConfigureAwait(false);
|
|
if (!sendResult)
|
|
{
|
|
query.Fail(sendResult.Error!);
|
|
RemoveMessageProcessor(query);
|
|
return;
|
|
}
|
|
|
|
try
|
|
{
|
|
while (!ct.IsCancellationRequested)
|
|
{
|
|
if (!_socket.IsOpen)
|
|
{
|
|
query.Fail(new WebError("Socket not open"));
|
|
return;
|
|
}
|
|
|
|
if (query.Completed)
|
|
return;
|
|
|
|
await query.WaitAsync(TimeSpan.FromMilliseconds(500), ct).ConfigureAwait(false);
|
|
|
|
if (query.Completed)
|
|
return;
|
|
}
|
|
|
|
if (ct.IsCancellationRequested)
|
|
{
|
|
query.Fail(new CancellationRequestedError());
|
|
return;
|
|
}
|
|
}
|
|
finally
|
|
{
|
|
RemoveMessageProcessor(query);
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Send data over the websocket connection
|
|
/// </summary>
|
|
/// <typeparam name="T">The type of the object to send</typeparam>
|
|
/// <param name="requestId">The request id</param>
|
|
/// <param name="obj">The object to send</param>
|
|
/// <param name="weight">The weight of the message</param>
|
|
public virtual ValueTask<CallResult> SendAsync<T>(int requestId, T obj, int weight)
|
|
{
|
|
if (_serializer is IByteMessageSerializer byteSerializer)
|
|
{
|
|
return SendBytesAsync(requestId, byteSerializer.Serialize(obj), weight);
|
|
}
|
|
else if (_serializer is IStringMessageSerializer stringSerializer)
|
|
{
|
|
if (obj is string str)
|
|
return SendStringAsync(requestId, str, weight);
|
|
|
|
str = stringSerializer.Serialize(obj);
|
|
return SendStringAsync(requestId, str, weight);
|
|
}
|
|
|
|
throw new Exception("Unknown serializer when sending message");
|
|
}
|
|
|
|
/// <summary>
|
|
/// Send byte data over the websocket connection
|
|
/// </summary>
|
|
/// <param name="data">The data to send</param>
|
|
/// <param name="weight">The weight of the message</param>
|
|
/// <param name="requestId">The id of the request</param>
|
|
public virtual async ValueTask<CallResult> SendBytesAsync(int requestId, byte[] data, int weight)
|
|
{
|
|
if (ApiClient.MessageSendSizeLimit != null && data.Length > ApiClient.MessageSendSizeLimit.Value)
|
|
{
|
|
var info = $"Message to send exceeds the max server message size ({data.Length} vs {ApiClient.MessageSendSizeLimit.Value} bytes). Split the request into batches to keep below this limit";
|
|
_logger.LogWarning("[Sckt {SocketId}] [Req {RequestId}] {Info}", SocketId, requestId, info);
|
|
return new CallResult(new InvalidOperationError(info));
|
|
}
|
|
|
|
if (!_socket.IsOpen)
|
|
{
|
|
_logger.LogWarning("[Sckt {SocketId}] [Req {RequestId}] failed to send, socket no longer open", SocketId, requestId);
|
|
return new CallResult(new WebError("Failed to send message, socket no longer open"));
|
|
}
|
|
|
|
_logger.SendingByteData(SocketId, requestId, data.Length);
|
|
try
|
|
{
|
|
if (!_socket.Send(requestId, data, weight))
|
|
return new CallResult(new WebError("Failed to send message, connection not open"));
|
|
|
|
return CallResult.SuccessResult;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
return new CallResult(new WebError("Failed to send message: " + ex.Message, exception: ex));
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Send string data over the websocket connection
|
|
/// </summary>
|
|
/// <param name="data">The data to send</param>
|
|
/// <param name="weight">The weight of the message</param>
|
|
/// <param name="requestId">The id of the request</param>
|
|
public virtual async ValueTask<CallResult> SendStringAsync(int requestId, string data, int weight)
|
|
{
|
|
if (ApiClient.MessageSendSizeLimit != null && data.Length > ApiClient.MessageSendSizeLimit.Value)
|
|
{
|
|
var info = $"Message to send exceeds the max server message size ({data.Length} vs {ApiClient.MessageSendSizeLimit.Value} bytes). Split the request into batches to keep below this limit";
|
|
_logger.LogWarning("[Sckt {SocketId}] [Req {RequestId}] {Info}", SocketId, requestId, info);
|
|
return new CallResult(new InvalidOperationError(info));
|
|
}
|
|
|
|
if (!_socket.IsOpen)
|
|
{
|
|
_logger.LogWarning("[Sckt {SocketId}] [Req {RequestId}] failed to send, socket no longer open", SocketId, requestId);
|
|
return new CallResult(new WebError("Failed to send message, socket no longer open"));
|
|
}
|
|
|
|
_logger.SendingData(SocketId, requestId, data);
|
|
try
|
|
{
|
|
if (!_socket.Send(requestId, data, weight))
|
|
return new CallResult(new WebError("Failed to send message, connection not open"));
|
|
|
|
return CallResult.SuccessResult;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
return new CallResult(new WebError("Failed to send message: " + ex.Message, exception: ex));
|
|
}
|
|
}
|
|
|
|
private async Task<CallResult> ProcessReconnectAsync()
|
|
{
|
|
if (!_socket.IsOpen)
|
|
return new CallResult(new WebError("Socket not connected"));
|
|
|
|
if (!DedicatedRequestConnection.IsDedicatedRequestConnection)
|
|
{
|
|
var anySubscriptions = _listeners.OfType<Subscription>().Any(s => s.UserSubscription);
|
|
if (!anySubscriptions)
|
|
{
|
|
// No need to resubscribe anything
|
|
_logger.NothingToResubscribeCloseConnection(SocketId);
|
|
_ = _socket.CloseAsync();
|
|
return CallResult.SuccessResult;
|
|
}
|
|
}
|
|
|
|
bool anyAuthenticated = _listeners.OfType<Subscription>().Any(s => s.Authenticated)
|
|
|| DedicatedRequestConnection.IsDedicatedRequestConnection && DedicatedRequestConnection.Authenticated;
|
|
if (anyAuthenticated)
|
|
{
|
|
// If we reconnected a authenticated connection we need to re-authenticate
|
|
var authResult = await ApiClient.AuthenticateSocketAsync(this).ConfigureAwait(false);
|
|
if (!authResult)
|
|
{
|
|
_logger.FailedAuthenticationDisconnectAndRecoonect(SocketId);
|
|
return authResult;
|
|
}
|
|
|
|
Authenticated = true;
|
|
_logger.AuthenticationSucceeded(SocketId);
|
|
}
|
|
|
|
// Foreach subscription which is subscribed by a subscription request we will need to resend that request to resubscribe
|
|
int batch = 0;
|
|
int batchSize = ApiClient.ClientOptions.MaxConcurrentResubscriptionsPerSocket;
|
|
while (true)
|
|
{
|
|
if (!_socket.IsOpen)
|
|
return new CallResult(new WebError("Socket not connected"));
|
|
|
|
var subList = _listeners.OfType<Subscription>().Where(x => x.Active).Skip(batch * batchSize).Take(batchSize).ToList();
|
|
if (subList.Count == 0)
|
|
break;
|
|
|
|
var taskList = new List<Task<CallResult>>();
|
|
foreach (var subscription in subList)
|
|
{
|
|
var subscribeTask = TrySubscribeAsync(subscription, false, default);
|
|
taskList.Add(subscribeTask);
|
|
}
|
|
|
|
await Task.WhenAll(taskList).ConfigureAwait(false);
|
|
if (taskList.Any(t => !t.Result.Success))
|
|
return taskList.First(t => !t.Result.Success).Result;
|
|
|
|
batch++;
|
|
}
|
|
|
|
if (!_socket.IsOpen)
|
|
return new CallResult(new WebError("Socket not connected"));
|
|
|
|
_logger.AllSubscriptionResubscribed(SocketId);
|
|
return CallResult.SuccessResult;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Try to subscribe a new subscription by sending the subscribe query and wait for the result as needed
|
|
/// </summary>
|
|
/// <param name="subscription">The subscription</param>
|
|
/// <param name="newSubscription">Whether this is a new subscription, or an existing subscription (resubscribing on reconnected socket)</param>
|
|
/// <param name="subCancelToken">Cancellation token</param>
|
|
protected internal async Task<CallResult> TrySubscribeAsync(Subscription subscription, bool newSubscription, CancellationToken subCancelToken)
|
|
{
|
|
subscription.ConnectionInvocations = 0;
|
|
|
|
if (!newSubscription)
|
|
{
|
|
if (!subscription.Active)
|
|
// Can be closed during resubscribing
|
|
return CallResult.SuccessResult;
|
|
|
|
var result = await ApiClient.RevitalizeRequestAsync(subscription).ConfigureAwait(false);
|
|
if (!result)
|
|
{
|
|
_logger.FailedRequestRevitalization(SocketId, result.Error?.ToString());
|
|
subscription.Status = SubscriptionStatus.Pending;
|
|
return result;
|
|
}
|
|
}
|
|
|
|
subscription.Status = SubscriptionStatus.Subscribing;
|
|
var subQuery = subscription.CreateSubscriptionQuery(this);
|
|
if (subQuery == null)
|
|
{
|
|
// No sub query, so successful
|
|
subscription.Status = SubscriptionStatus.Subscribed;
|
|
return CallResult.SuccessResult;
|
|
}
|
|
|
|
var subCompleteHandler = () =>
|
|
{
|
|
subscription.Status = subQuery.Result!.Success ? SubscriptionStatus.Subscribed : SubscriptionStatus.Pending;
|
|
subscription.HandleSubQueryResponse(this, subQuery.Response);
|
|
if (newSubscription && subQuery.Result.Success && subCancelToken != default)
|
|
{
|
|
subscription.CancellationTokenRegistration = subCancelToken.Register(async () =>
|
|
{
|
|
_logger.CancellationTokenSetClosingSubscription(SocketId, subscription.Id);
|
|
await CloseAsync(subscription).ConfigureAwait(false);
|
|
}, false);
|
|
}
|
|
};
|
|
subQuery.OnComplete = subCompleteHandler;
|
|
|
|
var subQueryResult = await SendAndWaitQueryAsync(subQuery).ConfigureAwait(false);
|
|
if (!subQueryResult)
|
|
{
|
|
_logger.FailedToSubscribe(SocketId, subQueryResult.Error?.ToString());
|
|
// If this was a server process error or timeout we still send an unsubscribe to prevent messages coming in later
|
|
if (newSubscription)
|
|
await CloseAsync(subscription).ConfigureAwait(false);
|
|
return new CallResult<UpdateSubscription>(subQueryResult.Error!);
|
|
}
|
|
|
|
if (!subQuery.ExpectsResponse)
|
|
subCompleteHandler();
|
|
|
|
return subQueryResult;
|
|
}
|
|
|
|
internal async Task UnsubscribeAsync(Subscription subscription)
|
|
{
|
|
var unsubscribeRequest = subscription.CreateUnsubscriptionQuery(this);
|
|
if (unsubscribeRequest == null)
|
|
return;
|
|
|
|
await SendAndWaitQueryAsync(unsubscribeRequest).ConfigureAwait(false);
|
|
subscription.HandleUnsubQueryResponse(this, unsubscribeRequest.Response);
|
|
_logger.SubscriptionUnsubscribed(SocketId, subscription.Id);
|
|
}
|
|
|
|
internal async Task<CallResult> ResubscribeAsync(Subscription subscription)
|
|
{
|
|
if (!_socket.IsOpen)
|
|
return new CallResult(new WebError("Socket is not connected"));
|
|
|
|
var subQuery = subscription.CreateSubscriptionQuery(this);
|
|
if (subQuery == null)
|
|
return CallResult.SuccessResult;
|
|
|
|
var result = await SendAndWaitQueryAsync(subQuery).ConfigureAwait(false);
|
|
subscription.HandleSubQueryResponse(this, subQuery.Response);
|
|
return result;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Update the sequence number for this connection
|
|
/// </summary>
|
|
public void UpdateSequenceNumber(long sequenceNumber)
|
|
{
|
|
if (ApiClient.EnforceSequenceNumbers
|
|
&& _lastSequenceNumber != 0 // Initial value is 0
|
|
&& _lastSequenceNumber != sequenceNumber // When there are multiple listeners for the same message it's possible this gets recorded multiple times, shouldn't be an issue
|
|
&& _lastSequenceNumber + 1 != sequenceNumber) // Expected value
|
|
{
|
|
// Not sequential
|
|
_logger.LogWarning("[Sckt {SocketId}] update not in sequence. Last recorded sequence number: {LastSequence}, update sequence number: {UpdateSequence}. Reconnecting", SocketId, _lastSequenceNumber, sequenceNumber);
|
|
_ = TriggerReconnectAsync();
|
|
}
|
|
|
|
_lastSequenceNumber = sequenceNumber;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Periodically sends data over a socket connection
|
|
/// </summary>
|
|
/// <param name="identifier">Identifier for the periodic send</param>
|
|
/// <param name="interval">How often</param>
|
|
/// <param name="queryDelegate">Method returning the query to send</param>
|
|
/// <param name="callback">The callback for processing the response</param>
|
|
public virtual void QueryPeriodic(string identifier, TimeSpan interval, Func<SocketConnection, Query> queryDelegate, Action<SocketConnection, CallResult>? callback)
|
|
{
|
|
if (queryDelegate == null)
|
|
throw new ArgumentNullException(nameof(queryDelegate));
|
|
|
|
periodicEvent = new AsyncResetEvent();
|
|
periodicTask = Task.Run(async () =>
|
|
{
|
|
while (Status != SocketStatus.Disposed
|
|
&& Status != SocketStatus.Closed
|
|
&& Status != SocketStatus.Closing)
|
|
{
|
|
await periodicEvent.WaitAsync(interval).ConfigureAwait(false);
|
|
if (Status == SocketStatus.Disposed
|
|
|| Status == SocketStatus.Closed
|
|
|| Status == SocketStatus.Closing)
|
|
{
|
|
break;
|
|
}
|
|
|
|
if (!Connected)
|
|
continue;
|
|
|
|
var query = queryDelegate(this);
|
|
if (query == null)
|
|
continue;
|
|
|
|
_logger.SendingPeriodic(SocketId, identifier);
|
|
|
|
try
|
|
{
|
|
var result = await SendAndWaitQueryAsync(query).ConfigureAwait(false);
|
|
callback?.Invoke(this, result);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.PeriodicSendFailed(SocketId, identifier, ex.Message, ex);
|
|
}
|
|
}
|
|
});
|
|
}
|
|
|
|
private void UpdateRoutingTable()
|
|
{
|
|
_routingTable.Update(_listeners);
|
|
}
|
|
|
|
private void AddMessageProcessor(IMessageProcessor processor)
|
|
{
|
|
lock (_listenersLock)
|
|
{
|
|
var updatedList = new List<IMessageProcessor>(_listeners);
|
|
updatedList.Add(processor);
|
|
processor.OnMessageRouterUpdated += UpdateRoutingTable;
|
|
_listeners = updatedList.AsReadOnly();
|
|
if (processor.MessageRouter.Routes.Length > 0)
|
|
{
|
|
UpdateRoutingTable();
|
|
#if DEBUG
|
|
_logger.LogTrace("Processor added, new routing table:\r\n" + _routingTable.ToString());
|
|
#endif
|
|
}
|
|
}
|
|
}
|
|
|
|
private void RemoveMessageProcessor(IMessageProcessor processor)
|
|
{
|
|
lock (_listenersLock)
|
|
{
|
|
var updatedList = new List<IMessageProcessor>(_listeners);
|
|
processor.OnMessageRouterUpdated -= UpdateRoutingTable;
|
|
if (!updatedList.Remove(processor))
|
|
return; // If nothing removed nothing has changed
|
|
|
|
_listeners = updatedList.AsReadOnly();
|
|
UpdateRoutingTable();
|
|
#if DEBUG
|
|
_logger.LogTrace("Processor removed, new routing table:\r\n" + _routingTable.ToString());
|
|
#endif
|
|
}
|
|
}
|
|
|
|
private void RemoveMessageProcessors(IEnumerable<IMessageProcessor> processors)
|
|
{
|
|
lock (_listenersLock)
|
|
{
|
|
var updatedList = new List<IMessageProcessor>(_listeners);
|
|
var anyRemoved = false;
|
|
foreach (var processor in processors)
|
|
{
|
|
processor.OnMessageRouterUpdated -= UpdateRoutingTable;
|
|
if (updatedList.Remove(processor))
|
|
anyRemoved = true;
|
|
}
|
|
|
|
if (!anyRemoved)
|
|
return; // If nothing removed nothing has changed
|
|
|
|
_listeners = updatedList.AsReadOnly();
|
|
UpdateRoutingTable();
|
|
#if DEBUG
|
|
_logger.LogTrace("Processors removed, new routing table:\r\n" + _routingTable.ToString());
|
|
#endif
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|