diff --git a/CryptoExchange.Net/Clients/BaseSocketClient.cs b/CryptoExchange.Net/Clients/BaseSocketClient.cs
index a8cd86a..ea81cad 100644
--- a/CryptoExchange.Net/Clients/BaseSocketClient.cs
+++ b/CryptoExchange.Net/Clients/BaseSocketClient.cs
@@ -3,7 +3,6 @@ using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
-using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;
using CryptoExchange.Net.Authentication;
@@ -11,7 +10,6 @@ using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Sockets;
using Microsoft.Extensions.Logging;
-using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
namespace CryptoExchange.Net
@@ -120,10 +118,7 @@ namespace CryptoExchange.Net
/// The options for this client
protected BaseSocketClient(string name, BaseSocketClientOptions options) : base(name, options)
{
- if (options == null)
- throw new ArgumentNullException(nameof(options));
-
- ClientOptions = options;
+ ClientOptions = options ?? throw new ArgumentNullException(nameof(options));
}
///
@@ -240,6 +235,7 @@ namespace CryptoExchange.Net
var subResult = await SubscribeAndWaitAsync(socketConnection, request, subscription).ConfigureAwait(false);
if (!subResult)
{
+ log.Write(LogLevel.Information, $"Socket {socketConnection.SocketId} failed to subscribe: {subResult.Error}");
await socketConnection.CloseAsync(subscription).ConfigureAwait(false);
return new CallResult(subResult.Error!);
}
@@ -250,7 +246,6 @@ namespace CryptoExchange.Net
subscription.Confirmed = true;
}
- socketConnection.ShouldReconnect = true;
if (ct != default)
{
subscription.CancellationTokenRegistration = ct.Register(async () =>
@@ -260,7 +255,7 @@ namespace CryptoExchange.Net
}, false);
}
- log.Write(LogLevel.Information, $"Socket {socketConnection.SocketId} subscription completed");
+ log.Write(LogLevel.Information, $"Socket {socketConnection.SocketId} subscription {subscription.Id} completed successfully");
return new CallResult(new UpdateSubscription(socketConnection, subscription));
}
@@ -334,8 +329,6 @@ namespace CryptoExchange.Net
}
finally
{
- //When the task is ready, release the semaphore. It is vital to ALWAYS release the semaphore when we are ready, or else we will end up with a Semaphore that is forever locked.
- //This is why it is important to do the Release within a try...finally clause; program execution may crash or take a different path, this way you are guaranteed execution
if (!released)
semaphoreSlim.Release();
}
@@ -585,6 +578,23 @@ namespace CryptoExchange.Net
return new CallResult(new CantConnectError());
}
+ ///
+ /// Get parameters for the websocket connection
+ ///
+ /// The address to connect to
+ ///
+ protected virtual WebSocketParameters GetWebSocketParameters(string address)
+ => new (new Uri(address), ClientOptions.AutoReconnect)
+ {
+ DataInterpreterBytes = dataInterpreterBytes,
+ DataInterpreterString = dataInterpreterString,
+ KeepAliveInterval = KeepAliveInterval,
+ ReconnectInterval = ClientOptions.ReconnectInterval,
+ RatelimitPerSecond = RateLimitPerSocketPerSecond,
+ Proxy = ClientOptions.Proxy,
+ Timeout = ClientOptions.SocketNoDataTimeout
+ };
+
///
/// Create a socket for an address
///
@@ -592,24 +602,8 @@ namespace CryptoExchange.Net
///
protected virtual IWebsocket CreateSocket(string address)
{
- var socket = SocketFactory.CreateWebsocket(log, address);
+ var socket = SocketFactory.CreateWebsocket(log, GetWebSocketParameters(address));
log.Write(LogLevel.Debug, $"Socket {socket.Id} new socket created for " + address);
-
- if (ClientOptions.Proxy != null)
- socket.SetProxy(ClientOptions.Proxy);
-
- socket.KeepAliveInterval = KeepAliveInterval;
- socket.Timeout = ClientOptions.SocketNoDataTimeout;
- socket.DataInterpreterBytes = dataInterpreterBytes;
- socket.DataInterpreterString = dataInterpreterString;
- socket.RatelimitPerSecond = RateLimitPerSocketPerSecond;
- socket.OnError += e =>
- {
- if(e is WebSocketException wse)
- log.Write(LogLevel.Warning, $"Socket {socket.Id} error: Websocket error code {wse.WebSocketErrorCode}, details: " + e.ToLogString());
- else
- log.Write(LogLevel.Warning, $"Socket {socket.Id} error: " + e.ToLogString());
- };
return socket;
}
@@ -667,7 +661,6 @@ namespace CryptoExchange.Net
///
public virtual async Task UnsubscribeAsync(int subscriptionId)
{
-
SocketSubscription? subscription = null;
SocketConnection? connection = null;
foreach(var socket in socketConnections.Values.ToList())
@@ -683,7 +676,7 @@ namespace CryptoExchange.Net
if (subscription == null || connection == null)
return;
- log.Write(LogLevel.Information, "Closing subscription " + subscriptionId);
+ log.Write(LogLevel.Information, $"Socket {connection.SocketId} Unsubscribing subscription " + subscriptionId);
await connection.CloseAsync(subscription).ConfigureAwait(false);
}
@@ -697,7 +690,7 @@ namespace CryptoExchange.Net
if (subscription == null)
throw new ArgumentNullException(nameof(subscription));
- log.Write(LogLevel.Information, "Closing subscription " + subscription.Id);
+ log.Write(LogLevel.Information, $"Socket {subscription.SocketId} Unsubscribing subscription " + subscription.Id);
await subscription.CloseAsync().ConfigureAwait(false);
}
diff --git a/CryptoExchange.Net/Interfaces/IWebsocket.cs b/CryptoExchange.Net/Interfaces/IWebsocket.cs
index 0b8eebc..a28df4c 100644
--- a/CryptoExchange.Net/Interfaces/IWebsocket.cs
+++ b/CryptoExchange.Net/Interfaces/IWebsocket.cs
@@ -41,30 +41,10 @@ namespace CryptoExchange.Net.Interfaces
///
int Id { get; }
///
- /// Origin header
- ///
- string? Origin { get; set; }
- ///
- /// Encoding to use for sending/receiving string data
- ///
- Encoding? Encoding { get; set; }
- ///
- /// The max amount of outgoing messages per second
- ///
- int? RatelimitPerSecond { get; set; }
- ///
/// The current kilobytes per second of data being received, averaged over the last 3 seconds
///
double IncomingKbps { get; }
///
- /// Handler for byte data
- ///
- Func? DataInterpreterBytes { get; set; }
- ///
- /// Handler for string data
- ///
- Func? DataInterpreterString { get; set; }
- ///
/// The uri the socket connects to
///
Uri Uri { get; }
@@ -77,23 +57,6 @@ namespace CryptoExchange.Net.Interfaces
///
bool IsOpen { get; }
///
- /// Supported ssl protocols
- ///
- SslProtocols SSLProtocols { get; set; }
- ///
- /// The max time for no data being received before the connection is considered lost
- ///
- TimeSpan Timeout { get; set; }
- ///
- /// The interval at which to send a ping frame to the server
- ///
- TimeSpan KeepAliveInterval { get; set; }
- ///
- /// Set a proxy to use when connecting
- ///
- ///
- void SetProxy(ApiProxy proxy);
- ///
/// Connect the socket
///
///
diff --git a/CryptoExchange.Net/Interfaces/IWebsocketFactory.cs b/CryptoExchange.Net/Interfaces/IWebsocketFactory.cs
index 809c624..7d638a3 100644
--- a/CryptoExchange.Net/Interfaces/IWebsocketFactory.cs
+++ b/CryptoExchange.Net/Interfaces/IWebsocketFactory.cs
@@ -1,5 +1,5 @@
-using System.Collections.Generic;
-using CryptoExchange.Net.Logging;
+using CryptoExchange.Net.Logging;
+using CryptoExchange.Net.Sockets;
namespace CryptoExchange.Net.Interfaces
{
@@ -12,17 +12,8 @@ namespace CryptoExchange.Net.Interfaces
/// Create a websocket for an url
///
/// The logger
- /// The url the socket is fo
+ /// The parameters to use for the connection
///
- IWebsocket CreateWebsocket(Log log, string url);
- ///
- /// Create a websocket for an url
- ///
- /// The logger
- /// The url the socket is fo
- /// Cookies to be send in the initial request
- /// Headers to be send in the initial request
- ///
- IWebsocket CreateWebsocket(Log log, string url, IDictionary cookies, IDictionary headers);
+ IWebsocket CreateWebsocket(Log log, WebSocketParameters parameters);
}
}
diff --git a/CryptoExchange.Net/Objects/Options.cs b/CryptoExchange.Net/Objects/Options.cs
index 0494e66..5c727cc 100644
--- a/CryptoExchange.Net/Objects/Options.cs
+++ b/CryptoExchange.Net/Objects/Options.cs
@@ -176,16 +176,6 @@ namespace CryptoExchange.Net.Objects
///
public TimeSpan ReconnectInterval { get; set; } = TimeSpan.FromSeconds(5);
- ///
- /// The maximum number of times to try to reconnect, default null will retry indefinitely
- ///
- public int? MaxReconnectTries { get; set; }
-
- ///
- /// The maximum number of times to try to resubscribe after reconnecting
- ///
- public int? MaxResubscribeTries { get; set; }
-
///
/// Max number of concurrent resubscription tasks per socket after reconnecting a socket
///
@@ -232,8 +222,6 @@ namespace CryptoExchange.Net.Objects
AutoReconnect = baseOptions.AutoReconnect;
ReconnectInterval = baseOptions.ReconnectInterval;
- MaxReconnectTries = baseOptions.MaxReconnectTries;
- MaxResubscribeTries = baseOptions.MaxResubscribeTries;
MaxConcurrentResubscriptionsPerSocket = baseOptions.MaxConcurrentResubscriptionsPerSocket;
SocketResponseTimeout = baseOptions.SocketResponseTimeout;
SocketNoDataTimeout = baseOptions.SocketNoDataTimeout;
@@ -244,7 +232,7 @@ namespace CryptoExchange.Net.Objects
///
public override string ToString()
{
- return $"{base.ToString()}, AutoReconnect: {AutoReconnect}, ReconnectInterval: {ReconnectInterval}, MaxReconnectTries: {MaxReconnectTries}, MaxResubscribeTries: {MaxResubscribeTries}, MaxConcurrentResubscriptionsPerSocket: {MaxConcurrentResubscriptionsPerSocket}, SocketResponseTimeout: {SocketResponseTimeout:c}, SocketNoDataTimeout: {SocketNoDataTimeout}, SocketSubscriptionsCombineTarget: {SocketSubscriptionsCombineTarget}, MaxSocketConnections: {MaxSocketConnections}";
+ return $"{base.ToString()}, AutoReconnect: {AutoReconnect}, ReconnectInterval: {ReconnectInterval}, MaxConcurrentResubscriptionsPerSocket: {MaxConcurrentResubscriptionsPerSocket}, SocketResponseTimeout: {SocketResponseTimeout:c}, SocketNoDataTimeout: {SocketNoDataTimeout}, SocketSubscriptionsCombineTarget: {SocketSubscriptionsCombineTarget}, MaxSocketConnections: {MaxSocketConnections}";
}
}
diff --git a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs
index c5cc984..b4b8285 100644
--- a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs
+++ b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs
@@ -5,13 +5,10 @@ using Microsoft.Extensions.Logging;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
-using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.WebSockets;
-using System.Security.Authentication;
-using System.Text;
using System.Threading;
using System.Threading.Tasks;
@@ -22,7 +19,6 @@ namespace CryptoExchange.Net.Sockets
///
public class CryptoExchangeWebSocketClient : IWebsocket
{
- // TODO keep the same ID's for subscriptions/sockets when reconnecting
enum ProcessState
{
Idle,
@@ -41,23 +37,21 @@ namespace CryptoExchange.Net.Sockets
internal static int lastStreamId;
private static readonly object streamIdLock = new();
- private ClientWebSocket _socket;
private readonly AsyncResetEvent _sendEvent;
private readonly ConcurrentQueue _sendBuffer;
- private readonly IDictionary cookies;
- private readonly IDictionary headers;
- private CancellationTokenSource _ctsSource;
- private ApiProxy? _proxy;
-
+ private readonly SemaphoreSlim _closeSem;
+ private readonly WebSocketParameters _parameters;
private readonly List _outgoingMessages;
+
+ private ClientWebSocket _socket;
+ private CancellationTokenSource _ctsSource;
private DateTime _lastReceivedMessagesUpdate;
private Task? _processTask;
private Task? _closeTask;
private bool _stopRequested;
private bool _disposed;
private ProcessState _processState;
- //private CloseState _closeState;
- private SemaphoreSlim _closeSem;
+
///
/// Received messages, the size and the timstamp
@@ -72,31 +66,18 @@ namespace CryptoExchange.Net.Sockets
///
/// Log
///
- protected Log log;
+ protected Log _log;
///
public int Id { get; }
- ///
- public string? Origin { get; set; }
-
///
/// The timestamp this socket has been active for the last time
///
public DateTime LastActionTime { get; private set; }
-
- ///
- /// Delegate used for processing byte data received from socket connections before it is processed by handlers
- ///
- public Func? DataInterpreterBytes { get; set; }
-
- ///
- /// Delegate used for processing string data received from socket connections before it is processed by handlers
- ///
- public Func? DataInterpreterString { get; set; }
-
+
///
- public Uri Uri { get; }
+ public Uri Uri => _parameters.Uri;
///
public bool IsClosed => _socket.State == WebSocketState.Closed;
@@ -104,34 +85,6 @@ namespace CryptoExchange.Net.Sockets
///
public bool IsOpen => _socket.State == WebSocketState.Open && !_ctsSource.IsCancellationRequested;
- ///
- /// Ssl protocols supported. NOT USED BY THIS IMPLEMENTATION
- ///
- public SslProtocols SSLProtocols { get; set; }
-
- private Encoding _encoding = Encoding.UTF8;
- ///
- public Encoding? Encoding
- {
- get => _encoding;
- set
- {
- if(value != null)
- _encoding = value;
- }
- }
-
- ///
- /// The max amount of outgoing messages per second
- ///
- public int? RatelimitPerSecond { get; set; }
-
- ///
- public TimeSpan Timeout { get; set; }
-
- ///
- public TimeSpan KeepAliveInterval { get; set; }
-
///
public double IncomingKbps
{
@@ -166,26 +119,13 @@ namespace CryptoExchange.Net.Sockets
/// ctor
///
/// The log object to use
- /// The uri the socket should connect to
- public CryptoExchangeWebSocketClient(Log log, Uri uri) : this(log, uri, new Dictionary(), new Dictionary())
- {
- }
-
- ///
- /// ctor
- ///
- /// The log object to use
- /// The uri the socket should connect to
- /// Cookies to sent in the socket connection request
- /// Headers to sent in the socket connection request
- public CryptoExchangeWebSocketClient(Log log, Uri uri, IDictionary cookies, IDictionary headers)
+ /// The parameters for this socket
+ public CryptoExchangeWebSocketClient(Log log, WebSocketParameters websocketParameters)
{
Id = NextStreamId();
- this.log = log;
- Uri = uri;
- this.cookies = cookies;
- this.headers = headers;
+ _log = log;
+ _parameters = websocketParameters;
_outgoingMessages = new List();
_receivedMessages = new List();
_sendEvent = new AsyncResetEvent();
@@ -197,25 +137,6 @@ namespace CryptoExchange.Net.Sockets
_socket = CreateSocket();
}
- ///
- public virtual void SetProxy(ApiProxy proxy)
- {
- _proxy = proxy;
-
- if (!Uri.TryCreate($"{proxy.Host}:{proxy.Port}", UriKind.Absolute, out var uri))
- throw new ArgumentException("Proxy settings invalid, {proxy.Host}:{proxy.Port} not a valid URI", nameof(proxy));
-
- _socket.Options.Proxy = uri?.Scheme == null
- ? _socket.Options.Proxy = new WebProxy(proxy.Host, proxy.Port)
- : _socket.Options.Proxy = new WebProxy
- {
- Address = uri
- };
-
- if (proxy.Login != null)
- _socket.Options.Proxy.Credentials = new NetworkCredential(proxy.Login, proxy.Password);
- }
-
///
public virtual async Task ConnectAsync()
{
@@ -227,9 +148,29 @@ namespace CryptoExchange.Net.Sockets
return true;
}
+ ///
+ /// Create the socket object
+ ///
+ private ClientWebSocket CreateSocket()
+ {
+ var cookieContainer = new CookieContainer();
+ foreach (var cookie in _parameters.Cookies)
+ cookieContainer.Add(new Cookie(cookie.Key, cookie.Value));
+
+ var socket = new ClientWebSocket();
+ socket.Options.Cookies = cookieContainer;
+ foreach (var header in _parameters.Headers)
+ socket.Options.SetRequestHeader(header.Key, header.Value);
+ socket.Options.KeepAliveInterval = _parameters.KeepAliveInterval ?? TimeSpan.Zero;
+ socket.Options.SetBuffer(65536, 65536); // Setting it to anything bigger than 65536 throws an exception in .net framework
+ if (_parameters.Proxy != null)
+ SetProxy(_parameters.Proxy);
+ return socket;
+ }
+
private async Task ConnectInternalAsync()
{
- log.Write(LogLevel.Debug, $"Socket {Id} connecting");
+ _log.Write(LogLevel.Debug, $"Socket {Id} connecting");
try
{
using CancellationTokenSource tcs = new(TimeSpan.FromSeconds(10));
@@ -237,11 +178,11 @@ namespace CryptoExchange.Net.Sockets
}
catch (Exception e)
{
- log.Write(LogLevel.Debug, $"Socket {Id} connection failed: " + e.ToLogString());
+ _log.Write(LogLevel.Debug, $"Socket {Id} connection failed: " + e.ToLogString());
return false;
}
- log.Write(LogLevel.Debug, $"Socket {Id} connected to {Uri}");
+ _log.Write(LogLevel.Debug, $"Socket {Id} connected to {Uri}");
return true;
}
@@ -250,13 +191,13 @@ namespace CryptoExchange.Net.Sockets
{
while (!_stopRequested)
{
- log.Write(LogLevel.Trace, $"Socket {Id} ProcessAsync started");
+ _log.Write(LogLevel.Trace, $"Socket {Id} ProcessAsync started");
_processState = ProcessState.Processing;
var sendTask = SendLoopAsync();
var receiveTask = ReceiveLoopAsync();
- var timeoutTask = Timeout != default ? CheckTimeoutAsync() : Task.CompletedTask;
+ var timeoutTask = _parameters.Timeout != null && _parameters.Timeout > TimeSpan.FromSeconds(0) ? CheckTimeoutAsync() : Task.CompletedTask;
await Task.WhenAll(sendTask, receiveTask, timeoutTask).ConfigureAwait(false);
- log.Write(LogLevel.Trace, $"Socket {Id} ProcessAsync finished");
+ _log.Write(LogLevel.Trace, $"Socket {Id} ProcessAsync finished");
_processState = ProcessState.WaitingForClose;
while (_closeTask == null)
@@ -264,7 +205,13 @@ namespace CryptoExchange.Net.Sockets
await _closeTask.ConfigureAwait(false);
_closeTask = null;
- //_closeState = CloseState.Idle;
+
+ if (!_parameters.AutoReconnect)
+ {
+ _processState = ProcessState.Idle;
+ OnClose?.Invoke();
+ return;
+ }
if (!_stopRequested)
{
@@ -274,7 +221,7 @@ namespace CryptoExchange.Net.Sockets
while (!_stopRequested)
{
- log.Write(LogLevel.Trace, $"Socket {Id} attempting to reconnect");
+ _log.Write(LogLevel.Trace, $"Socket {Id} attempting to reconnect");
_socket = CreateSocket();
_ctsSource.Dispose();
_ctsSource = new CancellationTokenSource();
@@ -283,7 +230,7 @@ namespace CryptoExchange.Net.Sockets
var connected = await ConnectInternalAsync().ConfigureAwait(false);
if (!connected)
{
- await Task.Delay(5000).ConfigureAwait(false);
+ await Task.Delay(_parameters.ReconnectInterval).ConfigureAwait(false);
continue;
}
@@ -299,10 +246,10 @@ namespace CryptoExchange.Net.Sockets
public virtual void Send(string data)
{
if (_ctsSource.IsCancellationRequested)
- throw new InvalidOperationException($"Socket {Id} Can't send data when socket is not connected");
+ return;
- var bytes = _encoding.GetBytes(data);
- log.Write(LogLevel.Trace, $"Socket {Id} Adding {bytes.Length} to sent buffer");
+ var bytes = _parameters.Encoding.GetBytes(data);
+ _log.Write(LogLevel.Trace, $"Socket {Id} Adding {bytes.Length} to sent buffer");
_sendBuffer.Enqueue(bytes);
_sendEvent.Set();
}
@@ -313,7 +260,7 @@ namespace CryptoExchange.Net.Sockets
if (_processState != ProcessState.Processing)
return;
- log.Write(LogLevel.Debug, $"Socket {Id} reconnecting");
+ _log.Write(LogLevel.Debug, $"Socket {Id} reconnecting");
_closeTask = CloseInternalAsync();
await _closeTask.ConfigureAwait(false);
}
@@ -326,20 +273,20 @@ namespace CryptoExchange.Net.Sockets
{
if (_closeTask != null && !_closeTask.IsCompleted)
{
- log.Write(LogLevel.Debug, $"Socket {Id} CloseAsync() waiting for existing close task");
+ _log.Write(LogLevel.Debug, $"Socket {Id} CloseAsync() waiting for existing close task");
await _closeTask.ConfigureAwait(false);
return;
}
+ _stopRequested = true;
+
if (!IsOpen)
{
- log.Write(LogLevel.Debug, $"Socket {Id} CloseAsync() socket not open");
+ _log.Write(LogLevel.Debug, $"Socket {Id} CloseAsync() socket not open");
return;
}
- log.Write(LogLevel.Debug, $"Socket {Id} closing");
- _stopRequested = true;
-
+ _log.Write(LogLevel.Debug, $"Socket {Id} closing");
_closeTask = CloseInternalAsync();
}
finally
@@ -348,9 +295,10 @@ namespace CryptoExchange.Net.Sockets
}
await _closeTask.ConfigureAwait(false);
- await _processTask!.ConfigureAwait(false);
+ if(_processTask != null)
+ await _processTask.ConfigureAwait(false);
OnClose?.Invoke();
- log.Write(LogLevel.Debug, $"Socket {Id} closed");
+ _log.Write(LogLevel.Debug, $"Socket {Id} closed");
}
///
@@ -370,19 +318,29 @@ namespace CryptoExchange.Net.Sockets
{
try
{
+ _log.Write(LogLevel.Trace, $"Socket {Id} normal closure 1");
await _socket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Closing", default).ConfigureAwait(false);
}
- catch(Exception)
- { } // Can sometimes throw an exception when socket is in aborted state due to timing
+ catch (Exception)
+ {
+ // Can sometimes throw an exception when socket is in aborted state due to timing
+ // Websocket is set to Aborted state when the cancelation token is set during SendAsync/ReceiveAsync
+ // So socket might go to aborted state, might still be open
+ }
}
else if(_socket.State == WebSocketState.CloseReceived)
{
try
{
+ _log.Write(LogLevel.Trace, $"Socket {Id} normal closure 2");
await _socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing", default).ConfigureAwait(false);
}
catch (Exception)
- { } // Can sometimes throw an exception when socket is in aborted state due to timing
+ {
+ // Can sometimes throw an exception when socket is in aborted state due to timing
+ // Websocket is set to Aborted state when the cancelation token is set during SendAsync/ReceiveAsync
+ // So socket might go to aborted state, might still be open
+ }
}
}
@@ -394,31 +352,11 @@ namespace CryptoExchange.Net.Sockets
if (_disposed)
return;
- log.Write(LogLevel.Debug, $"Socket {Id} disposing");
+ _log.Write(LogLevel.Debug, $"Socket {Id} disposing");
_disposed = true;
_socket.Dispose();
_ctsSource.Dispose();
- log.Write(LogLevel.Trace, $"Socket {Id} disposed");
- }
-
- ///
- /// Create the socket object
- ///
- private ClientWebSocket CreateSocket()
- {
- var cookieContainer = new CookieContainer();
- foreach (var cookie in cookies)
- cookieContainer.Add(new Cookie(cookie.Key, cookie.Value));
-
- var socket = new ClientWebSocket();
- socket.Options.Cookies = cookieContainer;
- foreach (var header in headers)
- socket.Options.SetRequestHeader(header.Key, header.Value);
- socket.Options.KeepAliveInterval = KeepAliveInterval;
- socket.Options.SetBuffer(65536, 65536); // Setting it to anything bigger than 65536 throws an exception in .net framework
- if (_proxy != null)
- SetProxy(_proxy);
- return socket;
+ _log.Write(LogLevel.Trace, $"Socket {Id} disposed");
}
///
@@ -441,25 +379,25 @@ namespace CryptoExchange.Net.Sockets
while (_sendBuffer.TryDequeue(out var data))
{
- if (RatelimitPerSecond != null)
+ if (_parameters.RatelimitPerSecond != null)
{
// Wait for rate limit
DateTime? start = null;
- while (MessagesSentLastSecond() >= RatelimitPerSecond)
+ while (MessagesSentLastSecond() >= _parameters.RatelimitPerSecond)
{
start ??= DateTime.UtcNow;
await Task.Delay(50).ConfigureAwait(false);
}
if (start != null)
- log.Write(LogLevel.Trace, $"Socket {Id} sent delayed {Math.Round((DateTime.UtcNow - start.Value).TotalMilliseconds)}ms because of rate limit");
+ _log.Write(LogLevel.Trace, $"Socket {Id} sent delayed {Math.Round((DateTime.UtcNow - start.Value).TotalMilliseconds)}ms because of rate limit");
}
try
{
await _socket.SendAsync(new ArraySegment(data, 0, data.Length), WebSocketMessageType.Text, true, _ctsSource.Token).ConfigureAwait(false);
_outgoingMessages.Add(DateTime.UtcNow);
- log.Write(LogLevel.Trace, $"Socket {Id} sent {data.Length} bytes");
+ _log.Write(LogLevel.Trace, $"Socket {Id} sent {data.Length} bytes");
}
catch (OperationCanceledException)
{
@@ -486,7 +424,7 @@ namespace CryptoExchange.Net.Sockets
}
finally
{
- log.Write(LogLevel.Trace, $"Socket {Id} Send loop finished");
+ _log.Write(LogLevel.Trace, $"Socket {Id} Send loop finished");
}
}
@@ -533,7 +471,7 @@ namespace CryptoExchange.Net.Sockets
if (receiveResult.MessageType == WebSocketMessageType.Close)
{
// Connection closed unexpectedly
- log.Write(LogLevel.Debug, $"Socket {Id} received `Close` message");
+ _log.Write(LogLevel.Debug, $"Socket {Id} received `Close` message");
_closeTask = CloseInternalAsync();
break;
}
@@ -543,7 +481,7 @@ namespace CryptoExchange.Net.Sockets
// We received data, but it is not complete, write it to a memory stream for reassembling
multiPartMessage = true;
memoryStream ??= new MemoryStream();
- log.Write(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in partial message");
+ _log.Write(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in partial message");
await memoryStream.WriteAsync(buffer.Array, buffer.Offset, receiveResult.Count).ConfigureAwait(false);
}
else
@@ -551,13 +489,13 @@ namespace CryptoExchange.Net.Sockets
if (!multiPartMessage)
{
// Received a complete message and it's not multi part
- log.Write(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in single message");
+ _log.Write(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in single message");
HandleMessage(buffer.Array!, buffer.Offset, receiveResult.Count, receiveResult.MessageType);
}
else
{
// Received the end of a multipart message, write to memory stream for reassembling
- log.Write(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in partial message");
+ _log.Write(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in partial message");
await memoryStream!.WriteAsync(buffer.Array, buffer.Offset, receiveResult.Count).ConfigureAwait(false);
}
break;
@@ -585,12 +523,12 @@ namespace CryptoExchange.Net.Sockets
if (receiveResult?.EndOfMessage == true)
{
// Reassemble complete message from memory stream
- log.Write(LogLevel.Trace, $"Socket {Id} reassembled message of {memoryStream!.Length} bytes");
+ _log.Write(LogLevel.Trace, $"Socket {Id} reassembled message of {memoryStream!.Length} bytes");
HandleMessage(memoryStream!.ToArray(), 0, (int)memoryStream.Length, receiveResult.MessageType);
memoryStream.Dispose();
}
else
- log.Write(LogLevel.Trace, $"Socket {Id} discarding incomplete message of {memoryStream!.Length} bytes");
+ _log.Write(LogLevel.Trace, $"Socket {Id} discarding incomplete message of {memoryStream!.Length} bytes");
}
}
}
@@ -604,7 +542,7 @@ namespace CryptoExchange.Net.Sockets
}
finally
{
- log.Write(LogLevel.Trace, $"Socket {Id} Receive loop finished");
+ _log.Write(LogLevel.Trace, $"Socket {Id} Receive loop finished");
}
}
@@ -620,54 +558,92 @@ namespace CryptoExchange.Net.Sockets
string strData;
if (messageType == WebSocketMessageType.Binary)
{
- if (DataInterpreterBytes == null)
+ if (_parameters.DataInterpreterBytes == null)
throw new Exception("Byte interpreter not set while receiving byte data");
try
{
var relevantData = new byte[count];
Array.Copy(data, offset, relevantData, 0, count);
- strData = DataInterpreterBytes(relevantData);
+ strData = _parameters.DataInterpreterBytes(relevantData);
}
catch(Exception e)
{
- log.Write(LogLevel.Error, $"Socket {Id} unhandled exception during byte data interpretation: " + e.ToLogString());
+ _log.Write(LogLevel.Error, $"Socket {Id} unhandled exception during byte data interpretation: " + e.ToLogString());
return;
}
}
else
- strData = _encoding.GetString(data, offset, count);
+ strData = _parameters.Encoding.GetString(data, offset, count);
- if (DataInterpreterString != null)
+ if (_parameters.DataInterpreterString != null)
{
try
{
- strData = DataInterpreterString(strData);
+ strData = _parameters.DataInterpreterString(strData);
}
catch(Exception e)
{
- log.Write(LogLevel.Error, $"Socket {Id} unhandled exception during string data interpretation: " + e.ToLogString());
+ _log.Write(LogLevel.Error, $"Socket {Id} unhandled exception during string data interpretation: " + e.ToLogString());
return;
}
}
try
{
+ LastActionTime = DateTime.UtcNow;
OnMessage?.Invoke(strData);
}
catch(Exception e)
{
- log.Write(LogLevel.Error, $"Socket {Id} unhandled exception during message processing: " + e.ToLogString());
+ _log.Write(LogLevel.Error, $"Socket {Id} unhandled exception during message processing: " + e.ToLogString());
}
}
+ ///
+ /// Trigger the OnMessage event
+ ///
+ ///
+ protected void TriggerOnMessage(string data)
+ {
+ LastActionTime = DateTime.UtcNow;
+ OnMessage?.Invoke(data);
+ }
+
+ ///
+ /// Trigger the OnError event
+ ///
+ ///
+ protected void TriggerOnError(Exception ex) => OnError?.Invoke(ex);
+
+ ///
+ /// Trigger the OnError event
+ ///
+ protected void TriggerOnOpen() => OnOpen?.Invoke();
+
+ ///
+ /// Trigger the OnError event
+ ///
+ protected void TriggerOnClose() => OnClose?.Invoke();
+
+ ///
+ /// Trigger the OnReconnecting event
+ ///
+ protected void TriggerOnReconnecting() => OnReconnecting?.Invoke();
+
+ ///
+ /// Trigger the OnReconnected event
+ ///
+ protected void TriggerOnReconnected() => OnReconnected?.Invoke();
+
///
/// Checks if there is no data received for a period longer than the specified timeout
///
///
protected async Task CheckTimeoutAsync()
{
- log.Write(LogLevel.Debug, $"Socket {Id} Starting task checking for no data received for {Timeout}");
+ _log.Write(LogLevel.Debug, $"Socket {Id} Starting task checking for no data received for {_parameters.Timeout}");
+ LastActionTime = DateTime.UtcNow;
try
{
while (true)
@@ -675,9 +651,9 @@ namespace CryptoExchange.Net.Sockets
if (_ctsSource.IsCancellationRequested)
return;
- if (DateTime.UtcNow - LastActionTime > Timeout)
+ if (DateTime.UtcNow - LastActionTime > _parameters.Timeout)
{
- log.Write(LogLevel.Warning, $"Socket {Id} No data received for {Timeout}, reconnecting socket");
+ _log.Write(LogLevel.Warning, $"Socket {Id} No data received for {_parameters.Timeout}, reconnecting socket");
_ = CloseAsync().ConfigureAwait(false);
return;
}
@@ -737,6 +713,27 @@ namespace CryptoExchange.Net.Sockets
_lastReceivedMessagesUpdate = checkTime;
}
}
+
+ ///
+ /// Set proxy on socket
+ ///
+ ///
+ ///
+ protected virtual void SetProxy(ApiProxy proxy)
+ {
+ if (!Uri.TryCreate($"{proxy.Host}:{proxy.Port}", UriKind.Absolute, out var uri))
+ throw new ArgumentException("Proxy settings invalid, {proxy.Host}:{proxy.Port} not a valid URI", nameof(proxy));
+
+ _socket.Options.Proxy = uri?.Scheme == null
+ ? _socket.Options.Proxy = new WebProxy(proxy.Host, proxy.Port)
+ : _socket.Options.Proxy = new WebProxy
+ {
+ Address = uri
+ };
+
+ if (proxy.Login != null)
+ _socket.Options.Proxy.Credentials = new NetworkCredential(proxy.Login, proxy.Password);
+ }
}
///
diff --git a/CryptoExchange.Net/Sockets/SocketConnection.cs b/CryptoExchange.Net/Sockets/SocketConnection.cs
index a143448..3e2e275 100644
--- a/CryptoExchange.Net/Sockets/SocketConnection.cs
+++ b/CryptoExchange.Net/Sockets/SocketConnection.cs
@@ -3,13 +3,13 @@ using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
-using System.Threading;
using System.Threading.Tasks;
using CryptoExchange.Net.Logging;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Microsoft.Extensions.Logging;
using CryptoExchange.Net.Objects;
+using System.Net.WebSockets;
namespace CryptoExchange.Net.Sockets
{
@@ -87,21 +87,6 @@ namespace CryptoExchange.Net.Sockets
///
public SocketApiClient ApiClient { get; set; }
- ///
- /// If the socket should be reconnected upon closing
- ///
- public bool ShouldReconnect { get; set; }
-
- ///
- /// Current reconnect try, reset when a successful connection is made
- ///
- public int ReconnectTry { get; set; }
-
- ///
- /// Current resubscribe try, reset when a successful connection is made
- ///
- public int ResubscribeTry { get; set; }
-
///
/// Time of disconnecting
///
@@ -124,23 +109,12 @@ namespace CryptoExchange.Net.Sockets
{
pausedActivity = value;
log.Write(LogLevel.Information, $"Socket {SocketId} Paused activity: " + value);
- if(pausedActivity) ActivityPaused?.Invoke();
- else ActivityUnpaused?.Invoke();
+ if(pausedActivity) _ = Task.Run(() => ActivityPaused?.Invoke());
+ else _ = Task.Run(() => ActivityUnpaused?.Invoke());
}
}
}
- private bool pausedActivity;
- private readonly List subscriptions;
- private readonly object subscriptionLock = new();
-
- private readonly Log log;
- private readonly BaseSocketClient socketClient;
-
- private readonly List pendingRequests;
-
- private SocketStatus _status;
-
///
/// Status of the socket connection
///
@@ -158,6 +132,17 @@ namespace CryptoExchange.Net.Sockets
}
}
+ private bool pausedActivity;
+ private readonly List subscriptions;
+ private readonly object subscriptionLock = new();
+
+ private readonly Log log;
+ private readonly BaseSocketClient socketClient;
+
+ private readonly List pendingRequests;
+
+ private SocketStatus _status;
+
///
/// The underlying websocket
///
@@ -176,36 +161,51 @@ namespace CryptoExchange.Net.Sockets
ApiClient = apiClient;
pendingRequests = new List();
-
subscriptions = new List();
+
_socket = socket;
-
-
- _socket.Timeout = client.ClientOptions.SocketNoDataTimeout;
- _socket.OnMessage += ProcessMessage;
- _socket.OnOpen += SocketOnOpen;
+ _socket.OnMessage += HandleMessage;
+ _socket.OnOpen += HandleOpen;
_socket.OnClose += HandleClose;
_socket.OnReconnecting += HandleReconnecting;
_socket.OnReconnected += HandleReconnected;
+ _socket.OnError += HandleError;
}
- private void HandleClose()
+ ///
+ /// Handler for a socket opening
+ ///
+ protected virtual void HandleOpen()
+ {
+ Status = SocketStatus.Connected;
+ PausedActivity = false;
+ }
+
+ ///
+ /// Handler for a socket closing without reconnect
+ ///
+ protected virtual void HandleClose()
{
Status = SocketStatus.Closed;
- ConnectionClosed?.Invoke();
+ Task.Run(() => ConnectionClosed?.Invoke());
}
- private void HandleReconnecting()
+ ///
+ /// Handler for a socket losing conenction and starting reconnect
+ ///
+ protected virtual void HandleReconnecting()
{
Status = SocketStatus.Reconnecting;
DisconnectTime = DateTime.UtcNow;
Task.Run(() => ConnectionLost?.Invoke());
}
- private async void HandleReconnected()
+ ///
+ /// Handler for a socket which has reconnected
+ ///
+ protected virtual async void HandleReconnected()
{
- log.Write(LogLevel.Debug, "Socket reconnected, processing");
-
+ Status = SocketStatus.Resubscribing;
lock (pendingRequests)
{
foreach (var pendingRequest in pendingRequests.ToList())
@@ -215,44 +215,112 @@ namespace CryptoExchange.Net.Sockets
}
}
- // TODO Track amount of failed reconencts and failed resubscriptions
-
var reconnectSuccessful = await ProcessReconnectAsync().ConfigureAwait(false);
if (!reconnectSuccessful)
await _socket.ReconnectAsync().ConfigureAwait(false);
else
{
Status = SocketStatus.Connected;
- ConnectionRestored?.Invoke(DateTime.UtcNow - DisconnectTime!.Value);
+ _ = Task.Run(() =>
+ {
+ ConnectionRestored?.Invoke(DateTime.UtcNow - DisconnectTime!.Value);
+ DisconnectTime = null;
+ });
}
}
-
+
///
- /// Connect the websocket and start processing
+ /// Handler for an error on a websocket
+ ///
+ /// The exception
+ protected virtual void HandleError(Exception e)
+ {
+ if (e is WebSocketException wse)
+ log.Write(LogLevel.Warning, $"Socket {SocketId} error: Websocket error code {wse.WebSocketErrorCode}, details: " + e.ToLogString());
+ else
+ log.Write(LogLevel.Warning, $"Socket {SocketId} error: " + e.ToLogString());
+ }
+
+ ///
+ /// Process a message received by the socket
+ ///
+ /// The received data
+ protected virtual void HandleMessage(string data)
+ {
+ var timestamp = DateTime.UtcNow;
+ log.Write(LogLevel.Trace, $"Socket {SocketId} received data: " + data);
+ if (string.IsNullOrEmpty(data)) return;
+
+ var tokenData = data.ToJToken(log);
+ if (tokenData == null)
+ {
+ data = $"\"{data}\"";
+ tokenData = data.ToJToken(log);
+ if (tokenData == null)
+ return;
+ }
+
+ var handledResponse = false;
+
+ // Remove any timed out requests
+ PendingRequest[] requests;
+ lock (pendingRequests)
+ {
+ pendingRequests.RemoveAll(r => r.Completed);
+ requests = pendingRequests.ToArray();
+ }
+
+ // Check if this message is an answer on any pending requests
+ foreach (var pendingRequest in requests)
+ {
+ if (pendingRequest.CheckData(tokenData))
+ {
+ lock (pendingRequests)
+ pendingRequests.Remove(pendingRequest);
+
+ if (!socketClient.ContinueOnQueryResponse)
+ return;
+
+ handledResponse = true;
+ break;
+ }
+ }
+
+ // Message was not a request response, check data handlers
+ var messageEvent = new MessageEvent(this, tokenData, socketClient.ClientOptions.OutputOriginalData ? data : null, timestamp);
+ var (handled, userProcessTime, subscription) = HandleData(messageEvent);
+ if (!handled && !handledResponse)
+ {
+ if (!socketClient.UnhandledMessageExpected)
+ log.Write(LogLevel.Warning, $"Socket {SocketId} Message not handled: " + tokenData);
+ UnhandledMessage?.Invoke(tokenData);
+ }
+
+ var total = DateTime.UtcNow - timestamp;
+ if (userProcessTime.TotalMilliseconds > 500)
+ log.Write(LogLevel.Debug, $"Socket {SocketId}{(subscription == null ? "" : " subscription " + subscription!.Id)} message processing slow ({(int)total.TotalMilliseconds}ms, {(int)userProcessTime.TotalMilliseconds}ms user code), consider offloading data handling to another thread. " +
+ "Data from this socket may arrive late or not at all if message processing is continuously slow.");
+
+ log.Write(LogLevel.Trace, $"Socket {SocketId}{(subscription == null ? "" : " subscription " + subscription!.Id)} message processed in {(int)total.TotalMilliseconds}ms, ({(int)userProcessTime.TotalMilliseconds}ms user code)");
+ }
+
+ ///
+ /// Connect the websocket
///
///
- public async Task ConnectAsync()
- {
- return await _socket.ConnectAsync().ConfigureAwait(false);
- }
+ public async Task ConnectAsync() => await _socket.ConnectAsync().ConfigureAwait(false);
///
/// Retrieve the underlying socket
///
///
- public IWebsocket GetSocket()
- {
- return _socket;
- }
+ public IWebsocket GetSocket() => _socket;
///
/// Trigger a reconnect of the socket connection
///
///
- public async Task TriggerReconnectAsync()
- {
- await _socket.ReconnectAsync().ConfigureAwait(false);
- }
+ public async Task TriggerReconnectAsync() => await _socket.ReconnectAsync().ConfigureAwait(false);
///
/// Close the connection
@@ -263,7 +331,6 @@ namespace CryptoExchange.Net.Sockets
if (Status == SocketStatus.Closed || Status == SocketStatus.Disposed)
return;
- ShouldReconnect = false;
if (socketClient.socketConnections.ContainsKey(SocketId))
socketClient.socketConnections.TryRemove(SocketId, out _);
@@ -276,10 +343,6 @@ namespace CryptoExchange.Net.Sockets
}
}
- while (Status == SocketStatus.Reconnecting)
- // Wait for reconnecting to finish
- await Task.Delay(100).ConfigureAwait(false);
-
await _socket.CloseAsync().ConfigureAwait(false);
_socket.Dispose();
}
@@ -291,6 +354,14 @@ namespace CryptoExchange.Net.Sockets
///
public async Task CloseAsync(SocketSubscription subscription)
{
+ lock (subscriptionLock)
+ {
+ if (!subscriptions.Contains(subscription))
+ return;
+
+ subscriptions.Remove(subscription);
+ }
+
if (Status == SocketStatus.Closing || Status == SocketStatus.Closed || Status == SocketStatus.Disposed)
return;
@@ -310,7 +381,7 @@ namespace CryptoExchange.Net.Sockets
return;
}
- shouldCloseConnection = subscriptions.All(r => !r.UserSubscription || r == subscription);
+ shouldCloseConnection = subscriptions.All(r => !r.UserSubscription);
if (shouldCloseConnection)
Status = SocketStatus.Closing;
}
@@ -320,139 +391,8 @@ namespace CryptoExchange.Net.Sockets
log.Write(LogLevel.Trace, $"Socket {SocketId} closing as there are no more subscriptions");
await CloseAsync().ConfigureAwait(false);
}
-
- lock (subscriptionLock)
- subscriptions.Remove(subscription);
}
- //private async Task ReconnectAsync()
- //{
- // // Fail all pending requests
- // lock (pendingRequests)
- // {
- // foreach (var pendingRequest in pendingRequests.ToList())
- // {
- // pendingRequest.Fail();
- // pendingRequests.Remove(pendingRequest);
- // }
- // }
-
- // if (socketClient.ClientOptions.AutoReconnect && ShouldReconnect)
- // {
- // // Should reconnect
- // DisconnectTime = DateTime.UtcNow;
- // log.Write(LogLevel.Warning, $"Socket {SocketId} Connection lost, will try to reconnect");
- // if (!lostTriggered)
- // {
- // lostTriggered = true;
- // _ = Task.Run(() => ConnectionLost?.Invoke());
- // }
-
- // while (ShouldReconnect)
- // {
- // if (ReconnectTry > 0)
- // {
- // // Wait a bit before attempting reconnect
- // await Task.Delay(socketClient.ClientOptions.ReconnectInterval).ConfigureAwait(false);
- // }
-
- // if (!ShouldReconnect)
- // {
- // // Should reconnect changed to false while waiting to reconnect
- // return;
- // }
-
- // _socket.Reset();
- // if (!await _socket.ConnectAsync().ConfigureAwait(false))
- // {
- // // Reconnect failed
- // ReconnectTry++;
- // ResubscribeTry = 0;
- // if (socketClient.ClientOptions.MaxReconnectTries != null
- // && ReconnectTry >= socketClient.ClientOptions.MaxReconnectTries)
- // {
- // log.Write(LogLevel.Warning, $"Socket {SocketId} failed to reconnect after {ReconnectTry} tries, closing");
- // ShouldReconnect = false;
-
- // if (socketClient.socketConnections.ContainsKey(SocketId))
- // socketClient.socketConnections.TryRemove(SocketId, out _);
-
- // _ = Task.Run(() => ConnectionClosed?.Invoke());
- // // Reached max tries, break loop and leave connection closed
- // break;
- // }
-
- // // Continue to try again
- // log.Write(LogLevel.Debug, $"Socket {SocketId} failed to reconnect{(socketClient.ClientOptions.MaxReconnectTries != null ? $", try {ReconnectTry}/{socketClient.ClientOptions.MaxReconnectTries}" : "")}, will try again in {socketClient.ClientOptions.ReconnectInterval}");
- // continue;
- // }
-
- // // Successfully reconnected, start processing
- // Status = SocketStatus.Connected;
-
- // ReconnectTry = 0;
- // var time = DisconnectTime;
- // DisconnectTime = null;
-
- // log.Write(LogLevel.Information, $"Socket {SocketId} reconnected after {DateTime.UtcNow - time}");
-
- // var reconnectResult = await ProcessReconnectAsync().ConfigureAwait(false);
- // if (!reconnectResult)
- // {
- // // Failed to resubscribe everything
- // ResubscribeTry++;
- // DisconnectTime = time;
-
- // if (socketClient.ClientOptions.MaxResubscribeTries != null &&
- // ResubscribeTry >= socketClient.ClientOptions.MaxResubscribeTries)
- // {
- // log.Write(LogLevel.Warning, $"Socket {SocketId} failed to resubscribe after {ResubscribeTry} tries, closing. Last resubscription error: {reconnectResult.Error}");
- // ShouldReconnect = false;
-
- // if (socketClient.socketConnections.ContainsKey(SocketId))
- // socketClient.socketConnections.TryRemove(SocketId, out _);
-
- // _ = Task.Run(() => ConnectionClosed?.Invoke());
- // }
- // else
- // log.Write(LogLevel.Debug, $"Socket {SocketId} resubscribing all subscriptions failed on reconnected socket{(socketClient.ClientOptions.MaxResubscribeTries != null ? $", try {ResubscribeTry}/{socketClient.ClientOptions.MaxResubscribeTries}" : "")}. Error: {reconnectResult.Error}. Disconnecting and reconnecting.");
-
- // // Failed resubscribe, close socket if it is still open
- // if (_socket.IsOpen)
- // await _socket.CloseAsync().ConfigureAwait(false);
- // else
- // DisconnectTime = DateTime.UtcNow;
-
- // // Break out of the loop, the new processing task should reconnect again
- // break;
- // }
- // else
- // {
- // // Succesfully reconnected
- // log.Write(LogLevel.Information, $"Socket {SocketId} data connection restored.");
- // ResubscribeTry = 0;
- // if (lostTriggered)
- // {
- // lostTriggered = false;
- // _ = Task.Run(() => ConnectionRestored?.Invoke(time.HasValue ? DateTime.UtcNow - time.Value : TimeSpan.FromSeconds(0)));
- // }
-
- // break;
- // }
- // }
- // }
- // else
- // {
- // if (!socketClient.ClientOptions.AutoReconnect && ShouldReconnect)
- // _ = Task.Run(() => ConnectionClosed?.Invoke());
-
- // // No reconnecting needed
- // log.Write(LogLevel.Information, $"Socket {SocketId} closed");
- // if (socketClient.socketConnections.ContainsKey(SocketId))
- // socketClient.socketConnections.TryRemove(SocketId, out _);
- // }
- //}
-
///
/// Dispose the connection
///
@@ -462,71 +402,6 @@ namespace CryptoExchange.Net.Sockets
_socket.Dispose();
}
- ///
- /// Process a message received by the socket
- ///
- /// The received data
- private void ProcessMessage(string data)
- {
- var timestamp = DateTime.UtcNow;
- log.Write(LogLevel.Trace, $"Socket {SocketId} received data: " + data);
- if (string.IsNullOrEmpty(data)) return;
-
- var tokenData = data.ToJToken(log);
- if (tokenData == null)
- {
- data = $"\"{data}\"";
- tokenData = data.ToJToken(log);
- if (tokenData == null)
- return;
- }
-
- var handledResponse = false;
- PendingRequest[] requests;
- lock(pendingRequests)
- requests = pendingRequests.ToArray();
-
- // Remove any timed out requests
- foreach (var request in requests.Where(r => r.Completed))
- {
- lock (pendingRequests)
- pendingRequests.Remove(request);
- }
-
- // Check if this message is an answer on any pending requests
- foreach (var pendingRequest in requests)
- {
- if (pendingRequest.CheckData(tokenData))
- {
- lock (pendingRequests)
- pendingRequests.Remove(pendingRequest);
-
- if (!socketClient.ContinueOnQueryResponse)
- return;
-
- handledResponse = true;
- break;
- }
- }
-
- // Message was not a request response, check data handlers
- var messageEvent = new MessageEvent(this, tokenData, socketClient.ClientOptions.OutputOriginalData ? data: null, timestamp);
- var (handled, userProcessTime) = HandleData(messageEvent);
- if (!handled && !handledResponse)
- {
- if (!socketClient.UnhandledMessageExpected)
- log.Write(LogLevel.Warning, $"Socket {SocketId} Message not handled: " + tokenData);
- UnhandledMessage?.Invoke(tokenData);
- }
-
- var total = DateTime.UtcNow - timestamp;
- if (userProcessTime.TotalMilliseconds > 500)
- log.Write(LogLevel.Debug, $"Socket {SocketId} message processing slow ({(int)total.TotalMilliseconds}ms, {(int)userProcessTime.TotalMilliseconds}ms user code), consider offloading data handling to another thread. " +
- "Data from this socket may arrive late or not at all if message processing is continuously slow.");
-
- log.Write(LogLevel.Trace, $"Socket {SocketId} message processed in {(int)total.TotalMilliseconds}ms, ({(int)userProcessTime.TotalMilliseconds}ms user code)");
- }
-
///
/// Add a subscription to this connection
///
@@ -570,7 +445,7 @@ namespace CryptoExchange.Net.Sockets
///
///
/// True if the data was successfully handled
- private (bool, TimeSpan) HandleData(MessageEvent messageEvent)
+ private (bool, TimeSpan, SocketSubscription?) HandleData(MessageEvent messageEvent)
{
SocketSubscription? currentSubscription = null;
try
@@ -611,13 +486,13 @@ namespace CryptoExchange.Net.Sockets
}
}
- return (handled, userCodeDuration);
+ return (handled, userCodeDuration, currentSubscription);
}
catch (Exception ex)
{
log.Write(LogLevel.Error, $"Socket {SocketId} Exception during message processing\r\nException: {ex.ToLogString()}\r\nData: {messageEvent.JsonData}");
currentSubscription?.InvokeExceptionHandler(ex);
- return (false, TimeSpan.Zero);
+ return (false, TimeSpan.Zero, null);
}
}
@@ -675,37 +550,23 @@ namespace CryptoExchange.Net.Sockets
}
}
- ///
- /// Handler for a socket opening
- ///
- protected virtual void SocketOnOpen()
- {
- Status = SocketStatus.Connected;
- ReconnectTry = 0;
- PausedActivity = false;
- }
-
- //private async Task ReconnectWatcherAsync()
- //{
- // while (true)
- // {
- // await _reconnectWaitEvent.WaitAsync().ConfigureAwait(false);
- // if (!ShouldReconnect)
- // return;
-
- // Status = SocketStatus.Reconnecting;
- // await ReconnectAsync().ConfigureAwait(false);
-
- // if (!ShouldReconnect)
- // return;
- // }
- //}
-
private async Task> ProcessReconnectAsync()
{
if (!_socket.IsOpen)
return new CallResult(new WebError("Socket not connected"));
+ bool anySubscriptions = false;
+ lock (subscriptionLock)
+ anySubscriptions = subscriptions.Any(s => s.UserSubscription);
+
+ if (!anySubscriptions)
+ {
+ // No need to resubscribe anything
+ log.Write(LogLevel.Debug, $"Socket {SocketId} Nothing to resubscribe, closing connection");
+ _ = _socket.CloseAsync();
+ return new CallResult(true);
+ }
+
if (Authenticated)
{
// If we reconnected a authenticated connection we need to re-authenticate
@@ -777,6 +638,10 @@ namespace CryptoExchange.Net.Sockets
///
Reconnecting,
///
+ /// Resubscribing on reconnected socket
+ ///
+ Resubscribing,
+ ///
/// Closing
///
Closing,
diff --git a/CryptoExchange.Net/Sockets/WebSocketParameters.cs b/CryptoExchange.Net/Sockets/WebSocketParameters.cs
new file mode 100644
index 0000000..7fe0330
--- /dev/null
+++ b/CryptoExchange.Net/Sockets/WebSocketParameters.cs
@@ -0,0 +1,79 @@
+using CryptoExchange.Net.Objects;
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace CryptoExchange.Net.Sockets
+{
+ ///
+ /// Parameters for a websocket
+ ///
+ public class WebSocketParameters
+ {
+ ///
+ /// The uri to connect to
+ ///
+ public Uri Uri { get; set; }
+ ///
+ /// Headers to send in the connection handshake
+ ///
+ public IDictionary Headers { get; set; } = new Dictionary();
+ ///
+ /// Cookies to send in the connection handshake
+ ///
+ public IDictionary Cookies { get; set; } = new Dictionary();
+ ///
+ /// The time to wait between reconnect attempts
+ ///
+ public TimeSpan ReconnectInterval { get; set; } = TimeSpan.FromSeconds(5);
+ ///
+ /// Proxy for the connection
+ ///
+ public ApiProxy? Proxy { get; set; }
+ ///
+ /// Whether the socket should automatically reconnect when connection is lost
+ ///
+ public bool AutoReconnect { get; set; }
+ ///
+ /// The maximum time of no data received before considering the connection lost and closting/reconnecting the socket
+ ///
+ public TimeSpan? Timeout { get; set; }
+ ///
+ /// Interval at which to send ping frames
+ ///
+ public TimeSpan? KeepAliveInterval { get; set; }
+ ///
+ /// The max amount of messages to send per second
+ ///
+ public int? RatelimitPerSecond { get; set; }
+ ///
+ /// Origin header value to send in the connection handshake
+ ///
+ public string? Origin { get; set; }
+ ///
+ /// Delegate used for processing byte data received from socket connections before it is processed by handlers
+ ///
+ public Func? DataInterpreterBytes { get; set; }
+
+ ///
+ /// Delegate used for processing string data received from socket connections before it is processed by handlers
+ ///
+ public Func? DataInterpreterString { get; set; }
+
+ ///
+ /// Encoding for sending/receiving data
+ ///
+ public Encoding Encoding { get; set; } = Encoding.UTF8;
+
+ ///
+ /// ctor
+ ///
+ /// Uri
+ /// Auto reconnect
+ public WebSocketParameters(Uri uri, bool autoReconnect)
+ {
+ Uri = uri;
+ AutoReconnect = autoReconnect;
+ }
+ }
+}
diff --git a/CryptoExchange.Net/Sockets/WebsocketFactory.cs b/CryptoExchange.Net/Sockets/WebsocketFactory.cs
index 06384cd..cd54111 100644
--- a/CryptoExchange.Net/Sockets/WebsocketFactory.cs
+++ b/CryptoExchange.Net/Sockets/WebsocketFactory.cs
@@ -1,6 +1,4 @@
-using System;
-using System.Collections.Generic;
-using CryptoExchange.Net.Interfaces;
+using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Logging;
namespace CryptoExchange.Net.Sockets
@@ -11,15 +9,9 @@ namespace CryptoExchange.Net.Sockets
public class WebsocketFactory : IWebsocketFactory
{
///
- public IWebsocket CreateWebsocket(Log log, string url)
+ public IWebsocket CreateWebsocket(Log log, WebSocketParameters parameters)
{
- return new CryptoExchangeWebSocketClient(log, new Uri(url));
- }
-
- ///
- public IWebsocket CreateWebsocket(Log log, string url, IDictionary cookies, IDictionary headers)
- {
- return new CryptoExchangeWebSocketClient(log, new Uri(url), cookies, headers);
+ return new CryptoExchangeWebSocketClient(log, parameters);
}
}
}