1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-06-07 16:06:15 +00:00

Removed ResubscribeMaxRetries default value of 5, Updated logging and log levels

This commit is contained in:
Jkorf 2022-03-09 12:59:09 +01:00
parent 949b205d4f
commit 839f509fef
7 changed files with 55 additions and 72 deletions

View File

@ -196,10 +196,10 @@ namespace CryptoExchange.Net
// If we have to output the original json data or output the data into the logging we'll have to read to full response // If we have to output the original json data or output the data into the logging we'll have to read to full response
// in order to log/return the json data // in order to log/return the json data
if (ClientOptions.OutputOriginalData || log.Level <= LogLevel.Debug) if (ClientOptions.OutputOriginalData || log.Level == LogLevel.Trace)
{ {
data = await reader.ReadToEndAsync().ConfigureAwait(false); data = await reader.ReadToEndAsync().ConfigureAwait(false);
log.Write(LogLevel.Debug, $"{(requestId != null ? $"[{requestId}] ": "")}Response received{(elapsedMilliseconds != null ? $" in {elapsedMilliseconds}" : " ")}ms: {data}"); log.Write(LogLevel.Debug, $"{(requestId != null ? $"[{requestId}] ": "")}Response received{(elapsedMilliseconds != null ? $" in {elapsedMilliseconds}" : " ")}ms{(log.Level == LogLevel.Trace ? (": " + data) : "")}");
var result = Deserialize<T>(data, serializer, requestId); var result = Deserialize<T>(data, serializer, requestId);
if(ClientOptions.OutputOriginalData) if(ClientOptions.OutputOriginalData)
result.OriginalData = data; result.OriginalData = data;
@ -209,6 +209,7 @@ namespace CryptoExchange.Net
// If we don't have to keep track of the original json data we can use the JsonTextReader to deserialize the stream directly // If we don't have to keep track of the original json data we can use the JsonTextReader to deserialize the stream directly
// into the desired object, which has increased performance over first reading the string value into memory and deserializing from that // into the desired object, which has increased performance over first reading the string value into memory and deserializing from that
using var jsonReader = new JsonTextReader(reader); using var jsonReader = new JsonTextReader(reader);
log.Write(LogLevel.Debug, $"{(requestId != null ? $"[{requestId}] ": "")}Response received{(elapsedMilliseconds != null ? $" in {elapsedMilliseconds}" : " ")}ms");
return new CallResult<T>(serializer.Deserialize<T>(jsonReader)!); return new CallResult<T>(serializer.Deserialize<T>(jsonReader)!);
} }
catch (JsonReaderException jre) catch (JsonReaderException jre)
@ -222,7 +223,7 @@ namespace CryptoExchange.Net
data = await ReadStreamAsync(stream).ConfigureAwait(false); data = await ReadStreamAsync(stream).ConfigureAwait(false);
} }
else else
data = "[Data only available in Debug LogLevel]"; data = "[Data only available in Trace LogLevel]";
} }
log.Write(LogLevel.Error, $"{(requestId != null ? $"[{requestId}] " : "")}Deserialize JsonReaderException: {jre.Message}, Path: {jre.Path}, LineNumber: {jre.LineNumber}, LinePosition: {jre.LinePosition}, data: {data}"); log.Write(LogLevel.Error, $"{(requestId != null ? $"[{requestId}] " : "")}Deserialize JsonReaderException: {jre.Message}, Path: {jre.Path}, LineNumber: {jre.LineNumber}, LinePosition: {jre.LinePosition}, data: {data}");
return new CallResult<T>(new DeserializeError($"Deserialize JsonReaderException: {jre.Message}, Path: {jre.Path}, LineNumber: {jre.LineNumber}, LinePosition: {jre.LinePosition}", data)); return new CallResult<T>(new DeserializeError($"Deserialize JsonReaderException: {jre.Message}, Path: {jre.Path}, LineNumber: {jre.LineNumber}, LinePosition: {jre.LinePosition}", data));
@ -237,7 +238,7 @@ namespace CryptoExchange.Net
data = await ReadStreamAsync(stream).ConfigureAwait(false); data = await ReadStreamAsync(stream).ConfigureAwait(false);
} }
else else
data = "[Data only available in Debug LogLevel]"; data = "[Data only available in Trace LogLevel]";
} }
log.Write(LogLevel.Error, $"{(requestId != null ? $"[{requestId}] " : "")}Deserialize JsonSerializationException: {jse.Message}, data: {data}"); log.Write(LogLevel.Error, $"{(requestId != null ? $"[{requestId}] " : "")}Deserialize JsonSerializationException: {jse.Message}, data: {data}");
@ -253,7 +254,7 @@ namespace CryptoExchange.Net
data = await ReadStreamAsync(stream).ConfigureAwait(false); data = await ReadStreamAsync(stream).ConfigureAwait(false);
} }
else else
data = "[Data only available in Debug LogLevel]"; data = "[Data only available in Trace LogLevel]";
} }
var exceptionInfo = ex.ToLogString(); var exceptionInfo = ex.ToLogString();

View File

@ -147,13 +147,13 @@ namespace CryptoExchange.Net
} }
} }
log.Write(LogLevel.Debug, $"[{requestId}] Creating request for " + uri);
if (signed && apiClient.AuthenticationProvider == null) if (signed && apiClient.AuthenticationProvider == null)
{ {
log.Write(LogLevel.Warning, $"[{requestId}] Request {uri.AbsolutePath} failed because no ApiCredentials were provided"); log.Write(LogLevel.Warning, $"[{requestId}] Request {uri.AbsolutePath} failed because no ApiCredentials were provided");
return new WebCallResult<T>(new NoApiCredentialsError()); return new WebCallResult<T>(new NoApiCredentialsError());
} }
log.Write(LogLevel.Information, $"[{requestId}] Creating request for " + uri);
var paramsPosition = parameterPosition ?? ParameterPositions[method]; var paramsPosition = parameterPosition ?? ParameterPositions[method];
var request = ConstructRequest(apiClient, uri, method, parameters, signed, paramsPosition, arraySerialization ?? this.arraySerialization, requestId, additionalHeaders); var request = ConstructRequest(apiClient, uri, method, parameters, signed, paramsPosition, arraySerialization ?? this.arraySerialization, requestId, additionalHeaders);
@ -161,15 +161,12 @@ namespace CryptoExchange.Net
if (paramsPosition == HttpMethodParameterPosition.InBody) if (paramsPosition == HttpMethodParameterPosition.InBody)
paramString = $" with request body '{request.Content}'"; paramString = $" with request body '{request.Content}'";
if (log.Level == LogLevel.Trace) var headers = request.GetHeaders();
{ if (headers.Any())
var headers = request.GetHeaders(); paramString += " with headers " + string.Join(", ", headers.Select(h => h.Key + $"=[{string.Join(",", h.Value)}]"));
if (headers.Any())
paramString += " with headers " + string.Join(", ", headers.Select(h => h.Key + $"=[{string.Join(",", h.Value)}]"));
}
apiClient.TotalRequestsMade++; apiClient.TotalRequestsMade++;
log.Write(LogLevel.Debug, $"[{requestId}] Sending {method}{(signed ? " signed" : "")} request to {request.Uri}{paramString ?? " "}{(ClientOptions.Proxy == null ? "" : $" via proxy {ClientOptions.Proxy.Host}")}"); log.Write(LogLevel.Trace, $"[{requestId}] Sending {method}{(signed ? " signed" : "")} request to {request.Uri}{paramString ?? " "}{(ClientOptions.Proxy == null ? "" : $" via proxy {ClientOptions.Proxy.Host}")}");
return await GetResponseAsync<T>(request, deserializer, cancellationToken).ConfigureAwait(false); return await GetResponseAsync<T>(request, deserializer, cancellationToken).ConfigureAwait(false);
} }
@ -200,7 +197,7 @@ namespace CryptoExchange.Net
var data = await reader.ReadToEndAsync().ConfigureAwait(false); var data = await reader.ReadToEndAsync().ConfigureAwait(false);
responseStream.Close(); responseStream.Close();
response.Close(); response.Close();
log.Write(LogLevel.Debug, $"[{request.RequestId}] Response received in {sw.ElapsedMilliseconds}ms: {data}"); log.Write(LogLevel.Debug, $"[{request.RequestId}] Response received in {sw.ElapsedMilliseconds}ms{(log.Level == LogLevel.Trace ? (": "+data): "")}");
// Validate if it is valid json. Sometimes other data will be returned, 502 error html pages for example // Validate if it is valid json. Sometimes other data will be returned, 502 error html pages for example
var parseResult = ValidateJson(data); var parseResult = ValidateJson(data);
@ -231,7 +228,7 @@ namespace CryptoExchange.Net
// Http status code indicates error // Http status code indicates error
using var reader = new StreamReader(responseStream); using var reader = new StreamReader(responseStream);
var data = await reader.ReadToEndAsync().ConfigureAwait(false); var data = await reader.ReadToEndAsync().ConfigureAwait(false);
log.Write(LogLevel.Debug, $"[{request.RequestId}] Error received: {data}"); log.Write(LogLevel.Warning, $"[{request.RequestId}] Error received in {sw.ElapsedMilliseconds}ms: {data}");
responseStream.Close(); responseStream.Close();
response.Close(); response.Close();
var parseResult = ValidateJson(data); var parseResult = ValidateJson(data);

View File

@ -188,9 +188,6 @@ namespace CryptoExchange.Net
var connectResult = await ConnectIfNeededAsync(socketConnection, authenticated).ConfigureAwait(false); var connectResult = await ConnectIfNeededAsync(socketConnection, authenticated).ConfigureAwait(false);
if (!connectResult) if (!connectResult)
return new CallResult<UpdateSubscription>(connectResult.Error!); return new CallResult<UpdateSubscription>(connectResult.Error!);
if (needsConnecting)
log.Write(LogLevel.Debug, $"Socket {socketConnection.Socket.Id} connected to {url} {(request == null ? "": "with request " + JsonConvert.SerializeObject(request))}");
} }
finally finally
{ {
@ -200,7 +197,7 @@ namespace CryptoExchange.Net
if (socketConnection.PausedActivity) if (socketConnection.PausedActivity)
{ {
log.Write(LogLevel.Information, $"Socket {socketConnection.Socket.Id} has been paused, can't subscribe at this moment"); log.Write(LogLevel.Warning, $"Socket {socketConnection.Socket.Id} has been paused, can't subscribe at this moment");
return new CallResult<UpdateSubscription>( new ServerError("Socket is paused")); return new CallResult<UpdateSubscription>( new ServerError("Socket is paused"));
} }
@ -225,10 +222,12 @@ namespace CryptoExchange.Net
{ {
subscription.CancellationTokenRegistration = ct.Register(async () => subscription.CancellationTokenRegistration = ct.Register(async () =>
{ {
log.Write(LogLevel.Debug, $"Socket {socketConnection.Socket.Id} Cancellation token set, closing subscription"); log.Write(LogLevel.Information, $"Socket {socketConnection.Socket.Id} Cancellation token set, closing subscription");
await socketConnection.CloseAsync(subscription).ConfigureAwait(false); await socketConnection.CloseAsync(subscription).ConfigureAwait(false);
}, false); }, false);
} }
log.Write(LogLevel.Information, $"Socket {socketConnection.Socket.Id} subscription completed");
return new CallResult<UpdateSubscription>(new UpdateSubscription(socketConnection, subscription)); return new CallResult<UpdateSubscription>(new UpdateSubscription(socketConnection, subscription));
} }
@ -310,7 +309,7 @@ namespace CryptoExchange.Net
if (socketConnection.PausedActivity) if (socketConnection.PausedActivity)
{ {
log.Write(LogLevel.Information, $"Socket {socketConnection.Socket.Id} has been paused, can't send query at this moment"); log.Write(LogLevel.Warning, $"Socket {socketConnection.Socket.Id} has been paused, can't send query at this moment");
return new CallResult<T>(new ServerError("Socket is paused")); return new CallResult<T>(new ServerError("Socket is paused"));
} }
@ -618,7 +617,7 @@ namespace CryptoExchange.Net
} }
catch (Exception ex) catch (Exception ex)
{ {
log.Write(LogLevel.Warning, $"Socket {socket.Socket.Id} Periodic send {identifier} failed: " + ex); log.Write(LogLevel.Warning, $"Socket {socket.Socket.Id} Periodic send {identifier} failed: " + ex.ToLogString());
} }
} }
} }
@ -672,7 +671,7 @@ namespace CryptoExchange.Net
/// <returns></returns> /// <returns></returns>
public virtual async Task UnsubscribeAllAsync() public virtual async Task UnsubscribeAllAsync()
{ {
log.Write(LogLevel.Debug, $"Closing all {sockets.Sum(s => s.Value.SubscriptionCount)} subscriptions"); log.Write(LogLevel.Information, $"Closing all {sockets.Sum(s => s.Value.SubscriptionCount)} subscriptions");
await Task.Run(async () => await Task.Run(async () =>
{ {

View File

@ -164,7 +164,7 @@ namespace CryptoExchange.Net.Objects
/// <summary> /// <summary>
/// The maximum number of times to try to resubscribe after reconnecting /// The maximum number of times to try to resubscribe after reconnecting
/// </summary> /// </summary>
public int? MaxResubscribeTries { get; set; } = 5; public int? MaxResubscribeTries { get; set; }
/// <summary> /// <summary>
/// Max number of concurrent resubscription tasks per socket after reconnecting a socket /// Max number of concurrent resubscription tasks per socket after reconnecting a socket

View File

@ -224,7 +224,7 @@ namespace CryptoExchange.Net.OrderBook
public async Task<CallResult<bool>> StartAsync(CancellationToken? ct = null) public async Task<CallResult<bool>> StartAsync(CancellationToken? ct = null)
{ {
if (Status != OrderBookStatus.Disconnected) if (Status != OrderBookStatus.Disconnected)
throw new InvalidOperationException($"Can't start book unless state is {OrderBookStatus.Connecting}. Was {Status}"); throw new InvalidOperationException($"Can't start book unless state is {OrderBookStatus.Disconnected}. Current state: {Status}");
log.Write(LogLevel.Debug, $"{Id} order book {Symbol} starting"); log.Write(LogLevel.Debug, $"{Id} order book {Symbol} starting");
_cts = new CancellationTokenSource(); _cts = new CancellationTokenSource();
@ -288,7 +288,7 @@ namespace CryptoExchange.Net.OrderBook
if (_subscription != null) if (_subscription != null)
await _subscription.CloseAsync().ConfigureAwait(false); await _subscription.CloseAsync().ConfigureAwait(false);
log.Write(LogLevel.Debug, $"{Id} order book {Symbol} stopped"); log.Write(LogLevel.Trace, $"{Id} order book {Symbol} stopped");
} }
/// <inheritdoc/> /// <inheritdoc/>
@ -417,7 +417,7 @@ namespace CryptoExchange.Net.OrderBook
{ {
var pbList = processBuffer.ToList(); var pbList = processBuffer.ToList();
if (pbList.Count > 0) if (pbList.Count > 0)
log.Write(LogLevel.Debug, "Processing buffered updates"); log.Write(LogLevel.Debug, $"Processing {pbList.Count} buffered updates");
foreach (var bufferEntry in pbList) foreach (var bufferEntry in pbList)
{ {
@ -436,7 +436,7 @@ namespace CryptoExchange.Net.OrderBook
{ {
if (sequence <= LastSequenceNumber) if (sequence <= LastSequenceNumber)
{ {
log.Write(LogLevel.Debug, $"{Id} order book {Symbol} update skipped #{sequence}"); log.Write(LogLevel.Debug, $"{Id} order book {Symbol} update skipped #{sequence}, currently at #{LastSequenceNumber}");
return false; return false;
} }
@ -664,7 +664,7 @@ namespace CryptoExchange.Net.OrderBook
FirstUpdateId = item.StartUpdateId, FirstUpdateId = item.StartUpdateId,
LastUpdateId = item.EndUpdateId, LastUpdateId = item.EndUpdateId,
}); });
log.Write(LogLevel.Debug, $"{Id} order book {Symbol} update buffered #{item.StartUpdateId}-#{item.EndUpdateId} [{item.Asks.Count()} asks, {item.Bids.Count()} bids]"); log.Write(LogLevel.Trace, $"{Id} order book {Symbol} update buffered #{item.StartUpdateId}-#{item.EndUpdateId} [{item.Asks.Count()} asks, {item.Bids.Count()} bids]");
} }
else else
{ {
@ -748,7 +748,7 @@ namespace CryptoExchange.Net.OrderBook
{ {
if (lastUpdateId <= LastSequenceNumber) if (lastUpdateId <= LastSequenceNumber)
{ {
log.Write(LogLevel.Debug, $"{Id} order book {Symbol} update skipped #{firstUpdateId}-{lastUpdateId}"); log.Write(LogLevel.Trace, $"{Id} order book {Symbol} update skipped #{firstUpdateId}-{lastUpdateId}");
return; return;
} }
@ -774,7 +774,7 @@ namespace CryptoExchange.Net.OrderBook
} }
LastSequenceNumber = lastUpdateId; LastSequenceNumber = lastUpdateId;
log.Write(LogLevel.Debug, $"{Id} order book {Symbol} update processed #{firstUpdateId}-{lastUpdateId}"); log.Write(LogLevel.Trace, $"{Id} order book {Symbol} update processed #{firstUpdateId}-{lastUpdateId}");
} }
} }

View File

@ -254,7 +254,7 @@ namespace CryptoExchange.Net.Sockets
} }
} }
log.Write(LogLevel.Debug, $"Socket {Id} connected"); log.Write(LogLevel.Debug, $"Socket {Id} connected to {Url}");
return true; return true;
} }

View File

@ -43,11 +43,6 @@ namespace CryptoExchange.Net.Sockets
/// </summary> /// </summary>
public event Action? ActivityUnpaused; public event Action? ActivityUnpaused;
/// <summary>
/// Connecting closed event
/// </summary>
public event Action? Closed;
/// <summary> /// <summary>
/// Unhandled message event /// Unhandled message event
/// </summary> /// </summary>
@ -118,7 +113,7 @@ namespace CryptoExchange.Net.Sockets
if (pausedActivity != value) if (pausedActivity != value)
{ {
pausedActivity = value; pausedActivity = value;
log.Write(LogLevel.Debug, $"Socket {Socket.Id} Paused activity: " + value); log.Write(LogLevel.Information, $"Socket {Socket.Id} Paused activity: " + value);
if(pausedActivity) ActivityPaused?.Invoke(); if(pausedActivity) ActivityPaused?.Invoke();
else ActivityUnpaused?.Invoke(); else ActivityUnpaused?.Invoke();
} }
@ -341,7 +336,7 @@ namespace CryptoExchange.Net.Sockets
/// <param name="data">The data to send</param> /// <param name="data">The data to send</param>
public virtual void Send(string data) public virtual void Send(string data)
{ {
log.Write(LogLevel.Debug, $"Socket {Socket.Id} sending data: {data}"); log.Write(LogLevel.Trace, $"Socket {Socket.Id} sending data: {data}");
Socket.Send(data); Socket.Send(data);
} }
@ -377,7 +372,7 @@ namespace CryptoExchange.Net.Sockets
Socket.Reconnecting = true; Socket.Reconnecting = true;
DisconnectTime = DateTime.UtcNow; DisconnectTime = DateTime.UtcNow;
log.Write(LogLevel.Information, $"Socket {Socket.Id} Connection lost, will try to reconnect{(ReconnectTry == 0 ? "": $" after {socketClient.ClientOptions.ReconnectInterval}")}"); log.Write(LogLevel.Warning, $"Socket {Socket.Id} Connection lost, will try to reconnect");
if (!lostTriggered) if (!lostTriggered)
{ {
lostTriggered = true; lostTriggered = true;
@ -409,13 +404,12 @@ namespace CryptoExchange.Net.Sockets
if (socketClient.ClientOptions.MaxReconnectTries != null if (socketClient.ClientOptions.MaxReconnectTries != null
&& ReconnectTry >= socketClient.ClientOptions.MaxReconnectTries) && ReconnectTry >= socketClient.ClientOptions.MaxReconnectTries)
{ {
log.Write(LogLevel.Debug, $"Socket {Socket.Id} failed to reconnect after {ReconnectTry} tries, closing"); log.Write(LogLevel.Warning, $"Socket {Socket.Id} failed to reconnect after {ReconnectTry} tries, closing");
ShouldReconnect = false; ShouldReconnect = false;
if (socketClient.sockets.ContainsKey(Socket.Id)) if (socketClient.sockets.ContainsKey(Socket.Id))
socketClient.sockets.TryRemove(Socket.Id, out _); socketClient.sockets.TryRemove(Socket.Id, out _);
Closed?.Invoke();
_ = Task.Run(() => ConnectionClosed?.Invoke()); _ = Task.Run(() => ConnectionClosed?.Invoke());
break; break;
} }
@ -439,17 +433,16 @@ namespace CryptoExchange.Net.Sockets
if (socketClient.ClientOptions.MaxResubscribeTries != null && if (socketClient.ClientOptions.MaxResubscribeTries != null &&
ResubscribeTry >= socketClient.ClientOptions.MaxResubscribeTries) ResubscribeTry >= socketClient.ClientOptions.MaxResubscribeTries)
{ {
log.Write(LogLevel.Debug, $"Socket {Socket.Id} failed to resubscribe after {ResubscribeTry} tries, closing"); log.Write(LogLevel.Warning, $"Socket {Socket.Id} failed to resubscribe after {ResubscribeTry} tries, closing. Last resubscription error: {reconnectResult.Error}");
ShouldReconnect = false; ShouldReconnect = false;
if (socketClient.sockets.ContainsKey(Socket.Id)) if (socketClient.sockets.ContainsKey(Socket.Id))
socketClient.sockets.TryRemove(Socket.Id, out _); socketClient.sockets.TryRemove(Socket.Id, out _);
Closed?.Invoke();
_ = Task.Run(() => ConnectionClosed?.Invoke()); _ = Task.Run(() => ConnectionClosed?.Invoke());
} }
else else
log.Write(LogLevel.Debug, $"Socket {Socket.Id} resubscribing all subscriptions failed on reconnected socket{(socketClient.ClientOptions.MaxResubscribeTries != null ? $", try {ResubscribeTry}/{socketClient.ClientOptions.MaxResubscribeTries}" : "")}. Disconnecting and reconnecting."); log.Write(LogLevel.Debug, $"Socket {Socket.Id} resubscribing all subscriptions failed on reconnected socket{(socketClient.ClientOptions.MaxResubscribeTries != null ? $", try {ResubscribeTry}/{socketClient.ClientOptions.MaxResubscribeTries}" : "")}. Error: {reconnectResult.Error}. Disconnecting and reconnecting.");
if (Socket.IsOpen) if (Socket.IsOpen)
await Socket.CloseAsync().ConfigureAwait(false); await Socket.CloseAsync().ConfigureAwait(false);
@ -458,7 +451,7 @@ namespace CryptoExchange.Net.Sockets
} }
else else
{ {
log.Write(LogLevel.Debug, $"Socket {Socket.Id} data connection restored."); log.Write(LogLevel.Information, $"Socket {Socket.Id} data connection restored.");
ResubscribeTry = 0; ResubscribeTry = 0;
if (lostTriggered) if (lostTriggered)
{ {
@ -482,24 +475,22 @@ namespace CryptoExchange.Net.Sockets
log.Write(LogLevel.Information, $"Socket {Socket.Id} closed"); log.Write(LogLevel.Information, $"Socket {Socket.Id} closed");
if (socketClient.sockets.ContainsKey(Socket.Id)) if (socketClient.sockets.ContainsKey(Socket.Id))
socketClient.sockets.TryRemove(Socket.Id, out _); socketClient.sockets.TryRemove(Socket.Id, out _);
Closed?.Invoke();
} }
} }
private async Task<bool> ProcessReconnectAsync() private async Task<CallResult<bool>> ProcessReconnectAsync()
{ {
if (!Socket.IsOpen)
return new CallResult<bool>(new WebError("Socket not connected"));
if (Authenticated) if (Authenticated)
{ {
if (!Socket.IsOpen)
return false;
// If we reconnected a authenticated connection we need to re-authenticate // If we reconnected a authenticated connection we need to re-authenticate
var authResult = await socketClient.AuthenticateSocketAsync(this).ConfigureAwait(false); var authResult = await socketClient.AuthenticateSocketAsync(this).ConfigureAwait(false);
if (!authResult) if (!authResult)
{ {
log.Write(LogLevel.Information, $"Socket {Socket.Id} authentication failed on reconnected socket. Disconnecting and reconnecting."); log.Write(LogLevel.Warning, $"Socket {Socket.Id} authentication failed on reconnected socket. Disconnecting and reconnecting.");
return false; return authResult;
} }
log.Write(LogLevel.Debug, $"Socket {Socket.Id} authentication succeeded on reconnected socket."); log.Write(LogLevel.Debug, $"Socket {Socket.Id} authentication succeeded on reconnected socket.");
@ -513,28 +504,23 @@ namespace CryptoExchange.Net.Sockets
// Foreach subscription which is subscribed by a subscription request we will need to resend that request to resubscribe // Foreach subscription which is subscribed by a subscription request we will need to resend that request to resubscribe
for (var i = 0; i < subscriptionList.Count; i += socketClient.ClientOptions.MaxConcurrentResubscriptionsPerSocket) for (var i = 0; i < subscriptionList.Count; i += socketClient.ClientOptions.MaxConcurrentResubscriptionsPerSocket)
{ {
var success = true; if (!Socket.IsOpen)
var taskList = new List<Task>(); return new CallResult<bool>(new WebError("Socket not connected"));
foreach (var subscription in subscriptionList.Skip(i).Take(socketClient.ClientOptions.MaxConcurrentResubscriptionsPerSocket))
{
if (!Socket.IsOpen)
continue;
var task = socketClient.SubscribeAndWaitAsync(this, subscription.Request!, subscription).ContinueWith(t => var taskList = new List<Task<CallResult<bool>>>();
{ foreach (var subscription in subscriptionList.Skip(i).Take(socketClient.ClientOptions.MaxConcurrentResubscriptionsPerSocket))
if (!t.Result) taskList.Add(socketClient.SubscribeAndWaitAsync(this, subscription.Request!, subscription));
success = false;
});
taskList.Add(task);
}
await Task.WhenAll(taskList).ConfigureAwait(false); await Task.WhenAll(taskList).ConfigureAwait(false);
if (!success || !Socket.IsOpen) if (taskList.Any(t => !t.Result.Success))
return false; return taskList.First(t => !t.Result.Success).Result;
} }
if (!Socket.IsOpen)
return new CallResult<bool>(new WebError("Socket not connected"));
log.Write(LogLevel.Debug, $"Socket {Socket.Id} all subscription successfully resubscribed on reconnected socket."); log.Write(LogLevel.Debug, $"Socket {Socket.Id} all subscription successfully resubscribed on reconnected socket.");
return true; return new CallResult<bool>(true);
} }
internal async Task UnsubscribeAsync(SocketSubscription socketSubscription) internal async Task UnsubscribeAsync(SocketSubscription socketSubscription)