1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-06-08 00:16:27 +00:00

Resubscribing after reconnecting now in parallel, fixed bug with ghost socket

This commit is contained in:
Jan Korf 2019-08-07 14:57:58 +02:00
parent fc41962524
commit cb68b4d6e3
2 changed files with 24 additions and 9 deletions

View File

@ -2371,7 +2371,7 @@
</member> </member>
<member name="M:CryptoExchange.Net.Sockets.SocketConnection.Close(CryptoExchange.Net.Sockets.SocketSubscription)"> <member name="M:CryptoExchange.Net.Sockets.SocketConnection.Close(CryptoExchange.Net.Sockets.SocketSubscription)">
<summary> <summary>
Close the subscriptions Close the subscription
</summary> </summary>
<param name="subscription">Subscription to close</param> <param name="subscription">Subscription to close</param>
<returns></returns> <returns></returns>

View File

@ -148,6 +148,7 @@ namespace CryptoExchange.Net.Sockets
if (tokenData == null) if (tokenData == null)
return; return;
var handledResponse = false;
foreach (var pendingRequest in pendingRequests.ToList()) foreach (var pendingRequest in pendingRequests.ToList())
{ {
if (pendingRequest.Check(tokenData)) if (pendingRequest.Check(tokenData))
@ -155,11 +156,12 @@ namespace CryptoExchange.Net.Sockets
pendingRequests.Remove(pendingRequest); pendingRequests.Remove(pendingRequest);
if (!socketClient.ContinueOnQueryResponse) if (!socketClient.ContinueOnQueryResponse)
return; return;
handledResponse = true;
break; break;
} }
} }
if (!HandleData(tokenData)) if (!HandleData(tokenData) && !handledResponse)
{ {
log.Write(LogVerbosity.Debug, "Message not handled: " + tokenData); log.Write(LogVerbosity.Debug, "Message not handled: " + tokenData);
} }
@ -306,6 +308,9 @@ namespace CryptoExchange.Net.Sockets
else else
{ {
log.Write(LogVerbosity.Info, $"Socket {Socket.Id} closed"); log.Write(LogVerbosity.Info, $"Socket {Socket.Id} closed");
if (socketClient.sockets.ContainsKey(Socket.Id))
socketClient.sockets.TryRemove(Socket.Id, out _);
Socket.Dispose(); Socket.Dispose();
Closed?.Invoke(); Closed?.Invoke();
} }
@ -333,15 +338,25 @@ namespace CryptoExchange.Net.Sockets
List<SocketSubscription> handlerList; List<SocketSubscription> handlerList;
lock (handlersLock) lock (handlersLock)
handlerList = handlers.Where(h => h.Request != null).ToList(); handlerList = handlers.Where(h => h.Request != null).ToList();
var success = true;
var taskList = new List<Task>();
foreach (var handler in handlerList) foreach (var handler in handlerList)
{ {
var resubResult = await socketClient.SubscribeAndWait(this, handler.Request, handler).ConfigureAwait(false); var task = socketClient.SubscribeAndWait(this, handler.Request, handler).ContinueWith(t =>
if (!resubResult.Success) {
if (!t.Result.Success)
success = false;
});
taskList.Add(task);
}
Task.WaitAll(taskList.ToArray());
if (!success)
{ {
log.Write(LogVerbosity.Debug, "Resubscribing all subscriptions failed on reconnected socket. Disconnecting and reconnecting."); log.Write(LogVerbosity.Debug, "Resubscribing all subscriptions failed on reconnected socket. Disconnecting and reconnecting.");
return false; return false;
} }
}
log.Write(LogVerbosity.Debug, "All subscription successfully resubscribed on reconnected socket."); log.Write(LogVerbosity.Debug, "All subscription successfully resubscribed on reconnected socket.");
return true; return true;
@ -363,7 +378,7 @@ namespace CryptoExchange.Net.Sockets
} }
/// <summary> /// <summary>
/// Close the subscriptions /// Close the subscription
/// </summary> /// </summary>
/// <param name="subscription">Subscription to close</param> /// <param name="subscription">Subscription to close</param>
/// <returns></returns> /// <returns></returns>