1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-06-10 17:36:19 +00:00

Websocket connection performance improvements, Added multiple options, SymbolOrderBook improvements

This commit is contained in:
Jkorf 2021-08-24 11:59:11 +02:00
parent c29b2d0cf1
commit acfd6310d9
9 changed files with 231 additions and 37 deletions

View File

@ -25,7 +25,8 @@ namespace CryptoExchange.Net.Converters
return new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc).AddSeconds(d); return new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc).AddSeconds(d);
var t = double.Parse(reader.Value.ToString(), CultureInfo.InvariantCulture); var t = double.Parse(reader.Value.ToString(), CultureInfo.InvariantCulture);
return new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc).AddSeconds(t); // Set ticks instead of seconds or milliseconds, because AddSeconds/AddMilliseconds rounds to nearest millisecond
return new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc).AddTicks((long)(t * TimeSpan.TicksPerSecond));
} }
/// <inheritdoc /> /// <inheritdoc />

View File

@ -1320,6 +1320,12 @@
<member name="P:CryptoExchange.Net.Interfaces.ISocketClient.SocketCombineTarget"> <member name="P:CryptoExchange.Net.Interfaces.ISocketClient.SocketCombineTarget">
<inheritdoc cref="P:CryptoExchange.Net.Objects.SocketClientOptions.SocketSubscriptionsCombineTarget"/> <inheritdoc cref="P:CryptoExchange.Net.Objects.SocketClientOptions.SocketSubscriptionsCombineTarget"/>
</member> </member>
<member name="P:CryptoExchange.Net.Interfaces.ISocketClient.MaxReconnectTries">
<inheritdoc cref="P:CryptoExchange.Net.Objects.SocketClientOptions.MaxReconnectTries"/>
</member>
<member name="P:CryptoExchange.Net.Interfaces.ISocketClient.MaxResubscribeTries">
<inheritdoc cref="P:CryptoExchange.Net.Objects.SocketClientOptions.MaxResubscribeTries"/>
</member>
<member name="M:CryptoExchange.Net.Interfaces.ISocketClient.UnsubscribeAsync(CryptoExchange.Net.Sockets.UpdateSubscription)"> <member name="M:CryptoExchange.Net.Interfaces.ISocketClient.UnsubscribeAsync(CryptoExchange.Net.Sockets.UpdateSubscription)">
<summary> <summary>
Unsubscribe from a stream Unsubscribe from a stream
@ -2219,6 +2225,11 @@
The name of the order book implementation The name of the order book implementation
</summary> </summary>
</member> </member>
<member name="P:CryptoExchange.Net.Objects.OrderBookOptions.ChecksumValidationEnabled">
<summary>
Whether or not checksum validation is enabled. Default is true, disabling will ignore checksum messages.
</summary>
</member>
<member name="P:CryptoExchange.Net.Objects.OrderBookOptions.SequenceNumbersAreConsecutive"> <member name="P:CryptoExchange.Net.Objects.OrderBookOptions.SequenceNumbersAreConsecutive">
<summary> <summary>
Whether each update should have a consecutive id number. Used to identify and reconnect when numbers are skipped. Whether each update should have a consecutive id number. Used to identify and reconnect when numbers are skipped.
@ -2339,6 +2350,21 @@
Time to wait between reconnect attempts Time to wait between reconnect attempts
</summary> </summary>
</member> </member>
<member name="P:CryptoExchange.Net.Objects.SocketClientOptions.MaxReconnectTries">
<summary>
The maximum number of times to try to reconnect
</summary>
</member>
<member name="P:CryptoExchange.Net.Objects.SocketClientOptions.MaxResubscribeTries">
<summary>
The maximum number of times to try to resubscribe after reconnecting
</summary>
</member>
<member name="P:CryptoExchange.Net.Objects.SocketClientOptions.MaxConcurrentResubscriptionsPerSocket">
<summary>
Max number of concurrent resubscription tasks per socket after reconnecting a socket
</summary>
</member>
<member name="P:CryptoExchange.Net.Objects.SocketClientOptions.SocketResponseTimeout"> <member name="P:CryptoExchange.Net.Objects.SocketClientOptions.SocketResponseTimeout">
<summary> <summary>
The time to wait for a socket response before giving a timeout The time to wait for a socket response before giving a timeout
@ -3000,6 +3026,15 @@
<member name="P:CryptoExchange.Net.SocketClient.SocketCombineTarget"> <member name="P:CryptoExchange.Net.SocketClient.SocketCombineTarget">
<inheritdoc cref="P:CryptoExchange.Net.Objects.SocketClientOptions.SocketSubscriptionsCombineTarget"/> <inheritdoc cref="P:CryptoExchange.Net.Objects.SocketClientOptions.SocketSubscriptionsCombineTarget"/>
</member> </member>
<member name="P:CryptoExchange.Net.SocketClient.MaxReconnectTries">
<inheritdoc cref="P:CryptoExchange.Net.Objects.SocketClientOptions.MaxReconnectTries"/>
</member>
<member name="P:CryptoExchange.Net.SocketClient.MaxResubscribeTries">
<inheritdoc cref="P:CryptoExchange.Net.Objects.SocketClientOptions.MaxResubscribeTries"/>
</member>
<member name="P:CryptoExchange.Net.SocketClient.MaxConcurrentResubscriptionsPerSocket">
<inheritdoc cref="!:SocketClientOptions.MaxConcurrentResubscriptions"/>
</member>
<member name="F:CryptoExchange.Net.SocketClient.dataInterpreterBytes"> <member name="F:CryptoExchange.Net.SocketClient.dataInterpreterBytes">
<summary> <summary>
Delegate used for processing byte data received from socket connections before it is processed by handlers Delegate used for processing byte data received from socket connections before it is processed by handlers
@ -3621,6 +3656,16 @@
If the socket should be reconnected upon closing If the socket should be reconnected upon closing
</summary> </summary>
</member> </member>
<member name="P:CryptoExchange.Net.Sockets.SocketConnection.ReconnectTry">
<summary>
Current reconnect try
</summary>
</member>
<member name="P:CryptoExchange.Net.Sockets.SocketConnection.ResubscribeTry">
<summary>
Current resubscribe try
</summary>
</member>
<member name="P:CryptoExchange.Net.Sockets.SocketConnection.DisconnectTime"> <member name="P:CryptoExchange.Net.Sockets.SocketConnection.DisconnectTime">
<summary> <summary>
Time of disconnecting Time of disconnecting
@ -3807,6 +3852,18 @@
</summary> </summary>
<returns></returns> <returns></returns>
</member> </member>
<member name="M:CryptoExchange.Net.Sockets.UpdateSubscription.UnsubscribeAsync">
<summary>
Unsubscribe a subscription
</summary>
<returns></returns>
</member>
<member name="M:CryptoExchange.Net.Sockets.UpdateSubscription.ResubscribeAsync">
<summary>
Resubscribe this subscription
</summary>
<returns></returns>
</member>
<member name="T:CryptoExchange.Net.Sockets.WebsocketFactory"> <member name="T:CryptoExchange.Net.Sockets.WebsocketFactory">
<summary> <summary>
Default weboscket factory implementation Default weboscket factory implementation

View File

@ -43,6 +43,12 @@ namespace CryptoExchange.Net.Interfaces
/// <inheritdoc cref="SocketClientOptions.SocketSubscriptionsCombineTarget"/> /// <inheritdoc cref="SocketClientOptions.SocketSubscriptionsCombineTarget"/>
int SocketCombineTarget { get; } int SocketCombineTarget { get; }
/// <inheritdoc cref="SocketClientOptions.MaxReconnectTries"/>
int? MaxReconnectTries { get; }
/// <inheritdoc cref="SocketClientOptions.MaxResubscribeTries"/>
int? MaxResubscribeTries { get; }
/// <inheritdoc cref="SocketClientOptions.MaxConcurrentResubscriptionsPerSocket"/>
int MaxConcurrentResubscriptionsPerSocket { get; }
/// <summary> /// <summary>
/// Unsubscribe from a stream /// Unsubscribe from a stream

View File

@ -45,6 +45,11 @@ namespace CryptoExchange.Net.Objects
/// </summary> /// </summary>
public string OrderBookName { get; } public string OrderBookName { get; }
/// <summary>
/// Whether or not checksum validation is enabled. Default is true, disabling will ignore checksum messages.
/// </summary>
public bool ChecksumValidationEnabled { get; set; } = true;
/// <summary> /// <summary>
/// Whether each update should have a consecutive id number. Used to identify and reconnect when numbers are skipped. /// Whether each update should have a consecutive id number. Used to identify and reconnect when numbers are skipped.
/// </summary> /// </summary>
@ -220,6 +225,21 @@ namespace CryptoExchange.Net.Objects
/// </summary> /// </summary>
public TimeSpan ReconnectInterval { get; set; } = TimeSpan.FromSeconds(5); public TimeSpan ReconnectInterval { get; set; } = TimeSpan.FromSeconds(5);
/// <summary>
/// The maximum number of times to try to reconnect
/// </summary>
public int? MaxReconnectTries { get; set; }
/// <summary>
/// The maximum number of times to try to resubscribe after reconnecting
/// </summary>
public int? MaxResubscribeTries { get; set; } = 5;
/// <summary>
/// Max number of concurrent resubscription tasks per socket after reconnecting a socket
/// </summary>
public int MaxConcurrentResubscriptionsPerSocket { get; set; } = 5;
/// <summary> /// <summary>
/// The time to wait for a socket response before giving a timeout /// The time to wait for a socket response before giving a timeout
/// </summary> /// </summary>

View File

@ -37,6 +37,7 @@ namespace CryptoExchange.Net.OrderBook
private UpdateSubscription? subscription; private UpdateSubscription? subscription;
private readonly bool sequencesAreConsecutive; private readonly bool sequencesAreConsecutive;
private readonly bool strictLevels; private readonly bool strictLevels;
private readonly bool validateChecksum;
private Task? _processTask; private Task? _processTask;
private readonly AutoResetEvent _queueEvent; private readonly AutoResetEvent _queueEvent;
@ -214,6 +215,7 @@ namespace CryptoExchange.Net.OrderBook
sequencesAreConsecutive = options.SequenceNumbersAreConsecutive; sequencesAreConsecutive = options.SequenceNumbersAreConsecutive;
strictLevels = options.StrictLevels; strictLevels = options.StrictLevels;
validateChecksum = options.ChecksumValidationEnabled;
Symbol = symbol; Symbol = symbol;
Status = OrderBookStatus.Disconnected; Status = OrderBookStatus.Disconnected;
@ -233,7 +235,7 @@ namespace CryptoExchange.Net.OrderBook
{ {
log.Write(LogLevel.Debug, $"{Id} order book {Symbol} starting"); log.Write(LogLevel.Debug, $"{Id} order book {Symbol} starting");
Status = OrderBookStatus.Connecting; Status = OrderBookStatus.Connecting;
_processTask = Task.Run(ProcessQueue); _processTask = Task.Factory.StartNew(ProcessQueue, TaskCreationOptions.LongRunning);
var startResult = await DoStartAsync().ConfigureAwait(false); var startResult = await DoStartAsync().ConfigureAwait(false);
if (!startResult) if (!startResult)
@ -243,7 +245,14 @@ namespace CryptoExchange.Net.OrderBook
} }
subscription = startResult.Data; subscription = startResult.Data;
subscription.ConnectionLost += Reset; subscription.ConnectionLost += () =>
{
log.Write(LogLevel.Warning, $"{Id} order book {Symbol} connection lost");
Status = OrderBookStatus.Reconnecting;
Reset();
};
subscription.ConnectionRestored += async time => await ResyncAsync().ConfigureAwait(false); subscription.ConnectionRestored += async time => await ResyncAsync().ConfigureAwait(false);
Status = OrderBookStatus.Synced; Status = OrderBookStatus.Synced;
return new CallResult<bool>(true, null); return new CallResult<bool>(true, null);
@ -288,8 +297,6 @@ namespace CryptoExchange.Net.OrderBook
private void Reset() private void Reset()
{ {
log.Write(LogLevel.Warning, $"{Id} order book {Symbol} connection lost");
Status = OrderBookStatus.Reconnecting;
_queueEvent.Set(); _queueEvent.Set();
// Clear queue // Clear queue
while (_processQueue.TryDequeue(out _)) { } while (_processQueue.TryDequeue(out _)) { }
@ -380,9 +387,7 @@ namespace CryptoExchange.Net.OrderBook
{ {
lock (bookLock) lock (bookLock)
{ {
if (Status == OrderBookStatus.Connecting || Status == OrderBookStatus.Disconnected) log.Write(LogLevel.Warning, $"{Symbol} bookSet");
return;
bookSet = true; bookSet = true;
asks.Clear(); asks.Clear();
foreach (var ask in item.Asks) foreach (var ask in item.Asks)
@ -408,9 +413,6 @@ namespace CryptoExchange.Net.OrderBook
{ {
lock (bookLock) lock (bookLock)
{ {
if (Status == OrderBookStatus.Connecting || Status == OrderBookStatus.Disconnected)
return;
if (!bookSet) if (!bookSet)
{ {
processBuffer.Add(new ProcessBufferRangeSequenceEntry() processBuffer.Add(new ProcessBufferRangeSequenceEntry()
@ -434,7 +436,7 @@ namespace CryptoExchange.Net.OrderBook
if (asks.First().Key < bids.First().Key) if (asks.First().Key < bids.First().Key)
{ {
log.Write(LogLevel.Warning, $"{Id} order book {Symbol} detected out of sync order book. Resyncing"); log.Write(LogLevel.Warning, $"{Id} order book {Symbol} detected out of sync order book. Resyncing");
_ = subscription?.ReconnectAsync(); Resubscribe();
return; return;
} }
@ -448,6 +450,9 @@ namespace CryptoExchange.Net.OrderBook
{ {
lock (bookLock) lock (bookLock)
{ {
if (!validateChecksum)
return;
bool checksumResult = false; bool checksumResult = false;
try try
{ {
@ -467,12 +472,31 @@ namespace CryptoExchange.Net.OrderBook
// Should maybe only reconnect the specific subscription? // Should maybe only reconnect the specific subscription?
log.Write(LogLevel.Warning, $"{Id} order book {Symbol} out of sync. Resyncing"); log.Write(LogLevel.Warning, $"{Id} order book {Symbol} out of sync. Resyncing");
_ = subscription?.ReconnectAsync(); Resubscribe();
return; return;
} }
} }
} }
private void Resubscribe()
{
Status = OrderBookStatus.Syncing;
_ = Task.Run(async () =>
{
await subscription!.UnsubscribeAsync().ConfigureAwait(false);
Reset();
if (!await subscription!.ResubscribeAsync().ConfigureAwait(false))
{
// Resubscribing failed, reconnect the socket
log.Write(LogLevel.Warning, $"{Id} order book {Symbol} resync failed, reconnecting socket");
Status = OrderBookStatus.Reconnecting;
_ = subscription!.ReconnectAsync();
}
else
await ResyncAsync().ConfigureAwait(false);
});
}
/// <summary> /// <summary>
/// Set the initial data for the order book /// Set the initial data for the order book
/// </summary> /// </summary>
@ -591,9 +615,6 @@ namespace CryptoExchange.Net.OrderBook
/// <param name="entry">The entry</param> /// <param name="entry">The entry</param>
protected virtual bool ProcessUpdate(long sequence, OrderBookEntryType type, ISymbolOrderBookEntry entry) protected virtual bool ProcessUpdate(long sequence, OrderBookEntryType type, ISymbolOrderBookEntry entry)
{ {
if (Status != OrderBookStatus.Syncing && Status != OrderBookStatus.Synced)
return false;
if (sequence <= LastSequenceNumber) if (sequence <= LastSequenceNumber)
{ {
log.Write(LogLevel.Debug, $"{Id} order book {Symbol} update skipped #{sequence}"); log.Write(LogLevel.Debug, $"{Id} order book {Symbol} update skipped #{sequence}");
@ -604,7 +625,7 @@ namespace CryptoExchange.Net.OrderBook
{ {
// Out of sync // Out of sync
log.Write(LogLevel.Warning, $"{Id} order book {Symbol} out of sync (expected { LastSequenceNumber + 1}, was {sequence}), reconnecting"); log.Write(LogLevel.Warning, $"{Id} order book {Symbol} out of sync (expected { LastSequenceNumber + 1}, was {sequence}), reconnecting");
subscription?.ReconnectAsync(); Resubscribe();
return false; return false;
} }

View File

@ -49,6 +49,12 @@ namespace CryptoExchange.Net
public int MaxSocketConnections { get; protected set; } = 9999; public int MaxSocketConnections { get; protected set; } = 9999;
/// <inheritdoc cref="SocketClientOptions.SocketSubscriptionsCombineTarget"/> /// <inheritdoc cref="SocketClientOptions.SocketSubscriptionsCombineTarget"/>
public int SocketCombineTarget { get; protected set; } public int SocketCombineTarget { get; protected set; }
/// <inheritdoc cref="SocketClientOptions.MaxReconnectTries"/>
public int? MaxReconnectTries { get; protected set; }
/// <inheritdoc cref="SocketClientOptions.MaxResubscribeTries"/>
public int? MaxResubscribeTries { get; protected set; }
/// <inheritdoc cref="SocketClientOptions.MaxConcurrentResubscriptions"/>
public int MaxConcurrentResubscriptionsPerSocket { get; protected set; }
/// <summary> /// <summary>
/// Delegate used for processing byte data received from socket connections before it is processed by handlers /// Delegate used for processing byte data received from socket connections before it is processed by handlers
/// </summary> /// </summary>
@ -102,6 +108,9 @@ namespace CryptoExchange.Net
ResponseTimeout = exchangeOptions.SocketResponseTimeout; ResponseTimeout = exchangeOptions.SocketResponseTimeout;
SocketNoDataTimeout = exchangeOptions.SocketNoDataTimeout; SocketNoDataTimeout = exchangeOptions.SocketNoDataTimeout;
SocketCombineTarget = exchangeOptions.SocketSubscriptionsCombineTarget ?? 1; SocketCombineTarget = exchangeOptions.SocketSubscriptionsCombineTarget ?? 1;
MaxReconnectTries = exchangeOptions.MaxReconnectTries;
MaxResubscribeTries = exchangeOptions.MaxResubscribeTries;
MaxConcurrentResubscriptionsPerSocket = exchangeOptions.MaxConcurrentResubscriptionsPerSocket;
} }
/// <summary> /// <summary>

View File

@ -5,6 +5,7 @@ using Microsoft.Extensions.Logging;
using System; using System;
using System.Collections.Concurrent; using System.Collections.Concurrent;
using System.Collections.Generic; using System.Collections.Generic;
using System.Diagnostics;
using System.IO; using System.IO;
using System.Net; using System.Net;
using System.Net.WebSockets; using System.Net.WebSockets;
@ -215,14 +216,19 @@ namespace CryptoExchange.Net.Sockets
return false; return false;
} }
_sendTask = Task.Run(SendLoopAsync); log.Write(LogLevel.Trace, $"Socket {Id} connection succeeded, starting communication");
_receiveTask = Task.Run(ReceiveLoopAsync); _sendTask = Task.Factory.StartNew(SendLoopAsync, TaskCreationOptions.LongRunning);
_receiveTask = Task.Factory.StartNew(ReceiveLoopAsync, TaskCreationOptions.LongRunning);
if (Timeout != default) if (Timeout != default)
_timeoutTask = Task.Run(CheckTimeoutAsync); _timeoutTask = Task.Run(CheckTimeoutAsync);
var sw = Stopwatch.StartNew();
while (!_startedSent || !_startedReceive) while (!_startedSent || !_startedReceive)
// Wait for the tasks to have actually started // Wait for the tasks to have actually started
await Task.Delay(10).ConfigureAwait(false); await Task.Delay(10).ConfigureAwait(false);
log.Write(LogLevel.Warning, $"Socket {Id} waited for {sw.ElapsedMilliseconds}ms for tasks to start");
log.Write(LogLevel.Debug, $"Socket {Id} connected"); log.Write(LogLevel.Debug, $"Socket {Id} connected");
return true; return true;
} }
@ -237,6 +243,7 @@ namespace CryptoExchange.Net.Sockets
throw new InvalidOperationException("Can't send data when socket is not connected"); throw new InvalidOperationException("Can't send data when socket is not connected");
var bytes = _encoding.GetBytes(data); var bytes = _encoding.GetBytes(data);
log.Write(LogLevel.Trace, $"Socket {Id} Adding {bytes.Length} to sent buffer");
_sendBuffer.Enqueue(bytes); _sendBuffer.Enqueue(bytes);
_sendEvent.Set(); _sendEvent.Set();
} }
@ -278,6 +285,7 @@ namespace CryptoExchange.Net.Sockets
if (_timeoutTask != null) if (_timeoutTask != null)
tasksToAwait.Add(_timeoutTask); tasksToAwait.Add(_timeoutTask);
log.Write(LogLevel.Trace, $"Socket {Id} waiting for communication loops to finish");
await Task.WhenAll(tasksToAwait).ConfigureAwait(false); await Task.WhenAll(tasksToAwait).ConfigureAwait(false);
log.Write(LogLevel.Debug, $"Socket {Id} closed"); log.Write(LogLevel.Debug, $"Socket {Id} closed");
Handle(closeHandlers); Handle(closeHandlers);
@ -296,6 +304,7 @@ namespace CryptoExchange.Net.Sockets
openHandlers.Clear(); openHandlers.Clear();
closeHandlers.Clear(); closeHandlers.Clear();
messageHandlers.Clear(); messageHandlers.Clear();
log.Write(LogLevel.Trace, $"Socket {Id} disposed");
} }
/// <summary> /// <summary>

View File

@ -9,6 +9,7 @@ using CryptoExchange.Net.Logging;
using Newtonsoft.Json; using Newtonsoft.Json;
using Newtonsoft.Json.Linq; using Newtonsoft.Json.Linq;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using CryptoExchange.Net.Objects;
namespace CryptoExchange.Net.Sockets namespace CryptoExchange.Net.Sockets
{ {
@ -68,7 +69,14 @@ namespace CryptoExchange.Net.Sockets
/// If the socket should be reconnected upon closing /// If the socket should be reconnected upon closing
/// </summary> /// </summary>
public bool ShouldReconnect { get; set; } public bool ShouldReconnect { get; set; }
/// <summary>
/// Current reconnect try
/// </summary>
public int ReconnectTry { get; set; }
/// <summary>
/// Current resubscribe try
/// </summary>
public int ResubscribeTry { get; set; }
/// <summary> /// <summary>
/// Time of disconnecting /// Time of disconnecting
/// </summary> /// </summary>
@ -133,6 +141,7 @@ namespace CryptoExchange.Net.Sockets
Socket.OnClose += SocketOnClose; Socket.OnClose += SocketOnClose;
Socket.OnOpen += () => Socket.OnOpen += () =>
{ {
ReconnectTry = 0;
PausedActivity = false; PausedActivity = false;
Connected = true; Connected = true;
}; };
@ -325,7 +334,21 @@ namespace CryptoExchange.Net.Sockets
Socket.Reset(); Socket.Reset();
if (!await Socket.ConnectAsync().ConfigureAwait(false)) if (!await Socket.ConnectAsync().ConfigureAwait(false))
{ {
log.Write(LogLevel.Debug, $"Socket {Socket.Id} failed to reconnect"); ReconnectTry++;
if(socketClient.MaxReconnectTries != null
&& ReconnectTry >= socketClient.MaxReconnectTries)
{
log.Write(LogLevel.Debug, $"Socket {Socket.Id} failed to reconnect after {ReconnectTry} tries, closing");
ShouldReconnect = false;
if (socketClient.sockets.ContainsKey(Socket.Id))
socketClient.sockets.TryRemove(Socket.Id, out _);
Closed?.Invoke();
break;
}
log.Write(LogLevel.Debug, $"Socket {Socket.Id} failed to reconnect{(socketClient.MaxReconnectTries != null ? $", try {ReconnectTry}/{socketClient.MaxReconnectTries}": "")}");
continue; continue;
} }
@ -337,9 +360,28 @@ namespace CryptoExchange.Net.Sockets
var reconnectResult = await ProcessReconnectAsync().ConfigureAwait(false); var reconnectResult = await ProcessReconnectAsync().ConfigureAwait(false);
if (!reconnectResult) if (!reconnectResult)
{
ResubscribeTry++;
if (socketClient.MaxResubscribeTries != null &&
ResubscribeTry >= socketClient.MaxResubscribeTries)
{
log.Write(LogLevel.Debug, $"Socket {Socket.Id} failed to resubscribe after {ResubscribeTry} tries, closing");
ShouldReconnect = false;
if (socketClient.sockets.ContainsKey(Socket.Id))
socketClient.sockets.TryRemove(Socket.Id, out _);
Closed?.Invoke();
}
else
log.Write(LogLevel.Debug, $"Socket {Socket.Id} resubscribing all subscriptions failed on reconnected socket{(socketClient.MaxResubscribeTries != null ? $", try {ResubscribeTry}/{socketClient.MaxResubscribeTries}" : "")}. Disconnecting and reconnecting.");
await Socket.CloseAsync().ConfigureAwait(false); await Socket.CloseAsync().ConfigureAwait(false);
}
else else
{ {
ResubscribeTry = 0;
if (lostTriggered) if (lostTriggered)
{ {
lostTriggered = false; lostTriggered = false;
@ -389,29 +431,39 @@ namespace CryptoExchange.Net.Sockets
lock (subscriptionLock) lock (subscriptionLock)
subscriptionList = subscriptions.Where(h => h.Request != null).ToList(); subscriptionList = subscriptions.Where(h => h.Request != null).ToList();
var success = true;
var taskList = new List<Task>();
// Foreach subscription which is subscribed by a subscription request we will need to resend that request to resubscribe // Foreach subscription which is subscribed by a subscription request we will need to resend that request to resubscribe
foreach (var subscription in subscriptionList) for (var i = 0; i < subscriptionList.Count; i += socketClient.MaxConcurrentResubscriptionsPerSocket)
{ {
var task = socketClient.SubscribeAndWaitAsync(this, subscription.Request!, subscription).ContinueWith(t => var success = true;
var taskList = new List<Task>();
foreach (var subscription in subscriptionList.Skip(i).Take(socketClient.MaxConcurrentResubscriptionsPerSocket))
{ {
if (!t.Result) var task = socketClient.SubscribeAndWaitAsync(this, subscription.Request!, subscription).ContinueWith(t =>
success = false; {
}); if (!t.Result)
taskList.Add(task); success = false;
} });
taskList.Add(task);
}
await Task.WhenAll(taskList).ConfigureAwait(false); await Task.WhenAll(taskList).ConfigureAwait(false);
if (!success) if (!success)
{ return false;
log.Write(LogLevel.Debug, $"Socket {Socket.Id} resubscribing all subscriptions failed on reconnected socket. Disconnecting and reconnecting."); }
return false;
}
log.Write(LogLevel.Debug, $"Socket {Socket.Id} all subscription successfully resubscribed on reconnected socket."); log.Write(LogLevel.Debug, $"Socket {Socket.Id} all subscription successfully resubscribed on reconnected socket.");
return true; return true;
} }
internal async Task UnsubscribeAsync(SocketSubscription socketSubscription)
{
await socketClient.UnsubscribeAsync(this, socketSubscription).ConfigureAwait(false);
}
internal async Task<CallResult<bool>> ResubscribeAsync(SocketSubscription socketSubscription)
{
return await socketClient.SubscribeAndWaitAsync(this, socketSubscription.Request!, socketSubscription).ConfigureAwait(false);
}
/// <summary> /// <summary>
/// Close the connection /// Close the connection

View File

@ -1,4 +1,5 @@
using System; using CryptoExchange.Net.Objects;
using System;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace CryptoExchange.Net.Sockets namespace CryptoExchange.Net.Sockets
@ -91,5 +92,23 @@ namespace CryptoExchange.Net.Sockets
{ {
return connection.Socket.CloseAsync(); return connection.Socket.CloseAsync();
} }
/// <summary>
/// Unsubscribe a subscription
/// </summary>
/// <returns></returns>
internal async Task UnsubscribeAsync()
{
await connection.UnsubscribeAsync(subscription).ConfigureAwait(false);
}
/// <summary>
/// Resubscribe this subscription
/// </summary>
/// <returns></returns>
internal async Task<CallResult<bool>> ResubscribeAsync()
{
return await connection.ResubscribeAsync(subscription).ConfigureAwait(false);
}
} }
} }