using CryptoExchange.Net.Interfaces;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using CryptoExchange.Net.Objects;
using System.Net.WebSockets;
using CryptoExchange.Net.Objects.Sockets;
using System.Diagnostics;
using CryptoExchange.Net.Clients;
using CryptoExchange.Net.Logging.Extensions;
using System.Threading;
using CryptoExchange.Net.Objects.Options;
using CryptoExchange.Net.Authentication;
namespace CryptoExchange.Net.Sockets
{
///
/// A single socket connection to the server
///
public class SocketConnection
{
///
/// State of a the connection
///
/// The id of the socket connection
/// The connection URI
/// Number of subscriptions on this socket
/// Socket status
/// If the connection is authenticated
/// Download speed over this socket
/// Number of non-completed queries
/// State for each subscription on this socket
public record SocketConnectionState(
int Id,
string Address,
int Subscriptions,
SocketStatus Status,
bool Authenticated,
double DownloadSpeed,
int PendingQueries,
List SubscriptionStates
);
///
/// Connection lost event
///
public event Action? ConnectionLost;
///
/// Connection closed and no reconnect is happening
///
public event Action? ConnectionClosed;
///
/// Failed to resubscribe all subscription on the reconnected socket
///
public event Action? ResubscribingFailed;
///
/// Connecting restored event
///
public event Action? ConnectionRestored;
///
/// The connection is paused event
///
public event Action? ActivityPaused;
///
/// The connection is unpaused event
///
public event Action? ActivityUnpaused;
///
/// Unhandled message event
///
public event Action? UnhandledMessage;
///
/// Connection was rate limited and couldn't be established
///
public Func? ConnectRateLimitedAsync;
///
/// The amount of subscriptions on this connection
///
public int UserSubscriptionCount
{
get
{
lock(_listenersLock)
return _listeners.OfType().Count(h => h.UserSubscription);
}
}
///
/// Get a copy of the current message subscriptions
///
public Subscription[] Subscriptions
{
get
{
lock(_listenersLock)
return _listeners.OfType().Where(h => h.UserSubscription).ToArray();
}
}
///
/// If the connection has been authenticated
///
public bool Authenticated { get; set; }
///
/// If connection is made
///
public bool Connected => _socket.IsOpen;
///
/// The unique ID of the socket
///
public int SocketId => _socket.Id;
///
/// The current kilobytes per second of data being received, averaged over the last 3 seconds
///
public double IncomingKbps => _socket.IncomingKbps;
///
/// The connection uri
///
public Uri ConnectionUri => _socket.Uri;
///
/// The API client the connection is for
///
public SocketApiClient ApiClient { get; set; }
///
/// Time of disconnecting
///
public DateTime? DisconnectTime { get; set; }
///
/// Tag for identification
///
public string Tag { get; set; }
///
/// Additional properties for this connection
///
public Dictionary Properties { get; set; }
///
/// If activity is paused
///
public bool PausedActivity
{
get => _pausedActivity;
set
{
if (_pausedActivity != value)
{
_pausedActivity = value;
_logger.ActivityPaused(SocketId, value);
if(_pausedActivity) _ = Task.Run(() => ActivityPaused?.Invoke());
else _ = Task.Run(() => ActivityUnpaused?.Invoke());
}
}
}
///
/// Status of the socket connection
///
public SocketStatus Status
{
get => _status;
private set
{
if (_status == value)
return;
var oldStatus = _status;
_status = value;
_logger.SocketStatusChanged(SocketId, oldStatus, value);
}
}
///
/// Info on whether this connection is a dedicated request connection
///
public DedicatedConnectionState DedicatedRequestConnection { get; internal set; } = new DedicatedConnectionState();
///
/// Current subscription topics on this connection
///
public string[] Topics
{
get
{
lock (_listenersLock)
return _listeners.OfType().Select(x => x.Topic).Where(t => t != null).ToArray()!;
}
}
private bool _pausedActivity;
private readonly object _listenersLock;
private readonly List _listeners;
private readonly ILogger _logger;
private SocketStatus _status;
private readonly IMessageSerializer _serializer;
private readonly IByteMessageAccessor _accessor;
///
/// The task that is sending periodic data on the websocket. Can be used for sending Ping messages every x seconds or similar. Not necessary.
///
protected Task? periodicTask;
///
/// Wait event for the periodicTask
///
protected AsyncResetEvent? periodicEvent;
///
/// The underlying websocket
///
private readonly IWebsocket _socket;
///
/// New socket connection
///
/// The logger
/// The api client
/// The socket
///
public SocketConnection(ILogger logger, SocketApiClient apiClient, IWebsocket socket, string tag)
{
_logger = logger;
ApiClient = apiClient;
Tag = tag;
Properties = new Dictionary();
_socket = socket;
_socket.OnStreamMessage += HandleStreamMessage;
_socket.OnRequestSent += HandleRequestSentAsync;
_socket.OnRequestRateLimited += HandleRequestRateLimitedAsync;
_socket.OnConnectRateLimited += HandleConnectRateLimitedAsync;
_socket.OnOpen += HandleOpenAsync;
_socket.OnClose += HandleCloseAsync;
_socket.OnReconnecting += HandleReconnectingAsync;
_socket.OnReconnected += HandleReconnectedAsync;
_socket.OnError += HandleErrorAsync;
_socket.GetReconnectionUrl = GetReconnectionUrlAsync;
_listenersLock = new object();
_listeners = new List();
_serializer = apiClient.CreateSerializer();
_accessor = apiClient.CreateAccessor();
}
///
/// Handler for a socket opening
///
protected virtual Task HandleOpenAsync()
{
Status = SocketStatus.Connected;
PausedActivity = false;
return Task.CompletedTask;
}
///
/// Handler for a socket closing without reconnect
///
protected virtual Task HandleCloseAsync()
{
Status = SocketStatus.Closed;
Authenticated = false;
lock (_listenersLock)
{
foreach (var subscription in _listeners.OfType().Where(l => l.UserSubscription))
subscription.Reset();
foreach (var query in _listeners.OfType().ToList())
{
query.Fail(new WebError("Connection interrupted"));
_listeners.Remove(query);
}
}
_ = Task.Run(() => ConnectionClosed?.Invoke());
return Task.CompletedTask;
}
///
/// Handler for a socket losing connection and starting reconnect
///
protected virtual Task HandleReconnectingAsync()
{
Status = SocketStatus.Reconnecting;
DisconnectTime = DateTime.UtcNow;
Authenticated = false;
lock (_listenersLock)
{
foreach (var subscription in _listeners.OfType().Where(l => l.UserSubscription))
subscription.Reset();
foreach (var query in _listeners.OfType().ToList())
{
query.Fail(new WebError("Connection interrupted"));
_listeners.Remove(query);
}
}
_ = Task.Run(() => ConnectionLost?.Invoke());
return Task.CompletedTask;
}
///
/// Get the url to connect to when reconnecting
///
///
protected virtual async Task GetReconnectionUrlAsync()
{
return await ApiClient.GetReconnectUriAsync(this).ConfigureAwait(false);
}
///
/// Handler for a socket which has reconnected
///
protected virtual Task HandleReconnectedAsync()
{
Status = SocketStatus.Resubscribing;
lock (_listenersLock)
{
foreach (var query in _listeners.OfType().ToList())
{
query.Fail(new WebError("Connection interrupted"));
_listeners.Remove(query);
}
}
// Can't wait for this as it would cause a deadlock
_ = Task.Run(async () =>
{
try
{
var reconnectSuccessful = await ProcessReconnectAsync().ConfigureAwait(false);
if (!reconnectSuccessful)
{
_logger.FailedReconnectProcessing(SocketId, reconnectSuccessful.Error!.ToString());
_ = Task.Run(() => ResubscribingFailed?.Invoke(reconnectSuccessful.Error));
_ = _socket.ReconnectAsync().ConfigureAwait(false);
}
else
{
Status = SocketStatus.Connected;
_ = Task.Run(() =>
{
ConnectionRestored?.Invoke(DateTime.UtcNow - DisconnectTime!.Value);
DisconnectTime = null;
});
}
}
catch(Exception ex)
{
_logger.UnknownExceptionWhileProcessingReconnection(SocketId, ex);
_ = _socket.ReconnectAsync().ConfigureAwait(false);
}
});
return Task.CompletedTask;
}
///
/// Handler for an error on a websocket
///
/// The exception
protected virtual Task HandleErrorAsync(Exception e)
{
if (e is WebSocketException wse)
_logger.WebSocketErrorCodeAndDetails(SocketId, wse.WebSocketErrorCode, wse.Message, wse);
else
_logger.WebSocketError(SocketId, e.Message, e);
return Task.CompletedTask;
}
///
/// Handler for whenever a request is rate limited and rate limit behavior is set to fail
///
///
///
protected virtual Task HandleRequestRateLimitedAsync(int requestId)
{
Query? query;
lock (_listenersLock)
{
query = _listeners.OfType().FirstOrDefault(x => x.Id == requestId);
}
if (query == null)
return Task.CompletedTask;
query.Fail(new ClientRateLimitError("Connection rate limit reached"));
return Task.CompletedTask;
}
///
/// Handler for whenever a connection was rate limited and couldn't be established
///
///
protected async virtual Task HandleConnectRateLimitedAsync()
{
if (ConnectRateLimitedAsync is not null)
await ConnectRateLimitedAsync().ConfigureAwait(false);
}
///
/// Handler for whenever a request is sent over the websocket
///
/// Id of the request sent
protected virtual Task HandleRequestSentAsync(int requestId)
{
Query? query;
lock (_listenersLock)
{
query = _listeners.OfType().FirstOrDefault(x => x.Id == requestId);
}
if (query == null)
{
_logger.MessageSentNotPending(SocketId, requestId);
return Task.CompletedTask;
}
query.IsSend(query.RequestTimeout ?? ApiClient.ClientOptions.RequestTimeout);
return Task.CompletedTask;
}
///
/// Handle a message
///
///
///
///
protected virtual async Task HandleStreamMessage(WebSocketMessageType type, ReadOnlyMemory data)
{
var sw = Stopwatch.StartNew();
var receiveTime = DateTime.UtcNow;
string? originalData = null;
// 1. Decrypt/Preprocess if necessary
data = ApiClient.PreprocessStreamMessage(this, type, data);
// 2. Read data into accessor
_accessor.Read(data);
try
{
bool outputOriginalData = ApiClient.ApiOptions.OutputOriginalData ?? ApiClient.ClientOptions.OutputOriginalData;
if (outputOriginalData)
{
originalData = _accessor.GetOriginalString();
_logger.ReceivedData(SocketId, originalData);
}
// 3. Determine the identifying properties of this message
var listenId = ApiClient.GetListenerIdentifier(_accessor);
if (listenId == null)
{
originalData = outputOriginalData ? _accessor.GetOriginalString() : "[OutputOriginalData is false]";
if (!ApiClient.UnhandledMessageExpected)
_logger.FailedToEvaluateMessage(SocketId, originalData);
UnhandledMessage?.Invoke(_accessor);
return;
}
// 4. Get the listeners interested in this message
List processors;
lock (_listenersLock)
processors = _listeners.Where(s => s.ListenerIdentifiers.Contains(listenId)).ToList();
if (processors.Count == 0)
{
if (!ApiClient.UnhandledMessageExpected)
{
List listenerIds;
lock (_listenersLock)
listenerIds = _listeners.SelectMany(l => l.ListenerIdentifiers).ToList();
_logger.ReceivedMessageNotMatchedToAnyListener(SocketId, listenId, string.Join(",", listenerIds));
UnhandledMessage?.Invoke(_accessor);
}
return;
}
_logger.ProcessorMatched(SocketId, processors.Count, listenId);
var totalUserTime = 0;
Dictionary? desCache = null;
if (processors.Count > 1)
{
// Only instantiate a cache if there are multiple processors
desCache = new Dictionary();
}
foreach (var processor in processors)
{
// 5. Determine the type to deserialize to for this processor
var messageType = processor.GetMessageType(_accessor);
if (messageType == null)
{
_logger.ReceivedMessageNotRecognized(SocketId, processor.Id);
continue;
}
if (processor is Subscription subscriptionProcessor && !subscriptionProcessor.Confirmed)
{
// If this message is for this listener then it is automatically confirmed, even if the subscription is not (yet) confirmed
subscriptionProcessor.Confirmed = true;
// This doesn't trigger a waiting subscribe query, should probably also somehow set the wait event for that
}
// 6. Deserialize the message
object? deserialized = null;
desCache?.TryGetValue(messageType, out deserialized);
if (deserialized == null)
{
var desResult = processor.Deserialize(_accessor, messageType);
if (!desResult)
{
_logger.FailedToDeserializeMessage(SocketId, desResult.Error?.ToString(), desResult.Error?.Exception);
continue;
}
deserialized = desResult.Data;
desCache?.Add(messageType, deserialized);
}
// 7. Hand of the message to the subscription
try
{
var innerSw = Stopwatch.StartNew();
await processor.Handle(this, new DataEvent