From a896fffdb30e5305e8c1b184fea3deeb93d88047 Mon Sep 17 00:00:00 2001 From: Jan Korf Date: Wed, 7 Jan 2026 10:00:14 +0100 Subject: [PATCH] Time offset management (#266) Updated time sync / time offset management for REST API's Added time offset tracking for WebSocket API's Added GetAuthenticationQuery virtual method on AuthenticationProvider Updated AuthenticationProvider GetTimestamp methods to include a one second offset by default Added AuthenticationProvider GetTimestamp methods for SocketApiClient instances Added ClientName property on BaseApiClient, resolving to the type name Added ObjectOrArrayConverter JsonConverterFactory implementation for resolving json data which might be returned as object or array Added UpdateServerTime, UpdateLocalTime and DataAge properties to (I)SymbolOrderBook Added OutputToConsoleAsync method to (I)SymbolOrderBook Updated SymbolOrderBook string representation Added DataTimeLocal and DataAge properties to DataEvent object Added SocketConnection parameter to subscription HandleSubQueryResponse and HandleUnsubQueryResponse methods --- .../TestImplementations/TestBaseClient.cs | 2 - .../TestImplementations/TestRestClient.cs | 21 +-- .../Authentication/AuthenticationProvider.cs | 64 +++++--- CryptoExchange.Net/Clients/BaseApiClient.cs | 17 ++ CryptoExchange.Net/Clients/RestApiClient.cs | 103 ++++++------ CryptoExchange.Net/Clients/SocketApiClient.cs | 25 ++- .../SystemTextJson/ObjectOrArrayConverter.cs | 58 +++++++ CryptoExchange.Net/ExchangeHelpers.cs | 6 + .../Interfaces/ISymbolOrderBook.cs | 22 ++- .../SymbolOrderBookLoggingExtensions.cs | 10 ++ .../Objects/Sockets/DataEvent.cs | 19 ++- CryptoExchange.Net/Objects/TimeSyncState.cs | 95 ----------- .../OrderBook/ProcessQueueItem.cs | 4 + .../OrderBook/SymbolOrderBook.cs | 143 +++++++++++++++-- .../Sockets/Default/SocketConnection.cs | 4 +- .../Sockets/Default/Subscription.cs | 6 +- CryptoExchange.Net/TimeOffsetManager.cs | 150 ++++++++++++++++++ 17 files changed, 536 insertions(+), 213 deletions(-) create mode 100644 CryptoExchange.Net/Converters/SystemTextJson/ObjectOrArrayConverter.cs delete mode 100644 CryptoExchange.Net/Objects/TimeSyncState.cs create mode 100644 CryptoExchange.Net/TimeOffsetManager.cs diff --git a/CryptoExchange.Net.UnitTests/TestImplementations/TestBaseClient.cs b/CryptoExchange.Net.UnitTests/TestImplementations/TestBaseClient.cs index 2d08c2b..9a3c5cc 100644 --- a/CryptoExchange.Net.UnitTests/TestImplementations/TestBaseClient.cs +++ b/CryptoExchange.Net.UnitTests/TestImplementations/TestBaseClient.cs @@ -63,8 +63,6 @@ namespace CryptoExchange.Net.UnitTests /// public override string FormatSymbol(string baseAsset, string quoteAsset, TradingMode futuresType, DateTime? deliverDate = null) => $"{baseAsset.ToUpperInvariant()}{quoteAsset.ToUpperInvariant()}"; - public override TimeSpan? GetTimeOffset() => null; - public override TimeSyncInfo GetTimeSyncInfo() => null; protected override IStreamMessageAccessor CreateAccessor() => new SystemTextJsonStreamMessageAccessor(new System.Text.Json.JsonSerializerOptions()); protected override IMessageSerializer CreateSerializer() => new SystemTextJsonMessageSerializer(new System.Text.Json.JsonSerializerOptions()); protected override AuthenticationProvider CreateAuthenticationProvider(ApiCredentials credentials) => throw new NotImplementedException(); diff --git a/CryptoExchange.Net.UnitTests/TestImplementations/TestRestClient.cs b/CryptoExchange.Net.UnitTests/TestImplementations/TestRestClient.cs index 2bb242a..c3a3818 100644 --- a/CryptoExchange.Net.UnitTests/TestImplementations/TestRestClient.cs +++ b/CryptoExchange.Net.UnitTests/TestImplementations/TestRestClient.cs @@ -160,11 +160,6 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations ParameterPositions[method] = position; } - public override TimeSpan? GetTimeOffset() - { - throw new NotImplementedException(); - } - protected override AuthenticationProvider CreateAuthenticationProvider(ApiCredentials credentials) => new TestAuthProvider(credentials); @@ -172,11 +167,6 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations { throw new NotImplementedException(); } - - public override TimeSyncInfo GetTimeSyncInfo() - { - throw new NotImplementedException(); - } } public class TestRestApi2Client : RestApiClient @@ -198,12 +188,7 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations { return await SendAsync("http://www.test.com", new RequestDefinition("/", HttpMethod.Get) { Weight = 0 }, null, ct); } - - public override TimeSpan? GetTimeOffset() - { - throw new NotImplementedException(); - } - + protected override AuthenticationProvider CreateAuthenticationProvider(ApiCredentials credentials) => new TestAuthProvider(credentials); @@ -212,10 +197,6 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations throw new NotImplementedException(); } - public override TimeSyncInfo GetTimeSyncInfo() - { - throw new NotImplementedException(); - } } public class TestError diff --git a/CryptoExchange.Net/Authentication/AuthenticationProvider.cs b/CryptoExchange.Net/Authentication/AuthenticationProvider.cs index 09aa93c..5f73f41 100644 --- a/CryptoExchange.Net/Authentication/AuthenticationProvider.cs +++ b/CryptoExchange.Net/Authentication/AuthenticationProvider.cs @@ -11,6 +11,8 @@ using System.Linq; using System.Globalization; using System.Security.Cryptography; using System.Text; +using CryptoExchange.Net.Sockets; +using CryptoExchange.Net.Sockets.Default; namespace CryptoExchange.Net.Authentication { @@ -76,12 +78,20 @@ namespace CryptoExchange.Net.Authentication } /// - /// Authenticate a request + /// Authenticate a REST request /// - /// The Api client sending the request + /// The API client sending the request /// The request configuration public abstract void ProcessRequest(RestApiClient apiClient, RestRequestConfiguration requestConfig); + /// + /// Get an authentication query for a websocket + /// + /// The API client sending the request + /// The connection to authenticate + /// Optional context required for creating the authentication query + public virtual Query? GetAuthenticationQuery(SocketApiClient apiClient, SocketConnection connection, Dictionary? context = null) => null; + /// /// SHA256 sign the data and return the bytes /// @@ -494,32 +504,50 @@ namespace CryptoExchange.Net.Authentication /// /// Get current timestamp including the time sync offset from the api client /// - /// - /// - protected DateTime GetTimestamp(RestApiClient apiClient) + protected DateTime GetTimestamp(RestApiClient apiClient, bool includeOneSecondOffset = true) { - return TimeProvider.GetTime().Add(apiClient.GetTimeOffset() ?? TimeSpan.Zero)!; + var result = TimeProvider.GetTime().Add(TimeOffsetManager.GetRestOffset(apiClient.ClientName) ?? TimeSpan.Zero)!; + if (includeOneSecondOffset) + result = result.AddSeconds(-1); + + return result; + } + + /// + /// Get current timestamp including the time sync offset from the api client + /// + protected DateTime GetTimestamp(SocketApiClient apiClient, bool includeOneSecondOffset = true) + { + var result = TimeProvider.GetTime().Add(TimeOffsetManager.GetSocketOffset(apiClient.ClientName) ?? TimeSpan.Zero)!; + if (includeOneSecondOffset) + result = result.AddSeconds(-1); + + return result; } /// /// Get millisecond timestamp as a string including the time sync offset from the api client /// - /// - /// - protected string GetMillisecondTimestamp(RestApiClient apiClient) - { - return DateTimeConverter.ConvertToMilliseconds(GetTimestamp(apiClient)).Value.ToString(CultureInfo.InvariantCulture); - } + protected string GetMillisecondTimestamp(RestApiClient apiClient, bool includeOneSecondOffset = true) + => DateTimeConverter.ConvertToMilliseconds(GetTimestamp(apiClient, includeOneSecondOffset)).Value.ToString(CultureInfo.InvariantCulture); + + /// + /// Get millisecond timestamp as a string including the time sync offset from the api client + /// + protected string GetMillisecondTimestamp(SocketApiClient apiClient, bool includeOneSecondOffset = true) + => DateTimeConverter.ConvertToMilliseconds(GetTimestamp(apiClient, includeOneSecondOffset)).Value.ToString(CultureInfo.InvariantCulture); /// /// Get millisecond timestamp as a long including the time sync offset from the api client /// - /// - /// - protected long GetMillisecondTimestampLong(RestApiClient apiClient) - { - return DateTimeConverter.ConvertToMilliseconds(GetTimestamp(apiClient)).Value; - } + protected long GetMillisecondTimestampLong(RestApiClient apiClient, bool includeOneSecondOffset = true) + => DateTimeConverter.ConvertToMilliseconds(GetTimestamp(apiClient, includeOneSecondOffset)).Value; + + /// + /// Get millisecond timestamp as a long including the time sync offset from the api client + /// + protected long GetMillisecondTimestampLong(SocketApiClient apiClient, bool includeOneSecondOffset = true) + => DateTimeConverter.ConvertToMilliseconds(GetTimestamp(apiClient, includeOneSecondOffset)).Value; /// /// Return the serialized request body diff --git a/CryptoExchange.Net/Clients/BaseApiClient.cs b/CryptoExchange.Net/Clients/BaseApiClient.cs index c2eab52..13fe15d 100644 --- a/CryptoExchange.Net/Clients/BaseApiClient.cs +++ b/CryptoExchange.Net/Clients/BaseApiClient.cs @@ -13,6 +13,8 @@ namespace CryptoExchange.Net.Clients /// public abstract class BaseApiClient : IDisposable, IBaseApiClient { + private string? _clientName; + /// /// Logger /// @@ -23,6 +25,21 @@ namespace CryptoExchange.Net.Clients /// protected bool _disposing; + /// + /// Name of the client + /// + protected internal string ClientName + { + get + { + if (_clientName != null) + return _clientName; + + _clientName = GetType().Name; + return _clientName; + } + } + /// /// The authentication provider for this API client. (null if no credentials are set) /// diff --git a/CryptoExchange.Net/Clients/RestApiClient.cs b/CryptoExchange.Net/Clients/RestApiClient.cs index 409412c..24d1d4f 100644 --- a/CryptoExchange.Net/Clients/RestApiClient.cs +++ b/CryptoExchange.Net/Clients/RestApiClient.cs @@ -32,12 +32,6 @@ namespace CryptoExchange.Net.Clients /// public IRequestFactory RequestFactory { get; set; } = new RequestFactory(); - /// - public abstract TimeSyncInfo? GetTimeSyncInfo(); - - /// - public abstract TimeSpan? GetTimeOffset(); - /// public int TotalRequestsMade { get; set; } @@ -115,6 +109,8 @@ namespace CryptoExchange.Net.Clients options, apiOptions) { + TimeOffsetManager.RegisterRestApi(ClientName); + RequestFactory.Configure(options, httpClient); } @@ -241,11 +237,9 @@ namespace CryptoExchange.Net.Clients { currentTry++; - var error = await CheckTimeSync(requestId, definition).ConfigureAwait(false); - if (error != null) - return new WebCallResult(error); + await CheckTimeSync(requestId, definition).ConfigureAwait(false); - error = await RateLimitAsync( + var error = await RateLimitAsync( baseAddress, requestId, definition, @@ -300,28 +294,6 @@ namespace CryptoExchange.Net.Clients } } - private async ValueTask CheckTimeSync(int requestId, RequestDefinition definition) - { - if (!definition.Authenticated) - return null; - - var syncTask = SyncTimeAsync(); - var timeSyncInfo = GetTimeSyncInfo(); - - if (timeSyncInfo != null && timeSyncInfo.TimeSyncState.LastSyncTime == default) - { - // Initially with first request we'll need to wait for the time syncing, if it's not the first request we can just continue - var syncTimeError = await syncTask.ConfigureAwait(false); - if (syncTimeError != null) - { - _logger.RestApiFailedToSyncTime(requestId, syncTimeError!.ToString()); - return syncTimeError; - } - } - - return null; - } - /// /// Check rate limits for the request /// @@ -725,26 +697,44 @@ namespace CryptoExchange.Net.Clients RequestFactory.UpdateSettings(options.Proxy, options.RequestTimeout ?? ClientOptions.RequestTimeout, ClientOptions.HttpKeepAliveInterval); } - internal async ValueTask SyncTimeAsync() + private async ValueTask CheckTimeSync(int requestId, RequestDefinition definition) { - var timeSyncParams = GetTimeSyncInfo(); - if (timeSyncParams == null) - return null; + if (!definition.Authenticated) + return; - if (await timeSyncParams.TimeSyncState.Semaphore.WaitAsync(0).ConfigureAwait(false)) + var lastUpdateTime = TimeOffsetManager.GetRestLastUpdateTime(ClientName); + var syncTask = CheckTimeOffsetAsync(); + + if (lastUpdateTime == null) { - if (!timeSyncParams.SyncTime || DateTime.UtcNow - timeSyncParams.TimeSyncState.LastSyncTime < timeSyncParams.RecalculationInterval) - { - timeSyncParams.TimeSyncState.Semaphore.Release(); - return null; - } + // Initially with first request we'll need to wait for the time syncing before making the actual request. + // If it's not the first request we can just continue and let it complete in the background + await syncTask.ConfigureAwait(false); + } + + return; + } + + internal async ValueTask CheckTimeOffsetAsync() + { + if (!(ApiOptions.AutoTimestamp ?? ClientOptions.AutoTimestamp)) + // Time syncing not enabled + return; + + await TimeOffsetManager.EnterAsync(ClientName).ConfigureAwait(false); + try + { + var lastUpdateTime = TimeOffsetManager.GetRestLastUpdateTime(ClientName); + if (DateTime.UtcNow - lastUpdateTime < (ApiOptions.TimestampRecalculationInterval ?? ClientOptions.TimestampRecalculationInterval)) + // Time syncing was recently done + return; var localTime = DateTime.UtcNow; var result = await GetServerTimestampAsync().ConfigureAwait(false); if (!result) { - timeSyncParams.TimeSyncState.Semaphore.Release(); - return result.Error; + _logger.LogWarning("Failed to determine time offset between client and server, timestamping might fail"); + return; } if (TotalRequestsMade == 1) @@ -754,18 +744,29 @@ namespace CryptoExchange.Net.Clients result = await GetServerTimestampAsync().ConfigureAwait(false); if (!result) { - timeSyncParams.TimeSyncState.Semaphore.Release(); - return result.Error; + _logger.LogWarning("Failed to determine time offset between client and server, timestamping might fail"); + return; } } - // Calculate time offset between local and server + // Estimate the offset as the round trip time / 2 var offset = result.Data - localTime.AddMilliseconds(result.ResponseTime!.Value.TotalMilliseconds / 2); - timeSyncParams.UpdateTimeOffset(offset); - timeSyncParams.TimeSyncState.Semaphore.Release(); - } + if (offset.TotalMilliseconds > 0 && offset.TotalMilliseconds < 500) + { + _logger.LogInformation("{ClientName} Time offset within limits ({Offset}ms), set offset to 0ms", ClientName, Math.Round(offset.TotalMilliseconds)); + offset = TimeSpan.Zero; + } + else + { + _logger.LogInformation("{ClientName} Time offset set to {Offset}ms", ClientName, Math.Round(offset.TotalMilliseconds)); + } - return null; + TimeOffsetManager.UpdateRestOffset(ClientName, offset.TotalMilliseconds); + } + finally + { + TimeOffsetManager.Release(ClientName); + } } private bool ShouldCache(RequestDefinition definition) diff --git a/CryptoExchange.Net/Clients/SocketApiClient.cs b/CryptoExchange.Net/Clients/SocketApiClient.cs index aea4ff4..6fd7ea1 100644 --- a/CryptoExchange.Net/Clients/SocketApiClient.cs +++ b/CryptoExchange.Net/Clients/SocketApiClient.cs @@ -32,8 +32,10 @@ namespace CryptoExchange.Net.Clients public abstract class SocketApiClient : BaseApiClient, ISocketApiClient { #region Fields + /// public IWebsocketFactory SocketFactory { get; set; } = new WebsocketFactory(); + /// public IHighPerfConnectionFactory? HighPerfConnectionFactory { get; set; } @@ -181,6 +183,24 @@ namespace CryptoExchange.Net.Clients DedicatedConnectionConfigs.Add(new DedicatedConnectionConfig() { SocketAddress = url, Authenticated = auth }); } + /// + /// Update the timestamp offset between client and server based on the timestamp + /// + /// Timestamp received from the server + public virtual void UpdateTimeOffset(DateTime timestamp) + { + if (timestamp == default) + return; + + TimeOffsetManager.UpdateSocketOffset(ClientName, (DateTime.UtcNow - timestamp).TotalMilliseconds); + } + + /// + /// Get the time offset between client and server + /// + /// + public virtual TimeSpan? GetTimeOffset() => TimeOffsetManager.GetSocketOffset(ClientName); + /// /// Add a query to periodically send on each connection /// @@ -296,7 +316,7 @@ namespace CryptoExchange.Net.Clients if (!success) return; - subscription.HandleSubQueryResponse(response); + subscription.HandleSubQueryResponse(socketConnection, response); subscription.Status = SubscriptionStatus.Subscribed; if (ct != default) { @@ -575,7 +595,8 @@ namespace CryptoExchange.Net.Clients /// Should return the request which can be used to authenticate a socket connection /// /// - protected internal virtual Task GetAuthenticationRequestAsync(SocketConnection connection) => throw new NotImplementedException(); + protected internal virtual Task GetAuthenticationRequestAsync(SocketConnection connection) => + Task.FromResult(AuthenticationProvider!.GetAuthenticationQuery(this, connection)); /// /// Adds a system subscription. Used for example to reply to ping requests diff --git a/CryptoExchange.Net/Converters/SystemTextJson/ObjectOrArrayConverter.cs b/CryptoExchange.Net/Converters/SystemTextJson/ObjectOrArrayConverter.cs new file mode 100644 index 0000000..8a14918 --- /dev/null +++ b/CryptoExchange.Net/Converters/SystemTextJson/ObjectOrArrayConverter.cs @@ -0,0 +1,58 @@ +using System; +using System.Text.Json; +using System.Text.Json.Serialization; + +#pragma warning disable IL2026 // Members annotated with 'RequiresUnreferencedCodeAttribute' require dynamic access otherwise can break functionality when trimming application code +#pragma warning disable IL3050 // Calling members annotated with 'RequiresDynamicCodeAttribute' may break functionality when AOT compiling. + +namespace CryptoExchange.Net.Converters.SystemTextJson +{ + /// + /// Converter for parsing object or array responses + /// + public class ObjectOrArrayConverter : JsonConverterFactory + { + /// + public override bool CanConvert(Type typeToConvert) => true; + /// + public override JsonConverter? CreateConverter(Type typeToConvert, JsonSerializerOptions options) + { + var type = typeof(InternalObjectOrArrayConverter<>).MakeGenericType(typeToConvert); + return (JsonConverter)Activator.CreateInstance(type)!; + } + + private class InternalObjectOrArrayConverter : JsonConverter + { + public override T? Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + if (reader.TokenType == JsonTokenType.StartObject && !typeToConvert.IsArray) + { + // Object to object + return JsonDocument.ParseValue(ref reader).Deserialize(options); + } + else if (reader.TokenType == JsonTokenType.StartArray && typeToConvert.IsArray) + { + // Array to array + return JsonDocument.ParseValue(ref reader).Deserialize(options); + } + else if (reader.TokenType == JsonTokenType.StartArray) + { + // Array to object + JsonDocument.ParseValue(ref reader).Deserialize(options); + return default; + } + else + { + // Object to array + JsonDocument.ParseValue(ref reader); + return default; + } + } + + public override void Write(Utf8JsonWriter writer, T value, JsonSerializerOptions options) + { + JsonSerializer.Serialize(writer, value, options); + } + } + } +} diff --git a/CryptoExchange.Net/ExchangeHelpers.cs b/CryptoExchange.Net/ExchangeHelpers.cs index dd4fbb0..9cd859b 100644 --- a/CryptoExchange.Net/ExchangeHelpers.cs +++ b/CryptoExchange.Net/ExchangeHelpers.cs @@ -258,6 +258,12 @@ namespace CryptoExchange.Net return value; } + /// + /// Generate a long value between two values + /// + /// Min value + /// Max value + /// public static long RandomLong(long minValue, long maxValue) { #if NET8_0_OR_GREATER diff --git a/CryptoExchange.Net/Interfaces/ISymbolOrderBook.cs b/CryptoExchange.Net/Interfaces/ISymbolOrderBook.cs index 609373f..0b94d6a 100644 --- a/CryptoExchange.Net/Interfaces/ISymbolOrderBook.cs +++ b/CryptoExchange.Net/Interfaces/ISymbolOrderBook.cs @@ -47,9 +47,21 @@ namespace CryptoExchange.Net.Interfaces /// event Action<(ISymbolOrderBookEntry BestBid, ISymbolOrderBookEntry BestAsk)> OnBestOffersChanged; /// - /// Timestamp of the last update + /// Timestamp of when the last update was applied to the book, local time /// DateTime UpdateTime { get; } + /// + /// Timestamp of the last event that was applied, server time + /// + DateTime? UpdateServerTime { get; } + /// + /// Timestamp of the last event that was applied, in local time, estimated based on timestamp difference between client and server + /// + DateTime? UpdateLocalTime { get; } + /// + /// Age of the data, in local time, estimated based on timestamp difference between client and server + the period since last update + /// + TimeSpan? DataAge { get; } /// /// The number of asks in the book @@ -126,5 +138,13 @@ namespace CryptoExchange.Net.Interfaces /// /// string ToString(int rows); + + /// + /// Output the orderbook to the console + /// + /// Number of rows to display + /// Refresh interval + /// Cancellation token + Task OutputToConsoleAsync(int numberOfEntries, TimeSpan refreshInterval, CancellationToken ct = default); } } diff --git a/CryptoExchange.Net/Logging/Extensions/SymbolOrderBookLoggingExtensions.cs b/CryptoExchange.Net/Logging/Extensions/SymbolOrderBookLoggingExtensions.cs index 413f12b..2997974 100644 --- a/CryptoExchange.Net/Logging/Extensions/SymbolOrderBookLoggingExtensions.cs +++ b/CryptoExchange.Net/Logging/Extensions/SymbolOrderBookLoggingExtensions.cs @@ -28,6 +28,7 @@ namespace CryptoExchange.Net.Logging.Extensions private static readonly Action _orderBookReconnectingSocket; private static readonly Action _orderBookSkippedMessage; private static readonly Action _orderBookProcessedMessage; + private static readonly Action _orderBookProcessedMessageSingle; private static readonly Action _orderBookOutOfSync; static SymbolOrderBookLoggingExtensions() @@ -136,6 +137,11 @@ namespace CryptoExchange.Net.Logging.Extensions LogLevel.Warning, new EventId(5020, "OrderBookOutOfSyncChecksum"), "{Api} order book {Symbol} out of sync. Checksum mismatch, resyncing"); + + _orderBookProcessedMessageSingle = LoggerMessage.Define( + LogLevel.Trace, + new EventId(5021, "OrderBookProcessedMessage"), + "{Api} order book {Symbol} update processed #{UpdateId}"); } public static void OrderBookStatusChanged(this ILogger logger, string api, string symbol, OrderBookStatus previousStatus, OrderBookStatus newStatus) @@ -229,6 +235,10 @@ namespace CryptoExchange.Net.Logging.Extensions _orderBookProcessedMessage(logger, api, symbol, firstUpdateId, lastUpdateId, null); } + public static void OrderBookProcessedMessage(this ILogger logger, string api, string symbol, long updateId) + { + _orderBookProcessedMessageSingle(logger, api, symbol, updateId, null); + } public static void OrderBookOutOfSyncChecksum(this ILogger logger, string api, string symbol) { _orderBookOutOfSyncChecksum(logger, api, symbol, null); diff --git a/CryptoExchange.Net/Objects/Sockets/DataEvent.cs b/CryptoExchange.Net/Objects/Sockets/DataEvent.cs index 8780b99..cacf430 100644 --- a/CryptoExchange.Net/Objects/Sockets/DataEvent.cs +++ b/CryptoExchange.Net/Objects/Sockets/DataEvent.cs @@ -18,6 +18,16 @@ namespace CryptoExchange.Net.Objects.Sockets /// public DateTime? DataTime { get; set; } + /// + /// The timestamp of the data in local time. Note that this is an estimation based on average delay from the server. + /// + public DateTime? DataTimeLocal { get; set; } + + /// + /// The age of the data. Note that this is an estimation based on average delay from the server. + /// + public TimeSpan? DataAge => DateTime.UtcNow - DataTimeLocal; + /// /// The stream producing the update /// @@ -119,9 +129,16 @@ namespace CryptoExchange.Net.Objects.Sockets /// /// Specify the data timestamp /// - public DataEvent WithDataTimestamp(DateTime? timestamp) + public DataEvent WithDataTimestamp(DateTime? timestamp, TimeSpan? offset) { + if (timestamp == null || timestamp == default(DateTime)) + return this; + DataTime = timestamp; + if (offset == null) + return this; + + DataTimeLocal = DataTime + offset; return this; } diff --git a/CryptoExchange.Net/Objects/TimeSyncState.cs b/CryptoExchange.Net/Objects/TimeSyncState.cs deleted file mode 100644 index 021a9fb..0000000 --- a/CryptoExchange.Net/Objects/TimeSyncState.cs +++ /dev/null @@ -1,95 +0,0 @@ -using System; -using System.Threading; -using Microsoft.Extensions.Logging; - -namespace CryptoExchange.Net.Objects -{ - /// - /// The time synchronization state of an API client - /// - public class TimeSyncState - { - /// - /// Name of the API - /// - public string ApiName { get; set; } - /// - /// Semaphore to use for checking the time syncing. Should be shared instance among the API client - /// - public SemaphoreSlim Semaphore { get; } - /// - /// Last sync time for the API client - /// - public DateTime LastSyncTime { get; set; } - /// - /// Time offset for the API client - /// - public TimeSpan TimeOffset { get; set; } - - /// - /// ctor - /// - public TimeSyncState(string apiName) - { - ApiName = apiName; - Semaphore = new SemaphoreSlim(1, 1); - } - } - - /// - /// Time synchronization info - /// - public class TimeSyncInfo - { - /// - /// Logger - /// - public ILogger Logger { get; } - /// - /// Should synchronize time - /// - public bool SyncTime { get; } - /// - /// Timestamp recalulcation interval - /// - public TimeSpan RecalculationInterval { get; } - /// - /// Time sync state for the API client - /// - public TimeSyncState TimeSyncState { get; } - - /// - /// ctor - /// - /// - /// - /// - /// - public TimeSyncInfo(ILogger logger, bool syncTime, TimeSpan recalculationInterval, TimeSyncState syncState) - { - Logger = logger; - SyncTime = syncTime; - RecalculationInterval = recalculationInterval; - TimeSyncState = syncState; - } - - /// - /// Set the time offset - /// - /// - public void UpdateTimeOffset(TimeSpan offset) - { - TimeSyncState.LastSyncTime = DateTime.UtcNow; - if (offset.TotalMilliseconds > 0 && offset.TotalMilliseconds < 500) - { - Logger.Log(LogLevel.Information, "{TimeSyncState.ApiName} Time offset within limits, set offset to 0ms", TimeSyncState.ApiName); - TimeSyncState.TimeOffset = TimeSpan.Zero; - } - else - { - Logger.Log(LogLevel.Information, "{TimeSyncState.ApiName} Time offset set to {Offset}ms", TimeSyncState.ApiName, Math.Round(offset.TotalMilliseconds)); - TimeSyncState.TimeOffset = offset; - } - } - } -} diff --git a/CryptoExchange.Net/OrderBook/ProcessQueueItem.cs b/CryptoExchange.Net/OrderBook/ProcessQueueItem.cs index 6d16d29..290d492 100644 --- a/CryptoExchange.Net/OrderBook/ProcessQueueItem.cs +++ b/CryptoExchange.Net/OrderBook/ProcessQueueItem.cs @@ -5,6 +5,8 @@ namespace CryptoExchange.Net.OrderBook { internal class ProcessQueueItem { + public DateTime? LocalDataTime { get; set; } + public DateTime? ServerDataTime { get; set; } public long StartUpdateId { get; set; } public long EndUpdateId { get; set; } public ISymbolOrderBookEntry[] Bids { get; set; } = Array.Empty(); @@ -13,6 +15,8 @@ namespace CryptoExchange.Net.OrderBook internal class InitialOrderBookItem { + public DateTime? LocalDataTime { get; set; } + public DateTime? ServerDataTime { get; set; } public long StartUpdateId { get; set; } public long EndUpdateId { get; set; } public ISymbolOrderBookEntry[] Bids { get; set; } = Array.Empty(); diff --git a/CryptoExchange.Net/OrderBook/SymbolOrderBook.cs b/CryptoExchange.Net/OrderBook/SymbolOrderBook.cs index 9892a54..a3cf80a 100644 --- a/CryptoExchange.Net/OrderBook/SymbolOrderBook.cs +++ b/CryptoExchange.Net/OrderBook/SymbolOrderBook.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; +using System.Diagnostics; using System.Globalization; using System.Linq; using System.Text; @@ -133,6 +134,15 @@ namespace CryptoExchange.Net.OrderBook /// public DateTime UpdateTime { get; private set; } + /// + public DateTime? UpdateServerTime { get; private set; } + + /// + public DateTime? UpdateLocalTime { get; set; } + + /// + public TimeSpan? DataAge => DateTime.UtcNow - UpdateLocalTime; + /// public int AskCount { get; private set; } @@ -406,10 +416,8 @@ namespace CryptoExchange.Net.OrderBook /// Implementation for validating a checksum value with the current order book. If checksum validation fails (returns false) /// the order book will be resynchronized /// - /// - /// protected virtual bool DoChecksum(int checksum) => true; - + /// /// Set the initial data for the order book. Typically the snapshot which was requested from the Rest API, or the first snapshot /// received from a socket subscription @@ -417,9 +425,25 @@ namespace CryptoExchange.Net.OrderBook /// The last update sequence number until which the snapshot is in sync /// List of asks /// List of bids - protected void SetInitialOrderBook(long orderBookSequenceNumber, ISymbolOrderBookEntry[] bidList, ISymbolOrderBookEntry[] askList) + /// Server data timestamp + /// local data timestamp + protected void SetInitialOrderBook( + long orderBookSequenceNumber, + ISymbolOrderBookEntry[] bidList, + ISymbolOrderBookEntry[] askList, + DateTime? serverDataTime = null, + DateTime? localDataTime = null) { - _processQueue.Enqueue(new InitialOrderBookItem { StartUpdateId = orderBookSequenceNumber, EndUpdateId = orderBookSequenceNumber, Asks = askList, Bids = bidList }); + _processQueue.Enqueue( + new InitialOrderBookItem + { + LocalDataTime = localDataTime, + ServerDataTime = serverDataTime, + StartUpdateId = orderBookSequenceNumber, + EndUpdateId = orderBookSequenceNumber, + Asks = askList, + Bids = bidList + }); _queueEvent.Set(); } @@ -429,9 +453,25 @@ namespace CryptoExchange.Net.OrderBook /// The sequence number /// List of updated/new bids /// List of updated/new asks - protected void UpdateOrderBook(long updateId, ISymbolOrderBookEntry[] bids, ISymbolOrderBookEntry[] asks) + /// Server data timestamp + /// local data timestamp + protected void UpdateOrderBook( + long updateId, + ISymbolOrderBookEntry[] bids, + ISymbolOrderBookEntry[] asks, + DateTime? serverDataTime = null, + DateTime? localDataTime = null) { - _processQueue.Enqueue(new ProcessQueueItem { StartUpdateId = updateId, EndUpdateId = updateId, Asks = asks, Bids = bids }); + _processQueue.Enqueue( + new ProcessQueueItem + { + LocalDataTime = localDataTime, + ServerDataTime = serverDataTime, + StartUpdateId = updateId, + EndUpdateId = updateId, + Asks = asks, + Bids = bids + }); _queueEvent.Set(); } @@ -442,9 +482,26 @@ namespace CryptoExchange.Net.OrderBook /// The sequence number of the last update /// List of updated/new bids /// List of updated/new asks - protected void UpdateOrderBook(long firstUpdateId, long lastUpdateId, ISymbolOrderBookEntry[] bids, ISymbolOrderBookEntry[] asks) + /// Server data timestamp + /// local data timestamp + protected void UpdateOrderBook( + long firstUpdateId, + long lastUpdateId, + ISymbolOrderBookEntry[] bids, + ISymbolOrderBookEntry[] asks, + DateTime? serverDataTime = null, + DateTime? localDataTime = null) { - _processQueue.Enqueue(new ProcessQueueItem { StartUpdateId = firstUpdateId, EndUpdateId = lastUpdateId, Asks = asks, Bids = bids }); + _processQueue.Enqueue( + new ProcessQueueItem + { + LocalDataTime = localDataTime, + ServerDataTime = serverDataTime, + StartUpdateId = firstUpdateId, + EndUpdateId = lastUpdateId, + Asks = asks, + Bids = bids + }); _queueEvent.Set(); } @@ -453,12 +510,27 @@ namespace CryptoExchange.Net.OrderBook /// /// List of updated/new bids /// List of updated/new asks - protected void UpdateOrderBook(ISymbolOrderSequencedBookEntry[] bids, ISymbolOrderSequencedBookEntry[] asks) + /// Server data timestamp + /// local data timestamp + protected void UpdateOrderBook( + ISymbolOrderSequencedBookEntry[] bids, + ISymbolOrderSequencedBookEntry[] asks, + DateTime? serverDataTime = null, + DateTime? localDataTime = null) { var highest = Math.Max(bids.Any() ? bids.Max(b => b.Sequence) : 0, asks.Any() ? asks.Max(a => a.Sequence) : 0); var lowest = Math.Min(bids.Any() ? bids.Min(b => b.Sequence) : long.MaxValue, asks.Any() ? asks.Min(a => a.Sequence) : long.MaxValue); - _processQueue.Enqueue(new ProcessQueueItem { StartUpdateId = lowest, EndUpdateId = highest, Asks = asks, Bids = bids }); + _processQueue.Enqueue( + new ProcessQueueItem + { + LocalDataTime = localDataTime, + ServerDataTime = serverDataTime, + StartUpdateId = lowest, + EndUpdateId = highest, + Asks = asks, + Bids = bids + }); _queueEvent.Set(); } @@ -614,6 +686,12 @@ namespace CryptoExchange.Net.OrderBook { var stringBuilder = new StringBuilder(); var book = Book; + stringBuilder.AppendLine($"{Exchange} - {Symbol}"); + stringBuilder.AppendLine($"Update time local: {UpdateTime:HH:mm:ss.fff} ({Math.Round((DateTime.UtcNow - UpdateTime).TotalMilliseconds)}ms ago)"); + stringBuilder.AppendLine($"Data timestamp server: {UpdateServerTime:HH:mm:ss.fff}"); + stringBuilder.AppendLine($"Data timestamp local: {UpdateLocalTime:HH:mm:ss.fff}"); + stringBuilder.AppendLine($"Data age: {DataAge?.TotalMilliseconds}ms"); + stringBuilder.AppendLine(); stringBuilder.AppendLine($" Ask quantity Ask price | Bid price Bid quantity"); for(var i = 0; i < numberOfEntries; i++) { @@ -625,6 +703,22 @@ namespace CryptoExchange.Net.OrderBook return stringBuilder.ToString(); } + /// + public Task OutputToConsoleAsync(int numberOfEntries, TimeSpan refreshInterval, CancellationToken ct = default) + { + return Task.Run(async () => + { + var referenceTime = DateTime.UtcNow; + while (!ct.IsCancellationRequested) + { + Console.Clear(); + Console.WriteLine(ToString(numberOfEntries)); + var delay = Math.Max(1, (DateTime.UtcNow - referenceTime).TotalMilliseconds % refreshInterval.TotalMilliseconds); + try { await Task.Delay(refreshInterval.Add(TimeSpan.FromMilliseconds(-delay)), ct).ConfigureAwait(false); } catch { } + } + }); + } + private void CheckBestOffersChanged(ISymbolOrderBookEntry prevBestBid, ISymbolOrderBookEntry prevBestAsk) { var (bestBid, bestAsk) = BestOffers; @@ -708,6 +802,9 @@ namespace CryptoExchange.Net.OrderBook BidCount = _bids.Count; UpdateTime = DateTime.UtcNow; + UpdateServerTime = item.ServerDataTime; + UpdateLocalTime = item.LocalDataTime; + _logger.OrderBookDataSet(Api, Symbol, BidCount, AskCount, item.EndUpdateId); CheckProcessBuffer(); OnOrderBookUpdate?.Invoke((item.Bids.ToArray(), item.Asks.ToArray())); @@ -750,6 +847,9 @@ namespace CryptoExchange.Net.OrderBook return; } + UpdateServerTime = item.ServerDataTime; + UpdateLocalTime = item.LocalDataTime; + OnOrderBookUpdate?.Invoke((item.Bids.ToArray(), item.Asks.ToArray())); CheckBestOffersChanged(prevBestBid, prevBestAsk); } @@ -813,7 +913,11 @@ namespace CryptoExchange.Net.OrderBook }); } - private void ProcessRangeUpdates(long firstUpdateId, long lastUpdateId, IEnumerable bids, IEnumerable asks) + private void ProcessRangeUpdates( + long firstUpdateId, + long lastUpdateId, + IEnumerable bids, + IEnumerable asks) { if (lastUpdateId <= LastSequenceNumber) { @@ -829,23 +933,28 @@ namespace CryptoExchange.Net.OrderBook if (Levels.HasValue && _strictLevels) { - while (this._bids.Count > Levels.Value) + while (_bids.Count > Levels.Value) { BidCount--; - this._bids.Remove(this._bids.Last().Key); + _bids.Remove(_bids.Last().Key); } - while (this._asks.Count > Levels.Value) + while (_asks.Count > Levels.Value) { AskCount--; - this._asks.Remove(this._asks.Last().Key); + _asks.Remove(this._asks.Last().Key); } } LastSequenceNumber = lastUpdateId; if (_logger.IsEnabled(LogLevel.Trace)) - _logger.OrderBookProcessedMessage(Api, Symbol, firstUpdateId, lastUpdateId); + { + if (firstUpdateId != lastUpdateId) + _logger.OrderBookProcessedMessage(Api, Symbol, firstUpdateId, lastUpdateId); + else + _logger.OrderBookProcessedMessage(Api, Symbol, firstUpdateId); + } } } diff --git a/CryptoExchange.Net/Sockets/Default/SocketConnection.cs b/CryptoExchange.Net/Sockets/Default/SocketConnection.cs index 8614cc0..41855ea 100644 --- a/CryptoExchange.Net/Sockets/Default/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/Default/SocketConnection.cs @@ -1236,7 +1236,7 @@ namespace CryptoExchange.Net.Sockets.Default subQuery.OnComplete = () => { subscription.Status = subQuery.Result!.Success ? SubscriptionStatus.Subscribed : SubscriptionStatus.Pending; - subscription.HandleSubQueryResponse(subQuery.Response); + subscription.HandleSubQueryResponse(this, subQuery.Response); }; taskList.Add(SendAndWaitQueryAsync(subQuery)); @@ -1276,7 +1276,7 @@ namespace CryptoExchange.Net.Sockets.Default return CallResult.SuccessResult; var result = await SendAndWaitQueryAsync(subQuery).ConfigureAwait(false); - subscription.HandleSubQueryResponse(subQuery.Response!); + subscription.HandleSubQueryResponse(this, subQuery.Response!); return result; } diff --git a/CryptoExchange.Net/Sockets/Default/Subscription.cs b/CryptoExchange.Net/Sockets/Default/Subscription.cs index c41b084..73d0490 100644 --- a/CryptoExchange.Net/Sockets/Default/Subscription.cs +++ b/CryptoExchange.Net/Sockets/Default/Subscription.cs @@ -149,14 +149,12 @@ namespace CryptoExchange.Net.Sockets.Default /// /// Handle a subscription query response /// - /// - public virtual void HandleSubQueryResponse(object? message) { } + public virtual void HandleSubQueryResponse(SocketConnection connection, object? message) { } /// /// Handle an unsubscription query response /// - /// - public virtual void HandleUnsubQueryResponse(object message) { } + public virtual void HandleUnsubQueryResponse(SocketConnection connection, object message) { } /// /// Create a new unsubscription query diff --git a/CryptoExchange.Net/TimeOffsetManager.cs b/CryptoExchange.Net/TimeOffsetManager.cs new file mode 100644 index 0000000..12f2746 --- /dev/null +++ b/CryptoExchange.Net/TimeOffsetManager.cs @@ -0,0 +1,150 @@ +using System; +using System.Collections.Concurrent; +using System.Threading; +using System.Threading.Tasks; + +namespace CryptoExchange.Net +{ + /// + /// Manager for timing offsets in APIs + /// + public static class TimeOffsetManager + { + class SocketTimeOffset + { + private DateTime _lastRollOver = DateTime.UtcNow; + private double? _fallbackLowest; + private double? _currentLowestOffset; + + /// + /// Get the estimated offset, resolves to the lowest offset in time measured in the last two minutes + /// + public double? Offset + { + get + { + if (_currentLowestOffset == null) + // If there is no current lowest offset return the fallback (which might or might not be null) + return _fallbackLowest; + + if (_fallbackLowest == null) + // If there is no fallback return the current lowest offset + return _currentLowestOffset; + + // If there is both a fallback and a current offset return the min offset of those + return Math.Min(_currentLowestOffset.Value, _fallbackLowest.Value); + } + } + + public void Update(double offsetMs) + { + if (_currentLowestOffset == null || _currentLowestOffset > offsetMs) + { + _currentLowestOffset = offsetMs; + _fallbackLowest = offsetMs; + } + + if (DateTime.UtcNow - _lastRollOver > TimeSpan.FromMinutes(1)) + { + _fallbackLowest = _currentLowestOffset; + _currentLowestOffset = null; + _lastRollOver = DateTime.UtcNow; + } + } + } + + class RestTimeOffset + { + public SemaphoreSlim SemaphoreSlim { get; } = new SemaphoreSlim(1, 1); + public DateTime? LastUpdate { get; set; } + public double? Offset { get; set; } + + public void Update(double offsetMs) + { + LastUpdate = DateTime.UtcNow; + Offset = offsetMs; + } + } + + private static ConcurrentDictionary _lastSocketDelays = new ConcurrentDictionary(); + private static ConcurrentDictionary _lastRestDelays = new ConcurrentDictionary(); + + /// + /// Update WebSocket API offset + /// + /// API name + /// Offset in milliseconds + public static void UpdateSocketOffset(string api, double offsetMs) + { + if (!_lastSocketDelays.TryGetValue(api, out var offsetValues)) + { + offsetValues = new SocketTimeOffset(); + _lastSocketDelays.TryAdd(api, offsetValues); + } + + _lastSocketDelays[api].Update(offsetMs); + } + + /// + /// Update REST API offset + /// + /// API name + /// Offset in milliseconds + public static void UpdateRestOffset(string api, double offsetMs) + { + _lastRestDelays[api].Update(offsetMs); + } + + /// + /// Get REST API offset + /// + /// API name + public static TimeSpan? GetRestOffset(string api) => _lastRestDelays.TryGetValue(api, out var val) && val.Offset != null ? TimeSpan.FromMilliseconds(val.Offset.Value) : null; + + /// + /// Get REST API last update time + /// + /// API name + public static DateTime? GetRestLastUpdateTime(string api) => _lastRestDelays.TryGetValue(api, out var val) && val.LastUpdate != null ? val.LastUpdate.Value : null; + + /// + /// Register a REST API client to be tracked + /// + /// + internal static void RegisterRestApi(string api) + { + _lastRestDelays[api] = new RestTimeOffset(); + } + + /// + /// Enter exclusive access for the API to update the time offset + /// + /// + /// + public static async ValueTask EnterAsync(string api) + { + await _lastRestDelays[api].SemaphoreSlim.WaitAsync().ConfigureAwait(false); + } + + /// + /// Release exclusive access for the API + /// + /// + public static void Release(string api) => _lastRestDelays[api].SemaphoreSlim.Release(); + + /// + /// Get WebSocket API offset + /// + /// API name + public static TimeSpan? GetSocketOffset(string api) => _lastSocketDelays.TryGetValue(api, out var val) && val.Offset != null ? TimeSpan.FromMilliseconds(val.Offset.Value) : null; + + /// + /// Reset the WebSocket API update timestamp to trigger a new time offset calculation + /// + /// API name + public static void ResetRestUpdateTime(string api) + { + _lastRestDelays[api].LastUpdate = null; + } + } +}