1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-06-10 17:36:19 +00:00

Small refactor reconnect handling

This commit is contained in:
Jkorf 2021-10-08 12:06:45 +02:00
parent dadf62c280
commit 0bad51d775
2 changed files with 64 additions and 35 deletions

View File

@ -3865,6 +3865,11 @@
</summary> </summary>
<param name="data">The data to send</param> <param name="data">The data to send</param>
</member> </member>
<member name="M:CryptoExchange.Net.Sockets.SocketConnection.SocketOnOpen">
<summary>
Handler for a socket opening
</summary>
</member>
<member name="M:CryptoExchange.Net.Sockets.SocketConnection.SocketOnClose"> <member name="M:CryptoExchange.Net.Sockets.SocketConnection.SocketOnClose">
<summary> <summary>
Handler for a socket closing. Reconnects the socket if needed, or removes it from the active socket list if not Handler for a socket closing. Reconnects the socket if needed, or removes it from the active socket list if not

View File

@ -132,12 +132,7 @@ namespace CryptoExchange.Net.Sockets
Socket.Timeout = client.SocketNoDataTimeout; Socket.Timeout = client.SocketNoDataTimeout;
Socket.OnMessage += ProcessMessage; Socket.OnMessage += ProcessMessage;
Socket.OnClose += SocketOnClose; Socket.OnClose += SocketOnClose;
Socket.OnOpen += () => Socket.OnOpen += SocketOnOpen;
{
ReconnectTry = 0;
PausedActivity = false;
Connected = true;
};
} }
/// <summary> /// <summary>
@ -159,34 +154,36 @@ namespace CryptoExchange.Net.Sockets
return; return;
} }
var messageEvent = new MessageEvent(this, tokenData, socketClient.OutputOriginalData ? data: null, timestamp);
var handledResponse = false; var handledResponse = false;
PendingRequest[] requests; PendingRequest[] requests;
lock(pendingRequests) lock(pendingRequests)
{
requests = pendingRequests.ToArray(); requests = pendingRequests.ToArray();
}
// Remove any timed out requests
foreach (var request in requests.Where(r => r.Completed))
{
lock (pendingRequests)
pendingRequests.Remove(request);
}
// Check if this message is an answer on any pending requests // Check if this message is an answer on any pending requests
foreach (var pendingRequest in requests) foreach (var pendingRequest in requests)
{ {
if (pendingRequest.Check(tokenData)) if (pendingRequest.CheckData(tokenData))
{ {
lock (pendingRequests) lock (pendingRequests)
{
pendingRequests.Remove(pendingRequest); pendingRequests.Remove(pendingRequest);
}
if (pendingRequest.Result == null)
{
continue; // A previous timeout.
}
if (!socketClient.ContinueOnQueryResponse) if (!socketClient.ContinueOnQueryResponse)
return; return;
handledResponse = true; handledResponse = true;
break; break;
} }
} }
// Message was not a request response, check data handlers // Message was not a request response, check data handlers
var messageEvent = new MessageEvent(this, tokenData, socketClient.OutputOriginalData ? data: null, timestamp);
if (!HandleData(messageEvent) && !handledResponse) if (!HandleData(messageEvent) && !handledResponse)
{ {
if (!socketClient.UnhandledMessageExpected) if (!socketClient.UnhandledMessageExpected)
@ -309,26 +306,45 @@ namespace CryptoExchange.Net.Sockets
Socket.Send(data); Socket.Send(data);
} }
/// <summary>
/// Handler for a socket opening
/// </summary>
protected virtual void SocketOnOpen()
{
ReconnectTry = 0;
PausedActivity = false;
Connected = true;
}
/// <summary> /// <summary>
/// Handler for a socket closing. Reconnects the socket if needed, or removes it from the active socket list if not /// Handler for a socket closing. Reconnects the socket if needed, or removes it from the active socket list if not
/// </summary> /// </summary>
protected virtual void SocketOnClose() protected virtual void SocketOnClose()
{ {
lock (pendingRequests)
{
foreach(var pendingRequest in pendingRequests.ToList())
{
pendingRequest.Fail();
pendingRequests.Remove(pendingRequest);
}
}
if (socketClient.AutoReconnect && ShouldReconnect) if (socketClient.AutoReconnect && ShouldReconnect)
{ {
if (Socket.Reconnecting) if (Socket.Reconnecting)
return; // Already reconnecting return; // Already reconnecting
Socket.Reconnecting = true;
DisconnectTime = DateTime.UtcNow; DisconnectTime = DateTime.UtcNow;
log.Write(LogLevel.Information, $"Socket {Socket.Id} Connection lost, will try to reconnect after {socketClient.ReconnectInterval}");
if (!lostTriggered) if (!lostTriggered)
{ {
lostTriggered = true; lostTriggered = true;
ConnectionLost?.Invoke(); ConnectionLost?.Invoke();
} }
Socket.Reconnecting = true;
log.Write(LogLevel.Information, $"Socket {Socket.Id} Connection lost, will try to reconnect after {socketClient.ReconnectInterval}");
Task.Run(async () => Task.Run(async () =>
{ {
while (ShouldReconnect) while (ShouldReconnect)
@ -346,7 +362,8 @@ namespace CryptoExchange.Net.Sockets
if (!await Socket.ConnectAsync().ConfigureAwait(false)) if (!await Socket.ConnectAsync().ConfigureAwait(false))
{ {
ReconnectTry++; ReconnectTry++;
if(socketClient.MaxReconnectTries != null ResubscribeTry = 0;
if (socketClient.MaxReconnectTries != null
&& ReconnectTry >= socketClient.MaxReconnectTries) && ReconnectTry >= socketClient.MaxReconnectTries)
{ {
log.Write(LogLevel.Debug, $"Socket {Socket.Id} failed to reconnect after {ReconnectTry} tries, closing"); log.Write(LogLevel.Debug, $"Socket {Socket.Id} failed to reconnect after {ReconnectTry} tries, closing");
@ -390,10 +407,14 @@ namespace CryptoExchange.Net.Sockets
else else
log.Write(LogLevel.Debug, $"Socket {Socket.Id} resubscribing all subscriptions failed on reconnected socket{(socketClient.MaxResubscribeTries != null ? $", try {ResubscribeTry}/{socketClient.MaxResubscribeTries}" : "")}. Disconnecting and reconnecting."); log.Write(LogLevel.Debug, $"Socket {Socket.Id} resubscribing all subscriptions failed on reconnected socket{(socketClient.MaxResubscribeTries != null ? $", try {ResubscribeTry}/{socketClient.MaxResubscribeTries}" : "")}. Disconnecting and reconnecting.");
await Socket.CloseAsync().ConfigureAwait(false); if (Socket.IsOpen)
await Socket.CloseAsync().ConfigureAwait(false);
else
DisconnectTime = DateTime.UtcNow;
} }
else else
{ {
log.Write(LogLevel.Debug, $"Socket {Socket.Id} data connection restored.");
ResubscribeTry = 0; ResubscribeTry = 0;
if (lostTriggered) if (lostTriggered)
{ {
@ -525,36 +546,39 @@ namespace CryptoExchange.Net.Sockets
{ {
public Func<JToken, bool> Handler { get; } public Func<JToken, bool> Handler { get; }
public JToken? Result { get; private set; } public JToken? Result { get; private set; }
public bool Completed { get; private set; }
public AsyncResetEvent Event { get; } public AsyncResetEvent Event { get; }
public TimeSpan Timeout { get; } public TimeSpan Timeout { get; }
private readonly DateTime startTime; private CancellationTokenSource cts;
public PendingRequest(Func<JToken, bool> handler, TimeSpan timeout) public PendingRequest(Func<JToken, bool> handler, TimeSpan timeout)
{ {
Handler = handler; Handler = handler;
Event = new AsyncResetEvent(false, false); Event = new AsyncResetEvent(false, false);
Timeout = timeout; Timeout = timeout;
startTime = DateTime.UtcNow;
cts = new CancellationTokenSource(timeout);
cts.Token.Register(Fail, false);
} }
public bool Check(JToken data) public bool CheckData(JToken data)
{ {
if (Handler(data)) if (Handler(data))
{ {
Result = data; Result = data;
Completed = true;
Event.Set(); Event.Set();
return true; return true;
} }
if (DateTime.UtcNow - startTime > Timeout)
{
// Timed out
Event.Set();
return true;
}
return false; return false;
} }
public void Fail()
{
Completed = true;
Event.Set();
}
} }
} }