1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-06-07 16:06:15 +00:00
This commit is contained in:
Jan Korf 2022-04-24 09:29:08 +02:00
parent 1739769f87
commit f514e172d7
7 changed files with 308 additions and 307 deletions

View File

@ -30,7 +30,7 @@ namespace CryptoExchange.Net
/// <summary>
/// List of socket connections currently connecting/connected
/// </summary>
protected internal ConcurrentDictionary<int, SocketConnection> sockets = new();
protected internal ConcurrentDictionary<int, SocketConnection> socketConnections = new();
/// <summary>
/// Semaphore used while creating sockets
/// </summary>
@ -85,10 +85,10 @@ namespace CryptoExchange.Net
{
get
{
if (!sockets.Any())
if (!socketConnections.Any())
return 0;
return sockets.Sum(s => s.Value.Socket.IncomingKbps);
return socketConnections.Sum(s => s.Value.IncomingKbps);
}
}
@ -205,7 +205,7 @@ namespace CryptoExchange.Net
if (socketConnection.PausedActivity)
{
log.Write(LogLevel.Warning, $"Socket {socketConnection.Socket.Id} has been paused, can't subscribe at this moment");
log.Write(LogLevel.Warning, $"Socket {socketConnection.SocketId} has been paused, can't subscribe at this moment");
return new CallResult<UpdateSubscription>( new ServerError("Socket is paused"));
}
@ -230,12 +230,12 @@ namespace CryptoExchange.Net
{
subscription.CancellationTokenRegistration = ct.Register(async () =>
{
log.Write(LogLevel.Information, $"Socket {socketConnection.Socket.Id} Cancellation token set, closing subscription");
log.Write(LogLevel.Information, $"Socket {socketConnection.SocketId} Cancellation token set, closing subscription");
await socketConnection.CloseAsync(subscription).ConfigureAwait(false);
}, false);
}
log.Write(LogLevel.Information, $"Socket {socketConnection.Socket.Id} subscription completed");
log.Write(LogLevel.Information, $"Socket {socketConnection.SocketId} subscription completed");
return new CallResult<UpdateSubscription>(new UpdateSubscription(socketConnection, subscription));
}
@ -317,7 +317,7 @@ namespace CryptoExchange.Net
if (socketConnection.PausedActivity)
{
log.Write(LogLevel.Warning, $"Socket {socketConnection.Socket.Id} has been paused, can't send query at this moment");
log.Write(LogLevel.Warning, $"Socket {socketConnection.SocketId} has been paused, can't send query at this moment");
return new CallResult<T>(new ServerError("Socket is paused"));
}
@ -368,7 +368,7 @@ namespace CryptoExchange.Net
if (!result)
{
await socket.CloseAsync().ConfigureAwait(false);
log.Write(LogLevel.Warning, $"Socket {socket.Socket.Id} authentication failed");
log.Write(LogLevel.Warning, $"Socket {socket.SocketId} authentication failed");
result.Error!.Message = "Authentication failed: " + result.Error.Message;
return new CallResult<bool>(result.Error);
}
@ -471,7 +471,7 @@ namespace CryptoExchange.Net
var desResult = Deserialize<T>(messageEvent.JsonData);
if (!desResult)
{
log.Write(LogLevel.Warning, $"Socket {connection.Socket.Id} Failed to deserialize data into type {typeof(T)}: {desResult.Error}");
log.Write(LogLevel.Warning, $"Socket {connection.SocketId} Failed to deserialize data into type {typeof(T)}: {desResult.Error}");
return;
}
@ -494,7 +494,7 @@ namespace CryptoExchange.Net
{
genericHandlers.Add(identifier, action);
var subscription = SocketSubscription.CreateForIdentifier(NextId(), identifier, false, action);
foreach (var connection in sockets.Values)
foreach (var connection in socketConnections.Values)
connection.AddSubscription(subscription);
}
@ -507,13 +507,13 @@ namespace CryptoExchange.Net
/// <returns></returns>
protected virtual SocketConnection GetSocketConnection(SocketApiClient apiClient, string address, bool authenticated)
{
var socketResult = sockets.Where(s => s.Value.Socket.Url.TrimEnd('/') == address.TrimEnd('/')
var socketResult = socketConnections.Where(s => s.Value.Uri.ToString().TrimEnd('/') == address.TrimEnd('/')
&& (s.Value.ApiClient.GetType() == apiClient.GetType())
&& (s.Value.Authenticated == authenticated || !authenticated) && s.Value.Connected).OrderBy(s => s.Value.SubscriptionCount).FirstOrDefault();
var result = socketResult.Equals(default(KeyValuePair<int, SocketConnection>)) ? null : socketResult.Value;
if (result != null)
{
if (result.SubscriptionCount < ClientOptions.SocketSubscriptionsCombineTarget || (sockets.Count >= MaxSocketConnections && sockets.All(s => s.Value.SubscriptionCount >= ClientOptions.SocketSubscriptionsCombineTarget)))
if (result.SubscriptionCount < ClientOptions.SocketSubscriptionsCombineTarget || (socketConnections.Count >= MaxSocketConnections && socketConnections.All(s => s.Value.SubscriptionCount >= ClientOptions.SocketSubscriptionsCombineTarget)))
{
// Use existing socket if it has less than target connections OR it has the least connections and we can't make new
return result;
@ -548,13 +548,13 @@ namespace CryptoExchange.Net
/// <returns></returns>
protected virtual async Task<CallResult<bool>> ConnectSocketAsync(SocketConnection socketConnection)
{
if (await socketConnection.Socket.ConnectAsync().ConfigureAwait(false))
if (await socketConnection.ConnectAsync().ConfigureAwait(false))
{
sockets.TryAdd(socketConnection.Socket.Id, socketConnection);
socketConnections.TryAdd(socketConnection.SocketId, socketConnection);
return new CallResult<bool>(true);
}
socketConnection.Socket.Dispose();
socketConnection.Dispose();
return new CallResult<bool>(new CantConnectError());
}
@ -605,19 +605,19 @@ namespace CryptoExchange.Net
if (disposing)
break;
foreach (var socket in sockets.Values)
foreach (var socket in socketConnections.Values)
{
if (disposing)
break;
if (!socket.Socket.IsOpen)
if (!socket.Connected)
continue;
var obj = objGetter(socket);
if (obj == null)
continue;
log.Write(LogLevel.Trace, $"Socket {socket.Socket.Id} sending periodic {identifier}");
log.Write(LogLevel.Trace, $"Socket {socket.SocketId} sending periodic {identifier}");
try
{
@ -625,7 +625,7 @@ namespace CryptoExchange.Net
}
catch (Exception ex)
{
log.Write(LogLevel.Warning, $"Socket {socket.Socket.Id} Periodic send {identifier} failed: " + ex.ToLogString());
log.Write(LogLevel.Warning, $"Socket {socket.SocketId} Periodic send {identifier} failed: " + ex.ToLogString());
}
}
}
@ -642,7 +642,7 @@ namespace CryptoExchange.Net
SocketSubscription? subscription = null;
SocketConnection? connection = null;
foreach(var socket in sockets.Values.ToList())
foreach(var socket in socketConnections.Values.ToList())
{
subscription = socket.GetSubscription(subscriptionId);
if (subscription != null)
@ -679,13 +679,13 @@ namespace CryptoExchange.Net
/// <returns></returns>
public virtual async Task UnsubscribeAllAsync()
{
log.Write(LogLevel.Information, $"Closing all {sockets.Sum(s => s.Value.SubscriptionCount)} subscriptions");
log.Write(LogLevel.Information, $"Closing all {socketConnections.Sum(s => s.Value.SubscriptionCount)} subscriptions");
await Task.Run(async () =>
{
var tasks = new List<Task>();
{
var socketList = sockets.Values;
var socketList = socketConnections.Values;
foreach (var sub in socketList)
tasks.Add(sub.CloseAsync());
}

View File

@ -41,10 +41,6 @@ namespace CryptoExchange.Net.Interfaces
/// </summary>
Encoding? Encoding { get; set; }
/// <summary>
/// Whether socket is in the process of reconnecting
/// </summary>
bool Reconnecting { get; set; }
/// <summary>
/// The max amount of outgoing messages per second
/// </summary>
int? RatelimitPerSecond { get; set; }
@ -61,9 +57,9 @@ namespace CryptoExchange.Net.Interfaces
/// </summary>
Func<string, string>? DataInterpreterString { get; set; }
/// <summary>
/// The url the socket connects to
/// The uri the socket connects to
/// </summary>
string Url { get; }
Uri Uri { get; }
/// <summary>
/// Whether the socket connection is closed
/// </summary>
@ -90,6 +86,7 @@ namespace CryptoExchange.Net.Interfaces
/// </summary>
/// <returns></returns>
Task<bool> ConnectAsync();
Task ProcessAsync();
/// <summary>
/// Send data
/// </summary>

View File

@ -483,7 +483,7 @@ namespace CryptoExchange.Net.OrderBook
/// <param name="timeout">Max wait time</param>
/// <param name="ct">Cancellation token</param>
/// <returns></returns>
protected async Task<CallResult<bool>> WaitForSetOrderBookAsync(int timeout, CancellationToken ct)
protected async Task<CallResult<bool>> WaitForSetOrderBookAsync(TimeSpan timeout, CancellationToken ct)
{
var startWait = DateTime.UtcNow;
while (!bookSet && Status == OrderBookStatus.Syncing)
@ -491,12 +491,12 @@ namespace CryptoExchange.Net.OrderBook
if(ct.IsCancellationRequested)
return new CallResult<bool>(new CancellationRequestedError());
if ((DateTime.UtcNow - startWait).TotalMilliseconds > timeout)
if (DateTime.UtcNow - startWait > timeout)
return new CallResult<bool>(new ServerError("Timeout while waiting for data"));
try
{
await Task.Delay(10, ct).ConfigureAwait(false);
await Task.Delay(50, ct).ConfigureAwait(false);
}
catch (OperationCanceledException)
{ }

View File

@ -26,17 +26,11 @@ namespace CryptoExchange.Net.Sockets
private static readonly object streamIdLock = new object();
private ClientWebSocket _socket;
private Task? _sendTask;
private Task? _receiveTask;
private Task? _timeoutTask;
private readonly AsyncResetEvent _sendEvent;
private readonly ConcurrentQueue<byte[]> _sendBuffer;
private readonly IDictionary<string, string> cookies;
private readonly IDictionary<string, string> headers;
private CancellationTokenSource _ctsSource;
private bool _closing;
private bool _startedSent;
private bool _startedReceive;
private readonly List<DateTime> _outgoingMessages;
private DateTime _lastReceivedMessagesUpdate;
@ -78,9 +72,6 @@ namespace CryptoExchange.Net.Sockets
/// <inheritdoc />
public string? Origin { get; set; }
/// <inheritdoc />
public bool Reconnecting { get; set; }
/// <summary>
/// The timestamp this socket has been active for the last time
/// </summary>
@ -97,13 +88,13 @@ namespace CryptoExchange.Net.Sockets
public Func<string, string>? DataInterpreterString { get; set; }
/// <inheritdoc />
public string Url { get; }
public Uri Uri { get; }
/// <inheritdoc />
public bool IsClosed => _socket.State == WebSocketState.Closed;
/// <inheritdoc />
public bool IsOpen => _socket.State == WebSocketState.Open && !_closing;
public bool IsOpen => _socket.State == WebSocketState.Open && !_ctsSource.IsCancellationRequested;
/// <summary>
/// Ssl protocols supported. NOT USED BY THIS IMPLEMENTATION
@ -179,8 +170,8 @@ namespace CryptoExchange.Net.Sockets
/// ctor
/// </summary>
/// <param name="log">The log object to use</param>
/// <param name="url">The url the socket should connect to</param>
public CryptoExchangeWebSocketClient(Log log, string url) : this(log, url, new Dictionary<string, string>(), new Dictionary<string, string>())
/// <param name="uri">The uri the socket should connect to</param>
public CryptoExchangeWebSocketClient(Log log, Uri uri) : this(log, uri, new Dictionary<string, string>(), new Dictionary<string, string>())
{
}
@ -188,14 +179,14 @@ namespace CryptoExchange.Net.Sockets
/// ctor
/// </summary>
/// <param name="log">The log object to use</param>
/// <param name="url">The url the socket should connect to</param>
/// <param name="uri">The uri the socket should connect to</param>
/// <param name="cookies">Cookies to sent in the socket connection request</param>
/// <param name="headers">Headers to sent in the socket connection request</param>
public CryptoExchangeWebSocketClient(Log log, string url, IDictionary<string, string> cookies, IDictionary<string, string> headers)
public CryptoExchangeWebSocketClient(Log log, Uri uri, IDictionary<string, string> cookies, IDictionary<string, string> headers)
{
Id = NextStreamId();
this.log = log;
Url = url;
Uri = uri;
this.cookies = cookies;
this.headers = headers;
@ -212,13 +203,16 @@ namespace CryptoExchange.Net.Sockets
/// <inheritdoc />
public virtual void SetProxy(ApiProxy proxy)
{
Uri.TryCreate($"{proxy.Host}:{proxy.Port}", UriKind.Absolute, out var uri);
if (!Uri.TryCreate($"{proxy.Host}:{proxy.Port}", UriKind.Absolute, out var uri))
throw new ArgumentException("Proxy settings invalid, {proxy.Host}:{proxy.Port} not a valid URI", nameof(proxy));
_socket.Options.Proxy = uri?.Scheme == null
? _socket.Options.Proxy = new WebProxy(proxy.Host, proxy.Port)
: _socket.Options.Proxy = new WebProxy
{
Address = uri
};
if (proxy.Login != null)
_socket.Options.Proxy.Credentials = new NetworkCredential(proxy.Login, proxy.Password);
}
@ -230,7 +224,7 @@ namespace CryptoExchange.Net.Sockets
try
{
using CancellationTokenSource tcs = new CancellationTokenSource(TimeSpan.FromSeconds(10));
await _socket.ConnectAsync(new Uri(Url), tcs.Token).ConfigureAwait(false);
await _socket.ConnectAsync(Uri, tcs.Token).ConfigureAwait(false);
Handle(openHandlers);
}
@ -240,34 +234,24 @@ namespace CryptoExchange.Net.Sockets
return false;
}
log.Write(LogLevel.Trace, $"Socket {Id} connection succeeded, starting communication");
_sendTask = Task.Factory.StartNew(SendLoopAsync, default, TaskCreationOptions.LongRunning | TaskCreationOptions.DenyChildAttach, TaskScheduler.Default).Unwrap();
_receiveTask = Task.Factory.StartNew(ReceiveLoopAsync, default, TaskCreationOptions.LongRunning | TaskCreationOptions.DenyChildAttach, TaskScheduler.Default).Unwrap();
if (Timeout != default)
_timeoutTask = Task.Run(CheckTimeoutAsync);
var sw = Stopwatch.StartNew();
while (!_startedSent || !_startedReceive)
{
// Wait for the tasks to have actually started
await Task.Delay(10).ConfigureAwait(false);
if(sw.ElapsedMilliseconds > 5000)
{
_ = _socket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "", default);
log.Write(LogLevel.Debug, $"Socket {Id} startup interupted");
return false;
}
}
log.Write(LogLevel.Debug, $"Socket {Id} connected to {Url}");
log.Write(LogLevel.Debug, $"Socket {Id} connected to {Uri}");
return true;
}
/// <inheritdoc />
public virtual async Task ProcessAsync()
{
var sendTask = SendLoopAsync();
var receiveTask = ReceiveLoopAsync();
var timeoutTask = Timeout != default ? CheckTimeoutAsync() : Task.CompletedTask;
log.Write(LogLevel.Debug, $"Socket {Id} processing started");
await Task.WhenAll(sendTask, receiveTask, timeoutTask).ConfigureAwait(false);
}
/// <inheritdoc />
public virtual void Send(string data)
{
if (_closing)
if (_ctsSource.IsCancellationRequested)
throw new InvalidOperationException($"Socket {Id} Can't send data when socket is not connected");
var bytes = _encoding.GetBytes(data);
@ -280,36 +264,21 @@ namespace CryptoExchange.Net.Sockets
public virtual async Task CloseAsync()
{
log.Write(LogLevel.Debug, $"Socket {Id} closing");
await CloseInternalAsync(true, true).ConfigureAwait(false);
await CloseInternalAsync().ConfigureAwait(false);
}
/// <summary>
/// Internal close method, will wait for each task to complete to gracefully close
/// Internal close method
/// </summary>
/// <param name="waitSend"></param>
/// <param name="waitReceive"></param>
/// <returns></returns>
private async Task CloseInternalAsync(bool waitSend, bool waitReceive)
private async Task CloseInternalAsync()
{
if (_closing)
return;
_closing = true;
var tasksToAwait = new List<Task>();
if (_socket.State == WebSocketState.Open)
tasksToAwait.Add(_socket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Closing", default));
_ctsSource.Cancel();
_sendEvent.Set();
if (waitSend)
tasksToAwait.Add(_sendTask!);
if (waitReceive)
tasksToAwait.Add(_receiveTask!);
if (_timeoutTask != null)
tasksToAwait.Add(_timeoutTask);
log.Write(LogLevel.Trace, $"Socket {Id} waiting for communication loops to finish");
await Task.WhenAll(tasksToAwait).ConfigureAwait(false);
if (_socket.State == WebSocketState.Open)
await _socket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Closing", default).ConfigureAwait(false);
log.Write(LogLevel.Debug, $"Socket {Id} closed");
Handle(closeHandlers);
}
@ -335,7 +304,6 @@ namespace CryptoExchange.Net.Sockets
{
log.Write(LogLevel.Debug, $"Socket {Id} resetting");
_ctsSource = new CancellationTokenSource();
_closing = false;
while (_sendBuffer.TryDequeue(out _)) { } // Clear send buffer
@ -366,17 +334,16 @@ namespace CryptoExchange.Net.Sockets
/// <returns></returns>
private async Task SendLoopAsync()
{
_startedSent = true;
try
{
while (true)
{
if (_closing)
if (_ctsSource.IsCancellationRequested)
break;
await _sendEvent.WaitAsync().ConfigureAwait(false);
if (_closing)
if (_ctsSource.IsCancellationRequested)
break;
while (_sendBuffer.TryDequeue(out var data))
@ -388,7 +355,7 @@ namespace CryptoExchange.Net.Sockets
while (MessagesSentLastSecond() >= RatelimitPerSecond)
{
start ??= DateTime.UtcNow;
await Task.Delay(10).ConfigureAwait(false);
await Task.Delay(50).ConfigureAwait(false);
}
if (start != null)
@ -410,7 +377,7 @@ namespace CryptoExchange.Net.Sockets
{
// Connection closed unexpectedly, .NET framework
Handle(errorHandlers, ioe);
_ = Task.Run(async () => await CloseInternalAsync(false, true).ConfigureAwait(false));
await CloseInternalAsync().ConfigureAwait(false);
break;
}
}
@ -427,7 +394,6 @@ namespace CryptoExchange.Net.Sockets
finally
{
log.Write(LogLevel.Trace, $"Socket {Id} Send loop finished");
_startedSent = false;
}
}
@ -437,14 +403,13 @@ namespace CryptoExchange.Net.Sockets
/// <returns></returns>
private async Task ReceiveLoopAsync()
{
_startedReceive = true;
var buffer = new ArraySegment<byte>(new byte[65536]);
var received = 0;
try
{
while (true)
{
if (_closing)
if (_ctsSource.IsCancellationRequested)
break;
MemoryStream? memoryStream = null;
@ -468,7 +433,7 @@ namespace CryptoExchange.Net.Sockets
{
// Connection closed unexpectedly
Handle(errorHandlers, wse);
_ = Task.Run(async () => await CloseInternalAsync(true, true).ConfigureAwait(false));
await CloseInternalAsync().ConfigureAwait(false);
break;
}
@ -476,7 +441,7 @@ namespace CryptoExchange.Net.Sockets
{
// Connection closed unexpectedly
log.Write(LogLevel.Debug, $"Socket {Id} received `Close` message");
_ = Task.Run(async () => await CloseInternalAsync(true, true).ConfigureAwait(false));
await CloseInternalAsync().ConfigureAwait(false);
break;
}
@ -515,7 +480,7 @@ namespace CryptoExchange.Net.Sockets
break;
}
if (receiveResult == null || _closing)
if (receiveResult == null || _ctsSource.IsCancellationRequested)
{
// Error during receiving or cancellation requested, stop.
break;
@ -547,7 +512,6 @@ namespace CryptoExchange.Net.Sockets
finally
{
log.Write(LogLevel.Trace, $"Socket {Id} Receive loop finished");
_startedReceive = false;
}
}
@ -615,7 +579,7 @@ namespace CryptoExchange.Net.Sockets
{
while (true)
{
if (_closing)
if (_ctsSource.IsCancellationRequested)
return;
if (DateTime.UtcNow - LastActionTime > Timeout)

View File

@ -65,12 +65,16 @@ namespace CryptoExchange.Net.Sockets
/// <summary>
/// If connection is made
/// </summary>
public bool Connected { get; private set; }
public bool Connected => Socket.IsOpen;
/// <summary>
/// The underlying websocket
/// </summary>
public IWebsocket Socket { get; set; }
private IWebsocket Socket { get; set; }
public int SocketId => Socket.Id;
public double IncomingKbps => Socket.IncomingKbps;
public Uri Uri => Socket.Uri;
/// <summary>
/// The API client the connection is for
@ -129,6 +133,7 @@ namespace CryptoExchange.Net.Sockets
private readonly BaseSocketClient socketClient;
private readonly List<PendingRequest> pendingRequests;
private Task _socketProcessReconnectTask;
/// <summary>
/// New socket connection
@ -149,10 +154,215 @@ namespace CryptoExchange.Net.Sockets
Socket.Timeout = client.ClientOptions.SocketNoDataTimeout;
Socket.OnMessage += ProcessMessage;
Socket.OnClose += SocketOnClose;
Socket.OnOpen += SocketOnOpen;
}
public async Task<bool> ConnectAsync()
{
var connected = await Socket.ConnectAsync().ConfigureAwait(false);
if (connected)
StartProcessingTask();
return connected;
}
public async Task TriggerReconnectAsync()
{
await Socket.CloseAsync().ConfigureAwait(false);
}
/// <summary>
/// Close the connection
/// </summary>
/// <returns></returns>
public async Task CloseAsync()
{
ShouldReconnect = false;
if (socketClient.socketConnections.ContainsKey(Socket.Id))
socketClient.socketConnections.TryRemove(Socket.Id, out _);
lock (subscriptionLock)
{
foreach (var subscription in subscriptions)
{
if (subscription.CancellationTokenRegistration.HasValue)
subscription.CancellationTokenRegistration.Value.Dispose();
}
}
await Socket.CloseAsync().ConfigureAwait(false);
Socket.Dispose();
await _socketProcessReconnectTask.ConfigureAwait(false);
}
/// <summary>
/// Close a subscription on this connection. If all subscriptions on this connection are closed the connection gets closed as well
/// </summary>
/// <param name="subscription">Subscription to close</param>
/// <returns></returns>
public async Task CloseAsync(SocketSubscription subscription)
{
if (!Socket.IsOpen)
return;
if (subscription.CancellationTokenRegistration.HasValue)
subscription.CancellationTokenRegistration.Value.Dispose();
if (subscription.Confirmed)
await socketClient.UnsubscribeAsync(this, subscription).ConfigureAwait(false);
bool shouldCloseConnection;
lock (subscriptionLock)
shouldCloseConnection = !subscriptions.Any(r => r.UserSubscription && subscription != r);
if (shouldCloseConnection)
await CloseAsync().ConfigureAwait(false);
lock (subscriptionLock)
subscriptions.Remove(subscription);
}
private void StartProcessingTask()
{
_socketProcessReconnectTask = Task.Run(async () =>
{
await Socket.ProcessAsync().ConfigureAwait(false);
await ReconnectAsync().ConfigureAwait(false);
});
}
private async Task ReconnectAsync()
{
// Fail all pending requests
lock (pendingRequests)
{
foreach (var pendingRequest in pendingRequests.ToList())
{
pendingRequest.Fail();
pendingRequests.Remove(pendingRequest);
}
}
if (socketClient.ClientOptions.AutoReconnect && ShouldReconnect)
{
// Should reconnect
DisconnectTime = DateTime.UtcNow;
log.Write(LogLevel.Warning, $"Socket {Socket.Id} Connection lost, will try to reconnect");
if (!lostTriggered)
{
lostTriggered = true;
ConnectionLost?.Invoke();
}
while (ShouldReconnect)
{
if (ReconnectTry > 0)
{
// Wait a bit before attempting reconnect
await Task.Delay(socketClient.ClientOptions.ReconnectInterval).ConfigureAwait(false);
}
if (!ShouldReconnect)
{
// Should reconnect changed to false while waiting to reconnect
return;
}
Socket.Reset();
if (!await Socket.ConnectAsync().ConfigureAwait(false))
{
// Reconnect failed
ReconnectTry++;
ResubscribeTry = 0;
if (socketClient.ClientOptions.MaxReconnectTries != null
&& ReconnectTry >= socketClient.ClientOptions.MaxReconnectTries)
{
log.Write(LogLevel.Warning, $"Socket {Socket.Id} failed to reconnect after {ReconnectTry} tries, closing");
ShouldReconnect = false;
if (socketClient.socketConnections.ContainsKey(Socket.Id))
socketClient.socketConnections.TryRemove(Socket.Id, out _);
_ = Task.Run(() => ConnectionClosed?.Invoke());
// Reached max tries, break loop and leave connection closed
break;
}
// Continue to try again
log.Write(LogLevel.Debug, $"Socket {Socket.Id} failed to reconnect{(socketClient.ClientOptions.MaxReconnectTries != null ? $", try {ReconnectTry}/{socketClient.ClientOptions.MaxReconnectTries}" : "")}, will try again in {socketClient.ClientOptions.ReconnectInterval}");
continue;
}
// Successfully reconnected, start processing
StartProcessingTask();
ReconnectTry = 0;
var time = DisconnectTime;
DisconnectTime = null;
log.Write(LogLevel.Information, $"Socket {Socket.Id} reconnected after {DateTime.UtcNow - time}");
var reconnectResult = await ProcessReconnectAsync().ConfigureAwait(false);
if (!reconnectResult)
{
// Failed to resubscribe everything
ResubscribeTry++;
DisconnectTime = time;
if (socketClient.ClientOptions.MaxResubscribeTries != null &&
ResubscribeTry >= socketClient.ClientOptions.MaxResubscribeTries)
{
log.Write(LogLevel.Warning, $"Socket {Socket.Id} failed to resubscribe after {ResubscribeTry} tries, closing. Last resubscription error: {reconnectResult.Error}");
ShouldReconnect = false;
if (socketClient.socketConnections.ContainsKey(Socket.Id))
socketClient.socketConnections.TryRemove(Socket.Id, out _);
_ = Task.Run(() => ConnectionClosed?.Invoke());
}
else
log.Write(LogLevel.Debug, $"Socket {Socket.Id} resubscribing all subscriptions failed on reconnected socket{(socketClient.ClientOptions.MaxResubscribeTries != null ? $", try {ResubscribeTry}/{socketClient.ClientOptions.MaxResubscribeTries}" : "")}. Error: {reconnectResult.Error}. Disconnecting and reconnecting.");
// Failed resubscribe, close socket if it is still open
if (Socket.IsOpen)
await Socket.CloseAsync().ConfigureAwait(false);
else
DisconnectTime = DateTime.UtcNow;
// Break out of the loop, the new processing task should reconnect again
break;
}
else
{
// Succesfully reconnected
log.Write(LogLevel.Information, $"Socket {Socket.Id} data connection restored.");
ResubscribeTry = 0;
if (lostTriggered)
{
lostTriggered = false;
_ = Task.Run(() => ConnectionRestored?.Invoke(time.HasValue ? DateTime.UtcNow - time.Value : TimeSpan.FromSeconds(0))).ConfigureAwait(false);
}
break;
}
}
}
else
{
if (!socketClient.ClientOptions.AutoReconnect && ShouldReconnect)
_ = Task.Run(() => ConnectionClosed?.Invoke());
// No reconnecting needed
log.Write(LogLevel.Information, $"Socket {Socket.Id} closed");
if (socketClient.socketConnections.ContainsKey(Socket.Id))
socketClient.socketConnections.TryRemove(Socket.Id, out _);
}
}
public void Dispose()
{
Socket.Dispose();
}
/// <summary>
/// Process a message received by the socket
/// </summary>
@ -202,12 +412,20 @@ namespace CryptoExchange.Net.Sockets
// Message was not a request response, check data handlers
var messageEvent = new MessageEvent(this, tokenData, socketClient.ClientOptions.OutputOriginalData ? data: null, timestamp);
if (!HandleData(messageEvent) && !handledResponse)
var (handled, userProcessTime) = HandleData(messageEvent);
if (!handled && !handledResponse)
{
if (!socketClient.UnhandledMessageExpected)
log.Write(LogLevel.Warning, $"Socket {Socket.Id} Message not handled: " + tokenData);
UnhandledMessage?.Invoke(tokenData);
}
var total = DateTime.UtcNow - timestamp;
if (userProcessTime.TotalMilliseconds > 500)
log.Write(LogLevel.Debug, $"Socket {Socket.Id} message processing slow ({(int)total.TotalMilliseconds}ms), consider offloading data handling to another thread. " +
"Data from this socket may arrive late or not at all if message processing is continuously slow.");
log.Write(LogLevel.Trace, $"Socket {Socket.Id} message processed in {(int)total.TotalMilliseconds}ms, ({(int)userProcessTime.TotalMilliseconds}ms user code)");
}
/// <summary>
@ -246,13 +464,13 @@ namespace CryptoExchange.Net.Sockets
/// </summary>
/// <param name="messageEvent"></param>
/// <returns>True if the data was successfully handled</returns>
private bool HandleData(MessageEvent messageEvent)
private (bool, TimeSpan) HandleData(MessageEvent messageEvent)
{
SocketSubscription? currentSubscription = null;
try
{
var handled = false;
var sw = Stopwatch.StartNew();
TimeSpan userCodeDuration = TimeSpan.Zero;
// Loop the subscriptions to check if any of them signal us that the message is for them
List<SocketSubscription> subscriptionsCopy;
@ -267,7 +485,10 @@ namespace CryptoExchange.Net.Sockets
if (socketClient.MessageMatchesHandler(this, messageEvent.JsonData, subscription.Identifier!))
{
handled = true;
var userSw = Stopwatch.StartNew();
subscription.MessageHandler(messageEvent);
userSw.Stop();
userCodeDuration = userSw.Elapsed;
}
}
else
@ -276,24 +497,21 @@ namespace CryptoExchange.Net.Sockets
{
handled = true;
messageEvent.JsonData = socketClient.ProcessTokenData(messageEvent.JsonData);
var userSw = Stopwatch.StartNew();
subscription.MessageHandler(messageEvent);
userSw.Stop();
userCodeDuration = userSw.Elapsed;
}
}
}
sw.Stop();
if (sw.ElapsedMilliseconds > 500)
log.Write(LogLevel.Debug, $"Socket {Socket.Id} message processing slow ({sw.ElapsedMilliseconds}ms), consider offloading data handling to another thread. " +
"Data from this socket may arrive late or not at all if message processing is continuously slow.");
else
log.Write(LogLevel.Trace, $"Socket {Socket.Id} message processed in {sw.ElapsedMilliseconds}ms");
return handled;
return (handled, userCodeDuration);
}
catch (Exception ex)
{
log.Write(LogLevel.Error, $"Socket {Socket.Id} Exception during message processing\r\nException: {ex.ToLogString()}\r\nData: {messageEvent.JsonData}");
currentSubscription?.InvokeExceptionHandler(ex);
return false;
return (false, TimeSpan.Zero);
}
}
@ -347,135 +565,6 @@ namespace CryptoExchange.Net.Sockets
{
ReconnectTry = 0;
PausedActivity = false;
Connected = true;
}
/// <summary>
/// Handler for a socket closing. Reconnects the socket if needed, or removes it from the active socket list if not
/// </summary>
protected virtual void SocketOnClose()
{
lock (pendingRequests)
{
foreach(var pendingRequest in pendingRequests.ToList())
{
pendingRequest.Fail();
pendingRequests.Remove(pendingRequest);
}
}
if (socketClient.ClientOptions.AutoReconnect && ShouldReconnect)
{
if (Socket.Reconnecting)
return; // Already reconnecting
Socket.Reconnecting = true;
DisconnectTime = DateTime.UtcNow;
log.Write(LogLevel.Warning, $"Socket {Socket.Id} Connection lost, will try to reconnect");
if (!lostTriggered)
{
lostTriggered = true;
ConnectionLost?.Invoke();
}
Task.Run(async () =>
{
while (ShouldReconnect)
{
if (ReconnectTry > 0)
{
// Wait a bit before attempting reconnect
await Task.Delay(socketClient.ClientOptions.ReconnectInterval).ConfigureAwait(false);
}
if (!ShouldReconnect)
{
// Should reconnect changed to false while waiting to reconnect
Socket.Reconnecting = false;
return;
}
Socket.Reset();
if (!await Socket.ConnectAsync().ConfigureAwait(false))
{
ReconnectTry++;
ResubscribeTry = 0;
if (socketClient.ClientOptions.MaxReconnectTries != null
&& ReconnectTry >= socketClient.ClientOptions.MaxReconnectTries)
{
log.Write(LogLevel.Warning, $"Socket {Socket.Id} failed to reconnect after {ReconnectTry} tries, closing");
ShouldReconnect = false;
if (socketClient.sockets.ContainsKey(Socket.Id))
socketClient.sockets.TryRemove(Socket.Id, out _);
_ = Task.Run(() => ConnectionClosed?.Invoke());
break;
}
log.Write(LogLevel.Debug, $"Socket {Socket.Id} failed to reconnect{(socketClient.ClientOptions.MaxReconnectTries != null ? $", try {ReconnectTry}/{socketClient.ClientOptions.MaxReconnectTries}": "")}, will try again in {socketClient.ClientOptions.ReconnectInterval}");
continue;
}
// Successfully reconnected
var time = DisconnectTime;
DisconnectTime = null;
log.Write(LogLevel.Information, $"Socket {Socket.Id} reconnected after {DateTime.UtcNow - time}");
var reconnectResult = await ProcessReconnectAsync().ConfigureAwait(false);
if (!reconnectResult)
{
ResubscribeTry++;
DisconnectTime = time;
if (socketClient.ClientOptions.MaxResubscribeTries != null &&
ResubscribeTry >= socketClient.ClientOptions.MaxResubscribeTries)
{
log.Write(LogLevel.Warning, $"Socket {Socket.Id} failed to resubscribe after {ResubscribeTry} tries, closing. Last resubscription error: {reconnectResult.Error}");
ShouldReconnect = false;
if (socketClient.sockets.ContainsKey(Socket.Id))
socketClient.sockets.TryRemove(Socket.Id, out _);
_ = Task.Run(() => ConnectionClosed?.Invoke());
}
else
log.Write(LogLevel.Debug, $"Socket {Socket.Id} resubscribing all subscriptions failed on reconnected socket{(socketClient.ClientOptions.MaxResubscribeTries != null ? $", try {ResubscribeTry}/{socketClient.ClientOptions.MaxResubscribeTries}" : "")}. Error: {reconnectResult.Error}. Disconnecting and reconnecting.");
if (Socket.IsOpen)
await Socket.CloseAsync().ConfigureAwait(false);
else
DisconnectTime = DateTime.UtcNow;
}
else
{
log.Write(LogLevel.Information, $"Socket {Socket.Id} data connection restored.");
ResubscribeTry = 0;
if (lostTriggered)
{
lostTriggered = false;
_ = Task.Run(() => ConnectionRestored?.Invoke(time.HasValue ? DateTime.UtcNow - time.Value : TimeSpan.FromSeconds(0))).ConfigureAwait(false);
}
break;
}
}
Socket.Reconnecting = false;
});
}
else
{
if (!socketClient.ClientOptions.AutoReconnect && ShouldReconnect)
_ = Task.Run(() => ConnectionClosed?.Invoke());
// No reconnecting needed
log.Write(LogLevel.Information, $"Socket {Socket.Id} closed");
if (socketClient.sockets.ContainsKey(Socket.Id))
socketClient.sockets.TryRemove(Socket.Id, out _);
}
}
private async Task<CallResult<bool>> ProcessReconnectAsync()
@ -535,56 +624,6 @@ namespace CryptoExchange.Net.Sockets
return await socketClient.SubscribeAndWaitAsync(this, socketSubscription.Request!, socketSubscription).ConfigureAwait(false);
}
/// <summary>
/// Close the connection
/// </summary>
/// <returns></returns>
public async Task CloseAsync()
{
Connected = false;
ShouldReconnect = false;
if (socketClient.sockets.ContainsKey(Socket.Id))
socketClient.sockets.TryRemove(Socket.Id, out _);
lock (subscriptionLock)
{
foreach (var subscription in subscriptions)
{
if (subscription.CancellationTokenRegistration.HasValue)
subscription.CancellationTokenRegistration.Value.Dispose();
}
}
await Socket.CloseAsync().ConfigureAwait(false);
Socket.Dispose();
}
/// <summary>
/// Close a subscription on this connection. If all subscriptions on this connection are closed the connection gets closed as well
/// </summary>
/// <param name="subscription">Subscription to close</param>
/// <returns></returns>
public async Task CloseAsync(SocketSubscription subscription)
{
if (!Socket.IsOpen)
return;
if (subscription.CancellationTokenRegistration.HasValue)
subscription.CancellationTokenRegistration.Value.Dispose();
if (subscription.Confirmed)
await socketClient.UnsubscribeAsync(this, subscription).ConfigureAwait(false);
bool shouldCloseConnection;
lock (subscriptionLock)
shouldCloseConnection = !subscriptions.Any(r => r.UserSubscription && subscription != r);
if (shouldCloseConnection)
await CloseAsync().ConfigureAwait(false);
lock (subscriptionLock)
subscriptions.Remove(subscription);
}
}
internal class PendingRequest

View File

@ -72,7 +72,7 @@ namespace CryptoExchange.Net.Sockets
/// <summary>
/// The id of the socket
/// </summary>
public int SocketId => connection.Socket.Id;
public int SocketId => connection.SocketId;
/// <summary>
/// The id of the subscription
@ -103,9 +103,9 @@ namespace CryptoExchange.Net.Sockets
/// Close the socket to cause a reconnect
/// </summary>
/// <returns></returns>
internal Task ReconnectAsync()
public Task ReconnectAsync()
{
return connection.Socket.CloseAsync();
return connection.TriggerReconnectAsync();
}
/// <summary>

View File

@ -1,4 +1,5 @@
using System.Collections.Generic;
using System;
using System.Collections.Generic;
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Logging;
@ -12,13 +13,13 @@ namespace CryptoExchange.Net.Sockets
/// <inheritdoc />
public IWebsocket CreateWebsocket(Log log, string url)
{
return new CryptoExchangeWebSocketClient(log, url);
return new CryptoExchangeWebSocketClient(log, new Uri(url));
}
/// <inheritdoc />
public IWebsocket CreateWebsocket(Log log, string url, IDictionary<string, string> cookies, IDictionary<string, string> headers)
{
return new CryptoExchangeWebSocketClient(log, url, cookies, headers);
return new CryptoExchangeWebSocketClient(log, new Uri(url), cookies, headers);
}
}
}