mirror of
https://github.com/JKorf/CryptoExchange.Net
synced 2025-06-08 16:36:15 +00:00
wip
This commit is contained in:
parent
91e33cc42c
commit
89b517c936
@ -3,7 +3,6 @@ using System.Collections.Concurrent;
|
||||
using System.Collections.Generic;
|
||||
using System.Diagnostics.CodeAnalysis;
|
||||
using System.Linq;
|
||||
using System.Net.WebSockets;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using CryptoExchange.Net.Authentication;
|
||||
@ -11,7 +10,6 @@ using CryptoExchange.Net.Interfaces;
|
||||
using CryptoExchange.Net.Objects;
|
||||
using CryptoExchange.Net.Sockets;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Newtonsoft.Json;
|
||||
using Newtonsoft.Json.Linq;
|
||||
|
||||
namespace CryptoExchange.Net
|
||||
@ -120,10 +118,7 @@ namespace CryptoExchange.Net
|
||||
/// <param name="options">The options for this client</param>
|
||||
protected BaseSocketClient(string name, BaseSocketClientOptions options) : base(name, options)
|
||||
{
|
||||
if (options == null)
|
||||
throw new ArgumentNullException(nameof(options));
|
||||
|
||||
ClientOptions = options;
|
||||
ClientOptions = options ?? throw new ArgumentNullException(nameof(options));
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
@ -240,6 +235,7 @@ namespace CryptoExchange.Net
|
||||
var subResult = await SubscribeAndWaitAsync(socketConnection, request, subscription).ConfigureAwait(false);
|
||||
if (!subResult)
|
||||
{
|
||||
log.Write(LogLevel.Information, $"Socket {socketConnection.SocketId} failed to subscribe: {subResult.Error}");
|
||||
await socketConnection.CloseAsync(subscription).ConfigureAwait(false);
|
||||
return new CallResult<UpdateSubscription>(subResult.Error!);
|
||||
}
|
||||
@ -250,7 +246,6 @@ namespace CryptoExchange.Net
|
||||
subscription.Confirmed = true;
|
||||
}
|
||||
|
||||
socketConnection.ShouldReconnect = true;
|
||||
if (ct != default)
|
||||
{
|
||||
subscription.CancellationTokenRegistration = ct.Register(async () =>
|
||||
@ -260,7 +255,7 @@ namespace CryptoExchange.Net
|
||||
}, false);
|
||||
}
|
||||
|
||||
log.Write(LogLevel.Information, $"Socket {socketConnection.SocketId} subscription completed");
|
||||
log.Write(LogLevel.Information, $"Socket {socketConnection.SocketId} subscription {subscription.Id} completed successfully");
|
||||
return new CallResult<UpdateSubscription>(new UpdateSubscription(socketConnection, subscription));
|
||||
}
|
||||
|
||||
@ -334,8 +329,6 @@ namespace CryptoExchange.Net
|
||||
}
|
||||
finally
|
||||
{
|
||||
//When the task is ready, release the semaphore. It is vital to ALWAYS release the semaphore when we are ready, or else we will end up with a Semaphore that is forever locked.
|
||||
//This is why it is important to do the Release within a try...finally clause; program execution may crash or take a different path, this way you are guaranteed execution
|
||||
if (!released)
|
||||
semaphoreSlim.Release();
|
||||
}
|
||||
@ -585,6 +578,23 @@ namespace CryptoExchange.Net
|
||||
return new CallResult<bool>(new CantConnectError());
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Get parameters for the websocket connection
|
||||
/// </summary>
|
||||
/// <param name="address">The address to connect to</param>
|
||||
/// <returns></returns>
|
||||
protected virtual WebSocketParameters GetWebSocketParameters(string address)
|
||||
=> new (new Uri(address), ClientOptions.AutoReconnect)
|
||||
{
|
||||
DataInterpreterBytes = dataInterpreterBytes,
|
||||
DataInterpreterString = dataInterpreterString,
|
||||
KeepAliveInterval = KeepAliveInterval,
|
||||
ReconnectInterval = ClientOptions.ReconnectInterval,
|
||||
RatelimitPerSecond = RateLimitPerSocketPerSecond,
|
||||
Proxy = ClientOptions.Proxy,
|
||||
Timeout = ClientOptions.SocketNoDataTimeout
|
||||
};
|
||||
|
||||
/// <summary>
|
||||
/// Create a socket for an address
|
||||
/// </summary>
|
||||
@ -592,24 +602,8 @@ namespace CryptoExchange.Net
|
||||
/// <returns></returns>
|
||||
protected virtual IWebsocket CreateSocket(string address)
|
||||
{
|
||||
var socket = SocketFactory.CreateWebsocket(log, address);
|
||||
var socket = SocketFactory.CreateWebsocket(log, GetWebSocketParameters(address));
|
||||
log.Write(LogLevel.Debug, $"Socket {socket.Id} new socket created for " + address);
|
||||
|
||||
if (ClientOptions.Proxy != null)
|
||||
socket.SetProxy(ClientOptions.Proxy);
|
||||
|
||||
socket.KeepAliveInterval = KeepAliveInterval;
|
||||
socket.Timeout = ClientOptions.SocketNoDataTimeout;
|
||||
socket.DataInterpreterBytes = dataInterpreterBytes;
|
||||
socket.DataInterpreterString = dataInterpreterString;
|
||||
socket.RatelimitPerSecond = RateLimitPerSocketPerSecond;
|
||||
socket.OnError += e =>
|
||||
{
|
||||
if(e is WebSocketException wse)
|
||||
log.Write(LogLevel.Warning, $"Socket {socket.Id} error: Websocket error code {wse.WebSocketErrorCode}, details: " + e.ToLogString());
|
||||
else
|
||||
log.Write(LogLevel.Warning, $"Socket {socket.Id} error: " + e.ToLogString());
|
||||
};
|
||||
return socket;
|
||||
}
|
||||
|
||||
@ -667,7 +661,6 @@ namespace CryptoExchange.Net
|
||||
/// <returns></returns>
|
||||
public virtual async Task UnsubscribeAsync(int subscriptionId)
|
||||
{
|
||||
|
||||
SocketSubscription? subscription = null;
|
||||
SocketConnection? connection = null;
|
||||
foreach(var socket in socketConnections.Values.ToList())
|
||||
@ -683,7 +676,7 @@ namespace CryptoExchange.Net
|
||||
if (subscription == null || connection == null)
|
||||
return;
|
||||
|
||||
log.Write(LogLevel.Information, "Closing subscription " + subscriptionId);
|
||||
log.Write(LogLevel.Information, $"Socket {connection.SocketId} Unsubscribing subscription " + subscriptionId);
|
||||
await connection.CloseAsync(subscription).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
@ -697,7 +690,7 @@ namespace CryptoExchange.Net
|
||||
if (subscription == null)
|
||||
throw new ArgumentNullException(nameof(subscription));
|
||||
|
||||
log.Write(LogLevel.Information, "Closing subscription " + subscription.Id);
|
||||
log.Write(LogLevel.Information, $"Socket {subscription.SocketId} Unsubscribing subscription " + subscription.Id);
|
||||
await subscription.CloseAsync().ConfigureAwait(false);
|
||||
}
|
||||
|
||||
|
@ -41,30 +41,10 @@ namespace CryptoExchange.Net.Interfaces
|
||||
/// </summary>
|
||||
int Id { get; }
|
||||
/// <summary>
|
||||
/// Origin header
|
||||
/// </summary>
|
||||
string? Origin { get; set; }
|
||||
/// <summary>
|
||||
/// Encoding to use for sending/receiving string data
|
||||
/// </summary>
|
||||
Encoding? Encoding { get; set; }
|
||||
/// <summary>
|
||||
/// The max amount of outgoing messages per second
|
||||
/// </summary>
|
||||
int? RatelimitPerSecond { get; set; }
|
||||
/// <summary>
|
||||
/// The current kilobytes per second of data being received, averaged over the last 3 seconds
|
||||
/// </summary>
|
||||
double IncomingKbps { get; }
|
||||
/// <summary>
|
||||
/// Handler for byte data
|
||||
/// </summary>
|
||||
Func<byte[], string>? DataInterpreterBytes { get; set; }
|
||||
/// <summary>
|
||||
/// Handler for string data
|
||||
/// </summary>
|
||||
Func<string, string>? DataInterpreterString { get; set; }
|
||||
/// <summary>
|
||||
/// The uri the socket connects to
|
||||
/// </summary>
|
||||
Uri Uri { get; }
|
||||
@ -77,23 +57,6 @@ namespace CryptoExchange.Net.Interfaces
|
||||
/// </summary>
|
||||
bool IsOpen { get; }
|
||||
/// <summary>
|
||||
/// Supported ssl protocols
|
||||
/// </summary>
|
||||
SslProtocols SSLProtocols { get; set; }
|
||||
/// <summary>
|
||||
/// The max time for no data being received before the connection is considered lost
|
||||
/// </summary>
|
||||
TimeSpan Timeout { get; set; }
|
||||
/// <summary>
|
||||
/// The interval at which to send a ping frame to the server
|
||||
/// </summary>
|
||||
TimeSpan KeepAliveInterval { get; set; }
|
||||
/// <summary>
|
||||
/// Set a proxy to use when connecting
|
||||
/// </summary>
|
||||
/// <param name="proxy"></param>
|
||||
void SetProxy(ApiProxy proxy);
|
||||
/// <summary>
|
||||
/// Connect the socket
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
|
@ -1,5 +1,5 @@
|
||||
using System.Collections.Generic;
|
||||
using CryptoExchange.Net.Logging;
|
||||
using CryptoExchange.Net.Logging;
|
||||
using CryptoExchange.Net.Sockets;
|
||||
|
||||
namespace CryptoExchange.Net.Interfaces
|
||||
{
|
||||
@ -12,17 +12,8 @@ namespace CryptoExchange.Net.Interfaces
|
||||
/// Create a websocket for an url
|
||||
/// </summary>
|
||||
/// <param name="log">The logger</param>
|
||||
/// <param name="url">The url the socket is fo</param>
|
||||
/// <param name="parameters">The parameters to use for the connection</param>
|
||||
/// <returns></returns>
|
||||
IWebsocket CreateWebsocket(Log log, string url);
|
||||
/// <summary>
|
||||
/// Create a websocket for an url
|
||||
/// </summary>
|
||||
/// <param name="log">The logger</param>
|
||||
/// <param name="url">The url the socket is fo</param>
|
||||
/// <param name="cookies">Cookies to be send in the initial request</param>
|
||||
/// <param name="headers">Headers to be send in the initial request</param>
|
||||
/// <returns></returns>
|
||||
IWebsocket CreateWebsocket(Log log, string url, IDictionary<string, string> cookies, IDictionary<string, string> headers);
|
||||
IWebsocket CreateWebsocket(Log log, WebSocketParameters parameters);
|
||||
}
|
||||
}
|
||||
|
@ -176,16 +176,6 @@ namespace CryptoExchange.Net.Objects
|
||||
/// </summary>
|
||||
public TimeSpan ReconnectInterval { get; set; } = TimeSpan.FromSeconds(5);
|
||||
|
||||
/// <summary>
|
||||
/// The maximum number of times to try to reconnect, default null will retry indefinitely
|
||||
/// </summary>
|
||||
public int? MaxReconnectTries { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// The maximum number of times to try to resubscribe after reconnecting
|
||||
/// </summary>
|
||||
public int? MaxResubscribeTries { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Max number of concurrent resubscription tasks per socket after reconnecting a socket
|
||||
/// </summary>
|
||||
@ -232,8 +222,6 @@ namespace CryptoExchange.Net.Objects
|
||||
|
||||
AutoReconnect = baseOptions.AutoReconnect;
|
||||
ReconnectInterval = baseOptions.ReconnectInterval;
|
||||
MaxReconnectTries = baseOptions.MaxReconnectTries;
|
||||
MaxResubscribeTries = baseOptions.MaxResubscribeTries;
|
||||
MaxConcurrentResubscriptionsPerSocket = baseOptions.MaxConcurrentResubscriptionsPerSocket;
|
||||
SocketResponseTimeout = baseOptions.SocketResponseTimeout;
|
||||
SocketNoDataTimeout = baseOptions.SocketNoDataTimeout;
|
||||
@ -244,7 +232,7 @@ namespace CryptoExchange.Net.Objects
|
||||
/// <inheritdoc />
|
||||
public override string ToString()
|
||||
{
|
||||
return $"{base.ToString()}, AutoReconnect: {AutoReconnect}, ReconnectInterval: {ReconnectInterval}, MaxReconnectTries: {MaxReconnectTries}, MaxResubscribeTries: {MaxResubscribeTries}, MaxConcurrentResubscriptionsPerSocket: {MaxConcurrentResubscriptionsPerSocket}, SocketResponseTimeout: {SocketResponseTimeout:c}, SocketNoDataTimeout: {SocketNoDataTimeout}, SocketSubscriptionsCombineTarget: {SocketSubscriptionsCombineTarget}, MaxSocketConnections: {MaxSocketConnections}";
|
||||
return $"{base.ToString()}, AutoReconnect: {AutoReconnect}, ReconnectInterval: {ReconnectInterval}, MaxConcurrentResubscriptionsPerSocket: {MaxConcurrentResubscriptionsPerSocket}, SocketResponseTimeout: {SocketResponseTimeout:c}, SocketNoDataTimeout: {SocketNoDataTimeout}, SocketSubscriptionsCombineTarget: {SocketSubscriptionsCombineTarget}, MaxSocketConnections: {MaxSocketConnections}";
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -5,13 +5,10 @@ using Microsoft.Extensions.Logging;
|
||||
using System;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Collections.Generic;
|
||||
using System.Diagnostics;
|
||||
using System.IO;
|
||||
using System.Linq;
|
||||
using System.Net;
|
||||
using System.Net.WebSockets;
|
||||
using System.Security.Authentication;
|
||||
using System.Text;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
@ -22,7 +19,6 @@ namespace CryptoExchange.Net.Sockets
|
||||
/// </summary>
|
||||
public class CryptoExchangeWebSocketClient : IWebsocket
|
||||
{
|
||||
// TODO keep the same ID's for subscriptions/sockets when reconnecting
|
||||
enum ProcessState
|
||||
{
|
||||
Idle,
|
||||
@ -41,23 +37,21 @@ namespace CryptoExchange.Net.Sockets
|
||||
internal static int lastStreamId;
|
||||
private static readonly object streamIdLock = new();
|
||||
|
||||
private ClientWebSocket _socket;
|
||||
private readonly AsyncResetEvent _sendEvent;
|
||||
private readonly ConcurrentQueue<byte[]> _sendBuffer;
|
||||
private readonly IDictionary<string, string> cookies;
|
||||
private readonly IDictionary<string, string> headers;
|
||||
private CancellationTokenSource _ctsSource;
|
||||
private ApiProxy? _proxy;
|
||||
|
||||
private readonly SemaphoreSlim _closeSem;
|
||||
private readonly WebSocketParameters _parameters;
|
||||
private readonly List<DateTime> _outgoingMessages;
|
||||
|
||||
private ClientWebSocket _socket;
|
||||
private CancellationTokenSource _ctsSource;
|
||||
private DateTime _lastReceivedMessagesUpdate;
|
||||
private Task? _processTask;
|
||||
private Task? _closeTask;
|
||||
private bool _stopRequested;
|
||||
private bool _disposed;
|
||||
private ProcessState _processState;
|
||||
//private CloseState _closeState;
|
||||
private SemaphoreSlim _closeSem;
|
||||
|
||||
|
||||
/// <summary>
|
||||
/// Received messages, the size and the timstamp
|
||||
@ -72,31 +66,18 @@ namespace CryptoExchange.Net.Sockets
|
||||
/// <summary>
|
||||
/// Log
|
||||
/// </summary>
|
||||
protected Log log;
|
||||
protected Log _log;
|
||||
|
||||
/// <inheritdoc />
|
||||
public int Id { get; }
|
||||
|
||||
/// <inheritdoc />
|
||||
public string? Origin { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// The timestamp this socket has been active for the last time
|
||||
/// </summary>
|
||||
public DateTime LastActionTime { get; private set; }
|
||||
|
||||
/// <summary>
|
||||
/// Delegate used for processing byte data received from socket connections before it is processed by handlers
|
||||
/// </summary>
|
||||
public Func<byte[], string>? DataInterpreterBytes { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Delegate used for processing string data received from socket connections before it is processed by handlers
|
||||
/// </summary>
|
||||
public Func<string, string>? DataInterpreterString { get; set; }
|
||||
|
||||
/// <inheritdoc />
|
||||
public Uri Uri { get; }
|
||||
public Uri Uri => _parameters.Uri;
|
||||
|
||||
/// <inheritdoc />
|
||||
public bool IsClosed => _socket.State == WebSocketState.Closed;
|
||||
@ -104,34 +85,6 @@ namespace CryptoExchange.Net.Sockets
|
||||
/// <inheritdoc />
|
||||
public bool IsOpen => _socket.State == WebSocketState.Open && !_ctsSource.IsCancellationRequested;
|
||||
|
||||
/// <summary>
|
||||
/// Ssl protocols supported. NOT USED BY THIS IMPLEMENTATION
|
||||
/// </summary>
|
||||
public SslProtocols SSLProtocols { get; set; }
|
||||
|
||||
private Encoding _encoding = Encoding.UTF8;
|
||||
/// <inheritdoc />
|
||||
public Encoding? Encoding
|
||||
{
|
||||
get => _encoding;
|
||||
set
|
||||
{
|
||||
if(value != null)
|
||||
_encoding = value;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// The max amount of outgoing messages per second
|
||||
/// </summary>
|
||||
public int? RatelimitPerSecond { get; set; }
|
||||
|
||||
/// <inheritdoc />
|
||||
public TimeSpan Timeout { get; set; }
|
||||
|
||||
/// <inheritdoc />
|
||||
public TimeSpan KeepAliveInterval { get; set; }
|
||||
|
||||
/// <inheritdoc />
|
||||
public double IncomingKbps
|
||||
{
|
||||
@ -166,26 +119,13 @@ namespace CryptoExchange.Net.Sockets
|
||||
/// ctor
|
||||
/// </summary>
|
||||
/// <param name="log">The log object to use</param>
|
||||
/// <param name="uri">The uri the socket should connect to</param>
|
||||
public CryptoExchangeWebSocketClient(Log log, Uri uri) : this(log, uri, new Dictionary<string, string>(), new Dictionary<string, string>())
|
||||
{
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// ctor
|
||||
/// </summary>
|
||||
/// <param name="log">The log object to use</param>
|
||||
/// <param name="uri">The uri the socket should connect to</param>
|
||||
/// <param name="cookies">Cookies to sent in the socket connection request</param>
|
||||
/// <param name="headers">Headers to sent in the socket connection request</param>
|
||||
public CryptoExchangeWebSocketClient(Log log, Uri uri, IDictionary<string, string> cookies, IDictionary<string, string> headers)
|
||||
/// <param name="websocketParameters">The parameters for this socket</param>
|
||||
public CryptoExchangeWebSocketClient(Log log, WebSocketParameters websocketParameters)
|
||||
{
|
||||
Id = NextStreamId();
|
||||
this.log = log;
|
||||
Uri = uri;
|
||||
this.cookies = cookies;
|
||||
this.headers = headers;
|
||||
_log = log;
|
||||
|
||||
_parameters = websocketParameters;
|
||||
_outgoingMessages = new List<DateTime>();
|
||||
_receivedMessages = new List<ReceiveItem>();
|
||||
_sendEvent = new AsyncResetEvent();
|
||||
@ -197,25 +137,6 @@ namespace CryptoExchange.Net.Sockets
|
||||
_socket = CreateSocket();
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public virtual void SetProxy(ApiProxy proxy)
|
||||
{
|
||||
_proxy = proxy;
|
||||
|
||||
if (!Uri.TryCreate($"{proxy.Host}:{proxy.Port}", UriKind.Absolute, out var uri))
|
||||
throw new ArgumentException("Proxy settings invalid, {proxy.Host}:{proxy.Port} not a valid URI", nameof(proxy));
|
||||
|
||||
_socket.Options.Proxy = uri?.Scheme == null
|
||||
? _socket.Options.Proxy = new WebProxy(proxy.Host, proxy.Port)
|
||||
: _socket.Options.Proxy = new WebProxy
|
||||
{
|
||||
Address = uri
|
||||
};
|
||||
|
||||
if (proxy.Login != null)
|
||||
_socket.Options.Proxy.Credentials = new NetworkCredential(proxy.Login, proxy.Password);
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public virtual async Task<bool> ConnectAsync()
|
||||
{
|
||||
@ -227,9 +148,29 @@ namespace CryptoExchange.Net.Sockets
|
||||
return true;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Create the socket object
|
||||
/// </summary>
|
||||
private ClientWebSocket CreateSocket()
|
||||
{
|
||||
var cookieContainer = new CookieContainer();
|
||||
foreach (var cookie in _parameters.Cookies)
|
||||
cookieContainer.Add(new Cookie(cookie.Key, cookie.Value));
|
||||
|
||||
var socket = new ClientWebSocket();
|
||||
socket.Options.Cookies = cookieContainer;
|
||||
foreach (var header in _parameters.Headers)
|
||||
socket.Options.SetRequestHeader(header.Key, header.Value);
|
||||
socket.Options.KeepAliveInterval = _parameters.KeepAliveInterval ?? TimeSpan.Zero;
|
||||
socket.Options.SetBuffer(65536, 65536); // Setting it to anything bigger than 65536 throws an exception in .net framework
|
||||
if (_parameters.Proxy != null)
|
||||
SetProxy(_parameters.Proxy);
|
||||
return socket;
|
||||
}
|
||||
|
||||
private async Task<bool> ConnectInternalAsync()
|
||||
{
|
||||
log.Write(LogLevel.Debug, $"Socket {Id} connecting");
|
||||
_log.Write(LogLevel.Debug, $"Socket {Id} connecting");
|
||||
try
|
||||
{
|
||||
using CancellationTokenSource tcs = new(TimeSpan.FromSeconds(10));
|
||||
@ -237,11 +178,11 @@ namespace CryptoExchange.Net.Sockets
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
log.Write(LogLevel.Debug, $"Socket {Id} connection failed: " + e.ToLogString());
|
||||
_log.Write(LogLevel.Debug, $"Socket {Id} connection failed: " + e.ToLogString());
|
||||
return false;
|
||||
}
|
||||
|
||||
log.Write(LogLevel.Debug, $"Socket {Id} connected to {Uri}");
|
||||
_log.Write(LogLevel.Debug, $"Socket {Id} connected to {Uri}");
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -250,13 +191,13 @@ namespace CryptoExchange.Net.Sockets
|
||||
{
|
||||
while (!_stopRequested)
|
||||
{
|
||||
log.Write(LogLevel.Trace, $"Socket {Id} ProcessAsync started");
|
||||
_log.Write(LogLevel.Trace, $"Socket {Id} ProcessAsync started");
|
||||
_processState = ProcessState.Processing;
|
||||
var sendTask = SendLoopAsync();
|
||||
var receiveTask = ReceiveLoopAsync();
|
||||
var timeoutTask = Timeout != default ? CheckTimeoutAsync() : Task.CompletedTask;
|
||||
var timeoutTask = _parameters.Timeout != null && _parameters.Timeout > TimeSpan.FromSeconds(0) ? CheckTimeoutAsync() : Task.CompletedTask;
|
||||
await Task.WhenAll(sendTask, receiveTask, timeoutTask).ConfigureAwait(false);
|
||||
log.Write(LogLevel.Trace, $"Socket {Id} ProcessAsync finished");
|
||||
_log.Write(LogLevel.Trace, $"Socket {Id} ProcessAsync finished");
|
||||
|
||||
_processState = ProcessState.WaitingForClose;
|
||||
while (_closeTask == null)
|
||||
@ -264,7 +205,13 @@ namespace CryptoExchange.Net.Sockets
|
||||
|
||||
await _closeTask.ConfigureAwait(false);
|
||||
_closeTask = null;
|
||||
//_closeState = CloseState.Idle;
|
||||
|
||||
if (!_parameters.AutoReconnect)
|
||||
{
|
||||
_processState = ProcessState.Idle;
|
||||
OnClose?.Invoke();
|
||||
return;
|
||||
}
|
||||
|
||||
if (!_stopRequested)
|
||||
{
|
||||
@ -274,7 +221,7 @@ namespace CryptoExchange.Net.Sockets
|
||||
|
||||
while (!_stopRequested)
|
||||
{
|
||||
log.Write(LogLevel.Trace, $"Socket {Id} attempting to reconnect");
|
||||
_log.Write(LogLevel.Trace, $"Socket {Id} attempting to reconnect");
|
||||
_socket = CreateSocket();
|
||||
_ctsSource.Dispose();
|
||||
_ctsSource = new CancellationTokenSource();
|
||||
@ -283,7 +230,7 @@ namespace CryptoExchange.Net.Sockets
|
||||
var connected = await ConnectInternalAsync().ConfigureAwait(false);
|
||||
if (!connected)
|
||||
{
|
||||
await Task.Delay(5000).ConfigureAwait(false);
|
||||
await Task.Delay(_parameters.ReconnectInterval).ConfigureAwait(false);
|
||||
continue;
|
||||
}
|
||||
|
||||
@ -299,10 +246,10 @@ namespace CryptoExchange.Net.Sockets
|
||||
public virtual void Send(string data)
|
||||
{
|
||||
if (_ctsSource.IsCancellationRequested)
|
||||
throw new InvalidOperationException($"Socket {Id} Can't send data when socket is not connected");
|
||||
return;
|
||||
|
||||
var bytes = _encoding.GetBytes(data);
|
||||
log.Write(LogLevel.Trace, $"Socket {Id} Adding {bytes.Length} to sent buffer");
|
||||
var bytes = _parameters.Encoding.GetBytes(data);
|
||||
_log.Write(LogLevel.Trace, $"Socket {Id} Adding {bytes.Length} to sent buffer");
|
||||
_sendBuffer.Enqueue(bytes);
|
||||
_sendEvent.Set();
|
||||
}
|
||||
@ -313,7 +260,7 @@ namespace CryptoExchange.Net.Sockets
|
||||
if (_processState != ProcessState.Processing)
|
||||
return;
|
||||
|
||||
log.Write(LogLevel.Debug, $"Socket {Id} reconnecting");
|
||||
_log.Write(LogLevel.Debug, $"Socket {Id} reconnecting");
|
||||
_closeTask = CloseInternalAsync();
|
||||
await _closeTask.ConfigureAwait(false);
|
||||
}
|
||||
@ -326,20 +273,20 @@ namespace CryptoExchange.Net.Sockets
|
||||
{
|
||||
if (_closeTask != null && !_closeTask.IsCompleted)
|
||||
{
|
||||
log.Write(LogLevel.Debug, $"Socket {Id} CloseAsync() waiting for existing close task");
|
||||
_log.Write(LogLevel.Debug, $"Socket {Id} CloseAsync() waiting for existing close task");
|
||||
await _closeTask.ConfigureAwait(false);
|
||||
return;
|
||||
}
|
||||
|
||||
_stopRequested = true;
|
||||
|
||||
if (!IsOpen)
|
||||
{
|
||||
log.Write(LogLevel.Debug, $"Socket {Id} CloseAsync() socket not open");
|
||||
_log.Write(LogLevel.Debug, $"Socket {Id} CloseAsync() socket not open");
|
||||
return;
|
||||
}
|
||||
|
||||
log.Write(LogLevel.Debug, $"Socket {Id} closing");
|
||||
_stopRequested = true;
|
||||
|
||||
_log.Write(LogLevel.Debug, $"Socket {Id} closing");
|
||||
_closeTask = CloseInternalAsync();
|
||||
}
|
||||
finally
|
||||
@ -348,9 +295,10 @@ namespace CryptoExchange.Net.Sockets
|
||||
}
|
||||
|
||||
await _closeTask.ConfigureAwait(false);
|
||||
await _processTask!.ConfigureAwait(false);
|
||||
if(_processTask != null)
|
||||
await _processTask.ConfigureAwait(false);
|
||||
OnClose?.Invoke();
|
||||
log.Write(LogLevel.Debug, $"Socket {Id} closed");
|
||||
_log.Write(LogLevel.Debug, $"Socket {Id} closed");
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@ -370,19 +318,29 @@ namespace CryptoExchange.Net.Sockets
|
||||
{
|
||||
try
|
||||
{
|
||||
_log.Write(LogLevel.Trace, $"Socket {Id} normal closure 1");
|
||||
await _socket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Closing", default).ConfigureAwait(false);
|
||||
}
|
||||
catch(Exception)
|
||||
{ } // Can sometimes throw an exception when socket is in aborted state due to timing
|
||||
catch (Exception)
|
||||
{
|
||||
// Can sometimes throw an exception when socket is in aborted state due to timing
|
||||
// Websocket is set to Aborted state when the cancelation token is set during SendAsync/ReceiveAsync
|
||||
// So socket might go to aborted state, might still be open
|
||||
}
|
||||
}
|
||||
else if(_socket.State == WebSocketState.CloseReceived)
|
||||
{
|
||||
try
|
||||
{
|
||||
_log.Write(LogLevel.Trace, $"Socket {Id} normal closure 2");
|
||||
await _socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing", default).ConfigureAwait(false);
|
||||
}
|
||||
catch (Exception)
|
||||
{ } // Can sometimes throw an exception when socket is in aborted state due to timing
|
||||
{
|
||||
// Can sometimes throw an exception when socket is in aborted state due to timing
|
||||
// Websocket is set to Aborted state when the cancelation token is set during SendAsync/ReceiveAsync
|
||||
// So socket might go to aborted state, might still be open
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -394,31 +352,11 @@ namespace CryptoExchange.Net.Sockets
|
||||
if (_disposed)
|
||||
return;
|
||||
|
||||
log.Write(LogLevel.Debug, $"Socket {Id} disposing");
|
||||
_log.Write(LogLevel.Debug, $"Socket {Id} disposing");
|
||||
_disposed = true;
|
||||
_socket.Dispose();
|
||||
_ctsSource.Dispose();
|
||||
log.Write(LogLevel.Trace, $"Socket {Id} disposed");
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Create the socket object
|
||||
/// </summary>
|
||||
private ClientWebSocket CreateSocket()
|
||||
{
|
||||
var cookieContainer = new CookieContainer();
|
||||
foreach (var cookie in cookies)
|
||||
cookieContainer.Add(new Cookie(cookie.Key, cookie.Value));
|
||||
|
||||
var socket = new ClientWebSocket();
|
||||
socket.Options.Cookies = cookieContainer;
|
||||
foreach (var header in headers)
|
||||
socket.Options.SetRequestHeader(header.Key, header.Value);
|
||||
socket.Options.KeepAliveInterval = KeepAliveInterval;
|
||||
socket.Options.SetBuffer(65536, 65536); // Setting it to anything bigger than 65536 throws an exception in .net framework
|
||||
if (_proxy != null)
|
||||
SetProxy(_proxy);
|
||||
return socket;
|
||||
_log.Write(LogLevel.Trace, $"Socket {Id} disposed");
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@ -441,25 +379,25 @@ namespace CryptoExchange.Net.Sockets
|
||||
|
||||
while (_sendBuffer.TryDequeue(out var data))
|
||||
{
|
||||
if (RatelimitPerSecond != null)
|
||||
if (_parameters.RatelimitPerSecond != null)
|
||||
{
|
||||
// Wait for rate limit
|
||||
DateTime? start = null;
|
||||
while (MessagesSentLastSecond() >= RatelimitPerSecond)
|
||||
while (MessagesSentLastSecond() >= _parameters.RatelimitPerSecond)
|
||||
{
|
||||
start ??= DateTime.UtcNow;
|
||||
await Task.Delay(50).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
if (start != null)
|
||||
log.Write(LogLevel.Trace, $"Socket {Id} sent delayed {Math.Round((DateTime.UtcNow - start.Value).TotalMilliseconds)}ms because of rate limit");
|
||||
_log.Write(LogLevel.Trace, $"Socket {Id} sent delayed {Math.Round((DateTime.UtcNow - start.Value).TotalMilliseconds)}ms because of rate limit");
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
await _socket.SendAsync(new ArraySegment<byte>(data, 0, data.Length), WebSocketMessageType.Text, true, _ctsSource.Token).ConfigureAwait(false);
|
||||
_outgoingMessages.Add(DateTime.UtcNow);
|
||||
log.Write(LogLevel.Trace, $"Socket {Id} sent {data.Length} bytes");
|
||||
_log.Write(LogLevel.Trace, $"Socket {Id} sent {data.Length} bytes");
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
@ -486,7 +424,7 @@ namespace CryptoExchange.Net.Sockets
|
||||
}
|
||||
finally
|
||||
{
|
||||
log.Write(LogLevel.Trace, $"Socket {Id} Send loop finished");
|
||||
_log.Write(LogLevel.Trace, $"Socket {Id} Send loop finished");
|
||||
}
|
||||
}
|
||||
|
||||
@ -533,7 +471,7 @@ namespace CryptoExchange.Net.Sockets
|
||||
if (receiveResult.MessageType == WebSocketMessageType.Close)
|
||||
{
|
||||
// Connection closed unexpectedly
|
||||
log.Write(LogLevel.Debug, $"Socket {Id} received `Close` message");
|
||||
_log.Write(LogLevel.Debug, $"Socket {Id} received `Close` message");
|
||||
_closeTask = CloseInternalAsync();
|
||||
break;
|
||||
}
|
||||
@ -543,7 +481,7 @@ namespace CryptoExchange.Net.Sockets
|
||||
// We received data, but it is not complete, write it to a memory stream for reassembling
|
||||
multiPartMessage = true;
|
||||
memoryStream ??= new MemoryStream();
|
||||
log.Write(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in partial message");
|
||||
_log.Write(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in partial message");
|
||||
await memoryStream.WriteAsync(buffer.Array, buffer.Offset, receiveResult.Count).ConfigureAwait(false);
|
||||
}
|
||||
else
|
||||
@ -551,13 +489,13 @@ namespace CryptoExchange.Net.Sockets
|
||||
if (!multiPartMessage)
|
||||
{
|
||||
// Received a complete message and it's not multi part
|
||||
log.Write(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in single message");
|
||||
_log.Write(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in single message");
|
||||
HandleMessage(buffer.Array!, buffer.Offset, receiveResult.Count, receiveResult.MessageType);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Received the end of a multipart message, write to memory stream for reassembling
|
||||
log.Write(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in partial message");
|
||||
_log.Write(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in partial message");
|
||||
await memoryStream!.WriteAsync(buffer.Array, buffer.Offset, receiveResult.Count).ConfigureAwait(false);
|
||||
}
|
||||
break;
|
||||
@ -585,12 +523,12 @@ namespace CryptoExchange.Net.Sockets
|
||||
if (receiveResult?.EndOfMessage == true)
|
||||
{
|
||||
// Reassemble complete message from memory stream
|
||||
log.Write(LogLevel.Trace, $"Socket {Id} reassembled message of {memoryStream!.Length} bytes");
|
||||
_log.Write(LogLevel.Trace, $"Socket {Id} reassembled message of {memoryStream!.Length} bytes");
|
||||
HandleMessage(memoryStream!.ToArray(), 0, (int)memoryStream.Length, receiveResult.MessageType);
|
||||
memoryStream.Dispose();
|
||||
}
|
||||
else
|
||||
log.Write(LogLevel.Trace, $"Socket {Id} discarding incomplete message of {memoryStream!.Length} bytes");
|
||||
_log.Write(LogLevel.Trace, $"Socket {Id} discarding incomplete message of {memoryStream!.Length} bytes");
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -604,7 +542,7 @@ namespace CryptoExchange.Net.Sockets
|
||||
}
|
||||
finally
|
||||
{
|
||||
log.Write(LogLevel.Trace, $"Socket {Id} Receive loop finished");
|
||||
_log.Write(LogLevel.Trace, $"Socket {Id} Receive loop finished");
|
||||
}
|
||||
}
|
||||
|
||||
@ -620,54 +558,92 @@ namespace CryptoExchange.Net.Sockets
|
||||
string strData;
|
||||
if (messageType == WebSocketMessageType.Binary)
|
||||
{
|
||||
if (DataInterpreterBytes == null)
|
||||
if (_parameters.DataInterpreterBytes == null)
|
||||
throw new Exception("Byte interpreter not set while receiving byte data");
|
||||
|
||||
try
|
||||
{
|
||||
var relevantData = new byte[count];
|
||||
Array.Copy(data, offset, relevantData, 0, count);
|
||||
strData = DataInterpreterBytes(relevantData);
|
||||
strData = _parameters.DataInterpreterBytes(relevantData);
|
||||
}
|
||||
catch(Exception e)
|
||||
{
|
||||
log.Write(LogLevel.Error, $"Socket {Id} unhandled exception during byte data interpretation: " + e.ToLogString());
|
||||
_log.Write(LogLevel.Error, $"Socket {Id} unhandled exception during byte data interpretation: " + e.ToLogString());
|
||||
return;
|
||||
}
|
||||
}
|
||||
else
|
||||
strData = _encoding.GetString(data, offset, count);
|
||||
strData = _parameters.Encoding.GetString(data, offset, count);
|
||||
|
||||
if (DataInterpreterString != null)
|
||||
if (_parameters.DataInterpreterString != null)
|
||||
{
|
||||
try
|
||||
{
|
||||
strData = DataInterpreterString(strData);
|
||||
strData = _parameters.DataInterpreterString(strData);
|
||||
}
|
||||
catch(Exception e)
|
||||
{
|
||||
log.Write(LogLevel.Error, $"Socket {Id} unhandled exception during string data interpretation: " + e.ToLogString());
|
||||
_log.Write(LogLevel.Error, $"Socket {Id} unhandled exception during string data interpretation: " + e.ToLogString());
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
LastActionTime = DateTime.UtcNow;
|
||||
OnMessage?.Invoke(strData);
|
||||
}
|
||||
catch(Exception e)
|
||||
{
|
||||
log.Write(LogLevel.Error, $"Socket {Id} unhandled exception during message processing: " + e.ToLogString());
|
||||
_log.Write(LogLevel.Error, $"Socket {Id} unhandled exception during message processing: " + e.ToLogString());
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Trigger the OnMessage event
|
||||
/// </summary>
|
||||
/// <param name="data"></param>
|
||||
protected void TriggerOnMessage(string data)
|
||||
{
|
||||
LastActionTime = DateTime.UtcNow;
|
||||
OnMessage?.Invoke(data);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Trigger the OnError event
|
||||
/// </summary>
|
||||
/// <param name="ex"></param>
|
||||
protected void TriggerOnError(Exception ex) => OnError?.Invoke(ex);
|
||||
|
||||
/// <summary>
|
||||
/// Trigger the OnError event
|
||||
/// </summary>
|
||||
protected void TriggerOnOpen() => OnOpen?.Invoke();
|
||||
|
||||
/// <summary>
|
||||
/// Trigger the OnError event
|
||||
/// </summary>
|
||||
protected void TriggerOnClose() => OnClose?.Invoke();
|
||||
|
||||
/// <summary>
|
||||
/// Trigger the OnReconnecting event
|
||||
/// </summary>
|
||||
protected void TriggerOnReconnecting() => OnReconnecting?.Invoke();
|
||||
|
||||
/// <summary>
|
||||
/// Trigger the OnReconnected event
|
||||
/// </summary>
|
||||
protected void TriggerOnReconnected() => OnReconnected?.Invoke();
|
||||
|
||||
/// <summary>
|
||||
/// Checks if there is no data received for a period longer than the specified timeout
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
protected async Task CheckTimeoutAsync()
|
||||
{
|
||||
log.Write(LogLevel.Debug, $"Socket {Id} Starting task checking for no data received for {Timeout}");
|
||||
_log.Write(LogLevel.Debug, $"Socket {Id} Starting task checking for no data received for {_parameters.Timeout}");
|
||||
LastActionTime = DateTime.UtcNow;
|
||||
try
|
||||
{
|
||||
while (true)
|
||||
@ -675,9 +651,9 @@ namespace CryptoExchange.Net.Sockets
|
||||
if (_ctsSource.IsCancellationRequested)
|
||||
return;
|
||||
|
||||
if (DateTime.UtcNow - LastActionTime > Timeout)
|
||||
if (DateTime.UtcNow - LastActionTime > _parameters.Timeout)
|
||||
{
|
||||
log.Write(LogLevel.Warning, $"Socket {Id} No data received for {Timeout}, reconnecting socket");
|
||||
_log.Write(LogLevel.Warning, $"Socket {Id} No data received for {_parameters.Timeout}, reconnecting socket");
|
||||
_ = CloseAsync().ConfigureAwait(false);
|
||||
return;
|
||||
}
|
||||
@ -737,6 +713,27 @@ namespace CryptoExchange.Net.Sockets
|
||||
_lastReceivedMessagesUpdate = checkTime;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Set proxy on socket
|
||||
/// </summary>
|
||||
/// <param name="proxy"></param>
|
||||
/// <exception cref="ArgumentException"></exception>
|
||||
protected virtual void SetProxy(ApiProxy proxy)
|
||||
{
|
||||
if (!Uri.TryCreate($"{proxy.Host}:{proxy.Port}", UriKind.Absolute, out var uri))
|
||||
throw new ArgumentException("Proxy settings invalid, {proxy.Host}:{proxy.Port} not a valid URI", nameof(proxy));
|
||||
|
||||
_socket.Options.Proxy = uri?.Scheme == null
|
||||
? _socket.Options.Proxy = new WebProxy(proxy.Host, proxy.Port)
|
||||
: _socket.Options.Proxy = new WebProxy
|
||||
{
|
||||
Address = uri
|
||||
};
|
||||
|
||||
if (proxy.Login != null)
|
||||
_socket.Options.Proxy.Credentials = new NetworkCredential(proxy.Login, proxy.Password);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
@ -3,13 +3,13 @@ using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Diagnostics;
|
||||
using System.Linq;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using CryptoExchange.Net.Logging;
|
||||
using Newtonsoft.Json;
|
||||
using Newtonsoft.Json.Linq;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using CryptoExchange.Net.Objects;
|
||||
using System.Net.WebSockets;
|
||||
|
||||
namespace CryptoExchange.Net.Sockets
|
||||
{
|
||||
@ -87,21 +87,6 @@ namespace CryptoExchange.Net.Sockets
|
||||
/// </summary>
|
||||
public SocketApiClient ApiClient { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// If the socket should be reconnected upon closing
|
||||
/// </summary>
|
||||
public bool ShouldReconnect { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Current reconnect try, reset when a successful connection is made
|
||||
/// </summary>
|
||||
public int ReconnectTry { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Current resubscribe try, reset when a successful connection is made
|
||||
/// </summary>
|
||||
public int ResubscribeTry { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Time of disconnecting
|
||||
/// </summary>
|
||||
@ -124,23 +109,12 @@ namespace CryptoExchange.Net.Sockets
|
||||
{
|
||||
pausedActivity = value;
|
||||
log.Write(LogLevel.Information, $"Socket {SocketId} Paused activity: " + value);
|
||||
if(pausedActivity) ActivityPaused?.Invoke();
|
||||
else ActivityUnpaused?.Invoke();
|
||||
if(pausedActivity) _ = Task.Run(() => ActivityPaused?.Invoke());
|
||||
else _ = Task.Run(() => ActivityUnpaused?.Invoke());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private bool pausedActivity;
|
||||
private readonly List<SocketSubscription> subscriptions;
|
||||
private readonly object subscriptionLock = new();
|
||||
|
||||
private readonly Log log;
|
||||
private readonly BaseSocketClient socketClient;
|
||||
|
||||
private readonly List<PendingRequest> pendingRequests;
|
||||
|
||||
private SocketStatus _status;
|
||||
|
||||
/// <summary>
|
||||
/// Status of the socket connection
|
||||
/// </summary>
|
||||
@ -158,6 +132,17 @@ namespace CryptoExchange.Net.Sockets
|
||||
}
|
||||
}
|
||||
|
||||
private bool pausedActivity;
|
||||
private readonly List<SocketSubscription> subscriptions;
|
||||
private readonly object subscriptionLock = new();
|
||||
|
||||
private readonly Log log;
|
||||
private readonly BaseSocketClient socketClient;
|
||||
|
||||
private readonly List<PendingRequest> pendingRequests;
|
||||
|
||||
private SocketStatus _status;
|
||||
|
||||
/// <summary>
|
||||
/// The underlying websocket
|
||||
/// </summary>
|
||||
@ -176,36 +161,51 @@ namespace CryptoExchange.Net.Sockets
|
||||
ApiClient = apiClient;
|
||||
|
||||
pendingRequests = new List<PendingRequest>();
|
||||
|
||||
subscriptions = new List<SocketSubscription>();
|
||||
|
||||
_socket = socket;
|
||||
|
||||
|
||||
_socket.Timeout = client.ClientOptions.SocketNoDataTimeout;
|
||||
_socket.OnMessage += ProcessMessage;
|
||||
_socket.OnOpen += SocketOnOpen;
|
||||
_socket.OnMessage += HandleMessage;
|
||||
_socket.OnOpen += HandleOpen;
|
||||
_socket.OnClose += HandleClose;
|
||||
_socket.OnReconnecting += HandleReconnecting;
|
||||
_socket.OnReconnected += HandleReconnected;
|
||||
_socket.OnError += HandleError;
|
||||
}
|
||||
|
||||
private void HandleClose()
|
||||
/// <summary>
|
||||
/// Handler for a socket opening
|
||||
/// </summary>
|
||||
protected virtual void HandleOpen()
|
||||
{
|
||||
Status = SocketStatus.Connected;
|
||||
PausedActivity = false;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Handler for a socket closing without reconnect
|
||||
/// </summary>
|
||||
protected virtual void HandleClose()
|
||||
{
|
||||
Status = SocketStatus.Closed;
|
||||
ConnectionClosed?.Invoke();
|
||||
Task.Run(() => ConnectionClosed?.Invoke());
|
||||
}
|
||||
|
||||
private void HandleReconnecting()
|
||||
/// <summary>
|
||||
/// Handler for a socket losing conenction and starting reconnect
|
||||
/// </summary>
|
||||
protected virtual void HandleReconnecting()
|
||||
{
|
||||
Status = SocketStatus.Reconnecting;
|
||||
DisconnectTime = DateTime.UtcNow;
|
||||
Task.Run(() => ConnectionLost?.Invoke());
|
||||
}
|
||||
|
||||
private async void HandleReconnected()
|
||||
/// <summary>
|
||||
/// Handler for a socket which has reconnected
|
||||
/// </summary>
|
||||
protected virtual async void HandleReconnected()
|
||||
{
|
||||
log.Write(LogLevel.Debug, "Socket reconnected, processing");
|
||||
|
||||
Status = SocketStatus.Resubscribing;
|
||||
lock (pendingRequests)
|
||||
{
|
||||
foreach (var pendingRequest in pendingRequests.ToList())
|
||||
@ -215,258 +215,37 @@ namespace CryptoExchange.Net.Sockets
|
||||
}
|
||||
}
|
||||
|
||||
// TODO Track amount of failed reconencts and failed resubscriptions
|
||||
|
||||
var reconnectSuccessful = await ProcessReconnectAsync().ConfigureAwait(false);
|
||||
if (!reconnectSuccessful)
|
||||
await _socket.ReconnectAsync().ConfigureAwait(false);
|
||||
else
|
||||
{
|
||||
Status = SocketStatus.Connected;
|
||||
ConnectionRestored?.Invoke(DateTime.UtcNow - DisconnectTime!.Value);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Connect the websocket and start processing
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
public async Task<bool> ConnectAsync()
|
||||
{
|
||||
return await _socket.ConnectAsync().ConfigureAwait(false);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Retrieve the underlying socket
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
public IWebsocket GetSocket()
|
||||
{
|
||||
return _socket;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Trigger a reconnect of the socket connection
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
public async Task TriggerReconnectAsync()
|
||||
{
|
||||
await _socket.ReconnectAsync().ConfigureAwait(false);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Close the connection
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
public async Task CloseAsync()
|
||||
{
|
||||
if (Status == SocketStatus.Closed || Status == SocketStatus.Disposed)
|
||||
return;
|
||||
|
||||
ShouldReconnect = false;
|
||||
if (socketClient.socketConnections.ContainsKey(SocketId))
|
||||
socketClient.socketConnections.TryRemove(SocketId, out _);
|
||||
|
||||
lock (subscriptionLock)
|
||||
{
|
||||
foreach (var subscription in subscriptions)
|
||||
_ = Task.Run(() =>
|
||||
{
|
||||
if (subscription.CancellationTokenRegistration.HasValue)
|
||||
subscription.CancellationTokenRegistration.Value.Dispose();
|
||||
}
|
||||
ConnectionRestored?.Invoke(DateTime.UtcNow - DisconnectTime!.Value);
|
||||
DisconnectTime = null;
|
||||
});
|
||||
}
|
||||
|
||||
while (Status == SocketStatus.Reconnecting)
|
||||
// Wait for reconnecting to finish
|
||||
await Task.Delay(100).ConfigureAwait(false);
|
||||
|
||||
await _socket.CloseAsync().ConfigureAwait(false);
|
||||
_socket.Dispose();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Close a subscription on this connection. If all subscriptions on this connection are closed the connection gets closed as well
|
||||
/// Handler for an error on a websocket
|
||||
/// </summary>
|
||||
/// <param name="subscription">Subscription to close</param>
|
||||
/// <returns></returns>
|
||||
public async Task CloseAsync(SocketSubscription subscription)
|
||||
/// <param name="e">The exception</param>
|
||||
protected virtual void HandleError(Exception e)
|
||||
{
|
||||
if (Status == SocketStatus.Closing || Status == SocketStatus.Closed || Status == SocketStatus.Disposed)
|
||||
return;
|
||||
|
||||
log.Write(LogLevel.Trace, $"Socket {SocketId} closing subscription {subscription.Id}");
|
||||
if (subscription.CancellationTokenRegistration.HasValue)
|
||||
subscription.CancellationTokenRegistration.Value.Dispose();
|
||||
|
||||
if (subscription.Confirmed && _socket.IsOpen)
|
||||
await socketClient.UnsubscribeAsync(this, subscription).ConfigureAwait(false);
|
||||
|
||||
bool shouldCloseConnection;
|
||||
lock (subscriptionLock)
|
||||
{
|
||||
if (Status == SocketStatus.Closing)
|
||||
{
|
||||
log.Write(LogLevel.Trace, $"Socket {SocketId} already closing");
|
||||
return;
|
||||
}
|
||||
|
||||
shouldCloseConnection = subscriptions.All(r => !r.UserSubscription || r == subscription);
|
||||
if (shouldCloseConnection)
|
||||
Status = SocketStatus.Closing;
|
||||
}
|
||||
|
||||
if (shouldCloseConnection)
|
||||
{
|
||||
log.Write(LogLevel.Trace, $"Socket {SocketId} closing as there are no more subscriptions");
|
||||
await CloseAsync().ConfigureAwait(false);
|
||||
}
|
||||
|
||||
lock (subscriptionLock)
|
||||
subscriptions.Remove(subscription);
|
||||
}
|
||||
|
||||
//private async Task ReconnectAsync()
|
||||
//{
|
||||
// // Fail all pending requests
|
||||
// lock (pendingRequests)
|
||||
// {
|
||||
// foreach (var pendingRequest in pendingRequests.ToList())
|
||||
// {
|
||||
// pendingRequest.Fail();
|
||||
// pendingRequests.Remove(pendingRequest);
|
||||
// }
|
||||
// }
|
||||
|
||||
// if (socketClient.ClientOptions.AutoReconnect && ShouldReconnect)
|
||||
// {
|
||||
// // Should reconnect
|
||||
// DisconnectTime = DateTime.UtcNow;
|
||||
// log.Write(LogLevel.Warning, $"Socket {SocketId} Connection lost, will try to reconnect");
|
||||
// if (!lostTriggered)
|
||||
// {
|
||||
// lostTriggered = true;
|
||||
// _ = Task.Run(() => ConnectionLost?.Invoke());
|
||||
// }
|
||||
|
||||
// while (ShouldReconnect)
|
||||
// {
|
||||
// if (ReconnectTry > 0)
|
||||
// {
|
||||
// // Wait a bit before attempting reconnect
|
||||
// await Task.Delay(socketClient.ClientOptions.ReconnectInterval).ConfigureAwait(false);
|
||||
// }
|
||||
|
||||
// if (!ShouldReconnect)
|
||||
// {
|
||||
// // Should reconnect changed to false while waiting to reconnect
|
||||
// return;
|
||||
// }
|
||||
|
||||
// _socket.Reset();
|
||||
// if (!await _socket.ConnectAsync().ConfigureAwait(false))
|
||||
// {
|
||||
// // Reconnect failed
|
||||
// ReconnectTry++;
|
||||
// ResubscribeTry = 0;
|
||||
// if (socketClient.ClientOptions.MaxReconnectTries != null
|
||||
// && ReconnectTry >= socketClient.ClientOptions.MaxReconnectTries)
|
||||
// {
|
||||
// log.Write(LogLevel.Warning, $"Socket {SocketId} failed to reconnect after {ReconnectTry} tries, closing");
|
||||
// ShouldReconnect = false;
|
||||
|
||||
// if (socketClient.socketConnections.ContainsKey(SocketId))
|
||||
// socketClient.socketConnections.TryRemove(SocketId, out _);
|
||||
|
||||
// _ = Task.Run(() => ConnectionClosed?.Invoke());
|
||||
// // Reached max tries, break loop and leave connection closed
|
||||
// break;
|
||||
// }
|
||||
|
||||
// // Continue to try again
|
||||
// log.Write(LogLevel.Debug, $"Socket {SocketId} failed to reconnect{(socketClient.ClientOptions.MaxReconnectTries != null ? $", try {ReconnectTry}/{socketClient.ClientOptions.MaxReconnectTries}" : "")}, will try again in {socketClient.ClientOptions.ReconnectInterval}");
|
||||
// continue;
|
||||
// }
|
||||
|
||||
// // Successfully reconnected, start processing
|
||||
// Status = SocketStatus.Connected;
|
||||
|
||||
// ReconnectTry = 0;
|
||||
// var time = DisconnectTime;
|
||||
// DisconnectTime = null;
|
||||
|
||||
// log.Write(LogLevel.Information, $"Socket {SocketId} reconnected after {DateTime.UtcNow - time}");
|
||||
|
||||
// var reconnectResult = await ProcessReconnectAsync().ConfigureAwait(false);
|
||||
// if (!reconnectResult)
|
||||
// {
|
||||
// // Failed to resubscribe everything
|
||||
// ResubscribeTry++;
|
||||
// DisconnectTime = time;
|
||||
|
||||
// if (socketClient.ClientOptions.MaxResubscribeTries != null &&
|
||||
// ResubscribeTry >= socketClient.ClientOptions.MaxResubscribeTries)
|
||||
// {
|
||||
// log.Write(LogLevel.Warning, $"Socket {SocketId} failed to resubscribe after {ResubscribeTry} tries, closing. Last resubscription error: {reconnectResult.Error}");
|
||||
// ShouldReconnect = false;
|
||||
|
||||
// if (socketClient.socketConnections.ContainsKey(SocketId))
|
||||
// socketClient.socketConnections.TryRemove(SocketId, out _);
|
||||
|
||||
// _ = Task.Run(() => ConnectionClosed?.Invoke());
|
||||
// }
|
||||
// else
|
||||
// log.Write(LogLevel.Debug, $"Socket {SocketId} resubscribing all subscriptions failed on reconnected socket{(socketClient.ClientOptions.MaxResubscribeTries != null ? $", try {ResubscribeTry}/{socketClient.ClientOptions.MaxResubscribeTries}" : "")}. Error: {reconnectResult.Error}. Disconnecting and reconnecting.");
|
||||
|
||||
// // Failed resubscribe, close socket if it is still open
|
||||
// if (_socket.IsOpen)
|
||||
// await _socket.CloseAsync().ConfigureAwait(false);
|
||||
// else
|
||||
// DisconnectTime = DateTime.UtcNow;
|
||||
|
||||
// // Break out of the loop, the new processing task should reconnect again
|
||||
// break;
|
||||
// }
|
||||
// else
|
||||
// {
|
||||
// // Succesfully reconnected
|
||||
// log.Write(LogLevel.Information, $"Socket {SocketId} data connection restored.");
|
||||
// ResubscribeTry = 0;
|
||||
// if (lostTriggered)
|
||||
// {
|
||||
// lostTriggered = false;
|
||||
// _ = Task.Run(() => ConnectionRestored?.Invoke(time.HasValue ? DateTime.UtcNow - time.Value : TimeSpan.FromSeconds(0)));
|
||||
// }
|
||||
|
||||
// break;
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// else
|
||||
// {
|
||||
// if (!socketClient.ClientOptions.AutoReconnect && ShouldReconnect)
|
||||
// _ = Task.Run(() => ConnectionClosed?.Invoke());
|
||||
|
||||
// // No reconnecting needed
|
||||
// log.Write(LogLevel.Information, $"Socket {SocketId} closed");
|
||||
// if (socketClient.socketConnections.ContainsKey(SocketId))
|
||||
// socketClient.socketConnections.TryRemove(SocketId, out _);
|
||||
// }
|
||||
//}
|
||||
|
||||
/// <summary>
|
||||
/// Dispose the connection
|
||||
/// </summary>
|
||||
public void Dispose()
|
||||
{
|
||||
Status = SocketStatus.Disposed;
|
||||
_socket.Dispose();
|
||||
if (e is WebSocketException wse)
|
||||
log.Write(LogLevel.Warning, $"Socket {SocketId} error: Websocket error code {wse.WebSocketErrorCode}, details: " + e.ToLogString());
|
||||
else
|
||||
log.Write(LogLevel.Warning, $"Socket {SocketId} error: " + e.ToLogString());
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Process a message received by the socket
|
||||
/// </summary>
|
||||
/// <param name="data">The received data</param>
|
||||
private void ProcessMessage(string data)
|
||||
protected virtual void HandleMessage(string data)
|
||||
{
|
||||
var timestamp = DateTime.UtcNow;
|
||||
log.Write(LogLevel.Trace, $"Socket {SocketId} received data: " + data);
|
||||
@ -482,15 +261,13 @@ namespace CryptoExchange.Net.Sockets
|
||||
}
|
||||
|
||||
var handledResponse = false;
|
||||
PendingRequest[] requests;
|
||||
lock(pendingRequests)
|
||||
requests = pendingRequests.ToArray();
|
||||
|
||||
// Remove any timed out requests
|
||||
foreach (var request in requests.Where(r => r.Completed))
|
||||
PendingRequest[] requests;
|
||||
lock (pendingRequests)
|
||||
{
|
||||
lock (pendingRequests)
|
||||
pendingRequests.Remove(request);
|
||||
pendingRequests.RemoveAll(r => r.Completed);
|
||||
requests = pendingRequests.ToArray();
|
||||
}
|
||||
|
||||
// Check if this message is an answer on any pending requests
|
||||
@ -510,8 +287,8 @@ namespace CryptoExchange.Net.Sockets
|
||||
}
|
||||
|
||||
// Message was not a request response, check data handlers
|
||||
var messageEvent = new MessageEvent(this, tokenData, socketClient.ClientOptions.OutputOriginalData ? data: null, timestamp);
|
||||
var (handled, userProcessTime) = HandleData(messageEvent);
|
||||
var messageEvent = new MessageEvent(this, tokenData, socketClient.ClientOptions.OutputOriginalData ? data : null, timestamp);
|
||||
var (handled, userProcessTime, subscription) = HandleData(messageEvent);
|
||||
if (!handled && !handledResponse)
|
||||
{
|
||||
if (!socketClient.UnhandledMessageExpected)
|
||||
@ -521,10 +298,108 @@ namespace CryptoExchange.Net.Sockets
|
||||
|
||||
var total = DateTime.UtcNow - timestamp;
|
||||
if (userProcessTime.TotalMilliseconds > 500)
|
||||
log.Write(LogLevel.Debug, $"Socket {SocketId} message processing slow ({(int)total.TotalMilliseconds}ms, {(int)userProcessTime.TotalMilliseconds}ms user code), consider offloading data handling to another thread. " +
|
||||
log.Write(LogLevel.Debug, $"Socket {SocketId}{(subscription == null ? "" : " subscription " + subscription!.Id)} message processing slow ({(int)total.TotalMilliseconds}ms, {(int)userProcessTime.TotalMilliseconds}ms user code), consider offloading data handling to another thread. " +
|
||||
"Data from this socket may arrive late or not at all if message processing is continuously slow.");
|
||||
|
||||
log.Write(LogLevel.Trace, $"Socket {SocketId} message processed in {(int)total.TotalMilliseconds}ms, ({(int)userProcessTime.TotalMilliseconds}ms user code)");
|
||||
log.Write(LogLevel.Trace, $"Socket {SocketId}{(subscription == null ? "" : " subscription " + subscription!.Id)} message processed in {(int)total.TotalMilliseconds}ms, ({(int)userProcessTime.TotalMilliseconds}ms user code)");
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Connect the websocket
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
public async Task<bool> ConnectAsync() => await _socket.ConnectAsync().ConfigureAwait(false);
|
||||
|
||||
/// <summary>
|
||||
/// Retrieve the underlying socket
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
public IWebsocket GetSocket() => _socket;
|
||||
|
||||
/// <summary>
|
||||
/// Trigger a reconnect of the socket connection
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
public async Task TriggerReconnectAsync() => await _socket.ReconnectAsync().ConfigureAwait(false);
|
||||
|
||||
/// <summary>
|
||||
/// Close the connection
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
public async Task CloseAsync()
|
||||
{
|
||||
if (Status == SocketStatus.Closed || Status == SocketStatus.Disposed)
|
||||
return;
|
||||
|
||||
if (socketClient.socketConnections.ContainsKey(SocketId))
|
||||
socketClient.socketConnections.TryRemove(SocketId, out _);
|
||||
|
||||
lock (subscriptionLock)
|
||||
{
|
||||
foreach (var subscription in subscriptions)
|
||||
{
|
||||
if (subscription.CancellationTokenRegistration.HasValue)
|
||||
subscription.CancellationTokenRegistration.Value.Dispose();
|
||||
}
|
||||
}
|
||||
|
||||
await _socket.CloseAsync().ConfigureAwait(false);
|
||||
_socket.Dispose();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Close a subscription on this connection. If all subscriptions on this connection are closed the connection gets closed as well
|
||||
/// </summary>
|
||||
/// <param name="subscription">Subscription to close</param>
|
||||
/// <returns></returns>
|
||||
public async Task CloseAsync(SocketSubscription subscription)
|
||||
{
|
||||
lock (subscriptionLock)
|
||||
{
|
||||
if (!subscriptions.Contains(subscription))
|
||||
return;
|
||||
|
||||
subscriptions.Remove(subscription);
|
||||
}
|
||||
|
||||
if (Status == SocketStatus.Closing || Status == SocketStatus.Closed || Status == SocketStatus.Disposed)
|
||||
return;
|
||||
|
||||
log.Write(LogLevel.Trace, $"Socket {SocketId} closing subscription {subscription.Id}");
|
||||
if (subscription.CancellationTokenRegistration.HasValue)
|
||||
subscription.CancellationTokenRegistration.Value.Dispose();
|
||||
|
||||
if (subscription.Confirmed && _socket.IsOpen)
|
||||
await socketClient.UnsubscribeAsync(this, subscription).ConfigureAwait(false);
|
||||
|
||||
bool shouldCloseConnection;
|
||||
lock (subscriptionLock)
|
||||
{
|
||||
if (Status == SocketStatus.Closing)
|
||||
{
|
||||
log.Write(LogLevel.Trace, $"Socket {SocketId} already closing");
|
||||
return;
|
||||
}
|
||||
|
||||
shouldCloseConnection = subscriptions.All(r => !r.UserSubscription);
|
||||
if (shouldCloseConnection)
|
||||
Status = SocketStatus.Closing;
|
||||
}
|
||||
|
||||
if (shouldCloseConnection)
|
||||
{
|
||||
log.Write(LogLevel.Trace, $"Socket {SocketId} closing as there are no more subscriptions");
|
||||
await CloseAsync().ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Dispose the connection
|
||||
/// </summary>
|
||||
public void Dispose()
|
||||
{
|
||||
Status = SocketStatus.Disposed;
|
||||
_socket.Dispose();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@ -570,7 +445,7 @@ namespace CryptoExchange.Net.Sockets
|
||||
/// </summary>
|
||||
/// <param name="messageEvent"></param>
|
||||
/// <returns>True if the data was successfully handled</returns>
|
||||
private (bool, TimeSpan) HandleData(MessageEvent messageEvent)
|
||||
private (bool, TimeSpan, SocketSubscription?) HandleData(MessageEvent messageEvent)
|
||||
{
|
||||
SocketSubscription? currentSubscription = null;
|
||||
try
|
||||
@ -611,13 +486,13 @@ namespace CryptoExchange.Net.Sockets
|
||||
}
|
||||
}
|
||||
|
||||
return (handled, userCodeDuration);
|
||||
return (handled, userCodeDuration, currentSubscription);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
log.Write(LogLevel.Error, $"Socket {SocketId} Exception during message processing\r\nException: {ex.ToLogString()}\r\nData: {messageEvent.JsonData}");
|
||||
currentSubscription?.InvokeExceptionHandler(ex);
|
||||
return (false, TimeSpan.Zero);
|
||||
return (false, TimeSpan.Zero, null);
|
||||
}
|
||||
}
|
||||
|
||||
@ -675,37 +550,23 @@ namespace CryptoExchange.Net.Sockets
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Handler for a socket opening
|
||||
/// </summary>
|
||||
protected virtual void SocketOnOpen()
|
||||
{
|
||||
Status = SocketStatus.Connected;
|
||||
ReconnectTry = 0;
|
||||
PausedActivity = false;
|
||||
}
|
||||
|
||||
//private async Task ReconnectWatcherAsync()
|
||||
//{
|
||||
// while (true)
|
||||
// {
|
||||
// await _reconnectWaitEvent.WaitAsync().ConfigureAwait(false);
|
||||
// if (!ShouldReconnect)
|
||||
// return;
|
||||
|
||||
// Status = SocketStatus.Reconnecting;
|
||||
// await ReconnectAsync().ConfigureAwait(false);
|
||||
|
||||
// if (!ShouldReconnect)
|
||||
// return;
|
||||
// }
|
||||
//}
|
||||
|
||||
private async Task<CallResult<bool>> ProcessReconnectAsync()
|
||||
{
|
||||
if (!_socket.IsOpen)
|
||||
return new CallResult<bool>(new WebError("Socket not connected"));
|
||||
|
||||
bool anySubscriptions = false;
|
||||
lock (subscriptionLock)
|
||||
anySubscriptions = subscriptions.Any(s => s.UserSubscription);
|
||||
|
||||
if (!anySubscriptions)
|
||||
{
|
||||
// No need to resubscribe anything
|
||||
log.Write(LogLevel.Debug, $"Socket {SocketId} Nothing to resubscribe, closing connection");
|
||||
_ = _socket.CloseAsync();
|
||||
return new CallResult<bool>(true);
|
||||
}
|
||||
|
||||
if (Authenticated)
|
||||
{
|
||||
// If we reconnected a authenticated connection we need to re-authenticate
|
||||
@ -777,6 +638,10 @@ namespace CryptoExchange.Net.Sockets
|
||||
/// </summary>
|
||||
Reconnecting,
|
||||
/// <summary>
|
||||
/// Resubscribing on reconnected socket
|
||||
/// </summary>
|
||||
Resubscribing,
|
||||
/// <summary>
|
||||
/// Closing
|
||||
/// </summary>
|
||||
Closing,
|
||||
|
79
CryptoExchange.Net/Sockets/WebSocketParameters.cs
Normal file
79
CryptoExchange.Net/Sockets/WebSocketParameters.cs
Normal file
@ -0,0 +1,79 @@
|
||||
using CryptoExchange.Net.Objects;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Text;
|
||||
|
||||
namespace CryptoExchange.Net.Sockets
|
||||
{
|
||||
/// <summary>
|
||||
/// Parameters for a websocket
|
||||
/// </summary>
|
||||
public class WebSocketParameters
|
||||
{
|
||||
/// <summary>
|
||||
/// The uri to connect to
|
||||
/// </summary>
|
||||
public Uri Uri { get; set; }
|
||||
/// <summary>
|
||||
/// Headers to send in the connection handshake
|
||||
/// </summary>
|
||||
public IDictionary<string, string> Headers { get; set; } = new Dictionary<string, string>();
|
||||
/// <summary>
|
||||
/// Cookies to send in the connection handshake
|
||||
/// </summary>
|
||||
public IDictionary<string, string> Cookies { get; set; } = new Dictionary<string, string>();
|
||||
/// <summary>
|
||||
/// The time to wait between reconnect attempts
|
||||
/// </summary>
|
||||
public TimeSpan ReconnectInterval { get; set; } = TimeSpan.FromSeconds(5);
|
||||
/// <summary>
|
||||
/// Proxy for the connection
|
||||
/// </summary>
|
||||
public ApiProxy? Proxy { get; set; }
|
||||
/// <summary>
|
||||
/// Whether the socket should automatically reconnect when connection is lost
|
||||
/// </summary>
|
||||
public bool AutoReconnect { get; set; }
|
||||
/// <summary>
|
||||
/// The maximum time of no data received before considering the connection lost and closting/reconnecting the socket
|
||||
/// </summary>
|
||||
public TimeSpan? Timeout { get; set; }
|
||||
/// <summary>
|
||||
/// Interval at which to send ping frames
|
||||
/// </summary>
|
||||
public TimeSpan? KeepAliveInterval { get; set; }
|
||||
/// <summary>
|
||||
/// The max amount of messages to send per second
|
||||
/// </summary>
|
||||
public int? RatelimitPerSecond { get; set; }
|
||||
/// <summary>
|
||||
/// Origin header value to send in the connection handshake
|
||||
/// </summary>
|
||||
public string? Origin { get; set; }
|
||||
/// <summary>
|
||||
/// Delegate used for processing byte data received from socket connections before it is processed by handlers
|
||||
/// </summary>
|
||||
public Func<byte[], string>? DataInterpreterBytes { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Delegate used for processing string data received from socket connections before it is processed by handlers
|
||||
/// </summary>
|
||||
public Func<string, string>? DataInterpreterString { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Encoding for sending/receiving data
|
||||
/// </summary>
|
||||
public Encoding Encoding { get; set; } = Encoding.UTF8;
|
||||
|
||||
/// <summary>
|
||||
/// ctor
|
||||
/// </summary>
|
||||
/// <param name="uri">Uri</param>
|
||||
/// <param name="autoReconnect">Auto reconnect</param>
|
||||
public WebSocketParameters(Uri uri, bool autoReconnect)
|
||||
{
|
||||
Uri = uri;
|
||||
AutoReconnect = autoReconnect;
|
||||
}
|
||||
}
|
||||
}
|
@ -1,6 +1,4 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using CryptoExchange.Net.Interfaces;
|
||||
using CryptoExchange.Net.Interfaces;
|
||||
using CryptoExchange.Net.Logging;
|
||||
|
||||
namespace CryptoExchange.Net.Sockets
|
||||
@ -11,15 +9,9 @@ namespace CryptoExchange.Net.Sockets
|
||||
public class WebsocketFactory : IWebsocketFactory
|
||||
{
|
||||
/// <inheritdoc />
|
||||
public IWebsocket CreateWebsocket(Log log, string url)
|
||||
public IWebsocket CreateWebsocket(Log log, WebSocketParameters parameters)
|
||||
{
|
||||
return new CryptoExchangeWebSocketClient(log, new Uri(url));
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public IWebsocket CreateWebsocket(Log log, string url, IDictionary<string, string> cookies, IDictionary<string, string> headers)
|
||||
{
|
||||
return new CryptoExchangeWebSocketClient(log, new Uri(url), cookies, headers);
|
||||
return new CryptoExchangeWebSocketClient(log, parameters);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user