1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-06-07 07:56:12 +00:00

Squashed commit of the following:

commit 0571ed17a0e502f689af6e8a5dbd0f05fd229496
Author: JKorf <jankorf91@gmail.com>
Date:   Sun Jul 10 19:56:27 2022 +0200

    Fixed tests

commit 99c331b389b58f09db3960adc7293d9b45d05caa
Author: JKorf <jankorf91@gmail.com>
Date:   Sun Jul 10 16:41:14 2022 +0200

    Updated version

commit 70f8bd203a00fbdef2b13526133a3b556cfc897f
Author: JKorf <jankorf91@gmail.com>
Date:   Sun Jul 10 16:36:00 2022 +0200

    Finished up websocket refactoring

commit 89b517c93684dc9c1e8a99bc600caaf6f9a4459e
Author: JKorf <jankorf91@gmail.com>
Date:   Fri Jul 8 20:24:58 2022 +0200

    wip

commit 91e33cc42c5725aece765b6c8f6a7f35ab87a80e
Author: JKorf <jankorf91@gmail.com>
Date:   Thu Jul 7 22:17:55 2022 +0200

    wip
This commit is contained in:
JKorf 2022-07-10 19:57:10 +02:00
parent ea9375d582
commit 50715ff2f7
15 changed files with 760 additions and 747 deletions

View File

@ -42,7 +42,7 @@ namespace CryptoExchange.Net.UnitTests
socket.CanConnect = canConnect;
//act
var connectResult = client.ConnectSocketSub(new SocketConnection(client, null, socket));
var connectResult = client.ConnectSocketSub(new SocketConnection(client, null, socket, null));
//assert
Assert.IsTrue(connectResult.Success == canConnect);
@ -57,10 +57,10 @@ namespace CryptoExchange.Net.UnitTests
socket.ShouldReconnect = true;
socket.CanConnect = true;
socket.DisconnectTime = DateTime.UtcNow;
var sub = new SocketConnection(client, null, socket);
var sub = new SocketConnection(client, null, socket, null);
var rstEvent = new ManualResetEvent(false);
JToken result = null;
sub.AddSubscription(SocketSubscription.CreateForIdentifier(10, "TestHandler", true, (messageEvent) =>
sub.AddSubscription(SocketSubscription.CreateForIdentifier(10, "TestHandler", true, false, (messageEvent) =>
{
result = messageEvent.JsonData;
rstEvent.Set();
@ -85,10 +85,10 @@ namespace CryptoExchange.Net.UnitTests
socket.ShouldReconnect = true;
socket.CanConnect = true;
socket.DisconnectTime = DateTime.UtcNow;
var sub = new SocketConnection(client, null, socket);
var sub = new SocketConnection(client, null, socket, null);
var rstEvent = new ManualResetEvent(false);
string original = null;
sub.AddSubscription(SocketSubscription.CreateForIdentifier(10, "TestHandler", true, (messageEvent) =>
sub.AddSubscription(SocketSubscription.CreateForIdentifier(10, "TestHandler", true, false, (messageEvent) =>
{
original = messageEvent.OriginalData;
rstEvent.Set();
@ -103,34 +103,6 @@ namespace CryptoExchange.Net.UnitTests
Assert.IsTrue(original == (enabled ? "{\"property\": 123}" : null));
}
[TestCase]
public void DisconnectedSocket_Should_Reconnect()
{
// arrange
bool reconnected = false;
var client = new TestSocketClient(new TestOptions() { ReconnectInterval = TimeSpan.Zero, LogLevel = LogLevel.Debug });
var socket = client.CreateSocket();
socket.ShouldReconnect = true;
socket.CanConnect = true;
socket.DisconnectTime = DateTime.UtcNow;
var sub = new SocketConnection(client, null, socket);
sub.ShouldReconnect = true;
client.ConnectSocketSub(sub);
var rstEvent = new ManualResetEvent(false);
sub.ConnectionRestored += (a) =>
{
reconnected = true;
rstEvent.Set();
};
// act
socket.InvokeClose();
rstEvent.WaitOne(1000);
// assert
Assert.IsTrue(reconnected);
}
[TestCase()]
public void UnsubscribingStream_Should_CloseTheSocket()
{
@ -138,9 +110,11 @@ namespace CryptoExchange.Net.UnitTests
var client = new TestSocketClient(new TestOptions() { ReconnectInterval = TimeSpan.Zero, LogLevel = LogLevel.Debug });
var socket = client.CreateSocket();
socket.CanConnect = true;
var sub = new SocketConnection(client, null, socket);
var sub = new SocketConnection(client, null, socket, null);
client.ConnectSocketSub(sub);
var ups = new UpdateSubscription(sub, SocketSubscription.CreateForIdentifier(10, "Test", true, (e) => {}));
var us = SocketSubscription.CreateForIdentifier(10, "Test", true, false, (e) => { });
var ups = new UpdateSubscription(sub, us);
sub.AddSubscription(us);
// act
client.UnsubscribeAsync(ups).Wait();
@ -158,8 +132,8 @@ namespace CryptoExchange.Net.UnitTests
var socket2 = client.CreateSocket();
socket1.CanConnect = true;
socket2.CanConnect = true;
var sub1 = new SocketConnection(client, null, socket1);
var sub2 = new SocketConnection(client, null, socket2);
var sub1 = new SocketConnection(client, null, socket1, null);
var sub2 = new SocketConnection(client, null, socket2, null);
client.ConnectSocketSub(sub1);
client.ConnectSocketSub(sub2);
@ -178,7 +152,7 @@ namespace CryptoExchange.Net.UnitTests
var client = new TestSocketClient(new TestOptions() { ReconnectInterval = TimeSpan.Zero, LogLevel = LogLevel.Debug });
var socket = client.CreateSocket();
socket.CanConnect = false;
var sub = new SocketConnection(client, null, socket);
var sub = new SocketConnection(client, null, socket, null);
// act
var connectResult = client.ConnectSocketSub(sub);

View File

@ -13,6 +13,8 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations
public bool Connected { get; set; }
public event Action OnClose;
public event Action OnReconnected;
public event Action OnReconnecting;
public event Action<string> OnMessage;
public event Action<Exception> OnError;
public event Action OnOpen;
@ -93,6 +95,7 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations
{
Connected = false;
DisconnectTime = DateTime.UtcNow;
Reconnecting = true;
OnClose?.Invoke();
}
@ -115,11 +118,6 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations
{
OnError?.Invoke(error);
}
public async Task ProcessAsync()
{
while (Connected)
await Task.Delay(50);
}
public Task ReconnectAsync() => Task.CompletedTask;
}
}

View File

@ -22,13 +22,13 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations
{
SubClient = new TestSubSocketClient(exchangeOptions, exchangeOptions.SubOptions);
SocketFactory = new Mock<IWebsocketFactory>().Object;
Mock.Get(SocketFactory).Setup(f => f.CreateWebsocket(It.IsAny<Log>(), It.IsAny<string>())).Returns(new TestSocket());
Mock.Get(SocketFactory).Setup(f => f.CreateWebsocket(It.IsAny<Log>(), It.IsAny<WebSocketParameters>())).Returns(new TestSocket());
}
public TestSocket CreateSocket()
{
Mock.Get(SocketFactory).Setup(f => f.CreateWebsocket(It.IsAny<Log>(), It.IsAny<string>())).Returns(new TestSocket());
return (TestSocket)CreateSocket("123");
Mock.Get(SocketFactory).Setup(f => f.CreateWebsocket(It.IsAny<Log>(), It.IsAny<WebSocketParameters>())).Returns(new TestSocket());
return (TestSocket)CreateSocket("https://localhost:123/");
}
public CallResult<bool> ConnectSocketSub(SocketConnection sub)

View File

@ -3,7 +3,7 @@ using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Net.WebSockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using CryptoExchange.Net.Authentication;
@ -11,7 +11,6 @@ using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Sockets;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
namespace CryptoExchange.Net
@ -120,10 +119,7 @@ namespace CryptoExchange.Net
/// <param name="options">The options for this client</param>
protected BaseSocketClient(string name, BaseSocketClientOptions options) : base(name, options)
{
if (options == null)
throw new ArgumentNullException(nameof(options));
ClientOptions = options;
ClientOptions = options ?? throw new ArgumentNullException(nameof(options));
}
/// <inheritdoc />
@ -196,10 +192,14 @@ namespace CryptoExchange.Net
while (true)
{
// Get a new or existing socket connection
socketConnection = GetSocketConnection(apiClient, url, authenticated);
var socketResult = await GetSocketConnection(apiClient, url, authenticated).ConfigureAwait(false);
if(!socketResult)
return socketResult.As<UpdateSubscription>(null);
socketConnection = socketResult.Data;
// Add a subscription on the socket connection
subscription = AddSubscription(request, identifier, true, socketConnection, dataHandler);
subscription = AddSubscription(request, identifier, true, socketConnection, dataHandler, authenticated);
if (subscription == null)
{
log.Write(LogLevel.Trace, $"Socket {socketConnection.SocketId} failed to add subscription, retrying on different connection");
@ -240,6 +240,7 @@ namespace CryptoExchange.Net
var subResult = await SubscribeAndWaitAsync(socketConnection, request, subscription).ConfigureAwait(false);
if (!subResult)
{
log.Write(LogLevel.Warning, $"Socket {socketConnection.SocketId} failed to subscribe: {subResult.Error}");
await socketConnection.CloseAsync(subscription).ConfigureAwait(false);
return new CallResult<UpdateSubscription>(subResult.Error!);
}
@ -250,7 +251,6 @@ namespace CryptoExchange.Net
subscription.Confirmed = true;
}
socketConnection.ShouldReconnect = true;
if (ct != default)
{
subscription.CancellationTokenRegistration = ct.Register(async () =>
@ -260,7 +260,7 @@ namespace CryptoExchange.Net
}, false);
}
log.Write(LogLevel.Information, $"Socket {socketConnection.SocketId} subscription completed");
log.Write(LogLevel.Information, $"Socket {socketConnection.SocketId} subscription {subscription.Id} completed successfully");
return new CallResult<UpdateSubscription>(new UpdateSubscription(socketConnection, subscription));
}
@ -320,7 +320,12 @@ namespace CryptoExchange.Net
await semaphoreSlim.WaitAsync().ConfigureAwait(false);
try
{
socketConnection = GetSocketConnection(apiClient, url, authenticated);
var socketResult = await GetSocketConnection(apiClient, url, authenticated).ConfigureAwait(false);
if (!socketResult)
return socketResult.As<T>(default);
socketConnection = socketResult.Data;
if (ClientOptions.SocketSubscriptionsCombineTarget == 1)
{
// Can release early when only a single sub per connection
@ -334,8 +339,6 @@ namespace CryptoExchange.Net
}
finally
{
//When the task is ready, release the semaphore. It is vital to ALWAYS release the semaphore when we are ready, or else we will end up with a Semaphore that is forever locked.
//This is why it is important to do the Release within a try...finally clause; program execution may crash or take a different path, this way you are guaranteed execution
if (!released)
semaphoreSlim.Release();
}
@ -481,8 +484,9 @@ namespace CryptoExchange.Net
/// <param name="userSubscription">Whether or not this is a user subscription (counts towards the max amount of handlers on a socket)</param>
/// <param name="connection">The socket connection the handler is on</param>
/// <param name="dataHandler">The handler of the data received</param>
/// <param name="authenticated">Whether the subscription needs authentication</param>
/// <returns></returns>
protected virtual SocketSubscription? AddSubscription<T>(object? request, string? identifier, bool userSubscription, SocketConnection connection, Action<DataEvent<T>> dataHandler)
protected virtual SocketSubscription? AddSubscription<T>(object? request, string? identifier, bool userSubscription, SocketConnection connection, Action<DataEvent<T>> dataHandler, bool authenticated)
{
void InternalHandler(MessageEvent messageEvent)
{
@ -504,8 +508,8 @@ namespace CryptoExchange.Net
}
var subscription = request == null
? SocketSubscription.CreateForIdentifier(NextId(), identifier!, userSubscription, InternalHandler)
: SocketSubscription.CreateForRequest(NextId(), request, userSubscription, InternalHandler);
? SocketSubscription.CreateForIdentifier(NextId(), identifier!, userSubscription, authenticated, InternalHandler)
: SocketSubscription.CreateForRequest(NextId(), request, userSubscription, authenticated, InternalHandler);
if (!connection.AddSubscription(subscription))
return null;
return subscription;
@ -519,11 +523,23 @@ namespace CryptoExchange.Net
protected void AddGenericHandler(string identifier, Action<MessageEvent> action)
{
genericHandlers.Add(identifier, action);
var subscription = SocketSubscription.CreateForIdentifier(NextId(), identifier, false, action);
var subscription = SocketSubscription.CreateForIdentifier(NextId(), identifier, false, false, action);
foreach (var connection in socketConnections.Values)
connection.AddSubscription(subscription);
}
/// <summary>
/// Get the url to connect to (defaults to BaseAddress form the client options)
/// </summary>
/// <param name="apiClient"></param>
/// <param name="address"></param>
/// <param name="authentication"></param>
/// <returns></returns>
protected virtual Task<CallResult<string?>> GetConnectionUrlAsync(SocketApiClient apiClient, string address, bool authentication)
{
return Task.FromResult(new CallResult<string?>(address));
}
/// <summary>
/// Gets a connection for a new subscription or query. Can be an existing if there are open position or a new one.
/// </summary>
@ -531,10 +547,10 @@ namespace CryptoExchange.Net
/// <param name="address">The address the socket is for</param>
/// <param name="authenticated">Whether the socket should be authenticated</param>
/// <returns></returns>
protected virtual SocketConnection GetSocketConnection(SocketApiClient apiClient, string address, bool authenticated)
protected virtual async Task<CallResult<SocketConnection>> GetSocketConnection(SocketApiClient apiClient, string address, bool authenticated)
{
var socketResult = socketConnections.Where(s => (s.Value.Status == SocketConnection.SocketStatus.None || s.Value.Status == SocketConnection.SocketStatus.Connected)
&& s.Value.Uri.ToString().TrimEnd('/') == address.TrimEnd('/')
&& s.Value.Tag.TrimEnd('/') == address.TrimEnd('/')
&& (s.Value.ApiClient.GetType() == apiClient.GetType())
&& (s.Value.Authenticated == authenticated || !authenticated) && s.Value.Connected).OrderBy(s => s.Value.SubscriptionCount).FirstOrDefault();
var result = socketResult.Equals(default(KeyValuePair<int, SocketConnection>)) ? null : socketResult.Value;
@ -543,21 +559,31 @@ namespace CryptoExchange.Net
if (result.SubscriptionCount < ClientOptions.SocketSubscriptionsCombineTarget || (socketConnections.Count >= ClientOptions.MaxSocketConnections && socketConnections.All(s => s.Value.SubscriptionCount >= ClientOptions.SocketSubscriptionsCombineTarget)))
{
// Use existing socket if it has less than target connections OR it has the least connections and we can't make new
return result;
return new CallResult<SocketConnection>(result);
}
}
var connectionAddress = await GetConnectionUrlAsync(apiClient, address, authenticated).ConfigureAwait(false);
if (!connectionAddress)
{
log.Write(LogLevel.Warning, $"Failed to determine connection url: " + connectionAddress.Error);
return connectionAddress.As<SocketConnection>(null);
}
if (connectionAddress.Data != address)
log.Write(LogLevel.Debug, $"Connection address set to " + connectionAddress.Data);
// Create new socket
var socket = CreateSocket(address);
var socketConnection = new SocketConnection(this, apiClient, socket);
var socket = CreateSocket(connectionAddress.Data!);
var socketConnection = new SocketConnection(this, apiClient, socket, address);
socketConnection.UnhandledMessage += HandleUnhandledMessage;
foreach (var kvp in genericHandlers)
{
var handler = SocketSubscription.CreateForIdentifier(NextId(), kvp.Key, false, kvp.Value);
var handler = SocketSubscription.CreateForIdentifier(NextId(), kvp.Key, false, false, kvp.Value);
socketConnection.AddSubscription(handler);
}
return socketConnection;
return new CallResult<SocketConnection>(socketConnection);
}
/// <summary>
@ -585,6 +611,23 @@ namespace CryptoExchange.Net
return new CallResult<bool>(new CantConnectError());
}
/// <summary>
/// Get parameters for the websocket connection
/// </summary>
/// <param name="address">The address to connect to</param>
/// <returns></returns>
protected virtual WebSocketParameters GetWebSocketParameters(string address)
=> new (new Uri(address), ClientOptions.AutoReconnect)
{
DataInterpreterBytes = dataInterpreterBytes,
DataInterpreterString = dataInterpreterString,
KeepAliveInterval = KeepAliveInterval,
ReconnectInterval = ClientOptions.ReconnectInterval,
RatelimitPerSecond = RateLimitPerSocketPerSecond,
Proxy = ClientOptions.Proxy,
Timeout = ClientOptions.SocketNoDataTimeout
};
/// <summary>
/// Create a socket for an address
/// </summary>
@ -592,24 +635,8 @@ namespace CryptoExchange.Net
/// <returns></returns>
protected virtual IWebsocket CreateSocket(string address)
{
var socket = SocketFactory.CreateWebsocket(log, address);
var socket = SocketFactory.CreateWebsocket(log, GetWebSocketParameters(address));
log.Write(LogLevel.Debug, $"Socket {socket.Id} new socket created for " + address);
if (ClientOptions.Proxy != null)
socket.SetProxy(ClientOptions.Proxy);
socket.KeepAliveInterval = KeepAliveInterval;
socket.Timeout = ClientOptions.SocketNoDataTimeout;
socket.DataInterpreterBytes = dataInterpreterBytes;
socket.DataInterpreterString = dataInterpreterString;
socket.RatelimitPerSecond = RateLimitPerSocketPerSecond;
socket.OnError += e =>
{
if(e is WebSocketException wse)
log.Write(LogLevel.Warning, $"Socket {socket.Id} error: Websocket error code {wse.WebSocketErrorCode}, details: " + e.ToLogString());
else
log.Write(LogLevel.Warning, $"Socket {socket.Id} error: " + e.ToLogString());
};
return socket;
}
@ -667,7 +694,6 @@ namespace CryptoExchange.Net
/// <returns></returns>
public virtual async Task UnsubscribeAsync(int subscriptionId)
{
SocketSubscription? subscription = null;
SocketConnection? connection = null;
foreach(var socket in socketConnections.Values.ToList())
@ -683,7 +709,7 @@ namespace CryptoExchange.Net
if (subscription == null || connection == null)
return;
log.Write(LogLevel.Information, "Closing subscription " + subscriptionId);
log.Write(LogLevel.Information, $"Socket {connection.SocketId} Unsubscribing subscription " + subscriptionId);
await connection.CloseAsync(subscription).ConfigureAwait(false);
}
@ -697,7 +723,7 @@ namespace CryptoExchange.Net
if (subscription == null)
throw new ArgumentNullException(nameof(subscription));
log.Write(LogLevel.Information, "Closing subscription " + subscription.Id);
log.Write(LogLevel.Information, $"Socket {subscription.SocketId} Unsubscribing subscription " + subscription.Id);
await subscription.CloseAsync().ConfigureAwait(false);
}
@ -707,7 +733,7 @@ namespace CryptoExchange.Net
/// <returns></returns>
public virtual async Task UnsubscribeAllAsync()
{
log.Write(LogLevel.Information, $"Closing all {socketConnections.Sum(s => s.Value.SubscriptionCount)} subscriptions");
log.Write(LogLevel.Information, $"Unsubscribing all {socketConnections.Sum(s => s.Value.SubscriptionCount)} subscriptions");
var tasks = new List<Task>();
{
var socketList = socketConnections.Values;
@ -718,6 +744,39 @@ namespace CryptoExchange.Net
await Task.WhenAll(tasks.ToArray()).ConfigureAwait(false);
}
/// <summary>
/// Reconnect all connections
/// </summary>
/// <returns></returns>
public virtual async Task ReconnectAsync()
{
log.Write(LogLevel.Information, $"Reconnecting all {socketConnections.Count} connections");
var tasks = new List<Task>();
{
var socketList = socketConnections.Values;
foreach (var sub in socketList)
tasks.Add(sub.TriggerReconnectAsync());
}
await Task.WhenAll(tasks.ToArray()).ConfigureAwait(false);
}
/// <summary>
/// Log the current state of connections and subscriptions
/// </summary>
public string GetSubscriptionsState()
{
var sb = new StringBuilder();
sb.AppendLine($"{socketConnections.Count} connections, {CurrentSubscriptions} subscriptions, kbps: {IncomingKbps}");
foreach(var connection in socketConnections)
{
sb.AppendLine($" Connection {connection.Key}: {connection.Value.SubscriptionCount} subscriptions, status: {connection.Value.Status}, authenticated: {connection.Value.Authenticated}, kbps: {connection.Value.IncomingKbps}");
foreach (var subscription in connection.Value.Subscriptions)
sb.AppendLine($" Subscription {subscription.Id}, authenticated: {subscription.Authenticated}, confirmed: {subscription.Confirmed}");
}
return sb.ToString();
}
/// <summary>
/// Dispose the client
/// </summary>
@ -726,8 +785,11 @@ namespace CryptoExchange.Net
disposing = true;
periodicEvent?.Set();
periodicEvent?.Dispose();
log.Write(LogLevel.Debug, "Disposing socket client, closing all subscriptions");
_ = UnsubscribeAllAsync();
if (socketConnections.Sum(s => s.Value.SubscriptionCount) > 0)
{
log.Write(LogLevel.Debug, "Disposing socket client, closing all subscriptions");
_ = UnsubscribeAllAsync();
}
semaphoreSlim?.Dispose();
base.Dispose();
}

View File

@ -6,16 +6,16 @@
<PackageId>CryptoExchange.Net</PackageId>
<Authors>JKorf</Authors>
<Description>A base package for implementing cryptocurrency API's</Description>
<PackageVersion>5.1.12</PackageVersion>
<AssemblyVersion>5.1.12</AssemblyVersion>
<FileVersion>5.1.12</FileVersion>
<PackageVersion>5.2.0</PackageVersion>
<AssemblyVersion>5.2.0</AssemblyVersion>
<FileVersion>5.2.0</FileVersion>
<PackageRequireLicenseAcceptance>false</PackageRequireLicenseAcceptance>
<RepositoryType>git</RepositoryType>
<RepositoryUrl>https://github.com/JKorf/CryptoExchange.Net.git</RepositoryUrl>
<PackageProjectUrl>https://github.com/JKorf/CryptoExchange.Net</PackageProjectUrl>
<NeutralLanguage>en</NeutralLanguage>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
<PackageReleaseNotes>5.1.12 - Changed time sync so requests no longer wait for it to complete unless it's the first time, Made log client options changable after client creation, Fixed proxy setting not used when reconnecting socket, Changed MaxSocketConnections to a client options, Updated socket reconnection logic</PackageReleaseNotes>
<PackageReleaseNotes>5.2.0 - Refactored websocket code, removed some clutter and simplified, Added ReconnectAsync and GetSubscriptionsState methods on socket clients</PackageReleaseNotes>
<Nullable>enable</Nullable>
<LangVersion>9.0</LangVersion>
<PackageLicenseExpression>MIT</PackageLicenseExpression>

View File

@ -27,36 +27,24 @@ namespace CryptoExchange.Net.Interfaces
/// Websocket opened event
/// </summary>
event Action OnOpen;
/// <summary>
/// Websocket has lost connection to the server and is attempting to reconnect
/// </summary>
event Action OnReconnecting;
/// <summary>
/// Websocket has reconnected to the server
/// </summary>
event Action OnReconnected;
/// <summary>
/// Unique id for this socket
/// </summary>
int Id { get; }
/// <summary>
/// Origin header
/// </summary>
string? Origin { get; set; }
/// <summary>
/// Encoding to use for sending/receiving string data
/// </summary>
Encoding? Encoding { get; set; }
/// <summary>
/// The max amount of outgoing messages per second
/// </summary>
int? RatelimitPerSecond { get; set; }
/// <summary>
/// The current kilobytes per second of data being received, averaged over the last 3 seconds
/// </summary>
double IncomingKbps { get; }
/// <summary>
/// Handler for byte data
/// </summary>
Func<byte[], string>? DataInterpreterBytes { get; set; }
/// <summary>
/// Handler for string data
/// </summary>
Func<string, string>? DataInterpreterString { get; set; }
/// <summary>
/// The uri the socket connects to
/// </summary>
Uri Uri { get; }
@ -69,41 +57,20 @@ namespace CryptoExchange.Net.Interfaces
/// </summary>
bool IsOpen { get; }
/// <summary>
/// Supported ssl protocols
/// </summary>
SslProtocols SSLProtocols { get; set; }
/// <summary>
/// The max time for no data being received before the connection is considered lost
/// </summary>
TimeSpan Timeout { get; set; }
/// <summary>
/// The interval at which to send a ping frame to the server
/// </summary>
TimeSpan KeepAliveInterval { get; set; }
/// <summary>
/// Set a proxy to use when connecting
/// </summary>
/// <param name="proxy"></param>
void SetProxy(ApiProxy proxy);
/// <summary>
/// Connect the socket
/// </summary>
/// <returns></returns>
Task<bool> ConnectAsync();
/// <summary>
/// Receive and send messages over the connection. Resulting task should complete when closing the socket.
/// </summary>
/// <returns></returns>
Task ProcessAsync();
Task<bool> ConnectAsync();
/// <summary>
/// Send data
/// </summary>
/// <param name="data"></param>
void Send(string data);
/// <summary>
/// Reset socket when a connection is lost to prepare for a new connection
/// Reconnect the socket
/// </summary>
void Reset();
/// <returns></returns>
Task ReconnectAsync();
/// <summary>
/// Close the connection
/// </summary>

View File

@ -1,5 +1,5 @@
using System.Collections.Generic;
using CryptoExchange.Net.Logging;
using CryptoExchange.Net.Logging;
using CryptoExchange.Net.Sockets;
namespace CryptoExchange.Net.Interfaces
{
@ -12,17 +12,8 @@ namespace CryptoExchange.Net.Interfaces
/// Create a websocket for an url
/// </summary>
/// <param name="log">The logger</param>
/// <param name="url">The url the socket is fo</param>
/// <param name="parameters">The parameters to use for the connection</param>
/// <returns></returns>
IWebsocket CreateWebsocket(Log log, string url);
/// <summary>
/// Create a websocket for an url
/// </summary>
/// <param name="log">The logger</param>
/// <param name="url">The url the socket is fo</param>
/// <param name="cookies">Cookies to be send in the initial request</param>
/// <param name="headers">Headers to be send in the initial request</param>
/// <returns></returns>
IWebsocket CreateWebsocket(Log log, string url, IDictionary<string, string> cookies, IDictionary<string, string> headers);
IWebsocket CreateWebsocket(Log log, WebSocketParameters parameters);
}
}

View File

@ -176,16 +176,6 @@ namespace CryptoExchange.Net.Objects
/// </summary>
public TimeSpan ReconnectInterval { get; set; } = TimeSpan.FromSeconds(5);
/// <summary>
/// The maximum number of times to try to reconnect, default null will retry indefinitely
/// </summary>
public int? MaxReconnectTries { get; set; }
/// <summary>
/// The maximum number of times to try to resubscribe after reconnecting
/// </summary>
public int? MaxResubscribeTries { get; set; }
/// <summary>
/// Max number of concurrent resubscription tasks per socket after reconnecting a socket
/// </summary>
@ -232,8 +222,6 @@ namespace CryptoExchange.Net.Objects
AutoReconnect = baseOptions.AutoReconnect;
ReconnectInterval = baseOptions.ReconnectInterval;
MaxReconnectTries = baseOptions.MaxReconnectTries;
MaxResubscribeTries = baseOptions.MaxResubscribeTries;
MaxConcurrentResubscriptionsPerSocket = baseOptions.MaxConcurrentResubscriptionsPerSocket;
SocketResponseTimeout = baseOptions.SocketResponseTimeout;
SocketNoDataTimeout = baseOptions.SocketNoDataTimeout;
@ -244,7 +232,7 @@ namespace CryptoExchange.Net.Objects
/// <inheritdoc />
public override string ToString()
{
return $"{base.ToString()}, AutoReconnect: {AutoReconnect}, ReconnectInterval: {ReconnectInterval}, MaxReconnectTries: {MaxReconnectTries}, MaxResubscribeTries: {MaxResubscribeTries}, MaxConcurrentResubscriptionsPerSocket: {MaxConcurrentResubscriptionsPerSocket}, SocketResponseTimeout: {SocketResponseTimeout:c}, SocketNoDataTimeout: {SocketNoDataTimeout}, SocketSubscriptionsCombineTarget: {SocketSubscriptionsCombineTarget}, MaxSocketConnections: {MaxSocketConnections}";
return $"{base.ToString()}, AutoReconnect: {AutoReconnect}, ReconnectInterval: {ReconnectInterval}, MaxConcurrentResubscriptionsPerSocket: {MaxConcurrentResubscriptionsPerSocket}, SocketResponseTimeout: {SocketResponseTimeout:c}, SocketNoDataTimeout: {SocketNoDataTimeout}, SocketSubscriptionsCombineTarget: {SocketSubscriptionsCombineTarget}, MaxSocketConnections: {MaxSocketConnections}";
}
}

View File

@ -5,13 +5,10 @@ using Microsoft.Extensions.Logging;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.WebSockets;
using System.Security.Authentication;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
@ -22,21 +19,32 @@ namespace CryptoExchange.Net.Sockets
/// </summary>
public class CryptoExchangeWebSocketClient : IWebsocket
{
enum ProcessState
{
Idle,
Processing,
WaitingForClose,
Reconnecting
}
internal static int lastStreamId;
private static readonly object streamIdLock = new();
private ClientWebSocket _socket;
private readonly AsyncResetEvent _sendEvent;
private readonly ConcurrentQueue<byte[]> _sendBuffer;
private readonly IDictionary<string, string> cookies;
private readonly IDictionary<string, string> headers;
private CancellationTokenSource _ctsSource;
private ApiProxy? _proxy;
private readonly SemaphoreSlim _closeSem;
private readonly WebSocketParameters _parameters;
private readonly List<DateTime> _outgoingMessages;
private ClientWebSocket _socket;
private CancellationTokenSource _ctsSource;
private DateTime _lastReceivedMessagesUpdate;
private bool _closed;
private Task? _processTask;
private Task? _closeTask;
private bool _stopRequested;
private bool _disposed;
private ProcessState _processState;
/// <summary>
/// Received messages, the size and the timstamp
@ -51,48 +59,18 @@ namespace CryptoExchange.Net.Sockets
/// <summary>
/// Log
/// </summary>
protected Log log;
/// <summary>
/// Handlers for when an error happens on the socket
/// </summary>
protected readonly List<Action<Exception>> errorHandlers = new();
/// <summary>
/// Handlers for when the socket connection is opened
/// </summary>
protected readonly List<Action> openHandlers = new();
/// <summary>
/// Handlers for when the connection is closed
/// </summary>
protected readonly List<Action> closeHandlers = new();
/// <summary>
/// Handlers for when a message is received
/// </summary>
protected readonly List<Action<string>> messageHandlers = new();
protected Log _log;
/// <inheritdoc />
public int Id { get; }
/// <inheritdoc />
public string? Origin { get; set; }
/// <summary>
/// The timestamp this socket has been active for the last time
/// </summary>
public DateTime LastActionTime { get; private set; }
/// <summary>
/// Delegate used for processing byte data received from socket connections before it is processed by handlers
/// </summary>
public Func<byte[], string>? DataInterpreterBytes { get; set; }
/// <summary>
/// Delegate used for processing string data received from socket connections before it is processed by handlers
/// </summary>
public Func<string, string>? DataInterpreterString { get; set; }
/// <inheritdoc />
public Uri Uri { get; }
public Uri Uri => _parameters.Uri;
/// <inheritdoc />
public bool IsClosed => _socket.State == WebSocketState.Closed;
@ -100,34 +78,6 @@ namespace CryptoExchange.Net.Sockets
/// <inheritdoc />
public bool IsOpen => _socket.State == WebSocketState.Open && !_ctsSource.IsCancellationRequested;
/// <summary>
/// Ssl protocols supported. NOT USED BY THIS IMPLEMENTATION
/// </summary>
public SslProtocols SSLProtocols { get; set; }
private Encoding _encoding = Encoding.UTF8;
/// <inheritdoc />
public Encoding? Encoding
{
get => _encoding;
set
{
if(value != null)
_encoding = value;
}
}
/// <summary>
/// The max amount of outgoing messages per second
/// </summary>
public int? RatelimitPerSecond { get; set; }
/// <inheritdoc />
public TimeSpan Timeout { get; set; }
/// <inheritdoc />
public TimeSpan KeepAliveInterval { get; set; }
/// <inheritdoc />
public double IncomingKbps
{
@ -146,57 +96,29 @@ namespace CryptoExchange.Net.Sockets
}
/// <inheritdoc />
public event Action OnClose
{
add => closeHandlers.Add(value);
remove => closeHandlers.Remove(value);
}
public event Action? OnClose;
/// <inheritdoc />
public event Action<string> OnMessage
{
add => messageHandlers.Add(value);
remove => messageHandlers.Remove(value);
}
public event Action<string>? OnMessage;
/// <inheritdoc />
public event Action<Exception> OnError
{
add => errorHandlers.Add(value);
remove => errorHandlers.Remove(value);
}
public event Action<Exception>? OnError;
/// <inheritdoc />
public event Action OnOpen
{
add => openHandlers.Add(value);
remove => openHandlers.Remove(value);
}
public event Action? OnOpen;
/// <inheritdoc />
public event Action? OnReconnecting;
/// <inheritdoc />
public event Action? OnReconnected;
/// <summary>
/// ctor
/// </summary>
/// <param name="log">The log object to use</param>
/// <param name="uri">The uri the socket should connect to</param>
public CryptoExchangeWebSocketClient(Log log, Uri uri) : this(log, uri, new Dictionary<string, string>(), new Dictionary<string, string>())
{
}
/// <summary>
/// ctor
/// </summary>
/// <param name="log">The log object to use</param>
/// <param name="uri">The uri the socket should connect to</param>
/// <param name="cookies">Cookies to sent in the socket connection request</param>
/// <param name="headers">Headers to sent in the socket connection request</param>
public CryptoExchangeWebSocketClient(Log log, Uri uri, IDictionary<string, string> cookies, IDictionary<string, string> headers)
/// <param name="websocketParameters">The parameters for this socket</param>
public CryptoExchangeWebSocketClient(Log log, WebSocketParameters websocketParameters)
{
Id = NextStreamId();
this.log = log;
Uri = uri;
this.cookies = cookies;
this.headers = headers;
_log = log;
_parameters = websocketParameters;
_outgoingMessages = new List<DateTime>();
_receivedMessages = new List<ReceiveItem>();
_sendEvent = new AsyncResetEvent();
@ -204,90 +126,184 @@ namespace CryptoExchange.Net.Sockets
_ctsSource = new CancellationTokenSource();
_receivedMessagesLock = new object();
_closeSem = new SemaphoreSlim(1, 1);
_socket = CreateSocket();
}
/// <inheritdoc />
public virtual void SetProxy(ApiProxy proxy)
{
_proxy = proxy;
if (!Uri.TryCreate($"{proxy.Host}:{proxy.Port}", UriKind.Absolute, out var uri))
throw new ArgumentException("Proxy settings invalid, {proxy.Host}:{proxy.Port} not a valid URI", nameof(proxy));
_socket.Options.Proxy = uri?.Scheme == null
? _socket.Options.Proxy = new WebProxy(proxy.Host, proxy.Port)
: _socket.Options.Proxy = new WebProxy
{
Address = uri
};
if (proxy.Login != null)
_socket.Options.Proxy.Credentials = new NetworkCredential(proxy.Login, proxy.Password);
}
/// <inheritdoc />
public virtual async Task<bool> ConnectAsync()
{
log.Write(LogLevel.Debug, $"Socket {Id} connecting");
if (!await ConnectInternalAsync().ConfigureAwait(false))
return false;
OnOpen?.Invoke();
_processTask = ProcessAsync();
return true;
}
/// <summary>
/// Create the socket object
/// </summary>
private ClientWebSocket CreateSocket()
{
var cookieContainer = new CookieContainer();
foreach (var cookie in _parameters.Cookies)
cookieContainer.Add(new Cookie(cookie.Key, cookie.Value));
var socket = new ClientWebSocket();
socket.Options.Cookies = cookieContainer;
foreach (var header in _parameters.Headers)
socket.Options.SetRequestHeader(header.Key, header.Value);
socket.Options.KeepAliveInterval = _parameters.KeepAliveInterval ?? TimeSpan.Zero;
socket.Options.SetBuffer(65536, 65536); // Setting it to anything bigger than 65536 throws an exception in .net framework
if (_parameters.Proxy != null)
SetProxy(_parameters.Proxy);
return socket;
}
private async Task<bool> ConnectInternalAsync()
{
_log.Write(LogLevel.Debug, $"Socket {Id} connecting");
try
{
using CancellationTokenSource tcs = new(TimeSpan.FromSeconds(10));
using CancellationTokenSource tcs = new(TimeSpan.FromSeconds(10));
await _socket.ConnectAsync(Uri, tcs.Token).ConfigureAwait(false);
Handle(openHandlers);
}
catch (Exception e)
{
log.Write(LogLevel.Debug, $"Socket {Id} connection failed: " + e.ToLogString());
_log.Write(LogLevel.Debug, $"Socket {Id} connection failed: " + e.ToLogString());
return false;
}
log.Write(LogLevel.Debug, $"Socket {Id} connected to {Uri}");
_log.Write(LogLevel.Debug, $"Socket {Id} connected to {Uri}");
return true;
}
/// <inheritdoc />
public virtual async Task ProcessAsync()
private async Task ProcessAsync()
{
log.Write(LogLevel.Trace, $"Socket {Id} ProcessAsync started");
var sendTask = SendLoopAsync();
var receiveTask = ReceiveLoopAsync();
var timeoutTask = Timeout != default ? CheckTimeoutAsync() : Task.CompletedTask;
log.Write(LogLevel.Trace, $"Socket {Id} processing startup completed");
await Task.WhenAll(sendTask, receiveTask, timeoutTask).ConfigureAwait(false);
log.Write(LogLevel.Trace, $"Socket {Id} ProcessAsync finished");
while (!_stopRequested)
{
_log.Write(LogLevel.Debug, $"Socket {Id} starting processing tasks");
_processState = ProcessState.Processing;
var sendTask = SendLoopAsync();
var receiveTask = ReceiveLoopAsync();
var timeoutTask = _parameters.Timeout != null && _parameters.Timeout > TimeSpan.FromSeconds(0) ? CheckTimeoutAsync() : Task.CompletedTask;
await Task.WhenAll(sendTask, receiveTask, timeoutTask).ConfigureAwait(false);
_log.Write(LogLevel.Debug, $"Socket {Id} processing tasks finished");
_processState = ProcessState.WaitingForClose;
while (_closeTask == null)
await Task.Delay(50).ConfigureAwait(false);
await _closeTask.ConfigureAwait(false);
_closeTask = null;
if (!_parameters.AutoReconnect)
{
_processState = ProcessState.Idle;
OnClose?.Invoke();
return;
}
if (!_stopRequested)
{
_processState = ProcessState.Reconnecting;
OnReconnecting?.Invoke();
}
while (!_stopRequested)
{
_log.Write(LogLevel.Debug, $"Socket {Id} attempting to reconnect");
_socket = CreateSocket();
_ctsSource.Dispose();
_ctsSource = new CancellationTokenSource();
while (_sendBuffer.TryDequeue(out _)) { } // Clear send buffer
var connected = await ConnectInternalAsync().ConfigureAwait(false);
if (!connected)
{
await Task.Delay(_parameters.ReconnectInterval).ConfigureAwait(false);
continue;
}
OnReconnected?.Invoke();
break;
}
}
_processState = ProcessState.Idle;
}
/// <inheritdoc />
public virtual void Send(string data)
{
if (_ctsSource.IsCancellationRequested)
throw new InvalidOperationException($"Socket {Id} Can't send data when socket is not connected");
return;
var bytes = _encoding.GetBytes(data);
log.Write(LogLevel.Trace, $"Socket {Id} Adding {bytes.Length} to sent buffer");
var bytes = _parameters.Encoding.GetBytes(data);
_log.Write(LogLevel.Trace, $"Socket {Id} Adding {bytes.Length} to sent buffer");
_sendBuffer.Enqueue(bytes);
_sendEvent.Set();
}
/// <inheritdoc />
public virtual async Task ReconnectAsync()
{
if (_processState != ProcessState.Processing)
return;
_log.Write(LogLevel.Debug, $"Socket {Id} reconnect requested");
_closeTask = CloseInternalAsync();
await _closeTask.ConfigureAwait(false);
}
/// <inheritdoc />
public virtual async Task CloseAsync()
{
log.Write(LogLevel.Debug, $"Socket {Id} closing");
await CloseInternalAsync().ConfigureAwait(false);
await _closeSem.WaitAsync().ConfigureAwait(false);
try
{
if (_closeTask != null && !_closeTask.IsCompleted)
{
_log.Write(LogLevel.Debug, $"Socket {Id} CloseAsync() waiting for existing close task");
await _closeTask.ConfigureAwait(false);
return;
}
_stopRequested = true;
if (!IsOpen)
{
_log.Write(LogLevel.Debug, $"Socket {Id} CloseAsync() socket not open");
return;
}
_log.Write(LogLevel.Debug, $"Socket {Id} closing");
_closeTask = CloseInternalAsync();
}
finally
{
_closeSem.Release();
}
await _closeTask.ConfigureAwait(false);
if(_processTask != null)
await _processTask.ConfigureAwait(false);
OnClose?.Invoke();
_log.Write(LogLevel.Debug, $"Socket {Id} closed");
}
/// <summary>
/// Internal close method
/// </summary>
/// <returns></returns>
private async Task CloseInternalAsync()
{
if (_closed || _disposed)
if (_disposed)
return;
_closed = true;
//_closeState = CloseState.Closing;
_ctsSource.Cancel();
_sendEvent.Set();
@ -297,8 +313,12 @@ namespace CryptoExchange.Net.Sockets
{
await _socket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Closing", default).ConfigureAwait(false);
}
catch(Exception)
{ } // Can sometimes throw an exception when socket is in aborted state due to timing
catch (Exception)
{
// Can sometimes throw an exception when socket is in aborted state due to timing
// Websocket is set to Aborted state when the cancelation token is set during SendAsync/ReceiveAsync
// So socket might go to aborted state, might still be open
}
}
else if(_socket.State == WebSocketState.CloseReceived)
{
@ -307,10 +327,12 @@ namespace CryptoExchange.Net.Sockets
await _socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing", default).ConfigureAwait(false);
}
catch (Exception)
{ } // Can sometimes throw an exception when socket is in aborted state due to timing
{
// Can sometimes throw an exception when socket is in aborted state due to timing
// Websocket is set to Aborted state when the cancelation token is set during SendAsync/ReceiveAsync
// So socket might go to aborted state, might still be open
}
}
log.Write(LogLevel.Debug, $"Socket {Id} closed");
Handle(closeHandlers);
}
/// <summary>
@ -321,48 +343,11 @@ namespace CryptoExchange.Net.Sockets
if (_disposed)
return;
log.Write(LogLevel.Debug, $"Socket {Id} disposing");
_log.Write(LogLevel.Debug, $"Socket {Id} disposing");
_disposed = true;
_socket.Dispose();
_ctsSource.Dispose();
errorHandlers.Clear();
openHandlers.Clear();
closeHandlers.Clear();
messageHandlers.Clear();
log.Write(LogLevel.Trace, $"Socket {Id} disposed");
}
/// <inheritdoc />
public void Reset()
{
log.Write(LogLevel.Debug, $"Socket {Id} resetting");
_ctsSource = new CancellationTokenSource();
while (_sendBuffer.TryDequeue(out _)) { } // Clear send buffer
_socket = CreateSocket();
if (_proxy != null)
SetProxy(_proxy);
_closed = false;
}
/// <summary>
/// Create the socket object
/// </summary>
private ClientWebSocket CreateSocket()
{
var cookieContainer = new CookieContainer();
foreach (var cookie in cookies)
cookieContainer.Add(new Cookie(cookie.Key, cookie.Value));
var socket = new ClientWebSocket();
socket.Options.Cookies = cookieContainer;
foreach (var header in headers)
socket.Options.SetRequestHeader(header.Key, header.Value);
socket.Options.KeepAliveInterval = KeepAliveInterval;
socket.Options.SetBuffer(65536, 65536); // Setting it to anything bigger than 65536 throws an exception in .net framework
return socket;
_log.Write(LogLevel.Trace, $"Socket {Id} disposed");
}
/// <summary>
@ -385,25 +370,25 @@ namespace CryptoExchange.Net.Sockets
while (_sendBuffer.TryDequeue(out var data))
{
if (RatelimitPerSecond != null)
if (_parameters.RatelimitPerSecond != null)
{
// Wait for rate limit
DateTime? start = null;
while (MessagesSentLastSecond() >= RatelimitPerSecond)
while (MessagesSentLastSecond() >= _parameters.RatelimitPerSecond)
{
start ??= DateTime.UtcNow;
await Task.Delay(50).ConfigureAwait(false);
}
if (start != null)
log.Write(LogLevel.Trace, $"Socket {Id} sent delayed {Math.Round((DateTime.UtcNow - start.Value).TotalMilliseconds)}ms because of rate limit");
_log.Write(LogLevel.Debug, $"Socket {Id} sent delayed {Math.Round((DateTime.UtcNow - start.Value).TotalMilliseconds)}ms because of rate limit");
}
try
{
await _socket.SendAsync(new ArraySegment<byte>(data, 0, data.Length), WebSocketMessageType.Text, true, _ctsSource.Token).ConfigureAwait(false);
_outgoingMessages.Add(DateTime.UtcNow);
log.Write(LogLevel.Trace, $"Socket {Id} sent {data.Length} bytes");
_log.Write(LogLevel.Trace, $"Socket {Id} sent {data.Length} bytes");
}
catch (OperationCanceledException)
{
@ -412,9 +397,9 @@ namespace CryptoExchange.Net.Sockets
}
catch (Exception ioe)
{
// Connection closed unexpectedly, .NET framework
Handle(errorHandlers, ioe);
await CloseInternalAsync().ConfigureAwait(false);
// Connection closed unexpectedly, .NET framework
OnError?.Invoke(ioe);
_closeTask = CloseInternalAsync();
break;
}
}
@ -425,12 +410,12 @@ namespace CryptoExchange.Net.Sockets
// Because this is running in a separate task and not awaited until the socket gets closed
// any exception here will crash the send processing, but do so silently unless the socket get's stopped.
// Make sure we at least let the owner know there was an error
Handle(errorHandlers, e);
OnError?.Invoke(e);
throw;
}
finally
{
log.Write(LogLevel.Trace, $"Socket {Id} Send loop finished");
_log.Write(LogLevel.Debug, $"Socket {Id} Send loop finished");
}
}
@ -468,17 +453,17 @@ namespace CryptoExchange.Net.Sockets
}
catch (Exception wse)
{
// Connection closed unexpectedly
Handle(errorHandlers, wse);
await CloseInternalAsync().ConfigureAwait(false);
// Connection closed unexpectedly
OnError?.Invoke(wse);
_closeTask = CloseInternalAsync();
break;
}
if (receiveResult.MessageType == WebSocketMessageType.Close)
{
// Connection closed unexpectedly
log.Write(LogLevel.Debug, $"Socket {Id} received `Close` message");
await CloseInternalAsync().ConfigureAwait(false);
_log.Write(LogLevel.Debug, $"Socket {Id} received `Close` message");
_closeTask = CloseInternalAsync();
break;
}
@ -487,7 +472,7 @@ namespace CryptoExchange.Net.Sockets
// We received data, but it is not complete, write it to a memory stream for reassembling
multiPartMessage = true;
memoryStream ??= new MemoryStream();
log.Write(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in partial message");
_log.Write(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in partial message");
await memoryStream.WriteAsync(buffer.Array, buffer.Offset, receiveResult.Count).ConfigureAwait(false);
}
else
@ -495,13 +480,13 @@ namespace CryptoExchange.Net.Sockets
if (!multiPartMessage)
{
// Received a complete message and it's not multi part
log.Write(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in single message");
_log.Write(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in single message");
HandleMessage(buffer.Array!, buffer.Offset, receiveResult.Count, receiveResult.MessageType);
}
else
{
// Received the end of a multipart message, write to memory stream for reassembling
log.Write(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in partial message");
_log.Write(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in partial message");
await memoryStream!.WriteAsync(buffer.Array, buffer.Offset, receiveResult.Count).ConfigureAwait(false);
}
break;
@ -529,12 +514,12 @@ namespace CryptoExchange.Net.Sockets
if (receiveResult?.EndOfMessage == true)
{
// Reassemble complete message from memory stream
log.Write(LogLevel.Trace, $"Socket {Id} reassembled message of {memoryStream!.Length} bytes");
_log.Write(LogLevel.Trace, $"Socket {Id} reassembled message of {memoryStream!.Length} bytes");
HandleMessage(memoryStream!.ToArray(), 0, (int)memoryStream.Length, receiveResult.MessageType);
memoryStream.Dispose();
}
else
log.Write(LogLevel.Trace, $"Socket {Id} discarding incomplete message of {memoryStream!.Length} bytes");
_log.Write(LogLevel.Trace, $"Socket {Id} discarding incomplete message of {memoryStream!.Length} bytes");
}
}
}
@ -543,12 +528,12 @@ namespace CryptoExchange.Net.Sockets
// Because this is running in a separate task and not awaited until the socket gets closed
// any exception here will crash the receive processing, but do so silently unless the socket gets stopped.
// Make sure we at least let the owner know there was an error
Handle(errorHandlers, e);
OnError?.Invoke(e);
throw;
}
finally
{
log.Write(LogLevel.Trace, $"Socket {Id} Receive loop finished");
_log.Write(LogLevel.Debug, $"Socket {Id} Receive loop finished");
}
}
@ -564,54 +549,92 @@ namespace CryptoExchange.Net.Sockets
string strData;
if (messageType == WebSocketMessageType.Binary)
{
if (DataInterpreterBytes == null)
if (_parameters.DataInterpreterBytes == null)
throw new Exception("Byte interpreter not set while receiving byte data");
try
{
var relevantData = new byte[count];
Array.Copy(data, offset, relevantData, 0, count);
strData = DataInterpreterBytes(relevantData);
strData = _parameters.DataInterpreterBytes(relevantData);
}
catch(Exception e)
{
log.Write(LogLevel.Error, $"Socket {Id} unhandled exception during byte data interpretation: " + e.ToLogString());
_log.Write(LogLevel.Error, $"Socket {Id} unhandled exception during byte data interpretation: " + e.ToLogString());
return;
}
}
else
strData = _encoding.GetString(data, offset, count);
strData = _parameters.Encoding.GetString(data, offset, count);
if (DataInterpreterString != null)
if (_parameters.DataInterpreterString != null)
{
try
{
strData = DataInterpreterString(strData);
strData = _parameters.DataInterpreterString(strData);
}
catch(Exception e)
{
log.Write(LogLevel.Error, $"Socket {Id} unhandled exception during string data interpretation: " + e.ToLogString());
_log.Write(LogLevel.Error, $"Socket {Id} unhandled exception during string data interpretation: " + e.ToLogString());
return;
}
}
try
{
Handle(messageHandlers, strData);
LastActionTime = DateTime.UtcNow;
OnMessage?.Invoke(strData);
}
catch(Exception e)
{
log.Write(LogLevel.Error, $"Socket {Id} unhandled exception during message processing: " + e.ToLogString());
_log.Write(LogLevel.Error, $"Socket {Id} unhandled exception during message processing: " + e.ToLogString());
}
}
/// <summary>
/// Trigger the OnMessage event
/// </summary>
/// <param name="data"></param>
protected void TriggerOnMessage(string data)
{
LastActionTime = DateTime.UtcNow;
OnMessage?.Invoke(data);
}
/// <summary>
/// Trigger the OnError event
/// </summary>
/// <param name="ex"></param>
protected void TriggerOnError(Exception ex) => OnError?.Invoke(ex);
/// <summary>
/// Trigger the OnError event
/// </summary>
protected void TriggerOnOpen() => OnOpen?.Invoke();
/// <summary>
/// Trigger the OnError event
/// </summary>
protected void TriggerOnClose() => OnClose?.Invoke();
/// <summary>
/// Trigger the OnReconnecting event
/// </summary>
protected void TriggerOnReconnecting() => OnReconnecting?.Invoke();
/// <summary>
/// Trigger the OnReconnected event
/// </summary>
protected void TriggerOnReconnected() => OnReconnected?.Invoke();
/// <summary>
/// Checks if there is no data received for a period longer than the specified timeout
/// </summary>
/// <returns></returns>
protected async Task CheckTimeoutAsync()
{
log.Write(LogLevel.Debug, $"Socket {Id} Starting task checking for no data received for {Timeout}");
_log.Write(LogLevel.Debug, $"Socket {Id} Starting task checking for no data received for {_parameters.Timeout}");
LastActionTime = DateTime.UtcNow;
try
{
while (true)
@ -619,9 +642,9 @@ namespace CryptoExchange.Net.Sockets
if (_ctsSource.IsCancellationRequested)
return;
if (DateTime.UtcNow - LastActionTime > Timeout)
if (DateTime.UtcNow - LastActionTime > _parameters.Timeout)
{
log.Write(LogLevel.Warning, $"Socket {Id} No data received for {Timeout}, reconnecting socket");
_log.Write(LogLevel.Warning, $"Socket {Id} No data received for {_parameters.Timeout}, reconnecting socket");
_ = CloseAsync().ConfigureAwait(false);
return;
}
@ -641,35 +664,11 @@ namespace CryptoExchange.Net.Sockets
// Because this is running in a separate task and not awaited until the socket gets closed
// any exception here will stop the timeout checking, but do so silently unless the socket get's stopped.
// Make sure we at least let the owner know there was an error
Handle(errorHandlers, e);
OnError?.Invoke(e);
throw;
}
}
/// <summary>
/// Helper to invoke handlers
/// </summary>
/// <param name="handlers"></param>
protected void Handle(List<Action> handlers)
{
LastActionTime = DateTime.UtcNow;
foreach (var handle in new List<Action>(handlers))
handle?.Invoke();
}
/// <summary>
/// Helper to invoke handlers
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="handlers"></param>
/// <param name="data"></param>
protected void Handle<T>(List<Action<T>> handlers, T data)
{
LastActionTime = DateTime.UtcNow;
foreach (var handle in new List<Action<T>>(handlers))
handle?.Invoke(data);
}
/// <summary>
/// Get the next identifier
/// </summary>
@ -705,6 +704,27 @@ namespace CryptoExchange.Net.Sockets
_lastReceivedMessagesUpdate = checkTime;
}
}
/// <summary>
/// Set proxy on socket
/// </summary>
/// <param name="proxy"></param>
/// <exception cref="ArgumentException"></exception>
protected virtual void SetProxy(ApiProxy proxy)
{
if (!Uri.TryCreate($"{proxy.Host}:{proxy.Port}", UriKind.Absolute, out var uri))
throw new ArgumentException("Proxy settings invalid, {proxy.Host}:{proxy.Port} not a valid URI", nameof(proxy));
_socket.Options.Proxy = uri?.Scheme == null
? _socket.Options.Proxy = new WebProxy(proxy.Host, proxy.Port)
: _socket.Options.Proxy = new WebProxy
{
Address = uri
};
if (proxy.Login != null)
_socket.Options.Proxy.Credentials = new NetworkCredential(proxy.Login, proxy.Password);
}
}
/// <summary>

View File

@ -3,13 +3,13 @@ using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using CryptoExchange.Net.Logging;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Microsoft.Extensions.Logging;
using CryptoExchange.Net.Objects;
using System.Net.WebSockets;
namespace CryptoExchange.Net.Sockets
{
@ -57,10 +57,22 @@ namespace CryptoExchange.Net.Sockets
return subscriptions.Count(h => h.UserSubscription); }
}
/// <summary>
/// Get a copy of the current subscriptions
/// </summary>
public SocketSubscription[] Subscriptions
{
get
{
lock (subscriptionLock)
return subscriptions.Where(h => h.UserSubscription).ToArray();
}
}
/// <summary>
/// If the connection has been authenticated
/// </summary>
public bool Authenticated { get; set; }
public bool Authenticated { get; internal set; }
/// <summary>
/// If connection is made
@ -80,28 +92,13 @@ namespace CryptoExchange.Net.Sockets
/// <summary>
/// The connection uri
/// </summary>
public Uri Uri => _socket.Uri;
public Uri ConnectionUri => _socket.Uri;
/// <summary>
/// The API client the connection is for
/// </summary>
public SocketApiClient ApiClient { get; set; }
/// <summary>
/// If the socket should be reconnected upon closing
/// </summary>
public bool ShouldReconnect { get; set; }
/// <summary>
/// Current reconnect try, reset when a successful connection is made
/// </summary>
public int ReconnectTry { get; set; }
/// <summary>
/// Current resubscribe try, reset when a successful connection is made
/// </summary>
public int ResubscribeTry { get; set; }
/// <summary>
/// Time of disconnecting
/// </summary>
@ -110,7 +107,7 @@ namespace CryptoExchange.Net.Sockets
/// <summary>
/// Tag for identificaion
/// </summary>
public string? Tag { get; set; }
public string Tag { get; set; }
/// <summary>
/// If activity is paused
@ -124,27 +121,12 @@ namespace CryptoExchange.Net.Sockets
{
pausedActivity = value;
log.Write(LogLevel.Information, $"Socket {SocketId} Paused activity: " + value);
if(pausedActivity) ActivityPaused?.Invoke();
else ActivityUnpaused?.Invoke();
if(pausedActivity) _ = Task.Run(() => ActivityPaused?.Invoke());
else _ = Task.Run(() => ActivityUnpaused?.Invoke());
}
}
}
private bool pausedActivity;
private readonly List<SocketSubscription> subscriptions;
private readonly object subscriptionLock = new();
private bool lostTriggered;
private readonly Log log;
private readonly BaseSocketClient socketClient;
private readonly List<PendingRequest> pendingRequests;
private Task? _socketProcessTask;
private Task? _socketReconnectTask;
private readonly AsyncResetEvent _reconnectWaitEvent;
private SocketStatus _status;
/// <summary>
/// Status of the socket connection
/// </summary>
@ -153,12 +135,26 @@ namespace CryptoExchange.Net.Sockets
get => _status;
private set
{
if (_status == value)
return;
var oldStatus = _status;
_status = value;
log.Write(LogLevel.Trace, $"Socket {SocketId} status changed from {oldStatus} to {_status}");
log.Write(LogLevel.Debug, $"Socket {SocketId} status changed from {oldStatus} to {_status}");
}
}
private bool pausedActivity;
private readonly List<SocketSubscription> subscriptions;
private readonly object subscriptionLock = new();
private readonly Log log;
private readonly BaseSocketClient socketClient;
private readonly List<PendingRequest> pendingRequests;
private SocketStatus _status;
/// <summary>
/// The underlying websocket
/// </summary>
@ -170,137 +166,72 @@ namespace CryptoExchange.Net.Sockets
/// <param name="client">The socket client</param>
/// <param name="apiClient">The api client</param>
/// <param name="socket">The socket</param>
public SocketConnection(BaseSocketClient client, SocketApiClient apiClient, IWebsocket socket)
/// <param name="tag"></param>
public SocketConnection(BaseSocketClient client, SocketApiClient apiClient, IWebsocket socket, string tag)
{
log = client.log;
socketClient = client;
ApiClient = apiClient;
Tag = tag;
pendingRequests = new List<PendingRequest>();
subscriptions = new List<SocketSubscription>();
_socket = socket;
_reconnectWaitEvent = new AsyncResetEvent(false, true);
_socket.Timeout = client.ClientOptions.SocketNoDataTimeout;
_socket.OnMessage += ProcessMessage;
_socket.OnOpen += SocketOnOpen;
_socket.OnClose += () => _reconnectWaitEvent.Set();
_socket.OnMessage += HandleMessage;
_socket.OnOpen += HandleOpen;
_socket.OnClose += HandleClose;
_socket.OnReconnecting += HandleReconnecting;
_socket.OnReconnected += HandleReconnected;
_socket.OnError += HandleError;
}
/// <summary>
/// Connect the websocket and start processing
/// Handler for a socket opening
/// </summary>
/// <returns></returns>
public async Task<bool> ConnectAsync()
protected virtual void HandleOpen()
{
var connected = await _socket.ConnectAsync().ConfigureAwait(false);
if (connected)
Status = SocketStatus.Connected;
PausedActivity = false;
}
/// <summary>
/// Handler for a socket closing without reconnect
/// </summary>
protected virtual void HandleClose()
{
Status = SocketStatus.Closed;
Authenticated = false;
lock(subscriptionLock)
{
Status = SocketStatus.Connected;
_socketReconnectTask = ReconnectWatcherAsync();
_socketProcessTask = _socket.ProcessAsync();
}
return connected;
foreach (var sub in subscriptions)
sub.Confirmed = false;
}
Task.Run(() => ConnectionClosed?.Invoke());
}
/// <summary>
/// Retrieve the underlying socket
/// Handler for a socket losing conenction and starting reconnect
/// </summary>
/// <returns></returns>
public IWebsocket GetSocket()
protected virtual void HandleReconnecting()
{
return _socket;
}
/// <summary>
/// Trigger a reconnect of the socket connection
/// </summary>
/// <returns></returns>
public async Task TriggerReconnectAsync()
{
await _socket.CloseAsync().ConfigureAwait(false);
}
/// <summary>
/// Close the connection
/// </summary>
/// <returns></returns>
public async Task CloseAsync()
{
if (Status == SocketStatus.Closed || Status == SocketStatus.Disposed)
return;
ShouldReconnect = false;
if (socketClient.socketConnections.ContainsKey(SocketId))
socketClient.socketConnections.TryRemove(SocketId, out _);
Status = SocketStatus.Reconnecting;
DisconnectTime = DateTime.UtcNow;
Authenticated = false;
lock (subscriptionLock)
{
foreach (var subscription in subscriptions)
{
if (subscription.CancellationTokenRegistration.HasValue)
subscription.CancellationTokenRegistration.Value.Dispose();
}
foreach (var sub in subscriptions)
sub.Confirmed = false;
}
while (Status == SocketStatus.Reconnecting)
// Wait for reconnecting to finish
await Task.Delay(100).ConfigureAwait(false);
await _socket.CloseAsync().ConfigureAwait(false);
if(_socketProcessTask != null)
await _socketProcessTask.ConfigureAwait(false);
_socket.Dispose();
Task.Run(() => ConnectionLost?.Invoke());
}
/// <summary>
/// Close a subscription on this connection. If all subscriptions on this connection are closed the connection gets closed as well
/// Handler for a socket which has reconnected
/// </summary>
/// <param name="subscription">Subscription to close</param>
/// <returns></returns>
public async Task CloseAsync(SocketSubscription subscription)
protected virtual async void HandleReconnected()
{
if (Status == SocketStatus.Closing || Status == SocketStatus.Closed || Status == SocketStatus.Disposed)
return;
log.Write(LogLevel.Trace, $"Socket {SocketId} closing subscription {subscription.Id}");
if (subscription.CancellationTokenRegistration.HasValue)
subscription.CancellationTokenRegistration.Value.Dispose();
if (subscription.Confirmed && _socket.IsOpen)
await socketClient.UnsubscribeAsync(this, subscription).ConfigureAwait(false);
bool shouldCloseConnection;
lock (subscriptionLock)
{
if (Status == SocketStatus.Closing)
{
log.Write(LogLevel.Trace, $"Socket {SocketId} already closing");
return;
}
shouldCloseConnection = subscriptions.All(r => !r.UserSubscription || r == subscription);
if (shouldCloseConnection)
Status = SocketStatus.Closing;
}
if (shouldCloseConnection)
{
log.Write(LogLevel.Trace, $"Socket {SocketId} closing as there are no more subscriptions");
await CloseAsync().ConfigureAwait(false);
}
lock (subscriptionLock)
subscriptions.Remove(subscription);
}
private async Task ReconnectAsync()
{
// Fail all pending requests
Status = SocketStatus.Resubscribing;
lock (pendingRequests)
{
foreach (var pendingRequest in pendingRequests.ToList())
@ -310,137 +241,37 @@ namespace CryptoExchange.Net.Sockets
}
}
if (socketClient.ClientOptions.AutoReconnect && ShouldReconnect)
{
// Should reconnect
DisconnectTime = DateTime.UtcNow;
log.Write(LogLevel.Warning, $"Socket {SocketId} Connection lost, will try to reconnect");
if (!lostTriggered)
{
lostTriggered = true;
_ = Task.Run(() => ConnectionLost?.Invoke());
}
while (ShouldReconnect)
{
if (ReconnectTry > 0)
{
// Wait a bit before attempting reconnect
await Task.Delay(socketClient.ClientOptions.ReconnectInterval).ConfigureAwait(false);
}
if (!ShouldReconnect)
{
// Should reconnect changed to false while waiting to reconnect
return;
}
_socket.Reset();
if (!await _socket.ConnectAsync().ConfigureAwait(false))
{
// Reconnect failed
ReconnectTry++;
ResubscribeTry = 0;
if (socketClient.ClientOptions.MaxReconnectTries != null
&& ReconnectTry >= socketClient.ClientOptions.MaxReconnectTries)
{
log.Write(LogLevel.Warning, $"Socket {SocketId} failed to reconnect after {ReconnectTry} tries, closing");
ShouldReconnect = false;
if (socketClient.socketConnections.ContainsKey(SocketId))
socketClient.socketConnections.TryRemove(SocketId, out _);
_ = Task.Run(() => ConnectionClosed?.Invoke());
// Reached max tries, break loop and leave connection closed
break;
}
// Continue to try again
log.Write(LogLevel.Debug, $"Socket {SocketId} failed to reconnect{(socketClient.ClientOptions.MaxReconnectTries != null ? $", try {ReconnectTry}/{socketClient.ClientOptions.MaxReconnectTries}" : "")}, will try again in {socketClient.ClientOptions.ReconnectInterval}");
continue;
}
// Successfully reconnected, start processing
Status = SocketStatus.Connected;
_socketProcessTask = _socket.ProcessAsync();
ReconnectTry = 0;
var time = DisconnectTime;
DisconnectTime = null;
log.Write(LogLevel.Information, $"Socket {SocketId} reconnected after {DateTime.UtcNow - time}");
var reconnectResult = await ProcessReconnectAsync().ConfigureAwait(false);
if (!reconnectResult)
{
// Failed to resubscribe everything
ResubscribeTry++;
DisconnectTime = time;
if (socketClient.ClientOptions.MaxResubscribeTries != null &&
ResubscribeTry >= socketClient.ClientOptions.MaxResubscribeTries)
{
log.Write(LogLevel.Warning, $"Socket {SocketId} failed to resubscribe after {ResubscribeTry} tries, closing. Last resubscription error: {reconnectResult.Error}");
ShouldReconnect = false;
if (socketClient.socketConnections.ContainsKey(SocketId))
socketClient.socketConnections.TryRemove(SocketId, out _);
_ = Task.Run(() => ConnectionClosed?.Invoke());
}
else
log.Write(LogLevel.Debug, $"Socket {SocketId} resubscribing all subscriptions failed on reconnected socket{(socketClient.ClientOptions.MaxResubscribeTries != null ? $", try {ResubscribeTry}/{socketClient.ClientOptions.MaxResubscribeTries}" : "")}. Error: {reconnectResult.Error}. Disconnecting and reconnecting.");
// Failed resubscribe, close socket if it is still open
if (_socket.IsOpen)
await _socket.CloseAsync().ConfigureAwait(false);
else
DisconnectTime = DateTime.UtcNow;
// Break out of the loop, the new processing task should reconnect again
break;
}
else
{
// Succesfully reconnected
log.Write(LogLevel.Information, $"Socket {SocketId} data connection restored.");
ResubscribeTry = 0;
if (lostTriggered)
{
lostTriggered = false;
_ = Task.Run(() => ConnectionRestored?.Invoke(time.HasValue ? DateTime.UtcNow - time.Value : TimeSpan.FromSeconds(0)));
}
break;
}
}
}
var reconnectSuccessful = await ProcessReconnectAsync().ConfigureAwait(false);
if (!reconnectSuccessful)
await _socket.ReconnectAsync().ConfigureAwait(false);
else
{
if (!socketClient.ClientOptions.AutoReconnect && ShouldReconnect)
_ = Task.Run(() => ConnectionClosed?.Invoke());
// No reconnecting needed
log.Write(LogLevel.Information, $"Socket {SocketId} closed");
if (socketClient.socketConnections.ContainsKey(SocketId))
socketClient.socketConnections.TryRemove(SocketId, out _);
Status = SocketStatus.Connected;
_ = Task.Run(() =>
{
ConnectionRestored?.Invoke(DateTime.UtcNow - DisconnectTime!.Value);
DisconnectTime = null;
});
}
}
/// <summary>
/// Dispose the connection
/// Handler for an error on a websocket
/// </summary>
public void Dispose()
/// <param name="e">The exception</param>
protected virtual void HandleError(Exception e)
{
Status = SocketStatus.Disposed;
_socket.Dispose();
if (e is WebSocketException wse)
log.Write(LogLevel.Warning, $"Socket {SocketId} error: Websocket error code {wse.WebSocketErrorCode}, details: " + e.ToLogString());
else
log.Write(LogLevel.Warning, $"Socket {SocketId} error: " + e.ToLogString());
}
/// <summary>
/// Process a message received by the socket
/// </summary>
/// <param name="data">The received data</param>
private void ProcessMessage(string data)
protected virtual void HandleMessage(string data)
{
var timestamp = DateTime.UtcNow;
log.Write(LogLevel.Trace, $"Socket {SocketId} received data: " + data);
@ -456,15 +287,13 @@ namespace CryptoExchange.Net.Sockets
}
var handledResponse = false;
PendingRequest[] requests;
lock(pendingRequests)
requests = pendingRequests.ToArray();
// Remove any timed out requests
foreach (var request in requests.Where(r => r.Completed))
PendingRequest[] requests;
lock (pendingRequests)
{
lock (pendingRequests)
pendingRequests.Remove(request);
pendingRequests.RemoveAll(r => r.Completed);
requests = pendingRequests.ToArray();
}
// Check if this message is an answer on any pending requests
@ -472,7 +301,7 @@ namespace CryptoExchange.Net.Sockets
{
if (pendingRequest.CheckData(tokenData))
{
lock (pendingRequests)
lock (pendingRequests)
pendingRequests.Remove(pendingRequest);
if (!socketClient.ContinueOnQueryResponse)
@ -484,8 +313,8 @@ namespace CryptoExchange.Net.Sockets
}
// Message was not a request response, check data handlers
var messageEvent = new MessageEvent(this, tokenData, socketClient.ClientOptions.OutputOriginalData ? data: null, timestamp);
var (handled, userProcessTime) = HandleData(messageEvent);
var messageEvent = new MessageEvent(this, tokenData, socketClient.ClientOptions.OutputOriginalData ? data : null, timestamp);
var (handled, userProcessTime, subscription) = HandleData(messageEvent);
if (!handled && !handledResponse)
{
if (!socketClient.UnhandledMessageExpected)
@ -495,10 +324,108 @@ namespace CryptoExchange.Net.Sockets
var total = DateTime.UtcNow - timestamp;
if (userProcessTime.TotalMilliseconds > 500)
log.Write(LogLevel.Debug, $"Socket {SocketId} message processing slow ({(int)total.TotalMilliseconds}ms, {(int)userProcessTime.TotalMilliseconds}ms user code), consider offloading data handling to another thread. " +
log.Write(LogLevel.Debug, $"Socket {SocketId}{(subscription == null ? "" : " subscription " + subscription!.Id)} message processing slow ({(int)total.TotalMilliseconds}ms, {(int)userProcessTime.TotalMilliseconds}ms user code), consider offloading data handling to another thread. " +
"Data from this socket may arrive late or not at all if message processing is continuously slow.");
log.Write(LogLevel.Trace, $"Socket {SocketId} message processed in {(int)total.TotalMilliseconds}ms, ({(int)userProcessTime.TotalMilliseconds}ms user code)");
log.Write(LogLevel.Trace, $"Socket {SocketId}{(subscription == null ? "" : " subscription " + subscription!.Id)} message processed in {(int)total.TotalMilliseconds}ms, ({(int)userProcessTime.TotalMilliseconds}ms user code)");
}
/// <summary>
/// Connect the websocket
/// </summary>
/// <returns></returns>
public async Task<bool> ConnectAsync() => await _socket.ConnectAsync().ConfigureAwait(false);
/// <summary>
/// Retrieve the underlying socket
/// </summary>
/// <returns></returns>
public IWebsocket GetSocket() => _socket;
/// <summary>
/// Trigger a reconnect of the socket connection
/// </summary>
/// <returns></returns>
public async Task TriggerReconnectAsync() => await _socket.ReconnectAsync().ConfigureAwait(false);
/// <summary>
/// Close the connection
/// </summary>
/// <returns></returns>
public async Task CloseAsync()
{
if (Status == SocketStatus.Closed || Status == SocketStatus.Disposed)
return;
if (socketClient.socketConnections.ContainsKey(SocketId))
socketClient.socketConnections.TryRemove(SocketId, out _);
lock (subscriptionLock)
{
foreach (var subscription in subscriptions)
{
if (subscription.CancellationTokenRegistration.HasValue)
subscription.CancellationTokenRegistration.Value.Dispose();
}
}
await _socket.CloseAsync().ConfigureAwait(false);
_socket.Dispose();
}
/// <summary>
/// Close a subscription on this connection. If all subscriptions on this connection are closed the connection gets closed as well
/// </summary>
/// <param name="subscription">Subscription to close</param>
/// <returns></returns>
public async Task CloseAsync(SocketSubscription subscription)
{
lock (subscriptionLock)
{
if (!subscriptions.Contains(subscription))
return;
subscriptions.Remove(subscription);
}
if (Status == SocketStatus.Closing || Status == SocketStatus.Closed || Status == SocketStatus.Disposed)
return;
log.Write(LogLevel.Debug, $"Socket {SocketId} closing subscription {subscription.Id}");
if (subscription.CancellationTokenRegistration.HasValue)
subscription.CancellationTokenRegistration.Value.Dispose();
if (subscription.Confirmed && _socket.IsOpen)
await socketClient.UnsubscribeAsync(this, subscription).ConfigureAwait(false);
bool shouldCloseConnection;
lock (subscriptionLock)
{
if (Status == SocketStatus.Closing)
{
log.Write(LogLevel.Debug, $"Socket {SocketId} already closing");
return;
}
shouldCloseConnection = subscriptions.All(r => !r.UserSubscription);
if (shouldCloseConnection)
Status = SocketStatus.Closing;
}
if (shouldCloseConnection)
{
log.Write(LogLevel.Debug, $"Socket {SocketId} closing as there are no more subscriptions");
await CloseAsync().ConfigureAwait(false);
}
}
/// <summary>
/// Dispose the connection
/// </summary>
public void Dispose()
{
Status = SocketStatus.Disposed;
_socket.Dispose();
}
/// <summary>
@ -513,7 +440,8 @@ namespace CryptoExchange.Net.Sockets
return false;
subscriptions.Add(subscription);
log.Write(LogLevel.Trace, $"Socket {SocketId} adding new subscription with id {subscription.Id}, total subscriptions on connection: {subscriptions.Count}");
if(subscription.UserSubscription)
log.Write(LogLevel.Debug, $"Socket {SocketId} adding new subscription with id {subscription.Id}, total subscriptions on connection: {subscriptions.Count(s => s.UserSubscription)}");
return true;
}
}
@ -544,7 +472,7 @@ namespace CryptoExchange.Net.Sockets
/// </summary>
/// <param name="messageEvent"></param>
/// <returns>True if the data was successfully handled</returns>
private (bool, TimeSpan) HandleData(MessageEvent messageEvent)
private (bool, TimeSpan, SocketSubscription?) HandleData(MessageEvent messageEvent)
{
SocketSubscription? currentSubscription = null;
try
@ -585,13 +513,13 @@ namespace CryptoExchange.Net.Sockets
}
}
return (handled, userCodeDuration);
return (handled, userCodeDuration, currentSubscription);
}
catch (Exception ex)
{
log.Write(LogLevel.Error, $"Socket {SocketId} Exception during message processing\r\nException: {ex.ToLogString()}\r\nData: {messageEvent.JsonData}");
currentSubscription?.InvokeExceptionHandler(ex);
return (false, TimeSpan.Zero);
return (false, TimeSpan.Zero, null);
}
}
@ -649,37 +577,24 @@ namespace CryptoExchange.Net.Sockets
}
}
/// <summary>
/// Handler for a socket opening
/// </summary>
protected virtual void SocketOnOpen()
{
ReconnectTry = 0;
PausedActivity = false;
}
private async Task ReconnectWatcherAsync()
{
while (true)
{
await _reconnectWaitEvent.WaitAsync().ConfigureAwait(false);
if (!ShouldReconnect)
return;
Status = SocketStatus.Reconnecting;
await ReconnectAsync().ConfigureAwait(false);
if (!ShouldReconnect)
return;
}
}
private async Task<CallResult<bool>> ProcessReconnectAsync()
{
if (!_socket.IsOpen)
return new CallResult<bool>(new WebError("Socket not connected"));
if (Authenticated)
bool anySubscriptions = false;
lock (subscriptionLock)
anySubscriptions = subscriptions.Any(s => s.UserSubscription);
if (!anySubscriptions)
{
// No need to resubscribe anything
log.Write(LogLevel.Debug, $"Socket {SocketId} Nothing to resubscribe, closing connection");
_ = _socket.CloseAsync();
return new CallResult<bool>(true);
}
if (subscriptions.Any(s => s.Authenticated))
{
// If we reconnected a authenticated connection we need to re-authenticate
var authResult = await socketClient.AuthenticateSocketAsync(this).ConfigureAwait(false);
@ -689,13 +604,22 @@ namespace CryptoExchange.Net.Sockets
return authResult;
}
Authenticated = true;
log.Write(LogLevel.Debug, $"Socket {SocketId} authentication succeeded on reconnected socket.");
}
// Get a list of all subscriptions on the socket
List<SocketSubscription> subscriptionList;
List<SocketSubscription> subscriptionList = new List<SocketSubscription>();
lock (subscriptionLock)
subscriptionList = subscriptions.Where(h => h.Request != null).ToList();
{
foreach (var subscription in subscriptions)
{
if (subscription.Request != null)
subscriptionList.Add(subscription);
else
subscription.Confirmed = true;
}
}
// Foreach subscription which is subscribed by a subscription request we will need to resend that request to resubscribe
for (var i = 0; i < subscriptionList.Count; i += socketClient.ClientOptions.MaxConcurrentResubscriptionsPerSocket)
@ -705,13 +629,16 @@ namespace CryptoExchange.Net.Sockets
var taskList = new List<Task<CallResult<bool>>>();
foreach (var subscription in subscriptionList.Skip(i).Take(socketClient.ClientOptions.MaxConcurrentResubscriptionsPerSocket))
taskList.Add(socketClient.SubscribeAndWaitAsync(this, subscription.Request!, subscription));
taskList.Add(socketClient.SubscribeAndWaitAsync(this, subscription.Request!, subscription));
await Task.WhenAll(taskList).ConfigureAwait(false);
if (taskList.Any(t => !t.Result.Success))
return taskList.First(t => !t.Result.Success).Result;
}
foreach (var subscription in subscriptionList)
subscription.Confirmed = true;
if (!_socket.IsOpen)
return new CallResult<bool>(new WebError("Socket not connected"));
@ -731,7 +658,7 @@ namespace CryptoExchange.Net.Sockets
return await socketClient.SubscribeAndWaitAsync(this, socketSubscription.Request!, socketSubscription).ConfigureAwait(false);
}
/// <summary>
/// Status of the socket connection
/// </summary>
@ -750,6 +677,10 @@ namespace CryptoExchange.Net.Sockets
/// </summary>
Reconnecting,
/// <summary>
/// Resubscribing on reconnected socket
/// </summary>
Resubscribing,
/// <summary>
/// Closing
/// </summary>
Closing,

View File

@ -43,19 +43,25 @@ namespace CryptoExchange.Net.Sockets
/// </summary>
public bool Confirmed { get; set; }
/// <summary>
/// Whether authentication is needed for this subscription
/// </summary>
public bool Authenticated { get; set; }
/// <summary>
/// Cancellation token registration, should be disposed when subscription is closed. Used for closing the subscription with
/// a provided cancelation token
/// </summary>
public CancellationTokenRegistration? CancellationTokenRegistration { get; set; }
private SocketSubscription(int id, object? request, string? identifier, bool userSubscription, Action<MessageEvent> dataHandler)
private SocketSubscription(int id, object? request, string? identifier, bool userSubscription, bool authenticated, Action<MessageEvent> dataHandler)
{
Id = id;
UserSubscription = userSubscription;
MessageHandler = dataHandler;
Request = request;
Identifier = identifier;
Authenticated = authenticated;
}
/// <summary>
@ -64,12 +70,13 @@ namespace CryptoExchange.Net.Sockets
/// <param name="id"></param>
/// <param name="request"></param>
/// <param name="userSubscription"></param>
/// <param name="authenticated"></param>
/// <param name="dataHandler"></param>
/// <returns></returns>
public static SocketSubscription CreateForRequest(int id, object request, bool userSubscription,
Action<MessageEvent> dataHandler)
bool authenticated, Action<MessageEvent> dataHandler)
{
return new SocketSubscription(id, request, null, userSubscription, dataHandler);
return new SocketSubscription(id, request, null, userSubscription, authenticated, dataHandler);
}
/// <summary>
@ -78,12 +85,13 @@ namespace CryptoExchange.Net.Sockets
/// <param name="id"></param>
/// <param name="identifier"></param>
/// <param name="userSubscription"></param>
/// <param name="authenticated"></param>
/// <param name="dataHandler"></param>
/// <returns></returns>
public static SocketSubscription CreateForIdentifier(int id, string identifier, bool userSubscription,
Action<MessageEvent> dataHandler)
bool authenticated, Action<MessageEvent> dataHandler)
{
return new SocketSubscription(id, null, identifier, userSubscription, dataHandler);
return new SocketSubscription(id, null, identifier, userSubscription, authenticated, dataHandler);
}
/// <summary>

View File

@ -22,8 +22,7 @@ namespace CryptoExchange.Net.Sockets
}
/// <summary>
/// Event when the connection is closed. This event happens when reconnecting/resubscribing has failed too often based on the <see cref="BaseSocketClientOptions.MaxReconnectTries"/> and <see cref="BaseSocketClientOptions.MaxResubscribeTries"/> options,
/// or <see cref="BaseSocketClientOptions.AutoReconnect"/> is false. The socket will not be reconnected
/// Event when the connection is closed and will not be reconnected
/// </summary>
public event Action ConnectionClosed
{

View File

@ -0,0 +1,79 @@
using CryptoExchange.Net.Objects;
using System;
using System.Collections.Generic;
using System.Text;
namespace CryptoExchange.Net.Sockets
{
/// <summary>
/// Parameters for a websocket
/// </summary>
public class WebSocketParameters
{
/// <summary>
/// The uri to connect to
/// </summary>
public Uri Uri { get; set; }
/// <summary>
/// Headers to send in the connection handshake
/// </summary>
public IDictionary<string, string> Headers { get; set; } = new Dictionary<string, string>();
/// <summary>
/// Cookies to send in the connection handshake
/// </summary>
public IDictionary<string, string> Cookies { get; set; } = new Dictionary<string, string>();
/// <summary>
/// The time to wait between reconnect attempts
/// </summary>
public TimeSpan ReconnectInterval { get; set; } = TimeSpan.FromSeconds(5);
/// <summary>
/// Proxy for the connection
/// </summary>
public ApiProxy? Proxy { get; set; }
/// <summary>
/// Whether the socket should automatically reconnect when connection is lost
/// </summary>
public bool AutoReconnect { get; set; }
/// <summary>
/// The maximum time of no data received before considering the connection lost and closting/reconnecting the socket
/// </summary>
public TimeSpan? Timeout { get; set; }
/// <summary>
/// Interval at which to send ping frames
/// </summary>
public TimeSpan? KeepAliveInterval { get; set; }
/// <summary>
/// The max amount of messages to send per second
/// </summary>
public int? RatelimitPerSecond { get; set; }
/// <summary>
/// Origin header value to send in the connection handshake
/// </summary>
public string? Origin { get; set; }
/// <summary>
/// Delegate used for processing byte data received from socket connections before it is processed by handlers
/// </summary>
public Func<byte[], string>? DataInterpreterBytes { get; set; }
/// <summary>
/// Delegate used for processing string data received from socket connections before it is processed by handlers
/// </summary>
public Func<string, string>? DataInterpreterString { get; set; }
/// <summary>
/// Encoding for sending/receiving data
/// </summary>
public Encoding Encoding { get; set; } = Encoding.UTF8;
/// <summary>
/// ctor
/// </summary>
/// <param name="uri">Uri</param>
/// <param name="autoReconnect">Auto reconnect</param>
public WebSocketParameters(Uri uri, bool autoReconnect)
{
Uri = uri;
AutoReconnect = autoReconnect;
}
}
}

View File

@ -1,6 +1,4 @@
using System;
using System.Collections.Generic;
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Logging;
namespace CryptoExchange.Net.Sockets
@ -11,15 +9,9 @@ namespace CryptoExchange.Net.Sockets
public class WebsocketFactory : IWebsocketFactory
{
/// <inheritdoc />
public IWebsocket CreateWebsocket(Log log, string url)
public IWebsocket CreateWebsocket(Log log, WebSocketParameters parameters)
{
return new CryptoExchangeWebSocketClient(log, new Uri(url));
}
/// <inheritdoc />
public IWebsocket CreateWebsocket(Log log, string url, IDictionary<string, string> cookies, IDictionary<string, string> headers)
{
return new CryptoExchangeWebSocketClient(log, new Uri(url), cookies, headers);
return new CryptoExchangeWebSocketClient(log, parameters);
}
}
}

View File

@ -18,6 +18,10 @@ I develop and maintain this package on my own for free in my spare time. Donatio
Alternatively, sponsor me on Github using [Github Sponsors](https://github.com/sponsors/JKorf)
## Release notes
* Version 5.2.0 - 10 Jul 2022
* Refactored websocket code, removed some clutter and simplified
* Added ReconnectAsync and GetSubscriptionsState methods on socket clients
* Version 5.1.12 - 12 Jun 2022
* Changed time sync so requests no longer wait for it to complete unless it's the first time
* Made log client options changable after client creation