using CryptoExchange.Net.Clients;
using CryptoExchange.Net.Converters.MessageParsing.DynamicConverters;
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Logging.Extensions;
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Sockets;
using CryptoExchange.Net.Sockets.Default.Interfaces;
using CryptoExchange.Net.Sockets.Interfaces;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net.WebSockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Sockets.Default
{
///
/// State of a the connection
///
/// The id of the socket connection
/// The connection URI
/// Number of subscriptions on this socket
/// Socket status
/// If the connection is authenticated
/// Download speed over this socket
/// Number of non-completed queries
/// State for each subscription on this socket
public record SocketConnectionState(
int Id,
string Address,
int Subscriptions,
SocketStatus Status,
bool Authenticated,
double DownloadSpeed,
int PendingQueries,
List SubscriptionStates
);
///
/// Status of the socket connection
///
public enum SocketStatus
{
///
/// None/Initial
///
None,
///
/// Connected
///
Connected,
///
/// Reconnecting
///
Reconnecting,
///
/// Resubscribing on reconnected socket
///
Resubscribing,
///
/// Closing
///
Closing,
///
/// Closed
///
Closed,
///
/// Disposed
///
Disposed
}
///
/// A single socket connection to the server
///
public class SocketConnection : ISocketConnection
{
///
/// Connection lost event
///
public event Action? ConnectionLost;
///
/// Connection closed and no reconnect is happening
///
public event Action? ConnectionClosed;
///
/// Failed to resubscribe all subscription on the reconnected socket
///
public event Action? ResubscribingFailed;
///
/// Connecting restored event
///
public event Action? ConnectionRestored;
///
/// The connection is paused event
///
public event Action? ActivityPaused;
///
/// The connection is unpaused event
///
public event Action? ActivityUnpaused;
///
/// Connection was rate limited and couldn't be established
///
public Func? ConnectRateLimitedAsync;
///
/// The amount of subscriptions on this connection
///
public int UserSubscriptionCount
{
get
{
lock (_listenersLock)
return _listeners.OfType().Count(h => h.UserSubscription);
}
}
///
/// Get a copy of the current message subscriptions
///
public Subscription[] Subscriptions
{
get
{
lock (_listenersLock)
return _listeners.OfType().Where(h => h.UserSubscription).ToArray();
}
}
///
/// If the connection has been authenticated
///
public bool Authenticated { get; set; }
///
public bool HasAuthenticatedSubscription => Subscriptions.Any(x => x.Authenticated);
///
/// If connection is made
///
public bool Connected => _socket.IsOpen;
///
/// The unique ID of the socket
///
public int SocketId => _socket.Id;
///
/// The current kilobytes per second of data being received, averaged over the last 3 seconds
///
public double IncomingKbps => _socket.IncomingKbps;
///
/// The connection uri
///
public Uri ConnectionUri => _socket.Uri;
///
/// The API client the connection is for
///
public SocketApiClient ApiClient { get; set; }
///
/// Time of disconnecting
///
public DateTime? DisconnectTime { get; set; }
///
/// Last timestamp something was received from the server
///
public DateTime? LastReceiveTime => _socket.LastReceiveTime;
///
/// Tag for identification
///
public string Tag { get; set; }
///
/// Additional properties for this connection
///
public Dictionary Properties { get; set; }
///
/// If activity is paused
///
public bool PausedActivity
{
get => _pausedActivity;
set
{
if (_pausedActivity != value)
{
_pausedActivity = value;
_logger.ActivityPaused(SocketId, value);
if (_pausedActivity) _ = Task.Run(() => ActivityPaused?.Invoke());
else _ = Task.Run(() => ActivityUnpaused?.Invoke());
}
}
}
///
/// Status of the socket connection
///
public SocketStatus Status
{
get => _status;
private set
{
if (_status == value)
return;
var oldStatus = _status;
_status = value;
_logger.SocketStatusChanged(SocketId, oldStatus, value);
}
}
///
/// Info on whether this connection is a dedicated request connection
///
public DedicatedConnectionState DedicatedRequestConnection { get; internal set; } = new DedicatedConnectionState();
///
/// Current subscription topics on this connection
///
public string[] Topics
{
get
{
lock (_listenersLock)
return _listeners.OfType().Select(x => x.Topic).Where(t => t != null).ToArray()!;
}
}
///
/// The number of current pending requests
///
public int PendingRequests
{
get
{
lock (_listenersLock)
return _listeners.OfType().Where(x => !x.Completed).Count();
}
}
private bool _pausedActivity;
#if NET9_0_OR_GREATER
private readonly Lock _listenersLock = new Lock();
#else
private readonly object _listenersLock = new object();
#endif
private readonly List _listeners;
private readonly ILogger _logger;
private SocketStatus _status;
private readonly IMessageSerializer _serializer;
private ISocketMessageHandler? _byteMessageConverter;
private ISocketMessageHandler? _textMessageConverter;
private long _lastSequenceNumber;
///
/// The task that is sending periodic data on the websocket. Can be used for sending Ping messages every x seconds or similar. Not necessary.
///
protected Task? periodicTask;
///
/// Wait event for the periodicTask
///
protected AsyncResetEvent? periodicEvent;
///
/// The underlying websocket
///
private readonly IWebsocket _socket;
///
/// New socket connection
///
public SocketConnection(ILogger logger, IWebsocketFactory socketFactory, WebSocketParameters parameters, SocketApiClient apiClient, string tag)
{
_logger = logger;
ApiClient = apiClient;
Tag = tag;
Properties = new Dictionary();
_socket = socketFactory.CreateWebsocket(logger, this, parameters);
_logger.SocketCreatedForAddress(_socket.Id, parameters.Uri.ToString());
_socket.OnRequestSent += HandleRequestSentAsync;
_socket.OnRequestRateLimited += HandleRequestRateLimitedAsync;
_socket.OnConnectRateLimited += HandleConnectRateLimitedAsync;
_socket.OnOpen += HandleOpenAsync;
_socket.OnClose += HandleCloseAsync;
_socket.OnReconnecting += HandleReconnectingAsync;
_socket.OnReconnected += HandleReconnectedAsync;
_socket.OnError += HandleErrorAsync;
_socket.GetReconnectionUrl = GetReconnectionUrlAsync;
_listeners = new List();
_serializer = apiClient.CreateSerializer();
}
///
/// Handler for a socket opening
///
protected virtual Task HandleOpenAsync()
{
Status = SocketStatus.Connected;
PausedActivity = false;
return Task.CompletedTask;
}
///
/// Handler for a socket closing without reconnect
///
protected virtual Task HandleCloseAsync()
{
Status = SocketStatus.Closed;
Authenticated = false;
_lastSequenceNumber = 0;
if (ApiClient._socketConnections.ContainsKey(SocketId))
ApiClient._socketConnections.TryRemove(SocketId, out _);
lock (_listenersLock)
{
foreach (var subscription in _listeners.OfType().Where(l => l.UserSubscription && !l.IsClosingConnection))
{
subscription.IsClosingConnection = true;
subscription.Reset();
}
foreach (var query in _listeners.OfType().ToList())
{
query.Fail(new WebError("Connection interrupted"));
_listeners.Remove(query);
}
}
_ = Task.Run(() => ConnectionClosed?.Invoke());
return Task.CompletedTask;
}
///
/// Handler for a socket losing connection and starting reconnect
///
protected virtual Task HandleReconnectingAsync()
{
Status = SocketStatus.Reconnecting;
DisconnectTime = DateTime.UtcNow;
Authenticated = false;
_lastSequenceNumber = 0;
lock (_listenersLock)
{
foreach (var subscription in _listeners.OfType().Where(l => l.UserSubscription))
subscription.Reset();
foreach (var query in _listeners.OfType().ToList())
{
query.Fail(new WebError("Connection interrupted"));
_listeners.Remove(query);
}
}
_ = Task.Run(() => ConnectionLost?.Invoke());
return Task.CompletedTask;
}
///
/// Get the url to connect to when reconnecting
///
///
protected virtual async Task GetReconnectionUrlAsync()
{
return await ApiClient.GetReconnectUriAsync(this).ConfigureAwait(false);
}
///
/// Handler for a socket which has reconnected
///
protected virtual Task HandleReconnectedAsync()
{
Status = SocketStatus.Resubscribing;
lock (_listenersLock)
{
foreach (var query in _listeners.OfType().ToList())
{
query.Fail(new WebError("Connection interrupted"));
_listeners.Remove(query);
}
}
// Can't wait for this as it would cause a deadlock
_ = Task.Run(async () =>
{
try
{
var reconnectSuccessful = await ProcessReconnectAsync().ConfigureAwait(false);
if (!reconnectSuccessful)
{
_logger.FailedReconnectProcessing(SocketId, reconnectSuccessful.Error!.ToString());
_ = Task.Run(() => ResubscribingFailed?.Invoke(reconnectSuccessful.Error));
_ = _socket.ReconnectAsync().ConfigureAwait(false);
}
else
{
Status = SocketStatus.Connected;
_ = Task.Run(() =>
{
ConnectionRestored?.Invoke(DateTime.UtcNow - DisconnectTime!.Value);
DisconnectTime = null;
});
}
}
catch (Exception ex)
{
_logger.UnknownExceptionWhileProcessingReconnection(SocketId, ex);
_ = _socket.ReconnectAsync().ConfigureAwait(false);
}
});
return Task.CompletedTask;
}
///
/// Handler for an error on a websocket
///
/// The exception
protected virtual Task HandleErrorAsync(Exception e)
{
if (e is WebSocketException wse)
_logger.WebSocketErrorCodeAndDetails(SocketId, wse.WebSocketErrorCode, wse.Message, wse);
else
_logger.WebSocketError(SocketId, e.Message, e);
return Task.CompletedTask;
}
///
/// Handler for whenever a request is rate limited and rate limit behavior is set to fail
///
///
///
protected virtual Task HandleRequestRateLimitedAsync(int requestId)
{
Query? query;
lock (_listenersLock)
{
query = _listeners.OfType().FirstOrDefault(x => x.Id == requestId);
}
if (query == null)
return Task.CompletedTask;
query.Fail(new ClientRateLimitError("Connection rate limit reached"));
return Task.CompletedTask;
}
///
/// Handler for whenever a connection was rate limited and couldn't be established
///
///
protected async virtual Task HandleConnectRateLimitedAsync()
{
if (ConnectRateLimitedAsync is not null)
await ConnectRateLimitedAsync().ConfigureAwait(false);
}
///
/// Handler for whenever a request is sent over the websocket
///
/// Id of the request sent
protected virtual Task HandleRequestSentAsync(int requestId)
{
Query? query;
lock (_listenersLock)
{
query = _listeners.OfType().FirstOrDefault(x => x.Id == requestId);
}
if (query == null)
return Task.CompletedTask;
query.IsSend(query.RequestTimeout ?? ApiClient.ClientOptions.RequestTimeout);
return Task.CompletedTask;
}
///
/// Handle a message
///
protected internal virtual void HandleStreamMessage2(WebSocketMessageType type, ReadOnlySpan data)
{
var receiveTime = DateTime.UtcNow;
// 1. Decrypt/Preprocess if necessary
data = ApiClient.PreprocessStreamMessage(this, type, data);
ISocketMessageHandler messageConverter;
if (type == WebSocketMessageType.Binary)
messageConverter = _byteMessageConverter ??= ApiClient.CreateMessageConverter(type);
else
messageConverter = _textMessageConverter ??= ApiClient.CreateMessageConverter(type);
string? originalData = null;
if (ApiClient.ApiOptions.OutputOriginalData ?? ApiClient.ClientOptions.OutputOriginalData)
{
#if NETSTANDARD2_0
originalData = Encoding.UTF8.GetString(data.ToArray());
#else
originalData = Encoding.UTF8.GetString(data);
#endif
if (_logger.IsEnabled(LogLevel.Trace))
_logger.ReceivedData(SocketId, originalData);
}
var typeIdentifier = messageConverter.GetTypeIdentifier(data, type);
if (typeIdentifier == null)
{
// Both deserialization type and identifier null, can't process
_logger.LogWarning("Failed to evaluate message. Data: {Message}", Encoding.UTF8.GetString(data.ToArray()));
return;
}
Type? deserializationType = null;
lock (_listenersLock)
{
foreach (var subscription in _listeners)
{
foreach (var route in subscription.MessageRouter.Routes)
{
if (!route.TypeIdentifier.Equals(typeIdentifier, StringComparison.Ordinal))
continue;
deserializationType = route.DeserializationType;
break;
}
if (deserializationType != null)
break;
}
}
if (deserializationType == null)
{
if (!ApiClient.HandleUnhandledMessage(this, typeIdentifier, data))
{
// No handler found for identifier either, can't process
_logger.LogWarning("Failed to determine message type for identifier {Identifier}. Data: {Message}", typeIdentifier, Encoding.UTF8.GetString(data.ToArray()));
}
return;
}
object result;
try
{
if (deserializationType == typeof(string))
{
#if NETSTANDARD2_0
result = Encoding.UTF8.GetString(data.ToArray());
#else
result = Encoding.UTF8.GetString(data);
#endif
}
else
{
result = messageConverter.Deserialize(data, deserializationType);
}
}
catch(Exception ex)
{
_logger.LogWarning(ex, "Deserialization failed. Data: {Message}", Encoding.UTF8.GetString(data.ToArray()));
return;
}
if (result == null)
{
// Deserialize error
_logger.LogWarning("Deserialization returned null. Data: {Message}", Encoding.UTF8.GetString(data.ToArray()));
return;
}
var topicFilter = messageConverter.GetTopicFilter(result);
bool processed = false;
lock (_listenersLock)
{
var currentCount = _listeners.Count;
for(var i = 0; i < _listeners.Count; i++)
{
if (_listeners.Count != currentCount)
{
// Possible a query added or removed. If added it's not a problem, if removed it is
if (_listeners.Count < currentCount)
throw new Exception("Listeners list adjusted, can't continue processing");
}
var processor = _listeners[i];
bool isQuery = false;
Query? query = null;
if (processor is Query cquery)
{
isQuery = true;
query = cquery;
}
var complete = false;
foreach (var route in processor.MessageRouter.Routes)
{
if (route.TypeIdentifier != typeIdentifier)
continue;
if (topicFilter == null
|| route.TopicFilter == null
|| route.TopicFilter.Equals(topicFilter, StringComparison.Ordinal))
{
processed = true;
if (isQuery && query!.Completed)
continue;
processor.Handle(this, receiveTime, originalData, result, route);
if (isQuery && !route.MultipleReaders)
{
complete = true;
break;
}
}
}
if (complete)
break;
}
}
if (!processed)
{
lock (_listenersLock)
{
_logger.ReceivedMessageNotMatchedToAnyListener(SocketId, topicFilter!,
string.Join(",", _listeners.Select(x => string.Join(",", x.MessageRouter.Routes.Select(x => x.TopicFilter != null ? string.Join(",", x.TopicFilter) : "[null]")))));
}
}
}
///
/// Connect the websocket
///
///
public async Task ConnectAsync(CancellationToken ct) => await _socket.ConnectAsync(ct).ConfigureAwait(false);
///
/// Retrieve the underlying socket
///
///
public IWebsocket GetSocket() => _socket;
///
/// Trigger a reconnect of the socket connection
///
///
public async Task TriggerReconnectAsync() => await _socket.ReconnectAsync().ConfigureAwait(false);
///
/// Update the proxy setting and reconnect
///
/// New proxy setting
public async Task UpdateProxy(ApiProxy? proxy)
{
_socket.UpdateProxy(proxy);
await TriggerReconnectAsync().ConfigureAwait(false);
}
///
/// Close the connection
///
///
public async Task CloseAsync()
{
if (Status == SocketStatus.Closed || Status == SocketStatus.Disposed)
return;
if (ApiClient._socketConnections.ContainsKey(SocketId))
ApiClient._socketConnections.TryRemove(SocketId, out _);
lock (_listenersLock)
{
foreach (var subscription in _listeners.OfType())
{
if (subscription.CancellationTokenRegistration.HasValue)
subscription.CancellationTokenRegistration.Value.Dispose();
}
}
await _socket.CloseAsync().ConfigureAwait(false);
_socket.Dispose();
}
///
/// Close a subscription on this connection. If all subscriptions on this connection are closed the connection gets closed as well
///
/// Subscription to close
///
public async Task CloseAsync(Subscription subscription)
{
// If we are resubscribing this subscription at this moment we'll want to wait for a bit until it is finished to avoid concurrency issues
while (subscription.Status == SubscriptionStatus.Subscribing)
await Task.Delay(50).ConfigureAwait(false);
subscription.Status = SubscriptionStatus.Closing;
if (Status == SocketStatus.Closing || Status == SocketStatus.Closed || Status == SocketStatus.Disposed)
{
subscription.Status = SubscriptionStatus.Closed;
return;
}
_logger.ClosingSubscription(SocketId, subscription.Id);
if (subscription.CancellationTokenRegistration.HasValue)
subscription.CancellationTokenRegistration.Value.Dispose();
bool anyDuplicateSubscription;
lock (_listenersLock)
anyDuplicateSubscription = _listeners.OfType().Any(x => x != subscription && x.MessageRouter.Routes.All(l => subscription.MessageRouter.ContainsCheck(l)));
bool shouldCloseConnection;
lock (_listenersLock)
shouldCloseConnection = _listeners.OfType().All(r => !r.UserSubscription || r.Status == SubscriptionStatus.Closing || r.Status == SubscriptionStatus.Closed) && !DedicatedRequestConnection.IsDedicatedRequestConnection;
if (!anyDuplicateSubscription)
{
bool needUnsub;
lock (_listenersLock)
needUnsub = _listeners.Contains(subscription) && !shouldCloseConnection;
if (needUnsub && _socket.IsOpen)
await UnsubscribeAsync(subscription).ConfigureAwait(false);
}
else
{
_logger.NotUnsubscribingSubscriptionBecauseDuplicateRunning(SocketId);
}
if (Status == SocketStatus.Closing)
{
subscription.Status = SubscriptionStatus.Closed;
_logger.AlreadyClosing(SocketId);
return;
}
if (shouldCloseConnection)
{
Status = SocketStatus.Closing;
subscription.IsClosingConnection = true;
_logger.ClosingNoMoreSubscriptions(SocketId);
await CloseAsync().ConfigureAwait(false);
}
lock (_listenersLock)
_listeners.Remove(subscription);
subscription.Status = SubscriptionStatus.Closed;
}
///
/// Dispose the connection
///
public void Dispose()
{
Status = SocketStatus.Disposed;
periodicEvent?.Set();
_socket.Dispose();
}
///
/// Add a subscription to this connection
///
///
public bool AddSubscription(Subscription subscription)
{
if (Status != SocketStatus.None && Status != SocketStatus.Connected)
return false;
lock (_listenersLock)
_listeners.Add(subscription);
if (subscription.UserSubscription)
_logger.AddingNewSubscription(SocketId, subscription.Id, UserSubscriptionCount);
return true;
}
///
/// Get a subscription on this connection by id
///
///
public Subscription? GetSubscription(int id)
{
lock (_listenersLock)
return _listeners.OfType().SingleOrDefault(s => s.Id == id);
}
///
/// Get the state of the connection
///
///
public SocketConnectionState GetState(bool includeSubDetails)
{
return new SocketConnectionState(
SocketId,
ConnectionUri.AbsoluteUri,
UserSubscriptionCount,
Status,
Authenticated,
IncomingKbps,
PendingQueries: _listeners.OfType().Count(x => !x.Completed),
includeSubDetails ? Subscriptions.Select(sub => sub.GetState()).ToList() : new List()
);
}
///
/// Send a query request and wait for an answer
///
/// Query to send
/// Cancellation token
///
public virtual async Task SendAndWaitQueryAsync(Query query, CancellationToken ct = default)
{
await SendAndWaitIntAsync(query, ct).ConfigureAwait(false);
return query.Result ?? new CallResult(new TimeoutError());
}
///
/// Send a query request and wait for an answer
///
/// Expected result type
/// Query to send
/// Cancellation token
///
public virtual async Task> SendAndWaitQueryAsync(Query query, CancellationToken ct = default)
{
await SendAndWaitIntAsync(query, ct).ConfigureAwait(false);
return query.TypedResult ?? new CallResult(new TimeoutError());
}
private async Task SendAndWaitIntAsync(Query query, CancellationToken ct = default)
{
lock (_listenersLock)
_listeners.Add(query);
var sendResult = await SendAsync(query.Id, query.Request, query.Weight).ConfigureAwait(false);
if (!sendResult)
{
query.Fail(sendResult.Error!);
lock (_listenersLock)
_listeners.Remove(query);
return;
}
try
{
while (!ct.IsCancellationRequested)
{
if (!_socket.IsOpen)
{
query.Fail(new WebError("Socket not open"));
return;
}
if (query.Completed)
return;
await query.WaitAsync(TimeSpan.FromMilliseconds(500), ct).ConfigureAwait(false);
if (query.Completed)
return;
}
if (ct.IsCancellationRequested)
{
query.Fail(new CancellationRequestedError());
return;
}
}
finally
{
lock (_listenersLock)
_listeners.Remove(query);
}
}
///
/// Send data over the websocket connection
///
/// The type of the object to send
/// The request id
/// The object to send
/// The weight of the message
public virtual ValueTask SendAsync(int requestId, T obj, int weight)
{
if (_serializer is IByteMessageSerializer byteSerializer)
{
return SendBytesAsync(requestId, byteSerializer.Serialize(obj), weight);
}
else if (_serializer is IStringMessageSerializer stringSerializer)
{
if (obj is string str)
return SendStringAsync(requestId, str, weight);
str = stringSerializer.Serialize(obj);
return SendAsync(requestId, str, weight);
}
throw new Exception("Unknown serializer when sending message");
}
///
/// Send byte data over the websocket connection
///
/// The data to send
/// The weight of the message
/// The id of the request
public virtual async ValueTask SendBytesAsync(int requestId, byte[] data, int weight)
{
if (ApiClient.MessageSendSizeLimit != null && data.Length > ApiClient.MessageSendSizeLimit.Value)
{
var info = $"Message to send exceeds the max server message size ({data.Length} vs {ApiClient.MessageSendSizeLimit.Value} bytes). Split the request into batches to keep below this limit";
_logger.LogWarning("[Sckt {SocketId}] [Req {RequestId}] {Info}", SocketId, requestId, info);
return new CallResult(new InvalidOperationError(info));
}
if (!_socket.IsOpen)
{
_logger.LogWarning("[Sckt {SocketId}] [Req {RequestId}] failed to send, socket no longer open", SocketId, requestId);
return new CallResult(new WebError("Failed to send message, socket no longer open"));
}
_logger.SendingByteData(SocketId, requestId, data.Length);
try
{
if (!_socket.Send(requestId, data, weight))
return new CallResult(new WebError("Failed to send message, connection not open"));
return CallResult.SuccessResult;
}
catch (Exception ex)
{
return new CallResult(new WebError("Failed to send message: " + ex.Message, exception: ex));
}
}
///
/// Send string data over the websocket connection
///
/// The data to send
/// The weight of the message
/// The id of the request
public virtual async ValueTask SendStringAsync(int requestId, string data, int weight)
{
if (ApiClient.MessageSendSizeLimit != null && data.Length > ApiClient.MessageSendSizeLimit.Value)
{
var info = $"Message to send exceeds the max server message size ({data.Length} vs {ApiClient.MessageSendSizeLimit.Value} bytes). Split the request into batches to keep below this limit";
_logger.LogWarning("[Sckt {SocketId}] [Req {RequestId}] {Info}", SocketId, requestId, info);
return new CallResult(new InvalidOperationError(info));
}
if (!_socket.IsOpen)
{
_logger.LogWarning("[Sckt {SocketId}] [Req {RequestId}] failed to send, socket no longer open", SocketId, requestId);
return new CallResult(new WebError("Failed to send message, socket no longer open"));
}
_logger.SendingData(SocketId, requestId, data);
try
{
if (!_socket.Send(requestId, data, weight))
return new CallResult(new WebError("Failed to send message, connection not open"));
return CallResult.SuccessResult;
}
catch (Exception ex)
{
return new CallResult(new WebError("Failed to send message: " + ex.Message, exception: ex));
}
}
private async Task ProcessReconnectAsync()
{
if (!_socket.IsOpen)
return new CallResult(new WebError("Socket not connected"));
if (!DedicatedRequestConnection.IsDedicatedRequestConnection)
{
bool anySubscriptions;
lock (_listenersLock)
anySubscriptions = _listeners.OfType().Any(s => s.UserSubscription);
if (!anySubscriptions)
{
// No need to resubscribe anything
_logger.NothingToResubscribeCloseConnection(SocketId);
_ = _socket.CloseAsync();
return CallResult.SuccessResult;
}
}
bool anyAuthenticated;
lock (_listenersLock)
{
anyAuthenticated = _listeners.OfType().Any(s => s.Authenticated)
|| DedicatedRequestConnection.IsDedicatedRequestConnection && DedicatedRequestConnection.Authenticated;
}
if (anyAuthenticated)
{
// If we reconnected a authenticated connection we need to re-authenticate
var authResult = await ApiClient.AuthenticateSocketAsync(this).ConfigureAwait(false);
if (!authResult)
{
_logger.FailedAuthenticationDisconnectAndRecoonect(SocketId);
return authResult;
}
Authenticated = true;
_logger.AuthenticationSucceeded(SocketId);
}
// Foreach subscription which is subscribed by a subscription request we will need to resend that request to resubscribe
int batch = 0;
int batchSize = ApiClient.ClientOptions.MaxConcurrentResubscriptionsPerSocket;
while (true)
{
if (!_socket.IsOpen)
return new CallResult(new WebError("Socket not connected"));
List subList;
lock (_listenersLock)
subList = _listeners.OfType().Where(x => x.Active).Skip(batch * batchSize).Take(batchSize).ToList();
if (subList.Count == 0)
break;
var taskList = new List>();
foreach (var subscription in subList)
{
subscription.ConnectionInvocations = 0;
if (!subscription.Active)
// Can be closed during resubscribing
continue;
subscription.Status = SubscriptionStatus.Subscribing;
var result = await ApiClient.RevitalizeRequestAsync(subscription).ConfigureAwait(false);
if (!result)
{
_logger.FailedRequestRevitalization(SocketId, result.Error?.ToString());
subscription.Status = SubscriptionStatus.Pending;
return result;
}
var subQuery = subscription.CreateSubscriptionQuery(this);
if (subQuery == null)
{
subscription.Status = SubscriptionStatus.Subscribed;
continue;
}
subQuery.OnComplete = () =>
{
subscription.Status = subQuery.Result!.Success ? SubscriptionStatus.Subscribed : SubscriptionStatus.Pending;
subscription.HandleSubQueryResponse(this, subQuery.Response);
};
taskList.Add(SendAndWaitQueryAsync(subQuery));
if (!subQuery.ExpectsResponse)
{
// If there won't be an answer we can immediately set this
subscription.Status = SubscriptionStatus.Subscribed;
subscription.HandleSubQueryResponse(this, null);
}
}
await Task.WhenAll(taskList).ConfigureAwait(false);
if (taskList.Any(t => !t.Result.Success))
return taskList.First(t => !t.Result.Success).Result;
batch++;
}
if (!_socket.IsOpen)
return new CallResult(new WebError("Socket not connected"));
_logger.AllSubscriptionResubscribed(SocketId);
return CallResult.SuccessResult;
}
internal async Task UnsubscribeAsync(Subscription subscription)
{
var unsubscribeRequest = subscription.CreateUnsubscriptionQuery(this);
if (unsubscribeRequest == null)
return;
await SendAndWaitQueryAsync(unsubscribeRequest).ConfigureAwait(false);
subscription.HandleUnsubQueryResponse(this, unsubscribeRequest.Response);
_logger.SubscriptionUnsubscribed(SocketId, subscription.Id);
}
internal async Task ResubscribeAsync(Subscription subscription)
{
if (!_socket.IsOpen)
return new CallResult(new WebError("Socket is not connected"));
var subQuery = subscription.CreateSubscriptionQuery(this);
if (subQuery == null)
return CallResult.SuccessResult;
var result = await SendAndWaitQueryAsync(subQuery).ConfigureAwait(false);
subscription.HandleSubQueryResponse(this, subQuery.Response);
return result;
}
///
/// Update the sequence number for this connection
///
public void UpdateSequenceNumber(long sequenceNumber)
{
if (ApiClient.EnforceSequenceNumbers
&& _lastSequenceNumber != 0 // Initial value is 0
&& _lastSequenceNumber != sequenceNumber // When there are multiple listeners for the same message it's possible this gets recorded multiple times, shouldn't be an issue
&& _lastSequenceNumber + 1 != sequenceNumber) // Expected value
{
// Not sequential
_logger.LogWarning("[Sckt {SocketId}] update not in sequence. Last recorded sequence number: {LastSequence}, update sequence number: {UpdateSequence}. Reconnecting", SocketId, _lastSequenceNumber, sequenceNumber);
_ = TriggerReconnectAsync();
}
_lastSequenceNumber = sequenceNumber;
}
///
/// Periodically sends data over a socket connection
///
/// Identifier for the periodic send
/// How often
/// Method returning the query to send
/// The callback for processing the response
public virtual void QueryPeriodic(string identifier, TimeSpan interval, Func queryDelegate, Action? callback)
{
if (queryDelegate == null)
throw new ArgumentNullException(nameof(queryDelegate));
periodicEvent = new AsyncResetEvent();
periodicTask = Task.Run(async () =>
{
while (Status != SocketStatus.Disposed
&& Status != SocketStatus.Closed
&& Status != SocketStatus.Closing)
{
await periodicEvent.WaitAsync(interval).ConfigureAwait(false);
if (Status == SocketStatus.Disposed
|| Status == SocketStatus.Closed
|| Status == SocketStatus.Closing)
{
break;
}
if (!Connected)
continue;
var query = queryDelegate(this);
if (query == null)
continue;
_logger.SendingPeriodic(SocketId, identifier);
try
{
var result = await SendAndWaitQueryAsync(query).ConfigureAwait(false);
callback?.Invoke(this, result);
}
catch (Exception ex)
{
_logger.PeriodicSendFailed(SocketId, identifier, ex.Message, ex);
}
}
});
}
}
}