1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-06-07 07:56:12 +00:00

Implement high-performance logging (#193)

* Implement high-performance logging
This commit is contained in:
Jonnern 2024-03-22 16:39:32 +01:00 committed by GitHub
parent 108c8fc183
commit de72fe4fb9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 1293 additions and 112 deletions

View File

@ -4,6 +4,7 @@ using System.Linq;
using System.Text;
using System.Threading.Tasks;
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Logging.Extensions;
using CryptoExchange.Net.Objects.Sockets;
using Microsoft.Extensions.Logging;
@ -63,7 +64,7 @@ namespace CryptoExchange.Net.Clients
if (subscription == null)
throw new ArgumentNullException(nameof(subscription));
_logger.Log(LogLevel.Information, $"Socket {subscription.SocketId} Unsubscribing subscription " + subscription.Id);
_logger.UnsubscribingSubscription(subscription.SocketId, subscription.Id);
await subscription.CloseAsync().ConfigureAwait(false);
}
@ -86,7 +87,7 @@ namespace CryptoExchange.Net.Clients
/// <returns></returns>
public virtual async Task ReconnectAsync()
{
_logger.Log(LogLevel.Information, $"Reconnecting all {CurrentConnections} connections");
_logger.ReconnectingAllConnections(CurrentConnections);
var tasks = new List<Task>();
foreach (var client in ApiClients.OfType<SocketApiClient>())
{

View File

@ -10,6 +10,7 @@ using System.Threading;
using System.Threading.Tasks;
using CryptoExchange.Net.Converters.JsonNet;
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Logging.Extensions;
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Options;
using CryptoExchange.Net.Requests;
@ -152,9 +153,9 @@ namespace CryptoExchange.Net.Clients
var result = await GetResponseAsync<object>(request.Data, cancellationToken).ConfigureAwait(false);
if (!result)
_logger.Log(LogLevel.Warning, $"[Req {result.RequestId}] {result.ResponseStatusCode} Error received in {result.ResponseTime!.Value.TotalMilliseconds}ms: {result.Error}");
_logger.RestApiErrorReceived(result.RequestId, result.ResponseStatusCode, (long)Math.Floor(result.ResponseTime!.Value.TotalMilliseconds), result.Error?.ToString());
else
_logger.Log(LogLevel.Debug, $"[Req {result.RequestId}] {result.ResponseStatusCode} Response received in {result.ResponseTime!.Value.TotalMilliseconds}ms{(OutputOriginalData ? ": " + result.OriginalData : "")}");
_logger.RestApiResponseReceived(result.RequestId, result.ResponseStatusCode, (long)Math.Floor(result.ResponseTime!.Value.TotalMilliseconds), OutputOriginalData ? result.OriginalData : "[Data only available when OutputOriginal = true]");
if (await ShouldRetryRequestAsync(result, currentTry).ConfigureAwait(false))
continue;
@ -204,9 +205,9 @@ namespace CryptoExchange.Net.Clients
var result = await GetResponseAsync<T>(request.Data, cancellationToken).ConfigureAwait(false);
if (!result)
_logger.Log(LogLevel.Warning, $"[Req {result.RequestId}] {result.ResponseStatusCode} Error received in {result.ResponseTime!.Value.TotalMilliseconds}ms: {result.Error}");
_logger.RestApiErrorReceived(result.RequestId, result.ResponseStatusCode, (long)Math.Floor(result.ResponseTime!.Value.TotalMilliseconds), result.Error?.ToString());
else
_logger.Log(LogLevel.Debug, $"[Req {result.RequestId}] {result.ResponseStatusCode} Response received in {result.ResponseTime!.Value.TotalMilliseconds}ms{(OutputOriginalData ? ": " + result.OriginalData : "")}");
_logger.RestApiResponseReceived(result.RequestId, result.ResponseStatusCode, (long)Math.Floor(result.ResponseTime!.Value.TotalMilliseconds), OutputOriginalData ? result.OriginalData : "[Data only available when OutputOriginal = true]");
if (await ShouldRetryRequestAsync(result, currentTry).ConfigureAwait(false))
continue;
@ -256,7 +257,7 @@ namespace CryptoExchange.Net.Clients
var syncTimeResult = await syncTask.ConfigureAwait(false);
if (!syncTimeResult)
{
_logger.Log(LogLevel.Debug, $"[Req {requestId}] Failed to sync time, aborting request: " + syncTimeResult.Error);
_logger.RestApiFailedToSyncTime(requestId, syncTimeResult.Error!.ToString());
return syncTimeResult.As<IRequest>(default);
}
}
@ -274,11 +275,11 @@ namespace CryptoExchange.Net.Clients
if (signed && AuthenticationProvider == null)
{
_logger.Log(LogLevel.Warning, $"[Req {requestId}] Request {uri.AbsolutePath} failed because no ApiCredentials were provided");
_logger.RestApiNoApiCredentials(requestId, uri.AbsolutePath);
return new CallResult<IRequest>(new NoApiCredentialsError());
}
_logger.Log(LogLevel.Information, $"[Req {requestId}] Creating request for " + uri);
_logger.RestApiCreatingRequest(requestId, uri);
var paramsPosition = parameterPosition ?? ParameterPositions[method];
var request = ConstructRequest(uri, method, parameters?.OrderBy(p => p.Key).ToDictionary(p => p.Key, p => p.Value), signed, paramsPosition, arraySerialization ?? ArraySerialization, requestBodyFormat ?? RequestBodyFormat, requestId, additionalHeaders);
@ -291,7 +292,7 @@ namespace CryptoExchange.Net.Clients
paramString += " with headers " + string.Join(", ", headers.Select(h => h.Key + $"=[{string.Join(",", h.Value)}]"));
TotalRequestsMade++;
_logger.Log(LogLevel.Trace, $"[Req {requestId}] Sending {method}{(signed ? " signed" : "")} request to {request.Uri}{paramString ?? " "}");
_logger.RestApiSendingRequest(requestId, method, signed ? "signed": "", request.Uri, paramString);
return new CallResult<IRequest>(request);
}

View File

@ -1,5 +1,6 @@
using CryptoExchange.Net.Converters.JsonNet;
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Logging.Extensions;
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Options;
using CryptoExchange.Net.Objects.Sockets;
@ -8,10 +9,7 @@ using Microsoft.Extensions.Logging;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net.Sockets;
using System.Net.WebSockets;
using System.Text;
using System.Threading;
@ -214,7 +212,7 @@ namespace CryptoExchange.Net.Clients
var success = socketConnection.AddSubscription(subscription);
if (!success)
{
_logger.Log(LogLevel.Trace, $"[Sckt {socketConnection.SocketId}] failed to add subscription, retrying on different connection");
_logger.FailedToAddSubscriptionRetryOnDifferentConnection(socketConnection.SocketId);
continue;
}
@ -242,7 +240,7 @@ namespace CryptoExchange.Net.Clients
if (socketConnection.PausedActivity)
{
_logger.Log(LogLevel.Warning, $"[Sckt {socketConnection.SocketId}] has been paused, can't subscribe at this moment");
_logger.HasBeenPausedCantSubscribeAtThisMoment(socketConnection.SocketId);
return new CallResult<UpdateSubscription>(new ServerError("Socket is paused"));
}
@ -255,7 +253,7 @@ namespace CryptoExchange.Net.Clients
if (!subResult)
{
waitEvent?.Set();
_logger.Log(LogLevel.Warning, $"[Sckt {socketConnection.SocketId}] failed to subscribe: {subResult.Error}");
_logger.FailedToSubscribe(socketConnection.SocketId, subResult.Error?.ToString());
// If this was a timeout we still need to send an unsubscribe to prevent messages coming in later
var unsubscribe = subResult.Error is CancellationRequestedError;
await socketConnection.CloseAsync(subscription, unsubscribe).ConfigureAwait(false);
@ -270,13 +268,13 @@ namespace CryptoExchange.Net.Clients
{
subscription.CancellationTokenRegistration = ct.Register(async () =>
{
_logger.Log(LogLevel.Information, $"[Sckt {socketConnection.SocketId}] Cancellation token set, closing subscription {subscription.Id}");
_logger.CancellationTokenSetClosingSubscription(socketConnection.SocketId, subscription.Id);
await socketConnection.CloseAsync(subscription).ConfigureAwait(false);
}, false);
}
waitEvent?.Set();
_logger.Log(LogLevel.Information, $"[Sckt {socketConnection.SocketId}] subscription {subscription.Id} completed successfully");
_logger.SubscriptionCompletedSuccessfully(socketConnection.SocketId, subscription.Id);
return new CallResult<UpdateSubscription>(new UpdateSubscription(socketConnection, subscription));
}
@ -333,7 +331,7 @@ namespace CryptoExchange.Net.Clients
if (socketConnection.PausedActivity)
{
_logger.Log(LogLevel.Warning, $"[Sckt {socketConnection.SocketId}] has been paused, can't send query at this moment");
_logger.HasBeenPausedCantSendQueryAtThisMoment(socketConnection.SocketId);
return new CallResult<T>(new ServerError("Socket is paused"));
}
@ -374,7 +372,7 @@ namespace CryptoExchange.Net.Clients
if (AuthenticationProvider == null)
return new CallResult<bool>(new NoApiCredentialsError());
_logger.Log(LogLevel.Debug, $"[Sckt {socket.SocketId}] Attempting to authenticate");
_logger.AttemptingToAuthenticate(socket.SocketId);
var authRequest = GetAuthenticationRequest();
if (authRequest != null)
{
@ -382,7 +380,7 @@ namespace CryptoExchange.Net.Clients
if (!result)
{
_logger.Log(LogLevel.Warning, $"[Sckt {socket.SocketId}] authentication failed");
_logger.AuthenticationFailed(socket.SocketId);
if (socket.Connected)
await socket.CloseAsync().ConfigureAwait(false);
@ -391,7 +389,7 @@ namespace CryptoExchange.Net.Clients
}
}
_logger.Log(LogLevel.Debug, $"[Sckt {socket.SocketId}] authenticated");
_logger.Authenticated(socket.SocketId);
socket.Authenticated = true;
return new CallResult<bool>(true);
}
@ -467,12 +465,12 @@ namespace CryptoExchange.Net.Clients
var connectionAddress = await GetConnectionUrlAsync(address, authenticated).ConfigureAwait(false);
if (!connectionAddress)
{
_logger.Log(LogLevel.Warning, $"Failed to determine connection url: " + connectionAddress.Error);
_logger.FailedToDetermineConnectionUrl(connectionAddress.Error?.ToString());
return connectionAddress.As<SocketConnection>(null);
}
if (connectionAddress.Data != address)
_logger.Log(LogLevel.Debug, $"Connection address set to " + connectionAddress.Data);
_logger.ConnectionAddressSetTo(connectionAddress.Data!);
// Create new socket
var socket = CreateSocket(connectionAddress.Data!);
@ -536,7 +534,7 @@ namespace CryptoExchange.Net.Clients
protected virtual IWebsocket CreateSocket(string address)
{
var socket = SocketFactory.CreateWebsocket(_logger, GetWebSocketParameters(address));
_logger.Log(LogLevel.Debug, $"[Sckt {socket.Id}] created for " + address);
_logger.SocketCreatedForAddress(socket.Id, address);
return socket;
}
@ -562,7 +560,7 @@ namespace CryptoExchange.Net.Clients
if (subscription == null || connection == null)
return false;
_logger.Log(LogLevel.Information, $"[Sckt {connection.SocketId}] unsubscribing subscription " + subscriptionId);
_logger.UnsubscribingSubscription(connection.SocketId, subscriptionId);
await connection.CloseAsync(subscription).ConfigureAwait(false);
return true;
}
@ -577,7 +575,7 @@ namespace CryptoExchange.Net.Clients
if (subscription == null)
throw new ArgumentNullException(nameof(subscription));
_logger.Log(LogLevel.Information, $"[Sckt {subscription.SocketId}] Unsubscribing subscription " + subscription.Id);
_logger.UnsubscribingSubscription(subscription.SocketId, subscription.Id);
await subscription.CloseAsync().ConfigureAwait(false);
}
@ -591,7 +589,7 @@ namespace CryptoExchange.Net.Clients
if (sum == 0)
return;
_logger.Log(LogLevel.Information, $"Unsubscribing all {socketConnections.Sum(s => s.Value.UserSubscriptionCount)} subscriptions");
_logger.UnsubscribingAll(socketConnections.Sum(s => s.Value.UserSubscriptionCount));
var tasks = new List<Task>();
{
var socketList = socketConnections.Values;
@ -608,7 +606,7 @@ namespace CryptoExchange.Net.Clients
/// <returns></returns>
public virtual async Task ReconnectAsync()
{
_logger.Log(LogLevel.Information, $"Reconnecting all {socketConnections.Count} connections");
_logger.ReconnectingAllConnections(socketConnections.Count);
var tasks = new List<Task>();
{
var socketList = socketConnections.Values;
@ -660,7 +658,7 @@ namespace CryptoExchange.Net.Clients
_disposing = true;
if (socketConnections.Sum(s => s.Value.UserSubscriptionCount) > 0)
{
_logger.Log(LogLevel.Debug, "Disposing socket client, closing all subscriptions");
_logger.DisposingSocketClient();
_ = UnsubscribeAllAsync();
}
semaphoreSlim?.Dispose();

View File

@ -0,0 +1,348 @@
using Microsoft.Extensions.Logging;
using System;
namespace CryptoExchange.Net.Logging.Extensions
{
internal static class CryptoExchangeWebSocketClientLoggingExtension
{
private static readonly Action<ILogger, int, Exception?> _connecting;
private static readonly Action<ILogger, int, string, Exception?> _connectionFailed;
private static readonly Action<ILogger, int, Uri, Exception?> _connected;
private static readonly Action<ILogger, int, Exception?> _startingProcessing;
private static readonly Action<ILogger, int, Exception?> _finishedProcessing;
private static readonly Action<ILogger, int, Exception?> _attemptReconnect;
private static readonly Action<ILogger, int, Uri, Exception?> _setReconnectUri;
private static readonly Action<ILogger, int, int, int, Exception?> _addingBytesToSendBuffer;
private static readonly Action<ILogger, int, Exception?> _reconnectRequested;
private static readonly Action<ILogger, int, Exception?> _closeAsyncWaitingForExistingCloseTask;
private static readonly Action<ILogger, int, Exception?> _closeAsyncSocketNotOpen;
private static readonly Action<ILogger, int, Exception?> _closing;
private static readonly Action<ILogger, int, Exception?> _closed;
private static readonly Action<ILogger, int, Exception?> _disposing;
private static readonly Action<ILogger, int, Exception?> _disposed;
private static readonly Action<ILogger, int, int, int, Exception?> _sendDelayedBecauseOfRateLimit;
private static readonly Action<ILogger, int, int, int, Exception?> _sentBytes;
private static readonly Action<ILogger, int, string, Exception?> _sendLoopStoppedWithException;
private static readonly Action<ILogger, int, Exception?> _sendLoopFinished;
private static readonly Action<ILogger, int, string, string ,Exception?> _receivedCloseMessage;
private static readonly Action<ILogger, int, int, Exception?> _receivedPartialMessage;
private static readonly Action<ILogger, int, int, Exception?> _receivedSingleMessage;
private static readonly Action<ILogger, int, long, Exception?> _reassembledMessage;
private static readonly Action<ILogger, int, long, Exception?> _discardIncompleteMessage;
private static readonly Action<ILogger, int, Exception?> _receiveLoopStoppedWithException;
private static readonly Action<ILogger, int, Exception?> _receiveLoopFinished;
private static readonly Action<ILogger, int, TimeSpan?, Exception?> _startingTaskForNoDataReceivedCheck;
private static readonly Action<ILogger, int, TimeSpan?, Exception?> _noDataReceiveTimoutReconnect;
static CryptoExchangeWebSocketClientLoggingExtension()
{
_connecting = LoggerMessage.Define<int>(
LogLevel.Debug,
new EventId(1000, "Connecting"),
"[Sckt {SocketId}] connecting");
_connectionFailed = LoggerMessage.Define<int, string>(
LogLevel.Error,
new EventId(1001, "ConnectionFailed"),
"[Sckt {SocketId}] connection failed: {ErrorMessage}");
_connected = LoggerMessage.Define<int, Uri?>(
LogLevel.Debug,
new EventId(1002, "Connected"),
"[Sckt {SocketId}] connected to {Uri}");
_startingProcessing = LoggerMessage.Define<int>(
LogLevel.Debug,
new EventId(1003, "StartingProcessing"),
"[Sckt {SocketId}] starting processing tasks");
_finishedProcessing = LoggerMessage.Define<int>(
LogLevel.Debug,
new EventId(1004, "FinishedProcessing"),
"[Sckt {SocketId}] processing tasks finished");
_attemptReconnect = LoggerMessage.Define<int>(
LogLevel.Debug,
new EventId(1005, "AttemptReconnect"),
"[Sckt {SocketId}] attempting to reconnect");
_setReconnectUri = LoggerMessage.Define<int, Uri>(
LogLevel.Debug,
new EventId(1006, "SetReconnectUri"),
"[Sckt {SocketId}] reconnect URI set to {ReconnectUri}");
_addingBytesToSendBuffer = LoggerMessage.Define<int, int, int>(
LogLevel.Trace,
new EventId(1007, "AddingBytesToSendBuffer"),
"[Sckt {SocketId}] msg {RequestId} - Adding {NumBytes} bytes to send buffer");
_reconnectRequested = LoggerMessage.Define<int>(
LogLevel.Debug,
new EventId(1008, "ReconnectRequested"),
"[Sckt {SocketId}] reconnect requested");
_closeAsyncWaitingForExistingCloseTask = LoggerMessage.Define<int>(
LogLevel.Debug,
new EventId(1009, "CloseAsyncWaitForExistingCloseTask"),
"[Sckt {SocketId}] CloseAsync() waiting for existing close task");
_closeAsyncSocketNotOpen = LoggerMessage.Define<int>(
LogLevel.Debug,
new EventId(1010, "CloseAsyncSocketNotOpen"),
"[Sckt {SocketId}] CloseAsync() socket not open");
_closing = LoggerMessage.Define<int>(
LogLevel.Debug,
new EventId(1011, "Closing"),
"[Sckt {SocketId}] closing");
_closed = LoggerMessage.Define<int>(
LogLevel.Debug,
new EventId(1012, "Closed"),
"[Sckt {SocketId}] closed");
_disposing = LoggerMessage.Define<int>(
LogLevel.Debug,
new EventId(1013, "Disposing"),
"[Sckt {SocketId}] disposing");
_disposed = LoggerMessage.Define<int>(
LogLevel.Trace,
new EventId(1014, "Disposed"),
"[Sckt {SocketId}] disposed");
_sendDelayedBecauseOfRateLimit = LoggerMessage.Define<int, int, int>(
LogLevel.Debug,
new EventId(1015, "SendDelayedBecauseOfRateLimit"),
"[Sckt {SocketId}] msg {RequestId} - send delayed {DelayMS}ms because of rate limit");
_sentBytes = LoggerMessage.Define<int, int, int>(
LogLevel.Trace,
new EventId(1016, "SentBytes"),
"[Sckt {SocketId}] msg {RequestId} - sent {NumBytes} bytes");
_sendLoopStoppedWithException = LoggerMessage.Define<int, string>(
LogLevel.Warning,
new EventId(1017, "SendLoopStoppedWithException"),
"[Sckt {SocketId}] send loop stopped with exception: {ErrorMessage}");
_sendLoopFinished = LoggerMessage.Define<int>(
LogLevel.Debug,
new EventId(1018, "SendLoopFinished"),
"[Sckt {SocketId}] send loop finished");
_receivedCloseMessage = LoggerMessage.Define<int, string, string>(
LogLevel.Debug,
new EventId(1019, "ReceivedCloseMessage"),
"[Sckt {SocketId}] received `Close` message, CloseStatus: {CloseStatus}, CloseStatusDescription: {CloseStatusDescription}");
_receivedPartialMessage = LoggerMessage.Define<int, int>(
LogLevel.Trace,
new EventId(1020, "ReceivedPartialMessage"),
"[Sckt {SocketId}] received {NumBytes} bytes in partial message");
_receivedSingleMessage = LoggerMessage.Define<int, int>(
LogLevel.Trace,
new EventId(1021, "ReceivedSingleMessage"),
"[Sckt {SocketId}] received {NumBytes} bytes in single message");
_reassembledMessage = LoggerMessage.Define<int, long>(
LogLevel.Trace,
new EventId(1022, "ReassembledMessage"),
"[Sckt {SocketId}] reassembled message of {NumBytes} bytes");
_discardIncompleteMessage = LoggerMessage.Define<int, long>(
LogLevel.Trace,
new EventId(1023, "DiscardIncompleteMessage"),
"[Sckt {SocketId}] discarding incomplete message of {NumBytes} bytes");
_receiveLoopStoppedWithException = LoggerMessage.Define<int>(
LogLevel.Warning,
new EventId(1024, "ReceiveLoopStoppedWithException"),
"[Sckt {SocketId}] receive loop stopped with exception");
_receiveLoopFinished = LoggerMessage.Define<int>(
LogLevel.Debug,
new EventId(1025, "ReceiveLoopFinished"),
"[Sckt {SocketId}] receive loop finished");
_startingTaskForNoDataReceivedCheck = LoggerMessage.Define<int, TimeSpan?>(
LogLevel.Debug,
new EventId(1026, "StartingTaskForNoDataReceivedCheck"),
"[Sckt {SocketId}] starting task checking for no data received for {Timeout}");
_noDataReceiveTimoutReconnect = LoggerMessage.Define<int, TimeSpan?>(
LogLevel.Debug,
new EventId(1027, "NoDataReceiveTimeoutReconnect"),
"[Sckt {SocketId}] no data received for {Timeout}, reconnecting socket");
}
public static void SocketConnecting(
this ILogger logger, int socketId)
{
_connecting(logger, socketId, null);
}
public static void SocketConnectionFailed(
this ILogger logger, int socketId, string message, Exception e)
{
_connectionFailed(logger, socketId, message, e);
}
public static void SocketConnected(
this ILogger logger, int socketId, Uri uri)
{
_connected(logger, socketId, uri, null);
}
public static void SocketStartingProcessing(
this ILogger logger, int socketId)
{
_startingProcessing(logger, socketId, null);
}
public static void SocketFinishedProcessing(
this ILogger logger, int socketId)
{
_finishedProcessing(logger, socketId, null);
}
public static void SocketAttemptReconnect(
this ILogger logger, int socketId)
{
_attemptReconnect(logger, socketId, null);
}
public static void SocketSetReconnectUri(
this ILogger logger, int socketId, Uri uri)
{
_setReconnectUri(logger, socketId, uri, null);
}
public static void SocketAddingBytesToSendBuffer(
this ILogger logger, int socketId, int requestId, byte[] bytes)
{
_addingBytesToSendBuffer(logger, socketId, requestId, bytes.Length, null);
}
public static void SocketReconnectRequested(
this ILogger logger, int socketId)
{
_reconnectRequested(logger, socketId, null);
}
public static void SocketCloseAsyncWaitingForExistingCloseTask(
this ILogger logger, int socketId)
{
_closeAsyncWaitingForExistingCloseTask(logger, socketId, null);
}
public static void SocketCloseAsyncSocketNotOpen(
this ILogger logger, int socketId)
{
_closeAsyncSocketNotOpen(logger, socketId, null);
}
public static void SocketClosing(
this ILogger logger, int socketId)
{
_closing(logger, socketId, null);
}
public static void SocketClosed(
this ILogger logger, int socketId)
{
_closed(logger, socketId, null);
}
public static void SocketDisposing(
this ILogger logger, int socketId)
{
_disposing(logger, socketId, null);
}
public static void SocketDisposed(
this ILogger logger, int socketId)
{
_disposed(logger, socketId, null);
}
public static void SocketSendDelayedBecauseOfRateLimit(
this ILogger logger, int socketId, int requestId, int delay)
{
_sendDelayedBecauseOfRateLimit(logger, socketId, requestId, delay, null);
}
public static void SocketSentBytes(
this ILogger logger, int socketId, int requestId, int numBytes)
{
_sentBytes(logger, socketId, requestId, numBytes, null);
}
public static void SocketSendLoopStoppedWithException(
this ILogger logger, int socketId, string message, Exception e)
{
_sendLoopStoppedWithException(logger, socketId, message, e);
}
public static void SocketSendLoopFinished(
this ILogger logger, int socketId)
{
_sendLoopFinished(logger, socketId, null);
}
public static void SocketReceivedCloseMessage(
this ILogger logger, int socketId, string webSocketCloseStatus, string closeStatusDescription)
{
_receivedCloseMessage(logger, socketId, webSocketCloseStatus, closeStatusDescription, null);
}
public static void SocketReceivedPartialMessage(
this ILogger logger, int socketId, int countBytes)
{
_receivedPartialMessage(logger, socketId, countBytes, null);
}
public static void SocketReceivedSingleMessage(
this ILogger logger, int socketId, int countBytes)
{
_receivedSingleMessage(logger, socketId, countBytes, null);
}
public static void SocketReassembledMessage(
this ILogger logger, int socketId, long countBytes)
{
_reassembledMessage(logger, socketId, countBytes, null);
}
public static void SocketDiscardIncompleteMessage(
this ILogger logger, int socketId, long countBytes)
{
_discardIncompleteMessage(logger, socketId, countBytes, null);
}
public static void SocketReceiveLoopStoppedWithException(
this ILogger logger, int socketId, Exception e)
{
_receiveLoopStoppedWithException(logger, socketId, e);
}
public static void SocketReceiveLoopFinished(
this ILogger logger, int socketId)
{
_receiveLoopFinished(logger, socketId, null);
}
public static void SocketStartingTaskForNoDataReceivedCheck(
this ILogger logger, int socketId, TimeSpan? timeSpan)
{
_startingTaskForNoDataReceivedCheck(logger, socketId, timeSpan, null);
}
public static void SocketNoDataReceiveTimoutReconnect(
this ILogger logger, int socketId, TimeSpan? timeSpan)
{
_noDataReceiveTimoutReconnect(logger, socketId, timeSpan, null);
}
}
}

View File

@ -0,0 +1,81 @@
using Microsoft.Extensions.Logging;
using System;
using System.Net;
using System.Net.Http;
namespace CryptoExchange.Net.Logging.Extensions
{
internal static class RestApiClientLoggingExtensions
{
private static readonly Action<ILogger, int?, HttpStatusCode?, long, string?, Exception?> _restApiErrorReceived;
private static readonly Action<ILogger, int?, HttpStatusCode?, long, string?, Exception?> _restApiResponseReceived;
private static readonly Action<ILogger, int, string, Exception?> _restApiFailedToSyncTime;
private static readonly Action<ILogger, int, string, Exception?> _restApiNoApiCredentials;
private static readonly Action<ILogger, int, Uri, Exception?> _restApiCreatingRequest;
private static readonly Action<ILogger, int, HttpMethod, string, Uri, string, Exception?> _restApiSendingRequest;
static RestApiClientLoggingExtensions()
{
_restApiErrorReceived = LoggerMessage.Define<int?, HttpStatusCode?, long, string?>(
LogLevel.Warning,
new EventId(4000, "RestApiErrorReceived"),
"[Req {RequestId}] {ResponseStatusCode} Error received in {ResponseTime}ms: {ErrorMessage}");
_restApiResponseReceived = LoggerMessage.Define<int?, HttpStatusCode?, long, string?>(
LogLevel.Debug,
new EventId(4001, "RestApiResponseReceived"),
"[Req {RequestId}] {ResponseStatusCode} Response received in {ResponseTime}ms: {OriginalData}");
_restApiFailedToSyncTime = LoggerMessage.Define<int, string>(
LogLevel.Debug,
new EventId(4002, "RestApifailedToSyncTime"),
"[Req {RequestId}] Failed to sync time, aborting request: {ErrorMessage}");
_restApiNoApiCredentials = LoggerMessage.Define<int, string>(
LogLevel.Warning,
new EventId(4003, "RestApiNoApiCredentials"),
"[Req {RequestId}] Request {RestApiUri} failed because no ApiCredentials were provided");
_restApiCreatingRequest = LoggerMessage.Define<int, Uri>(
LogLevel.Information,
new EventId(4004, "RestApiCreatingRequest"),
"[Req {RequestId}] Creating request for {RestApiUri}");
_restApiSendingRequest = LoggerMessage.Define<int, HttpMethod, string, Uri, string>(
LogLevel.Trace,
new EventId(4005, "RestApiSendingRequest"),
"[Req {RequestId}] Sending {Method}{Signed} request to {RestApiUri}{Query}");
}
public static void RestApiErrorReceived(this ILogger logger, int? requestId, HttpStatusCode? responseStatusCode, long responseTime, string? error)
{
_restApiErrorReceived(logger, requestId, responseStatusCode, responseTime, error, null);
}
public static void RestApiResponseReceived(this ILogger logger, int? requestId, HttpStatusCode? responseStatusCode, long responseTime, string? originalData)
{
_restApiResponseReceived(logger, requestId, responseStatusCode, responseTime, originalData, null);
}
public static void RestApiFailedToSyncTime(this ILogger logger, int requestId, string error)
{
_restApiFailedToSyncTime(logger, requestId, error, null);
}
public static void RestApiNoApiCredentials(this ILogger logger, int requestId, string uri)
{
_restApiNoApiCredentials(logger, requestId, uri, null);
}
public static void RestApiCreatingRequest(this ILogger logger, int requestId, Uri uri)
{
_restApiCreatingRequest(logger, requestId, uri, null);
}
public static void RestApiSendingRequest(this ILogger logger, int requestId, HttpMethod method, string signed, Uri uri, string paramString)
{
_restApiSendingRequest(logger, requestId, method, signed, uri, paramString, null);
}
}
}

View File

@ -0,0 +1,188 @@
using Microsoft.Extensions.Logging;
using System;
namespace CryptoExchange.Net.Logging.Extensions
{
internal static class SocketApiClientLoggingExtension
{
private static readonly Action<ILogger, int, Exception?> _failedToAddSubscriptionRetryOnDifferentConnection;
private static readonly Action<ILogger, int, Exception?> _hasBeenPausedCantSubscribeAtThisMoment;
private static readonly Action<ILogger, int, string?, Exception?> _failedToSubscribe;
private static readonly Action<ILogger, int, int, Exception?> _cancellationTokenSetClosingSubscription;
private static readonly Action<ILogger, int, int, Exception?> _subscriptionCompletedSuccessfully;
private static readonly Action<ILogger, int, Exception?> _hasBeenPausedCantSendQueryAtThisMoment;
private static readonly Action<ILogger, int, Exception?> _attemptingToAuthenticate;
private static readonly Action<ILogger, int, Exception?> _authenticationFailed;
private static readonly Action<ILogger, int, Exception?> _authenticated;
private static readonly Action<ILogger, string?, Exception?> _failedToDetermineConnectionUrl;
private static readonly Action<ILogger, string, Exception?> _connectionAddressSetTo;
private static readonly Action<ILogger, int, string, Exception?> _socketCreatedForAddress;
private static readonly Action<ILogger, int, Exception?> _unsubscribingAll;
private static readonly Action<ILogger, Exception?> _disposingSocketClient;
private static readonly Action<ILogger, int, int, Exception?> _unsubscribingSubscription;
private static readonly Action<ILogger, int, Exception?> _reconnectingAllConnections;
static SocketApiClientLoggingExtension()
{
_failedToAddSubscriptionRetryOnDifferentConnection = LoggerMessage.Define<int>(
LogLevel.Trace,
new EventId(3000, "FailedToAddSubscriptionRetryOnDifferentConnection"),
"[Sckt {SocketId}] failed to add subscription, retrying on different connection");
_hasBeenPausedCantSubscribeAtThisMoment = LoggerMessage.Define<int>(
LogLevel.Warning,
new EventId(3001, "HasBeenPausedCantSubscribeAtThisMoment"),
"[Sckt {SocketId}] has been paused, can't subscribe at this moment");
_failedToSubscribe = LoggerMessage.Define<int, string?>(
LogLevel.Warning,
new EventId(3002, "FailedToSubscribe"),
"[Sckt {SocketId}] failed to subscribe: {ErrorMessage}");
_cancellationTokenSetClosingSubscription = LoggerMessage.Define<int, int>(
LogLevel.Information,
new EventId(3003, "CancellationTokenSetClosingSubscription"),
"[Sckt {SocketId}] Cancellation token set, closing subscription {SubscriptionId}");
_subscriptionCompletedSuccessfully = LoggerMessage.Define<int, int>(
LogLevel.Information,
new EventId(3004, "SubscriptionCompletedSuccessfully"),
"[Sckt {SocketId}] subscription {SubscriptionId} completed successfully");
_hasBeenPausedCantSendQueryAtThisMoment = LoggerMessage.Define<int>(
LogLevel.Warning,
new EventId(3005, "HasBeenPausedCantSendQueryAtThisMoment"),
"[Sckt {SocketId}] has been paused, can't send query at this moment");
_attemptingToAuthenticate = LoggerMessage.Define<int>(
LogLevel.Debug,
new EventId(3006, "AttemptingToAuthenticate"),
"[Sckt {SocketId}] Attempting to authenticate");
_authenticationFailed = LoggerMessage.Define<int>(
LogLevel.Warning,
new EventId(3007, "AuthenticationFailed"),
"[Sckt {SocketId}] authentication failed");
_authenticated = LoggerMessage.Define<int>(
LogLevel.Debug,
new EventId(3008, "Authenticated"),
"[Sckt {SocketId}] authenticated");
_failedToDetermineConnectionUrl = LoggerMessage.Define<string?>(
LogLevel.Warning,
new EventId(3009, "FailedToDetermineConnectionUrl"),
"Failed to determine connection url: {ErrorMessage}");
_connectionAddressSetTo = LoggerMessage.Define<string>(
LogLevel.Debug,
new EventId(3010, "ConnectionAddressSetTo"),
"Connection address set to {ConnectionAddress}");
_socketCreatedForAddress = LoggerMessage.Define<int, string>(
LogLevel.Debug,
new EventId(3011, "SocketCreatedForAddress"),
"[Sckt {SocketId}] created for {Address}");
_unsubscribingAll = LoggerMessage.Define<int>(
LogLevel.Information,
new EventId(3013, "UnsubscribingAll"),
"Unsubscribing all {SubscriptionCount} subscriptions");
_disposingSocketClient = LoggerMessage.Define(
LogLevel.Debug,
new EventId(3015, "DisposingSocketClient"),
"Disposing socket client, closing all subscriptions");
_unsubscribingSubscription = LoggerMessage.Define<int, int>(
LogLevel.Information,
new EventId(3016, "UnsubscribingSubscription"),
"[Sckt {SocketId}] Unsubscribing subscription {SubscriptionId}");
_reconnectingAllConnections = LoggerMessage.Define<int>(
LogLevel.Information,
new EventId(3017, "ReconnectingAll"),
"Reconnecting all {ConnectionCount} connections");
}
public static void FailedToAddSubscriptionRetryOnDifferentConnection(this ILogger logger, int socketId)
{
_failedToAddSubscriptionRetryOnDifferentConnection(logger, socketId, null);
}
public static void HasBeenPausedCantSubscribeAtThisMoment(this ILogger logger, int socketId)
{
_hasBeenPausedCantSubscribeAtThisMoment(logger, socketId, null);
}
public static void FailedToSubscribe(this ILogger logger, int socketId, string? error)
{
_failedToSubscribe(logger, socketId, error, null);
}
public static void CancellationTokenSetClosingSubscription(this ILogger logger, int socketId, int subscriptionId)
{
_cancellationTokenSetClosingSubscription(logger, socketId, subscriptionId, null);
}
public static void SubscriptionCompletedSuccessfully(this ILogger logger, int socketId, int subscriptionId)
{
_subscriptionCompletedSuccessfully(logger, socketId, subscriptionId, null);
}
public static void HasBeenPausedCantSendQueryAtThisMoment(this ILogger logger, int socketId)
{
_hasBeenPausedCantSendQueryAtThisMoment(logger, socketId, null);
}
public static void AttemptingToAuthenticate(this ILogger logger, int socketId)
{
_attemptingToAuthenticate(logger, socketId, null);
}
public static void AuthenticationFailed(this ILogger logger, int socketId)
{
_authenticationFailed(logger, socketId, null);
}
public static void Authenticated(this ILogger logger, int socketId)
{
_authenticated(logger, socketId, null);
}
public static void FailedToDetermineConnectionUrl(this ILogger logger, string? error)
{
_failedToDetermineConnectionUrl(logger, error, null);
}
public static void ConnectionAddressSetTo(this ILogger logger, string connectionAddress)
{
_connectionAddressSetTo(logger, connectionAddress, null);
}
public static void SocketCreatedForAddress(this ILogger logger, int socketId, string address)
{
_socketCreatedForAddress(logger, socketId, address, null);
}
public static void UnsubscribingAll(this ILogger logger, int subscriptionCount)
{
_unsubscribingAll(logger, subscriptionCount, null);
}
public static void DisposingSocketClient(this ILogger logger)
{
_disposingSocketClient(logger, null);
}
public static void UnsubscribingSubscription(this ILogger logger, int socketId, int subscriptionId)
{
_unsubscribingSubscription(logger, socketId, subscriptionId, null);
}
public static void ReconnectingAllConnections(this ILogger logger, int connectionCount)
{
_reconnectingAllConnections(logger, connectionCount, null);
}
}
}

View File

@ -0,0 +1,325 @@
using System;
using System.Net.WebSockets;
using CryptoExchange.Net.Objects;
using Microsoft.Extensions.Logging;
namespace CryptoExchange.Net.Logging.Extensions
{
internal static class SocketConnectionLoggingExtension
{
private static readonly Action<ILogger, int, bool, Exception?> _activityPaused;
private static readonly Action<ILogger, int, Sockets.SocketConnection.SocketStatus, Sockets.SocketConnection.SocketStatus, Exception?> _socketStatusChanged;
private static readonly Action<ILogger, int, string?, Exception?> _failedReconnectProcessing;
private static readonly Action<ILogger, int, Exception?> _unkownExceptionWhileProcessingReconnection;
private static readonly Action<ILogger, int, WebSocketError, string?, Exception?> _webSocketErrorCodeAndDetails;
private static readonly Action<ILogger, int, string?, Exception?> _webSocketError;
private static readonly Action<ILogger, int, int, Exception?> _messageSentNotPending;
private static readonly Action<ILogger, int, string, Exception?> _receivedData;
private static readonly Action<ILogger, int, string, Exception?> _failedToEvaluateMessage;
private static readonly Action<ILogger, int, Exception?> _errorProcessingMessage;
private static readonly Action<ILogger, int, int, string, Exception?> _processorMatched;
private static readonly Action<ILogger, int, int, Exception?> _receivedMessageNotRecognized;
private static readonly Action<ILogger, int, string?, Exception?> _failedToDeserializeMessage;
private static readonly Action<ILogger, int, string, Exception?> _userMessageProcessingFailed;
private static readonly Action<ILogger, int, long, long, Exception?> _messageProcessed;
private static readonly Action<ILogger, int, int, Exception?> _closingSubscription;
private static readonly Action<ILogger, int, Exception?> _notUnsubscribingSubscriptionBecauseDuplicateRunning;
private static readonly Action<ILogger, int, Exception?> _alreadyClosing;
private static readonly Action<ILogger, int, Exception?> _closingNoMoreSubscriptions;
private static readonly Action<ILogger, int, int, int, Exception?> _addingNewSubscription;
private static readonly Action<ILogger, int, Exception?> _nothingToResubscribeCloseConnection;
private static readonly Action<ILogger, int, Exception?> _failedAuthenticationDisconnectAndRecoonect;
private static readonly Action<ILogger, int, Exception?> _authenticationSucceeded;
private static readonly Action<ILogger, int, string?, Exception?> _failedRequestRevitalization;
private static readonly Action<ILogger, int, Exception?> _allSubscriptionResubscribed;
private static readonly Action<ILogger, int, int, Exception?> _subscriptionUnsubscribed;
private static readonly Action<ILogger, int, string, Exception?> _sendingPeriodic;
private static readonly Action<ILogger, int, string, string, Exception?> _periodicSendFailed;
private static readonly Action<ILogger, int, int, string, Exception?> _sendingData;
private static readonly Action<ILogger, int, string, string, Exception?> _receivedMessageNotMatchedToAnyListener;
static SocketConnectionLoggingExtension()
{
_activityPaused = LoggerMessage.Define<int, bool>(
LogLevel.Information,
new EventId(2000, "ActivityPaused"),
"[Sckt {SocketId}] paused activity: {Paused}");
_socketStatusChanged = LoggerMessage.Define<int, Sockets.SocketConnection.SocketStatus, Sockets.SocketConnection.SocketStatus>(
LogLevel.Debug,
new EventId(2001, "SocketStatusChanged"),
"[Sckt {SocketId}] status changed from {OldStatus} to {NewStatus}");
_failedReconnectProcessing = LoggerMessage.Define<int, string?>(
LogLevel.Warning,
new EventId(2002, "FailedReconnectProcessing"),
"[Sckt {SocketId}] failed reconnect processing: {ErrorMessage}, reconnecting again");
_unkownExceptionWhileProcessingReconnection = LoggerMessage.Define<int>(
LogLevel.Warning,
new EventId(2003, "UnkownExceptionWhileProcessingReconnection"),
"[Sckt {SocketId}] Unknown exception while processing reconnection, reconnecting again");
_webSocketErrorCodeAndDetails = LoggerMessage.Define<int, WebSocketError, string?>(
LogLevel.Warning,
new EventId(2004, "WebSocketErrorCode"),
"[Sckt {SocketId}] error: Websocket error code {WebSocketErrorCdoe}, details: {Details}");
_webSocketError = LoggerMessage.Define<int, string?>(
LogLevel.Warning,
new EventId(2005, "WebSocketError"),
"[Sckt {SocketId}] error: {ErrorMessage}");
_messageSentNotPending = LoggerMessage.Define<int, int>(
LogLevel.Debug,
new EventId(2006, "MessageSentNotPending"),
"[Sckt {SocketId}] msg {RequestId} - message sent, but not pending");
_receivedData = LoggerMessage.Define<int, string>(
LogLevel.Trace,
new EventId(2007, "ReceivedData"),
"[Sckt {SocketId}] received {OriginalData}");
_failedToEvaluateMessage = LoggerMessage.Define<int, string>(
LogLevel.Warning,
new EventId(2008, "FailedToEvaluateMessage"),
"[Sckt {SocketId}] failed to evaluate message. {OriginalData}");
_errorProcessingMessage = LoggerMessage.Define<int>(
LogLevel.Error,
new EventId(2009, "ErrorProcessingMessage"),
"[Sckt {SocketId}] error processing message");
_processorMatched = LoggerMessage.Define<int, int, string>(
LogLevel.Trace,
new EventId(2010, "ProcessorMatched"),
"[Sckt {SocketId}] {Count} processor(s) matched to message with listener identifier {ListenerId}");
_receivedMessageNotRecognized = LoggerMessage.Define<int, int>(
LogLevel.Warning,
new EventId(2011, "ReceivedMessageNotRecognized"),
"[Sckt {SocketId}] received message not recognized by handler {ProcessorId}");
_failedToDeserializeMessage = LoggerMessage.Define<int, string?>(
LogLevel.Warning,
new EventId(2012, "FailedToDeserializeMessage"),
"[Sckt {SocketId}] deserialization failed: {ErrorMessage}");
_userMessageProcessingFailed = LoggerMessage.Define<int, string>(
LogLevel.Warning,
new EventId(2013, "UserMessageProcessingFailed"),
"[Sckt {SocketId}] user message processing failed: {ErrorMessage}");
_messageProcessed = LoggerMessage.Define<int, long, long>(
LogLevel.Trace,
new EventId(2014, "MessageProcessed"),
"[Sckt {SocketId}] message processed in {ProcessingTime}ms, {ParsingTime}ms parsing");
_closingSubscription = LoggerMessage.Define<int, int>(
LogLevel.Debug,
new EventId(2015, "ClosingSubscription"),
"[Sckt {SocketId}] closing subscription {SubscriptionId}");
_notUnsubscribingSubscriptionBecauseDuplicateRunning = LoggerMessage.Define<int>(
LogLevel.Debug,
new EventId(2016, "NotUnsubscribingSubscription"),
"[Sckt {SocketId}] not unsubscribing subscription as there is still a duplicate subscription running");
_alreadyClosing = LoggerMessage.Define<int>(
LogLevel.Debug,
new EventId(2017, "AlreadyClosing"),
"[Sckt {SocketId}] already closing");
_closingNoMoreSubscriptions = LoggerMessage.Define<int>(
LogLevel.Debug,
new EventId(2018, "ClosingNoMoreSubscriptions"),
"[Sckt {SocketId}] closing as there are no more subscriptions");
_addingNewSubscription = LoggerMessage.Define<int, int, int>(
LogLevel.Debug,
new EventId(2019, "AddingNewSubscription"),
"[Sckt {SocketId}] adding new subscription with id {SubscriptionId}, total subscriptions on connection: {UserSubscriptionCount}");
_nothingToResubscribeCloseConnection = LoggerMessage.Define<int>(
LogLevel.Debug,
new EventId(2020, "NothingToResubscribe"),
"[Sckt {SocketId}] nothing to resubscribe, closing connection");
_failedAuthenticationDisconnectAndRecoonect = LoggerMessage.Define<int>(
LogLevel.Warning,
new EventId(2021, "FailedAuthentication"),
"[Sckt {SocketId}] authentication failed on reconnected socket. Disconnecting and reconnecting");
_authenticationSucceeded = LoggerMessage.Define<int>(
LogLevel.Debug,
new EventId(2022, "AuthenticationSucceeded"),
"[Sckt {SocketId}] authentication succeeded on reconnected socket");
_failedRequestRevitalization = LoggerMessage.Define<int, string?>(
LogLevel.Warning,
new EventId(2023, "FailedRequestRevitalization"),
"[Sckt {SocketId}] failed request revitalization: {ErrorMessage}");
_allSubscriptionResubscribed = LoggerMessage.Define<int>(
LogLevel.Debug,
new EventId(2024, "AllSubscriptionResubscribed"),
"[Sckt {SocketId}] all subscription successfully resubscribed on reconnected socket");
_subscriptionUnsubscribed = LoggerMessage.Define<int, int>(
LogLevel.Information,
new EventId(2025, "SubscriptionUnsubscribed"),
"[Sckt {SocketId}] subscription {SubscriptionId} unsubscribed");
_sendingPeriodic = LoggerMessage.Define<int, string>(
LogLevel.Trace,
new EventId(2026, "SendingPeriodic"),
"[Sckt {SocketId}] sending periodic {Identifier}");
_periodicSendFailed = LoggerMessage.Define<int, string, string>(
LogLevel.Warning,
new EventId(2027, "PeriodicSendFailed"),
"[Sckt {SocketId}] Periodic send {Identifier} failed: {ErrorMessage}");
_sendingData = LoggerMessage.Define<int, int, string>(
LogLevel.Trace,
new EventId(2028, "SendingData"),
"[Sckt {SocketId}] msg {RequestId} - sending messsage: {Data}");
_receivedMessageNotMatchedToAnyListener = LoggerMessage.Define<int, string, string>(
LogLevel.Warning,
new EventId(2029, "ReceivedMessageNotMatchedToAnyListener"),
"[Sckt {SocketId}] received message not matched to any listener. ListenId: {ListenId}, current listeners: {ListenIds}");
}
public static void ActivityPaused(this ILogger logger, int socketId, bool paused)
{
_activityPaused(logger, socketId, paused, null);
}
public static void SocketStatusChanged(this ILogger logger, int socketId, Sockets.SocketConnection.SocketStatus oldStatus, Sockets.SocketConnection.SocketStatus newStatus)
{
_socketStatusChanged(logger, socketId, oldStatus, newStatus, null);
}
public static void FailedReconnectProcessing(this ILogger logger, int socketId, string? error)
{
_failedReconnectProcessing(logger, socketId, error, null);
}
public static void UnkownExceptionWhileProcessingReconnection(this ILogger logger, int socketId, Exception e)
{
_unkownExceptionWhileProcessingReconnection(logger, socketId, e);
}
public static void WebSocketErrorCodeAndDetails(this ILogger logger, int socketId, WebSocketError error, string? details, Exception e)
{
_webSocketErrorCodeAndDetails(logger, socketId, error, details, e);
}
public static void WebSocketError(this ILogger logger, int socketId, string? errorMessage, Exception e)
{
_webSocketError(logger, socketId, errorMessage, e);
}
public static void MessageSentNotPending(this ILogger logger, int socketId, int requestId)
{
_messageSentNotPending(logger, socketId, requestId, null);
}
public static void ReceivedData(this ILogger logger, int socketId, string originalData)
{
_receivedData(logger, socketId, originalData, null);
}
public static void FailedToEvaluateMessage(this ILogger logger, int socketId, string originalData)
{
_failedToEvaluateMessage(logger, socketId, originalData, null);
}
public static void ErrorProcessingMessage(this ILogger logger, int socketId, Exception e)
{
_errorProcessingMessage(logger, socketId, e);
}
public static void ProcessorMatched(this ILogger logger, int socketId, int count, string listenerId)
{
_processorMatched(logger, socketId, count, listenerId, null);
}
public static void ReceivedMessageNotRecognized(this ILogger logger, int socketId, int id)
{
_receivedMessageNotRecognized(logger, socketId, id, null);
}
public static void FailedToDeserializeMessage(this ILogger logger, int socketId, string? errorMessage)
{
_failedToDeserializeMessage(logger, socketId, errorMessage, null);
}
public static void UserMessageProcessingFailed(this ILogger logger, int socketId, string errorMessage, Exception e)
{
_userMessageProcessingFailed(logger, socketId, errorMessage, e);
}
public static void MessageProcessed(this ILogger logger, int socketId, long processingTime, long parsingTime)
{
_messageProcessed(logger, socketId, processingTime, parsingTime, null);
}
public static void ClosingSubscription(this ILogger logger, int socketId, int subscriptionId)
{
_closingSubscription(logger, socketId, subscriptionId, null);
}
public static void NotUnsubscribingSubscriptionBecauseDuplicateRunning(this ILogger logger, int socketId)
{
_notUnsubscribingSubscriptionBecauseDuplicateRunning(logger, socketId, null);
}
public static void AlreadyClosing(this ILogger logger, int socketId)
{
_alreadyClosing(logger, socketId, null);
}
public static void ClosingNoMoreSubscriptions(this ILogger logger, int socketId)
{
_closingNoMoreSubscriptions(logger, socketId, null);
}
public static void AddingNewSubscription(this ILogger logger, int socketId, int subscriptionId, int userSubscriptionCount)
{
_addingNewSubscription(logger, socketId, subscriptionId, userSubscriptionCount, null);
}
public static void NothingToResubscribeCloseConnection(this ILogger logger, int socketId)
{
_nothingToResubscribeCloseConnection(logger, socketId, null);
}
public static void FailedAuthenticationDisconnectAndRecoonect(this ILogger logger, int socketId)
{
_failedAuthenticationDisconnectAndRecoonect(logger, socketId, null);
}
public static void AuthenticationSucceeded(this ILogger logger, int socketId)
{
_authenticationSucceeded(logger, socketId, null);
}
public static void FailedRequestRevitalization(this ILogger logger, int socketId, string? errorMessage)
{
_failedRequestRevitalization(logger, socketId, errorMessage, null);
}
public static void AllSubscriptionResubscribed(this ILogger logger, int socketId)
{
_allSubscriptionResubscribed(logger, socketId, null);
}
public static void SubscriptionUnsubscribed(this ILogger logger, int socketId, int subscriptionId)
{
_subscriptionUnsubscribed(logger, socketId, subscriptionId, null);
}
public static void SendingPeriodic(this ILogger logger, int socketId, string identifier)
{
_sendingPeriodic(logger, socketId, identifier, null);
}
public static void PeriodicSendFailed(this ILogger logger, int socketId, string identifier, string errorMessage, Exception e)
{
_periodicSendFailed(logger, socketId, identifier, errorMessage, e);
}
public static void SendingData(this ILogger logger, int socketId, int requestId, string data)
{
_sendingData(logger, socketId, requestId, data, null);
}
public static void ReceivedMessageNotMatchedToAnyListener(this ILogger logger, int socketId, string listenId, string listenIds)
{
_receivedMessageNotMatchedToAnyListener(logger, socketId, listenId, listenIds, null);
}
}
}

View File

@ -0,0 +1,236 @@
using System;
using CryptoExchange.Net.Objects;
using Microsoft.Extensions.Logging;
namespace CryptoExchange.Net.Logging.Extensions
{
internal static class SymbolOrderBookLoggingExtensions
{
private static readonly Action<ILogger, string, string, OrderBookStatus, OrderBookStatus, Exception?> _orderBookStatusChanged;
private static readonly Action<ILogger, string, string, Exception?> _orderBookStarting;
private static readonly Action<ILogger, string, string, Exception?> _orderBookStoppedStarting;
private static readonly Action<ILogger, string, string, Exception?> _orderBookStopping;
private static readonly Action<ILogger, string, string, Exception?> _orderBookStopped;
private static readonly Action<ILogger, string, string, Exception?> _orderBookConnectionLost;
private static readonly Action<ILogger, string, string, Exception?> _orderBookDisconnected;
private static readonly Action<ILogger, string, int, Exception?> _orderBookProcessingBufferedUpdates;
private static readonly Action<ILogger, string, string, long, long, Exception?> _orderBookUpdateSkipped;
private static readonly Action<ILogger, string, string, Exception?> _orderBookOutOfSyncChecksum;
private static readonly Action<ILogger, string, string, Exception?> _orderBookResyncFailed;
private static readonly Action<ILogger, string, string, Exception?> _orderBookResyncing;
private static readonly Action<ILogger, string, string, Exception?> _orderBookResynced;
private static readonly Action<ILogger, string, Exception?> _orderBookMessageSkippedBecauseOfResubscribing;
private static readonly Action<ILogger, string, string, long, long, long, Exception?> _orderBookDataSet;
private static readonly Action<ILogger, string, string, long, long, long, long, Exception?> _orderBookUpdateBuffered;
private static readonly Action<ILogger, string, string, decimal, decimal, Exception?> _orderBookOutOfSyncDetected;
private static readonly Action<ILogger, string, string, Exception?> _orderBookReconnectingSocket;
private static readonly Action<ILogger, string, string, long, long, Exception?> _orderBookSkippedMessage;
private static readonly Action<ILogger, string, string, long, long, Exception?> _orderBookProcessedMessage;
private static readonly Action<ILogger, string, string, long, long, Exception?> _orderBookOutOfSync;
static SymbolOrderBookLoggingExtensions()
{
_orderBookStatusChanged = LoggerMessage.Define<string, string, OrderBookStatus, OrderBookStatus>(
LogLevel.Information,
new EventId(5000, "OrderBookStatusChanged"),
"{Id} order book {Symbol} status changed: {PreviousStatus} => {NewStatus}");
_orderBookStarting = LoggerMessage.Define<string, string>(
LogLevel.Debug,
new EventId(5001, "OrderBookStarting"),
"{Id} order book {Symbol} starting");
_orderBookStoppedStarting = LoggerMessage.Define<string, string>(
LogLevel.Debug,
new EventId(5002, "OrderBookStoppedStarting"),
"{Id} order book {Symbol} stopped while starting");
_orderBookConnectionLost = LoggerMessage.Define<string, string>(
LogLevel.Warning,
new EventId(5003, "OrderBookConnectionLost"),
"{Id} order book {Symbol} connection lost");
_orderBookDisconnected = LoggerMessage.Define<string, string>(
LogLevel.Warning,
new EventId(5004, "OrderBookDisconnected"),
"{Id} order book {Symbol} disconnected");
_orderBookStopping = LoggerMessage.Define<string, string>(
LogLevel.Debug,
new EventId(5005, "OrderBookStopping"),
"{Id} order book {Symbol} stopping");
_orderBookStopped = LoggerMessage.Define<string, string>(
LogLevel.Trace,
new EventId(5006, "OrderBookStopped"),
"{Id} order book {Symbol} stopped");
_orderBookProcessingBufferedUpdates = LoggerMessage.Define<string, int>(
LogLevel.Debug,
new EventId(5007, "OrderBookProcessingBufferedUpdates"),
"{Id} Processing {NumberBufferedUpdated} buffered updates");
_orderBookUpdateSkipped = LoggerMessage.Define<string, string, long, long>(
LogLevel.Debug,
new EventId(5008, "OrderBookUpdateSkipped"),
"{Id} order book {Symbol} update skipped #{SequenceNumber}, currently at #{LastSequenceNumber}");
_orderBookOutOfSync = LoggerMessage.Define<string, string, long, long>(
LogLevel.Warning,
new EventId(5009, "OrderBookOutOfSync"),
"{Id} order book {Symbol} out of sync (expected {ExpectedSequenceNumber}, was {SequenceNumber}), reconnecting");
_orderBookResynced = LoggerMessage.Define<string, string>(
LogLevel.Information,
new EventId(5010, "OrderBookResynced"),
"{Id} order book {Symbol} successfully resynchronized");
_orderBookMessageSkippedBecauseOfResubscribing = LoggerMessage.Define<string>(
LogLevel.Trace,
new EventId(5011, "OrderBookMessageSkippedResubscribing"),
"{Id} Skipping message because of resubscribing");
_orderBookDataSet = LoggerMessage.Define<string, string, long, long, long>(
LogLevel.Debug,
new EventId(5012, "OrderBookDataSet"),
"{Id} order book {Symbol} data set: {BidCount} bids, {AskCount} asks. #{EndUpdateId}");
_orderBookUpdateBuffered = LoggerMessage.Define<string, string, long, long, long, long>(
LogLevel.Trace,
new EventId(5013, "OrderBookUpdateBuffered"),
"{Id} order book {Symbol} update buffered #{StartUpdateId}-#{EndUpdateId} [{AsksCount} asks, {BidsCount} bids]");
_orderBookOutOfSyncDetected = LoggerMessage.Define<string, string, decimal, decimal>(
LogLevel.Warning,
new EventId(5014, "OrderBookOutOfSyncDetected"),
"{Id} order book {Symbol} detected out of sync order book. First ask: {FirstAsk}, first bid: {FirstBid}. Resyncing");
_orderBookReconnectingSocket = LoggerMessage.Define<string, string>(
LogLevel.Warning,
new EventId(5015, "OrderBookReconnectingSocket"),
"{Id} order book {Symbol} out of sync. Reconnecting socket");
_orderBookResyncing = LoggerMessage.Define<string, string>(
LogLevel.Warning,
new EventId(5016, "OrderBookResyncing"),
"{Id} order book {Symbol} out of sync. Resyncing");
_orderBookResyncFailed = LoggerMessage.Define<string, string>(
LogLevel.Warning,
new EventId(5017, "OrderBookResyncFailed"),
"{Id} order book {Symbol} resync failed, reconnecting socket");
_orderBookSkippedMessage = LoggerMessage.Define<string, string, long, long>(
LogLevel.Trace,
new EventId(5018, "OrderBookSkippedMessage"),
"{Id} order book {Symbol} update skipped #{FirstUpdateId}-{LastUpdateId}");
_orderBookProcessedMessage = LoggerMessage.Define<string, string, long, long>(
LogLevel.Trace,
new EventId(5019, "OrderBookProcessedMessage"),
"{Id} order book {Symbol} update processed #{FirstUpdateId}-{LastUpdateId}");
_orderBookOutOfSyncChecksum = LoggerMessage.Define<string, string>(
LogLevel.Warning,
new EventId(5020, "OrderBookOutOfSyncChecksum"),
"{Id} order book {Symbol} out of sync. Checksum mismatch, resyncing");
}
public static void OrderBookStatusChanged(this ILogger logger, string id, string symbol, OrderBookStatus previousStatus, OrderBookStatus newStatus)
{
_orderBookStatusChanged(logger, id, symbol, previousStatus, newStatus, null);
}
public static void OrderBookStarting(this ILogger logger, string id, string symbol)
{
_orderBookStarting(logger, id, symbol, null);
}
public static void OrderBookStoppedStarting(this ILogger logger, string id, string symbol)
{
_orderBookStoppedStarting(logger, id, symbol, null);
}
public static void OrderBookConnectionLost(this ILogger logger, string id, string symbol)
{
_orderBookConnectionLost(logger, id, symbol, null);
}
public static void OrderBookDisconnected(this ILogger logger, string id, string symbol)
{
_orderBookDisconnected(logger, id, symbol, null);
}
public static void OrderBookStopping(this ILogger logger, string id, string symbol)
{
_orderBookStopping(logger, id, symbol, null);
}
public static void OrderBookStopped(this ILogger logger, string id, string symbol)
{
_orderBookStopped(logger, id, symbol, null);
}
public static void OrderBookProcessingBufferedUpdates(this ILogger logger, string id, int numberBufferedUpdated)
{
_orderBookProcessingBufferedUpdates(logger, id, numberBufferedUpdated, null);
}
public static void OrderBookUpdateSkipped(this ILogger logger, string id, string symbol, long sequence, long lastSequenceNumber)
{
_orderBookUpdateSkipped(logger, id, symbol, sequence, lastSequenceNumber, null);
}
public static void OrderBookOutOfSync(this ILogger logger, string id, string symbol, long expectedSequenceNumber, long sequenceNumber)
{
_orderBookOutOfSync(logger, id, symbol, expectedSequenceNumber, sequenceNumber, null);
}
public static void OrderBookResynced(this ILogger logger, string id, string symbol)
{
_orderBookResynced(logger, id, symbol, null);
}
public static void OrderBookMessageSkippedResubscribing(this ILogger logger, string id)
{
_orderBookMessageSkippedBecauseOfResubscribing(logger, id, null);
}
public static void OrderBookDataSet(this ILogger logger, string id, string symbol, long bidCount, long askCount, long endUpdateId)
{
_orderBookDataSet(logger, id, symbol, bidCount, askCount, endUpdateId, null);
}
public static void OrderBookUpdateBuffered(this ILogger logger, string id, string symbol, long startUpdateId, long endUpdateId, long asksCount, long bidsCount)
{
_orderBookUpdateBuffered(logger, id, symbol, startUpdateId, endUpdateId, asksCount, bidsCount, null);
}
public static void OrderBookOutOfSyncDetected(this ILogger logger, string id, string symbol, decimal firstAsk, decimal firstBid)
{
_orderBookOutOfSyncDetected(logger, id, symbol, firstAsk, firstBid, null);
}
public static void OrderBookReconnectingSocket(this ILogger logger, string id, string symbol)
{
_orderBookReconnectingSocket(logger, id, symbol, null);
}
public static void OrderBookResyncing(this ILogger logger, string id, string symbol)
{
_orderBookResyncing(logger, id, symbol, null);
}
public static void OrderBookResyncFailed(this ILogger logger, string id, string symbol)
{
_orderBookResyncFailed(logger, id, symbol, null);
}
public static void OrderBookSkippedMessage(this ILogger logger, string id, string symbol, long firstUpdateId, long lastUpdateId)
{
_orderBookSkippedMessage(logger, id, symbol, firstUpdateId, lastUpdateId, null);
}
public static void OrderBookProcessedMessage(this ILogger logger, string id, string symbol, long firstUpdateId, long lastUpdateId)
{
_orderBookProcessedMessage(logger, id, symbol, firstUpdateId, lastUpdateId, null);
}
public static void OrderBookOutOfSyncChecksum(this ILogger logger, string id, string symbol)
{
_orderBookOutOfSyncChecksum(logger, id, symbol, null);
}
}
}

View File

@ -7,6 +7,7 @@ using System.Text;
using System.Threading;
using System.Threading.Tasks;
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Logging.Extensions;
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Options;
using CryptoExchange.Net.Objects.Sockets;
@ -101,7 +102,7 @@ namespace CryptoExchange.Net.OrderBook
var old = _status;
_status = value;
_logger.Log(LogLevel.Information, $"{Id} order book {Symbol} status changed: {old} => {value}");
_logger.OrderBookStatusChanged(Id, Symbol, old, value);
OnStatusChange?.Invoke(old, _status);
}
}
@ -232,7 +233,7 @@ namespace CryptoExchange.Net.OrderBook
if (Status != OrderBookStatus.Disconnected)
throw new InvalidOperationException($"Can't start book unless state is {OrderBookStatus.Disconnected}. Current state: {Status}");
_logger.Log(LogLevel.Debug, $"{Id} order book {Symbol} starting");
_logger.OrderBookStarting(Id, Symbol);
_cts = new CancellationTokenSource();
ct?.Register(async () =>
{
@ -257,7 +258,7 @@ namespace CryptoExchange.Net.OrderBook
if (_cts.IsCancellationRequested)
{
_logger.Log(LogLevel.Debug, $"{Id} order book {Symbol} stopped while starting");
_logger.OrderBookStoppedStarting(Id, Symbol);
await startResult.Data.CloseAsync().ConfigureAwait(false);
Status = OrderBookStatus.Disconnected;
return new CallResult<bool>(new CancellationRequestedError());
@ -273,15 +274,15 @@ namespace CryptoExchange.Net.OrderBook
}
private void HandleConnectionLost() {
_logger.Log(LogLevel.Warning, $"{Id} order book {Symbol} connection lost");
if (Status != OrderBookStatus.Disposed) {
_logger.OrderBookConnectionLost(Id, Symbol);
if (Status != OrderBookStatus.Disposed) {
Status = OrderBookStatus.Reconnecting;
Reset();
}
}
private void HandleConnectionClosed() {
_logger.Log(LogLevel.Warning, $"{Id} order book {Symbol} disconnected");
_logger.OrderBookDisconnected(Id, Symbol);
Status = OrderBookStatus.Disconnected;
_ = StopAsync();
}
@ -293,7 +294,7 @@ namespace CryptoExchange.Net.OrderBook
/// <inheritdoc/>
public async Task StopAsync()
{
_logger.Log(LogLevel.Debug, $"{Id} order book {Symbol} stopping");
_logger.OrderBookStopping(Id, Symbol);
Status = OrderBookStatus.Disconnected;
_cts?.Cancel();
_queueEvent.Set();
@ -306,7 +307,7 @@ namespace CryptoExchange.Net.OrderBook
_subscription.ConnectionClosed -= HandleConnectionClosed;
_subscription.ConnectionRestored -= HandleConnectionRestored;
}
_logger.Log(LogLevel.Trace, $"{Id} order book {Symbol} stopped");
_logger.OrderBookStopped(Id, Symbol);
}
/// <inheritdoc/>
@ -464,7 +465,7 @@ namespace CryptoExchange.Net.OrderBook
{
var pbList = _processBuffer.ToList();
if (pbList.Count > 0)
_logger.Log(LogLevel.Debug, $"{Id} Processing {pbList.Count} buffered updates");
_logger.OrderBookProcessingBufferedUpdates(Id, pbList.Count);
foreach (var bufferEntry in pbList)
{
@ -483,14 +484,14 @@ namespace CryptoExchange.Net.OrderBook
{
if (sequence <= LastSequenceNumber)
{
_logger.Log(LogLevel.Debug, $"{Id} order book {Symbol} update skipped #{sequence}, currently at #{LastSequenceNumber}");
_logger.OrderBookSkippedMessage(Id, Symbol, sequence, LastSequenceNumber);
return false;
}
if (_sequencesAreConsecutive && sequence > LastSequenceNumber + 1)
{
// Out of sync
_logger.Log(LogLevel.Warning, $"{Id} order book {Symbol} out of sync (expected { LastSequenceNumber + 1}, was {sequence}), reconnecting");
_logger.OrderBookOutOfSync(Id, Symbol, LastSequenceNumber + 1, sequence);
_stopProcessing = true;
Resubscribe();
return false;
@ -644,7 +645,7 @@ namespace CryptoExchange.Net.OrderBook
success = resyncResult;
}
_logger.Log(LogLevel.Information, $"{Id} order book {Symbol} successfully resynchronized");
_logger.OrderBookResynced(Id, Symbol);
Status = OrderBookStatus.Synced;
}
@ -661,7 +662,7 @@ namespace CryptoExchange.Net.OrderBook
if (_stopProcessing)
{
_logger.Log(LogLevel.Trace, $"{Id} Skipping message because of resubscribing");
_logger.OrderBookMessageSkippedResubscribing(Id);
continue;
}
@ -693,7 +694,7 @@ namespace CryptoExchange.Net.OrderBook
BidCount = _bids.Count;
UpdateTime = DateTime.UtcNow;
_logger.Log(LogLevel.Debug, $"{Id} order book {Symbol} data set: {BidCount} bids, {AskCount} asks. #{item.EndUpdateId}");
_logger.OrderBookDataSet(Id, Symbol, BidCount, AskCount, item.EndUpdateId);
CheckProcessBuffer();
OnOrderBookUpdate?.Invoke((item.Bids, item.Asks));
OnBestOffersChanged?.Invoke((BestBid, BestAsk));
@ -713,7 +714,7 @@ namespace CryptoExchange.Net.OrderBook
FirstUpdateId = item.StartUpdateId,
LastUpdateId = item.EndUpdateId,
});
_logger.Log(LogLevel.Trace, $"{Id} order book {Symbol} update buffered #{item.StartUpdateId}-#{item.EndUpdateId} [{item.Asks.Count()} asks, {item.Bids.Count()} bids]");
_logger.OrderBookUpdateBuffered(Id, Symbol, item.StartUpdateId, item.EndUpdateId, item.Asks.Count(), item.Bids.Count());
}
else
{
@ -726,7 +727,7 @@ namespace CryptoExchange.Net.OrderBook
if (_asks.First().Key < _bids.First().Key)
{
_logger.Log(LogLevel.Warning, $"{Id} order book {Symbol} detected out of sync order book. First ask: {_asks.First().Key}, first bid: {_bids.First().Key}. Resyncing");
_logger.OrderBookOutOfSyncDetected(Id, Symbol, _asks.First().Key, _bids.First().Key);
_stopProcessing = true;
Resubscribe();
return;
@ -760,7 +761,7 @@ namespace CryptoExchange.Net.OrderBook
if (!checksumResult)
{
_logger.Log(LogLevel.Warning, $"{Id} order book {Symbol} out of sync. Resyncing");
_logger.OrderBookOutOfSyncChecksum(Id, Symbol);
_stopProcessing = true;
Resubscribe();
}
@ -784,7 +785,7 @@ namespace CryptoExchange.Net.OrderBook
if (!await _subscription!.ResubscribeAsync().ConfigureAwait(false))
{
// Resubscribing failed, reconnect the socket
_logger.Log(LogLevel.Warning, $"{Id} order book {Symbol} resync failed, reconnecting socket");
_logger.OrderBookResyncFailed(Id, Symbol);
Status = OrderBookStatus.Reconnecting;
_ = _subscription!.ReconnectAsync();
}
@ -799,7 +800,7 @@ namespace CryptoExchange.Net.OrderBook
{
if (lastUpdateId <= LastSequenceNumber)
{
_logger.Log(LogLevel.Trace, $"{Id} order book {Symbol} update skipped #{firstUpdateId}-{lastUpdateId}");
_logger.OrderBookUpdateSkipped(Id, Symbol, firstUpdateId, lastUpdateId);
return;
}
@ -825,7 +826,7 @@ namespace CryptoExchange.Net.OrderBook
}
LastSequenceNumber = lastUpdateId;
_logger.Log(LogLevel.Trace, $"{Id} order book {Symbol} update processed #{firstUpdateId}-{lastUpdateId}");
_logger.OrderBookProcessedMessage(Id, Symbol, firstUpdateId, lastUpdateId);
}
}

View File

@ -1,4 +1,5 @@
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Logging.Extensions;
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Sockets;
using Microsoft.Extensions.Logging;
@ -189,7 +190,7 @@ namespace CryptoExchange.Net.Sockets
private async Task<bool> ConnectInternalAsync()
{
_logger.Log(LogLevel.Debug, $"[Sckt {Id}] connecting");
_logger.SocketConnecting(Id);
try
{
using CancellationTokenSource tcs = new(TimeSpan.FromSeconds(10));
@ -197,11 +198,11 @@ namespace CryptoExchange.Net.Sockets
}
catch (Exception e)
{
_logger.Log(LogLevel.Debug, $"[Sckt {Id}] connection failed: " + e.ToLogString());
_logger.SocketConnectionFailed(Id, e.Message, e);
return false;
}
_logger.Log(LogLevel.Debug, $"[Sckt {Id}] connected to {Uri}");
_logger.SocketConnected(Id, Uri);
return true;
}
@ -210,13 +211,13 @@ namespace CryptoExchange.Net.Sockets
{
while (!_stopRequested)
{
_logger.Log(LogLevel.Debug, $"[Sckt {Id}] starting processing tasks");
_logger.SocketStartingProcessing(Id);
_processState = ProcessState.Processing;
var sendTask = SendLoopAsync();
var receiveTask = ReceiveLoopAsync();
var timeoutTask = Parameters.Timeout != null && Parameters.Timeout > TimeSpan.FromSeconds(0) ? CheckTimeoutAsync() : Task.CompletedTask;
await Task.WhenAll(sendTask, receiveTask, timeoutTask).ConfigureAwait(false);
_logger.Log(LogLevel.Debug, $"[Sckt {Id}] processing tasks finished");
_logger.SocketFinishedProcessing(Id);
_processState = ProcessState.WaitingForClose;
while (_closeTask == null)
@ -244,14 +245,14 @@ namespace CryptoExchange.Net.Sockets
while (!_stopRequested)
{
_logger.Log(LogLevel.Debug, $"[Sckt {Id}] attempting to reconnect");
_logger.SocketAttemptReconnect(Id);
var task = GetReconnectionUrl?.Invoke();
if (task != null)
{
var reconnectUri = await task.ConfigureAwait(false);
if (reconnectUri != null && Parameters.Uri != reconnectUri)
{
_logger.Log(LogLevel.Debug, $"[Sckt {Id}] reconnect URI set to {reconnectUri}");
_logger.SocketSetReconnectUri(Id, reconnectUri);
Parameters.Uri = reconnectUri;
}
}
@ -284,7 +285,7 @@ namespace CryptoExchange.Net.Sockets
return;
var bytes = Parameters.Encoding.GetBytes(data);
_logger.Log(LogLevel.Trace, $"[Sckt {Id}] msg {id} - Adding {bytes.Length} bytes to send buffer");
_logger.SocketAddingBytesToSendBuffer(Id, id, bytes);
_sendBuffer.Enqueue(new SendItem { Id = id, Weight = weight, Bytes = bytes });
_sendEvent.Set();
}
@ -295,7 +296,7 @@ namespace CryptoExchange.Net.Sockets
if (_processState != ProcessState.Processing && IsOpen)
return;
_logger.Log(LogLevel.Debug, $"[Sckt {Id}] reconnect requested");
_logger.SocketReconnectRequested(Id);
_closeTask = CloseInternalAsync();
await _closeTask.ConfigureAwait(false);
}
@ -310,18 +311,18 @@ namespace CryptoExchange.Net.Sockets
{
if (_closeTask?.IsCompleted == false)
{
_logger.Log(LogLevel.Debug, $"[Sckt {Id}] CloseAsync() waiting for existing close task");
_logger.SocketCloseAsyncWaitingForExistingCloseTask(Id);
await _closeTask.ConfigureAwait(false);
return;
}
if (!IsOpen)
{
_logger.Log(LogLevel.Debug, $"[Sckt {Id}] CloseAsync() socket not open");
_logger.SocketCloseAsyncSocketNotOpen(Id);
return;
}
_logger.Log(LogLevel.Debug, $"[Sckt {Id}] closing");
_logger.SocketClosing(Id);
_closeTask = CloseInternalAsync();
}
finally
@ -333,7 +334,7 @@ namespace CryptoExchange.Net.Sockets
if(_processTask != null)
await _processTask.ConfigureAwait(false);
await (OnClose?.Invoke() ?? Task.CompletedTask).ConfigureAwait(false);
_logger.Log(LogLevel.Debug, $"[Sckt {Id}] closed");
_logger.SocketClosed(Id);
}
/// <summary>
@ -385,11 +386,11 @@ namespace CryptoExchange.Net.Sockets
if (_disposed)
return;
_logger.Log(LogLevel.Debug, $"[Sckt {Id}] disposing");
_logger.SocketDisposing(Id);
_disposed = true;
_socket.Dispose();
_ctsSource.Dispose();
_logger.Log(LogLevel.Trace, $"[Sckt {Id}] disposed");
_logger.SocketDisposed(Id);
}
/// <summary>
@ -421,7 +422,7 @@ namespace CryptoExchange.Net.Sockets
if (limitResult.Success)
{
if (limitResult.Data > 0)
_logger.Log(LogLevel.Debug, $"[Sckt {Id}] msg {data.Id} - send delayed {limitResult.Data}ms because of rate limit");
_logger.SocketSendDelayedBecauseOfRateLimit(Id, data.Id, limitResult.Data);
}
}
}
@ -430,7 +431,7 @@ namespace CryptoExchange.Net.Sockets
{
await _socket.SendAsync(new ArraySegment<byte>(data.Bytes, 0, data.Bytes.Length), WebSocketMessageType.Text, true, _ctsSource.Token).ConfigureAwait(false);
await (OnRequestSent?.Invoke(data.Id) ?? Task.CompletedTask).ConfigureAwait(false);
_logger.Log(LogLevel.Trace, $"[Sckt {Id}] msg {data.Id} - sent {data.Bytes.Length} bytes");
_logger.SocketSentBytes(Id, data.Id, data.Bytes.Length);
}
catch (OperationCanceledException)
{
@ -453,13 +454,13 @@ namespace CryptoExchange.Net.Sockets
// Because this is running in a separate task and not awaited until the socket gets closed
// any exception here will crash the send processing, but do so silently unless the socket get's stopped.
// Make sure we at least let the owner know there was an error
_logger.Log(LogLevel.Warning, $"[Sckt {Id}] send loop stopped with exception");
_logger.SocketSendLoopStoppedWithException(Id, e.Message, e);
await (OnError?.Invoke(e) ?? Task.CompletedTask).ConfigureAwait(false);
throw;
}
finally
{
_logger.Log(LogLevel.Debug, $"[Sckt {Id}] send loop finished");
_logger.SocketSendLoopFinished(Id);
}
}
@ -506,8 +507,8 @@ namespace CryptoExchange.Net.Sockets
if (receiveResult.MessageType == WebSocketMessageType.Close)
{
// Connection closed unexpectedly
_logger.Log(LogLevel.Debug, "[Sckt {Id}] received `Close` message, CloseStatus: {Status}, CloseStatusDescription: {CloseStatusDescription}", Id, receiveResult.CloseStatus, receiveResult.CloseStatusDescription);
// Connection closed unexpectedly
_logger.SocketReceivedCloseMessage(Id, receiveResult.CloseStatus.ToString(), receiveResult.CloseStatusDescription);
if (_closeTask?.IsCompleted != false)
_closeTask = CloseInternalAsync();
break;
@ -517,7 +518,7 @@ namespace CryptoExchange.Net.Sockets
{
// We received data, but it is not complete, write it to a memory stream for reassembling
multiPartMessage = true;
_logger.Log(LogLevel.Trace, $"[Sckt {Id}] received {receiveResult.Count} bytes in partial message");
_logger.SocketReceivedPartialMessage(Id, receiveResult.Count);
// Write the data to a memory stream to be reassembled later
if (multipartStream == null)
@ -529,13 +530,13 @@ namespace CryptoExchange.Net.Sockets
if (!multiPartMessage)
{
// Received a complete message and it's not multi part
_logger.Log(LogLevel.Trace, $"[Sckt {Id}] received {receiveResult.Count} bytes in single message");
_logger.SocketReceivedSingleMessage(Id, receiveResult.Count);
ProcessData(receiveResult.MessageType, new ReadOnlyMemory<byte>(buffer.Array, buffer.Offset, receiveResult.Count));
}
else
{
// Received the end of a multipart message, write to memory stream for reassembling
_logger.Log(LogLevel.Trace, $"[Sckt {Id}] received {receiveResult.Count} bytes in partial message");
_logger.SocketReceivedPartialMessage(Id, receiveResult.Count);
multipartStream!.Write(buffer.Array, buffer.Offset, receiveResult.Count);
}
@ -563,13 +564,13 @@ namespace CryptoExchange.Net.Sockets
// When the connection gets interupted we might not have received a full message
if (receiveResult?.EndOfMessage == true)
{
_logger.Log(LogLevel.Trace, $"[Sckt {Id}] reassembled message of {multipartStream!.Length} bytes");
_logger.SocketReassembledMessage(Id, multipartStream!.Length);
// Get the underlying buffer of the memorystream holding the written data and delimit it (GetBuffer return the full array, not only the written part)
ProcessData(receiveResult.MessageType, new ReadOnlyMemory<byte>(multipartStream.GetBuffer(), 0, (int)multipartStream.Length));
}
else
{
_logger.Log(LogLevel.Trace, $"[Sckt {Id}] discarding incomplete message of {multipartStream!.Length} bytes");
_logger.SocketDiscardIncompleteMessage(Id, multipartStream!.Length);
}
}
}
@ -579,13 +580,13 @@ namespace CryptoExchange.Net.Sockets
// Because this is running in a separate task and not awaited until the socket gets closed
// any exception here will crash the receive processing, but do so silently unless the socket gets stopped.
// Make sure we at least let the owner know there was an error
_logger.Log(LogLevel.Warning, $"[Sckt {Id}] receive loop stopped with exception");
_logger.SocketReceiveLoopStoppedWithException(Id, e);
await (OnError?.Invoke(e) ?? Task.CompletedTask).ConfigureAwait(false);
throw;
}
finally
{
_logger.Log(LogLevel.Debug, $"[Sckt {Id}] receive loop finished");
_logger.SocketReceiveLoopFinished(Id);
}
}
@ -607,7 +608,7 @@ namespace CryptoExchange.Net.Sockets
/// <returns></returns>
protected async Task CheckTimeoutAsync()
{
_logger.Log(LogLevel.Debug, $"[Sckt {Id}] starting task checking for no data received for {Parameters.Timeout}");
_logger.SocketStartingTaskForNoDataReceivedCheck(Id, Parameters.Timeout);
LastActionTime = DateTime.UtcNow;
try
{
@ -618,7 +619,7 @@ namespace CryptoExchange.Net.Sockets
if (DateTime.UtcNow - LastActionTime > Parameters.Timeout)
{
_logger.Log(LogLevel.Warning, $"[Sckt {Id}] no data received for {Parameters.Timeout}, reconnecting socket");
_logger.SocketNoDataReceiveTimoutReconnect(Id, Parameters.Timeout);
_ = ReconnectAsync().ConfigureAwait(false);
return;
}

View File

@ -6,11 +6,10 @@ using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using CryptoExchange.Net.Objects;
using System.Net.WebSockets;
using System.IO;
using CryptoExchange.Net.Objects.Sockets;
using System.Diagnostics;
using CryptoExchange.Net.Clients;
using CryptoExchange.Net.Converters.JsonNet;
using CryptoExchange.Net.Logging.Extensions;
using System.Threading;
namespace CryptoExchange.Net.Sockets
@ -130,7 +129,7 @@ namespace CryptoExchange.Net.Sockets
if (_pausedActivity != value)
{
_pausedActivity = value;
_logger.Log(LogLevel.Information, $"[Sckt {SocketId}] paused activity: " + value);
_logger.ActivityPaused(SocketId, value);
if(_pausedActivity) _ = Task.Run(() => ActivityPaused?.Invoke());
else _ = Task.Run(() => ActivityUnpaused?.Invoke());
}
@ -150,7 +149,7 @@ namespace CryptoExchange.Net.Sockets
var oldStatus = _status;
_status = value;
_logger.Log(LogLevel.Debug, $"[Sckt {SocketId}] status changed from {oldStatus} to {_status}");
_logger.SocketStatusChanged(SocketId, oldStatus, value);
}
}
@ -301,7 +300,7 @@ namespace CryptoExchange.Net.Sockets
var reconnectSuccessful = await ProcessReconnectAsync().ConfigureAwait(false);
if (!reconnectSuccessful)
{
_logger.Log(LogLevel.Warning, $"[Sckt {SocketId}] failed reconnect processing: {reconnectSuccessful.Error}, reconnecting again");
_logger.FailedReconnectProcessing(SocketId, reconnectSuccessful.Error?.ToString());
_ = _socket.ReconnectAsync().ConfigureAwait(false);
}
else
@ -316,7 +315,7 @@ namespace CryptoExchange.Net.Sockets
}
catch(Exception ex)
{
_logger.Log(LogLevel.Warning, ex, $"[Sckt {SocketId}] Unknown exception while processing reconnection, reconnecting again");
_logger.UnkownExceptionWhileProcessingReconnection(SocketId, ex);
_ = _socket.ReconnectAsync().ConfigureAwait(false);
}
});
@ -331,9 +330,9 @@ namespace CryptoExchange.Net.Sockets
protected virtual Task HandleErrorAsync(Exception e)
{
if (e is WebSocketException wse)
_logger.Log(LogLevel.Warning, $"[Sckt {SocketId}] error: Websocket error code {wse.WebSocketErrorCode}, details: " + e.ToLogString());
_logger.WebSocketErrorCodeAndDetails(SocketId, wse.WebSocketErrorCode, wse.Message, wse);
else
_logger.Log(LogLevel.Warning, $"[Sckt {SocketId}] error: " + e.ToLogString());
_logger.WebSocketError(SocketId, e.Message, e);
return Task.CompletedTask;
}
@ -352,7 +351,7 @@ namespace CryptoExchange.Net.Sockets
if (query == null)
{
_logger.Log(LogLevel.Debug, $"[Sckt {SocketId}] msg {requestId} - message sent, but not pending");
_logger.MessageSentNotPending(SocketId, requestId);
return Task.CompletedTask;
}
@ -379,18 +378,20 @@ namespace CryptoExchange.Net.Sockets
_accessor.Read(data);
try
{
if (ApiClient.ApiOptions.OutputOriginalData ?? ApiClient.ClientOptions.OutputOriginalData)
bool outputOriginalData = ApiClient.ApiOptions.OutputOriginalData ?? ApiClient.ClientOptions.OutputOriginalData;
if (outputOriginalData)
{
originalData = _accessor.GetOriginalString();
_logger.LogTrace("[Sckt {SocketId}] received {Data}", SocketId, originalData);
_logger.ReceivedData(SocketId, originalData);
}
// 3. Determine the identifying properties of this message
var listenId = ApiClient.GetListenerIdentifier(_accessor);
if (listenId == null)
{
originalData = outputOriginalData ? _accessor.GetOriginalString() : "[OutputOriginalData is false]";
if (!ApiClient.UnhandledMessageExpected)
_logger.LogWarning("[Sckt {SocketId}] failed to evaluate message", SocketId);
_logger.FailedToEvaluateMessage(SocketId, originalData);
UnhandledMessage?.Invoke(_accessor);
return;
@ -408,14 +409,14 @@ namespace CryptoExchange.Net.Sockets
List<string> listenerIds;
lock (_listenersLock)
listenerIds = _listeners.Where(l => l.CanHandleData).SelectMany(l => l.ListenerIdentifiers).ToList();
_logger.LogWarning("[Sckt {SocketId}] received message not matched to any listener. ListenId: {ListenId}, current listeners: {ListenIds}", SocketId, listenId, listenerIds);
_logger.ReceivedMessageNotMatchedToAnyListener(SocketId, listenId, string.Join(",", listenerIds));
UnhandledMessage?.Invoke(_accessor);
}
return;
}
_logger.LogTrace("[Sckt {SocketId}] {Count} processor(s) matched to message with listener identifier {ListenerId}", SocketId, processors.Count, listenId);
_logger.ProcessorMatched(SocketId, processors.Count, listenId);
var totalUserTime = 0;
Dictionary<Type, object>? desCache = null;
if (processors.Count > 1)
@ -430,7 +431,7 @@ namespace CryptoExchange.Net.Sockets
var messageType = processor.GetMessageType(_accessor);
if (messageType == null)
{
_logger.LogWarning("[Sckt {SocketId}] received message not recognized by handler {Id}", SocketId, processor.Id);
_logger.ReceivedMessageNotRecognized(SocketId, processor.Id);
continue;
}
@ -443,7 +444,7 @@ namespace CryptoExchange.Net.Sockets
var desResult = processor.Deserialize(_accessor, messageType);
if (!desResult)
{
_logger.LogWarning("[Sckt {SocketId}] deserialization failed: {Error}", SocketId, desResult.Error);
_logger.FailedToDeserializeMessage(SocketId, desResult.Error?.ToString());
continue;
}
deserialized = desResult.Data;
@ -459,13 +460,13 @@ namespace CryptoExchange.Net.Sockets
}
catch (Exception ex)
{
_logger.LogWarning("[Sckt {SocketId}] user message processing failed: {Exception}", SocketId, ex.ToLogString());
_logger.UserMessageProcessingFailed(SocketId, ex.ToLogString(), ex);
if (processor is Subscription subscription)
subscription.InvokeExceptionHandler(ex);
}
}
_logger.LogTrace($"[Sckt {SocketId}] message processed in {(int)sw.ElapsedMilliseconds}ms ({sw.ElapsedMilliseconds - totalUserTime}ms parsing)");
_logger.MessageProcessed(SocketId, sw.ElapsedMilliseconds, sw.ElapsedMilliseconds - totalUserTime);
}
finally
{
@ -529,7 +530,7 @@ namespace CryptoExchange.Net.Sockets
if (Status == SocketStatus.Closing || Status == SocketStatus.Closed || Status == SocketStatus.Disposed)
return;
_logger.Log(LogLevel.Debug, $"[Sckt {SocketId}] closing subscription {subscription.Id}");
_logger.ClosingSubscription(SocketId, subscription.Id);
if (subscription.CancellationTokenRegistration.HasValue)
subscription.CancellationTokenRegistration.Value.Dispose();
@ -548,12 +549,12 @@ namespace CryptoExchange.Net.Sockets
}
else
{
_logger.Log(LogLevel.Debug, $"[Sckt {SocketId}] not unsubscribing subscription as there is still a duplicate subscription running");
_logger.NotUnsubscribingSubscriptionBecauseDuplicateRunning(SocketId);
}
if (Status == SocketStatus.Closing)
{
_logger.Log(LogLevel.Debug, $"[Sckt {SocketId}] already closing");
_logger.AlreadyClosing(SocketId);
return;
}
@ -567,7 +568,7 @@ namespace CryptoExchange.Net.Sockets
if (shouldCloseConnection)
{
_logger.Log(LogLevel.Debug, $"[Sckt {SocketId}] closing as there are no more subscriptions");
_logger.ClosingNoMoreSubscriptions(SocketId);
await CloseAsync().ConfigureAwait(false);
}
@ -605,7 +606,7 @@ namespace CryptoExchange.Net.Sockets
_listeners.Add(subscription);
if (subscription.UserSubscription)
_logger.Log(LogLevel.Debug, $"[Sckt {SocketId}] adding new subscription with id {subscription.Id}, total subscriptions on connection: {UserSubscriptionCount}");
_logger.AddingNewSubscription(SocketId, subscription.Id, UserSubscriptionCount);
return true;
}
@ -713,7 +714,7 @@ namespace CryptoExchange.Net.Sockets
return new CallResult(new InvalidOperationError(info));
}
_logger.Log(LogLevel.Trace, $"[Sckt {SocketId}] msg {requestId} - sending messsage: {data}");
_logger.SendingData(SocketId, requestId, data);
try
{
_socket.Send(requestId, data, weight);
@ -736,7 +737,7 @@ namespace CryptoExchange.Net.Sockets
if (!anySubscriptions)
{
// No need to resubscribe anything
_logger.Log(LogLevel.Debug, $"[Sckt {SocketId}] nothing to resubscribe, closing connection");
_logger.NothingToResubscribeCloseConnection(SocketId);
_ = _socket.CloseAsync();
return new CallResult<bool>(true);
}
@ -750,12 +751,12 @@ namespace CryptoExchange.Net.Sockets
var authResult = await ApiClient.AuthenticateSocketAsync(this).ConfigureAwait(false);
if (!authResult)
{
_logger.Log(LogLevel.Warning, $"[Sckt {SocketId}] authentication failed on reconnected socket. Disconnecting and reconnecting.");
_logger.FailedAuthenticationDisconnectAndRecoonect(SocketId);
return authResult;
}
Authenticated = true;
_logger.Log(LogLevel.Debug, $"[Sckt {SocketId}] authentication succeeded on reconnected socket.");
_logger.AuthenticationSucceeded(SocketId);
}
// Get a list of all subscriptions on the socket
@ -769,7 +770,7 @@ namespace CryptoExchange.Net.Sockets
var result = await ApiClient.RevitalizeRequestAsync(subscription).ConfigureAwait(false);
if (!result)
{
_logger.Log(LogLevel.Warning, $"[Sckt {SocketId}] failed request revitalization: " + result.Error);
_logger.FailedRequestRevitalization(SocketId, result.Error?.ToString());
return result;
}
}
@ -807,7 +808,7 @@ namespace CryptoExchange.Net.Sockets
if (!_socket.IsOpen)
return new CallResult<bool>(new WebError("Socket not connected"));
_logger.Log(LogLevel.Debug, $"[Sckt {SocketId}] all subscription successfully resubscribed on reconnected socket.");
_logger.AllSubscriptionResubscribed(SocketId);
return new CallResult<bool>(true);
}
@ -818,7 +819,7 @@ namespace CryptoExchange.Net.Sockets
return;
await SendAndWaitQueryAsync(unsubscribeRequest).ConfigureAwait(false);
_logger.Log(LogLevel.Information, $"[Sckt {SocketId}] subscription {subscription!.Id} unsubscribed");
_logger.SubscriptionUnsubscribed(SocketId, subscription.Id);
}
internal async Task<CallResult> ResubscribeAsync(Subscription subscription)
@ -869,7 +870,7 @@ namespace CryptoExchange.Net.Sockets
if (query == null)
continue;
_logger.Log(LogLevel.Trace, $"[Sckt {SocketId}] sending periodic {identifier}");
_logger.SendingPeriodic(SocketId, identifier);
try
{
@ -878,7 +879,7 @@ namespace CryptoExchange.Net.Sockets
}
catch (Exception ex)
{
_logger.Log(LogLevel.Warning, $"[Sckt {SocketId}] Periodic send {identifier} failed: " + ex.ToLogString());
_logger.PeriodicSendFailed(SocketId, identifier, ex.ToLogString(), ex);
}
}
});