mirror of
https://github.com/JKorf/CryptoExchange.Net
synced 2026-04-07 10:11:10 +00:00
Time offset management (#266)
Updated time sync / time offset management for REST API's Added time offset tracking for WebSocket API's Added GetAuthenticationQuery virtual method on AuthenticationProvider Updated AuthenticationProvider GetTimestamp methods to include a one second offset by default Added AuthenticationProvider GetTimestamp methods for SocketApiClient instances Added ClientName property on BaseApiClient, resolving to the type name Added ObjectOrArrayConverter JsonConverterFactory implementation for resolving json data which might be returned as object or array Added UpdateServerTime, UpdateLocalTime and DataAge properties to (I)SymbolOrderBook Added OutputToConsoleAsync method to (I)SymbolOrderBook Updated SymbolOrderBook string representation Added DataTimeLocal and DataAge properties to DataEvent object Added SocketConnection parameter to subscription HandleSubQueryResponse and HandleUnsubQueryResponse methods
This commit is contained in:
parent
177daf903b
commit
a896fffdb3
@ -63,8 +63,6 @@ namespace CryptoExchange.Net.UnitTests
|
||||
|
||||
/// <inheritdoc />
|
||||
public override string FormatSymbol(string baseAsset, string quoteAsset, TradingMode futuresType, DateTime? deliverDate = null) => $"{baseAsset.ToUpperInvariant()}{quoteAsset.ToUpperInvariant()}";
|
||||
public override TimeSpan? GetTimeOffset() => null;
|
||||
public override TimeSyncInfo GetTimeSyncInfo() => null;
|
||||
protected override IStreamMessageAccessor CreateAccessor() => new SystemTextJsonStreamMessageAccessor(new System.Text.Json.JsonSerializerOptions());
|
||||
protected override IMessageSerializer CreateSerializer() => new SystemTextJsonMessageSerializer(new System.Text.Json.JsonSerializerOptions());
|
||||
protected override AuthenticationProvider CreateAuthenticationProvider(ApiCredentials credentials) => throw new NotImplementedException();
|
||||
|
||||
@ -160,11 +160,6 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations
|
||||
ParameterPositions[method] = position;
|
||||
}
|
||||
|
||||
public override TimeSpan? GetTimeOffset()
|
||||
{
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
protected override AuthenticationProvider CreateAuthenticationProvider(ApiCredentials credentials)
|
||||
=> new TestAuthProvider(credentials);
|
||||
|
||||
@ -172,11 +167,6 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations
|
||||
{
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
public override TimeSyncInfo GetTimeSyncInfo()
|
||||
{
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
}
|
||||
|
||||
public class TestRestApi2Client : RestApiClient
|
||||
@ -199,11 +189,6 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations
|
||||
return await SendAsync<T>("http://www.test.com", new RequestDefinition("/", HttpMethod.Get) { Weight = 0 }, null, ct);
|
||||
}
|
||||
|
||||
public override TimeSpan? GetTimeOffset()
|
||||
{
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
protected override AuthenticationProvider CreateAuthenticationProvider(ApiCredentials credentials)
|
||||
=> new TestAuthProvider(credentials);
|
||||
|
||||
@ -212,10 +197,6 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
public override TimeSyncInfo GetTimeSyncInfo()
|
||||
{
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
}
|
||||
|
||||
public class TestError
|
||||
|
||||
@ -11,6 +11,8 @@ using System.Linq;
|
||||
using System.Globalization;
|
||||
using System.Security.Cryptography;
|
||||
using System.Text;
|
||||
using CryptoExchange.Net.Sockets;
|
||||
using CryptoExchange.Net.Sockets.Default;
|
||||
|
||||
namespace CryptoExchange.Net.Authentication
|
||||
{
|
||||
@ -76,12 +78,20 @@ namespace CryptoExchange.Net.Authentication
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Authenticate a request
|
||||
/// Authenticate a REST request
|
||||
/// </summary>
|
||||
/// <param name="apiClient">The Api client sending the request</param>
|
||||
/// <param name="apiClient">The API client sending the request</param>
|
||||
/// <param name="requestConfig">The request configuration</param>
|
||||
public abstract void ProcessRequest(RestApiClient apiClient, RestRequestConfiguration requestConfig);
|
||||
|
||||
/// <summary>
|
||||
/// Get an authentication query for a websocket
|
||||
/// </summary>
|
||||
/// <param name="apiClient">The API client sending the request</param>
|
||||
/// <param name="connection">The connection to authenticate</param>
|
||||
/// <param name="context">Optional context required for creating the authentication query</param>
|
||||
public virtual Query? GetAuthenticationQuery(SocketApiClient apiClient, SocketConnection connection, Dictionary<string, object?>? context = null) => null;
|
||||
|
||||
/// <summary>
|
||||
/// SHA256 sign the data and return the bytes
|
||||
/// </summary>
|
||||
@ -494,32 +504,50 @@ namespace CryptoExchange.Net.Authentication
|
||||
/// <summary>
|
||||
/// Get current timestamp including the time sync offset from the api client
|
||||
/// </summary>
|
||||
/// <param name="apiClient"></param>
|
||||
/// <returns></returns>
|
||||
protected DateTime GetTimestamp(RestApiClient apiClient)
|
||||
protected DateTime GetTimestamp(RestApiClient apiClient, bool includeOneSecondOffset = true)
|
||||
{
|
||||
return TimeProvider.GetTime().Add(apiClient.GetTimeOffset() ?? TimeSpan.Zero)!;
|
||||
var result = TimeProvider.GetTime().Add(TimeOffsetManager.GetRestOffset(apiClient.ClientName) ?? TimeSpan.Zero)!;
|
||||
if (includeOneSecondOffset)
|
||||
result = result.AddSeconds(-1);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Get current timestamp including the time sync offset from the api client
|
||||
/// </summary>
|
||||
protected DateTime GetTimestamp(SocketApiClient apiClient, bool includeOneSecondOffset = true)
|
||||
{
|
||||
var result = TimeProvider.GetTime().Add(TimeOffsetManager.GetSocketOffset(apiClient.ClientName) ?? TimeSpan.Zero)!;
|
||||
if (includeOneSecondOffset)
|
||||
result = result.AddSeconds(-1);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Get millisecond timestamp as a string including the time sync offset from the api client
|
||||
/// </summary>
|
||||
/// <param name="apiClient"></param>
|
||||
/// <returns></returns>
|
||||
protected string GetMillisecondTimestamp(RestApiClient apiClient)
|
||||
{
|
||||
return DateTimeConverter.ConvertToMilliseconds(GetTimestamp(apiClient)).Value.ToString(CultureInfo.InvariantCulture);
|
||||
}
|
||||
protected string GetMillisecondTimestamp(RestApiClient apiClient, bool includeOneSecondOffset = true)
|
||||
=> DateTimeConverter.ConvertToMilliseconds(GetTimestamp(apiClient, includeOneSecondOffset)).Value.ToString(CultureInfo.InvariantCulture);
|
||||
|
||||
/// <summary>
|
||||
/// Get millisecond timestamp as a string including the time sync offset from the api client
|
||||
/// </summary>
|
||||
protected string GetMillisecondTimestamp(SocketApiClient apiClient, bool includeOneSecondOffset = true)
|
||||
=> DateTimeConverter.ConvertToMilliseconds(GetTimestamp(apiClient, includeOneSecondOffset)).Value.ToString(CultureInfo.InvariantCulture);
|
||||
|
||||
/// <summary>
|
||||
/// Get millisecond timestamp as a long including the time sync offset from the api client
|
||||
/// </summary>
|
||||
/// <param name="apiClient"></param>
|
||||
/// <returns></returns>
|
||||
protected long GetMillisecondTimestampLong(RestApiClient apiClient)
|
||||
{
|
||||
return DateTimeConverter.ConvertToMilliseconds(GetTimestamp(apiClient)).Value;
|
||||
}
|
||||
protected long GetMillisecondTimestampLong(RestApiClient apiClient, bool includeOneSecondOffset = true)
|
||||
=> DateTimeConverter.ConvertToMilliseconds(GetTimestamp(apiClient, includeOneSecondOffset)).Value;
|
||||
|
||||
/// <summary>
|
||||
/// Get millisecond timestamp as a long including the time sync offset from the api client
|
||||
/// </summary>
|
||||
protected long GetMillisecondTimestampLong(SocketApiClient apiClient, bool includeOneSecondOffset = true)
|
||||
=> DateTimeConverter.ConvertToMilliseconds(GetTimestamp(apiClient, includeOneSecondOffset)).Value;
|
||||
|
||||
/// <summary>
|
||||
/// Return the serialized request body
|
||||
|
||||
@ -13,6 +13,8 @@ namespace CryptoExchange.Net.Clients
|
||||
/// </summary>
|
||||
public abstract class BaseApiClient : IDisposable, IBaseApiClient
|
||||
{
|
||||
private string? _clientName;
|
||||
|
||||
/// <summary>
|
||||
/// Logger
|
||||
/// </summary>
|
||||
@ -23,6 +25,21 @@ namespace CryptoExchange.Net.Clients
|
||||
/// </summary>
|
||||
protected bool _disposing;
|
||||
|
||||
/// <summary>
|
||||
/// Name of the client
|
||||
/// </summary>
|
||||
protected internal string ClientName
|
||||
{
|
||||
get
|
||||
{
|
||||
if (_clientName != null)
|
||||
return _clientName;
|
||||
|
||||
_clientName = GetType().Name;
|
||||
return _clientName;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// The authentication provider for this API client. (null if no credentials are set)
|
||||
/// </summary>
|
||||
|
||||
@ -32,12 +32,6 @@ namespace CryptoExchange.Net.Clients
|
||||
/// <inheritdoc />
|
||||
public IRequestFactory RequestFactory { get; set; } = new RequestFactory();
|
||||
|
||||
/// <inheritdoc />
|
||||
public abstract TimeSyncInfo? GetTimeSyncInfo();
|
||||
|
||||
/// <inheritdoc />
|
||||
public abstract TimeSpan? GetTimeOffset();
|
||||
|
||||
/// <inheritdoc />
|
||||
public int TotalRequestsMade { get; set; }
|
||||
|
||||
@ -115,6 +109,8 @@ namespace CryptoExchange.Net.Clients
|
||||
options,
|
||||
apiOptions)
|
||||
{
|
||||
TimeOffsetManager.RegisterRestApi(ClientName);
|
||||
|
||||
RequestFactory.Configure(options, httpClient);
|
||||
}
|
||||
|
||||
@ -241,11 +237,9 @@ namespace CryptoExchange.Net.Clients
|
||||
{
|
||||
currentTry++;
|
||||
|
||||
var error = await CheckTimeSync(requestId, definition).ConfigureAwait(false);
|
||||
if (error != null)
|
||||
return new WebCallResult<T>(error);
|
||||
await CheckTimeSync(requestId, definition).ConfigureAwait(false);
|
||||
|
||||
error = await RateLimitAsync(
|
||||
var error = await RateLimitAsync(
|
||||
baseAddress,
|
||||
requestId,
|
||||
definition,
|
||||
@ -300,28 +294,6 @@ namespace CryptoExchange.Net.Clients
|
||||
}
|
||||
}
|
||||
|
||||
private async ValueTask<Error?> CheckTimeSync(int requestId, RequestDefinition definition)
|
||||
{
|
||||
if (!definition.Authenticated)
|
||||
return null;
|
||||
|
||||
var syncTask = SyncTimeAsync();
|
||||
var timeSyncInfo = GetTimeSyncInfo();
|
||||
|
||||
if (timeSyncInfo != null && timeSyncInfo.TimeSyncState.LastSyncTime == default)
|
||||
{
|
||||
// Initially with first request we'll need to wait for the time syncing, if it's not the first request we can just continue
|
||||
var syncTimeError = await syncTask.ConfigureAwait(false);
|
||||
if (syncTimeError != null)
|
||||
{
|
||||
_logger.RestApiFailedToSyncTime(requestId, syncTimeError!.ToString());
|
||||
return syncTimeError;
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Check rate limits for the request
|
||||
/// </summary>
|
||||
@ -725,26 +697,44 @@ namespace CryptoExchange.Net.Clients
|
||||
RequestFactory.UpdateSettings(options.Proxy, options.RequestTimeout ?? ClientOptions.RequestTimeout, ClientOptions.HttpKeepAliveInterval);
|
||||
}
|
||||
|
||||
internal async ValueTask<Error?> SyncTimeAsync()
|
||||
private async ValueTask CheckTimeSync(int requestId, RequestDefinition definition)
|
||||
{
|
||||
var timeSyncParams = GetTimeSyncInfo();
|
||||
if (timeSyncParams == null)
|
||||
return null;
|
||||
if (!definition.Authenticated)
|
||||
return;
|
||||
|
||||
if (await timeSyncParams.TimeSyncState.Semaphore.WaitAsync(0).ConfigureAwait(false))
|
||||
var lastUpdateTime = TimeOffsetManager.GetRestLastUpdateTime(ClientName);
|
||||
var syncTask = CheckTimeOffsetAsync();
|
||||
|
||||
if (lastUpdateTime == null)
|
||||
{
|
||||
if (!timeSyncParams.SyncTime || DateTime.UtcNow - timeSyncParams.TimeSyncState.LastSyncTime < timeSyncParams.RecalculationInterval)
|
||||
{
|
||||
timeSyncParams.TimeSyncState.Semaphore.Release();
|
||||
return null;
|
||||
// Initially with first request we'll need to wait for the time syncing before making the actual request.
|
||||
// If it's not the first request we can just continue and let it complete in the background
|
||||
await syncTask.ConfigureAwait(false);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
internal async ValueTask CheckTimeOffsetAsync()
|
||||
{
|
||||
if (!(ApiOptions.AutoTimestamp ?? ClientOptions.AutoTimestamp))
|
||||
// Time syncing not enabled
|
||||
return;
|
||||
|
||||
await TimeOffsetManager.EnterAsync(ClientName).ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
var lastUpdateTime = TimeOffsetManager.GetRestLastUpdateTime(ClientName);
|
||||
if (DateTime.UtcNow - lastUpdateTime < (ApiOptions.TimestampRecalculationInterval ?? ClientOptions.TimestampRecalculationInterval))
|
||||
// Time syncing was recently done
|
||||
return;
|
||||
|
||||
var localTime = DateTime.UtcNow;
|
||||
var result = await GetServerTimestampAsync().ConfigureAwait(false);
|
||||
if (!result)
|
||||
{
|
||||
timeSyncParams.TimeSyncState.Semaphore.Release();
|
||||
return result.Error;
|
||||
_logger.LogWarning("Failed to determine time offset between client and server, timestamping might fail");
|
||||
return;
|
||||
}
|
||||
|
||||
if (TotalRequestsMade == 1)
|
||||
@ -754,18 +744,29 @@ namespace CryptoExchange.Net.Clients
|
||||
result = await GetServerTimestampAsync().ConfigureAwait(false);
|
||||
if (!result)
|
||||
{
|
||||
timeSyncParams.TimeSyncState.Semaphore.Release();
|
||||
return result.Error;
|
||||
_logger.LogWarning("Failed to determine time offset between client and server, timestamping might fail");
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Calculate time offset between local and server
|
||||
// Estimate the offset as the round trip time / 2
|
||||
var offset = result.Data - localTime.AddMilliseconds(result.ResponseTime!.Value.TotalMilliseconds / 2);
|
||||
timeSyncParams.UpdateTimeOffset(offset);
|
||||
timeSyncParams.TimeSyncState.Semaphore.Release();
|
||||
if (offset.TotalMilliseconds > 0 && offset.TotalMilliseconds < 500)
|
||||
{
|
||||
_logger.LogInformation("{ClientName} Time offset within limits ({Offset}ms), set offset to 0ms", ClientName, Math.Round(offset.TotalMilliseconds));
|
||||
offset = TimeSpan.Zero;
|
||||
}
|
||||
else
|
||||
{
|
||||
_logger.LogInformation("{ClientName} Time offset set to {Offset}ms", ClientName, Math.Round(offset.TotalMilliseconds));
|
||||
}
|
||||
|
||||
return null;
|
||||
TimeOffsetManager.UpdateRestOffset(ClientName, offset.TotalMilliseconds);
|
||||
}
|
||||
finally
|
||||
{
|
||||
TimeOffsetManager.Release(ClientName);
|
||||
}
|
||||
}
|
||||
|
||||
private bool ShouldCache(RequestDefinition definition)
|
||||
|
||||
@ -32,8 +32,10 @@ namespace CryptoExchange.Net.Clients
|
||||
public abstract class SocketApiClient : BaseApiClient, ISocketApiClient
|
||||
{
|
||||
#region Fields
|
||||
|
||||
/// <inheritdoc/>
|
||||
public IWebsocketFactory SocketFactory { get; set; } = new WebsocketFactory();
|
||||
|
||||
/// <inheritdoc/>
|
||||
public IHighPerfConnectionFactory? HighPerfConnectionFactory { get; set; }
|
||||
|
||||
@ -181,6 +183,24 @@ namespace CryptoExchange.Net.Clients
|
||||
DedicatedConnectionConfigs.Add(new DedicatedConnectionConfig() { SocketAddress = url, Authenticated = auth });
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Update the timestamp offset between client and server based on the timestamp
|
||||
/// </summary>
|
||||
/// <param name="timestamp">Timestamp received from the server</param>
|
||||
public virtual void UpdateTimeOffset(DateTime timestamp)
|
||||
{
|
||||
if (timestamp == default)
|
||||
return;
|
||||
|
||||
TimeOffsetManager.UpdateSocketOffset(ClientName, (DateTime.UtcNow - timestamp).TotalMilliseconds);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Get the time offset between client and server
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
public virtual TimeSpan? GetTimeOffset() => TimeOffsetManager.GetSocketOffset(ClientName);
|
||||
|
||||
/// <summary>
|
||||
/// Add a query to periodically send on each connection
|
||||
/// </summary>
|
||||
@ -296,7 +316,7 @@ namespace CryptoExchange.Net.Clients
|
||||
if (!success)
|
||||
return;
|
||||
|
||||
subscription.HandleSubQueryResponse(response);
|
||||
subscription.HandleSubQueryResponse(socketConnection, response);
|
||||
subscription.Status = SubscriptionStatus.Subscribed;
|
||||
if (ct != default)
|
||||
{
|
||||
@ -575,7 +595,8 @@ namespace CryptoExchange.Net.Clients
|
||||
/// Should return the request which can be used to authenticate a socket connection
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
protected internal virtual Task<Query?> GetAuthenticationRequestAsync(SocketConnection connection) => throw new NotImplementedException();
|
||||
protected internal virtual Task<Query?> GetAuthenticationRequestAsync(SocketConnection connection) =>
|
||||
Task.FromResult(AuthenticationProvider!.GetAuthenticationQuery(this, connection));
|
||||
|
||||
/// <summary>
|
||||
/// Adds a system subscription. Used for example to reply to ping requests
|
||||
|
||||
@ -0,0 +1,58 @@
|
||||
using System;
|
||||
using System.Text.Json;
|
||||
using System.Text.Json.Serialization;
|
||||
|
||||
#pragma warning disable IL2026 // Members annotated with 'RequiresUnreferencedCodeAttribute' require dynamic access otherwise can break functionality when trimming application code
|
||||
#pragma warning disable IL3050 // Calling members annotated with 'RequiresDynamicCodeAttribute' may break functionality when AOT compiling.
|
||||
|
||||
namespace CryptoExchange.Net.Converters.SystemTextJson
|
||||
{
|
||||
/// <summary>
|
||||
/// Converter for parsing object or array responses
|
||||
/// </summary>
|
||||
public class ObjectOrArrayConverter : JsonConverterFactory
|
||||
{
|
||||
/// <inheritdoc />
|
||||
public override bool CanConvert(Type typeToConvert) => true;
|
||||
/// <inheritdoc />
|
||||
public override JsonConverter? CreateConverter(Type typeToConvert, JsonSerializerOptions options)
|
||||
{
|
||||
var type = typeof(InternalObjectOrArrayConverter<>).MakeGenericType(typeToConvert);
|
||||
return (JsonConverter)Activator.CreateInstance(type)!;
|
||||
}
|
||||
|
||||
private class InternalObjectOrArrayConverter<T> : JsonConverter<T>
|
||||
{
|
||||
public override T? Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
|
||||
{
|
||||
if (reader.TokenType == JsonTokenType.StartObject && !typeToConvert.IsArray)
|
||||
{
|
||||
// Object to object
|
||||
return JsonDocument.ParseValue(ref reader).Deserialize<T>(options);
|
||||
}
|
||||
else if (reader.TokenType == JsonTokenType.StartArray && typeToConvert.IsArray)
|
||||
{
|
||||
// Array to array
|
||||
return JsonDocument.ParseValue(ref reader).Deserialize<T>(options);
|
||||
}
|
||||
else if (reader.TokenType == JsonTokenType.StartArray)
|
||||
{
|
||||
// Array to object
|
||||
JsonDocument.ParseValue(ref reader).Deserialize<T[]>(options);
|
||||
return default;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Object to array
|
||||
JsonDocument.ParseValue(ref reader);
|
||||
return default;
|
||||
}
|
||||
}
|
||||
|
||||
public override void Write(Utf8JsonWriter writer, T value, JsonSerializerOptions options)
|
||||
{
|
||||
JsonSerializer.Serialize(writer, value, options);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -258,6 +258,12 @@ namespace CryptoExchange.Net
|
||||
return value;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Generate a long value between two values
|
||||
/// </summary>
|
||||
/// <param name="minValue">Min value</param>
|
||||
/// <param name="maxValue">Max value</param>
|
||||
/// <returns></returns>
|
||||
public static long RandomLong(long minValue, long maxValue)
|
||||
{
|
||||
#if NET8_0_OR_GREATER
|
||||
|
||||
@ -47,9 +47,21 @@ namespace CryptoExchange.Net.Interfaces
|
||||
/// </summary>
|
||||
event Action<(ISymbolOrderBookEntry BestBid, ISymbolOrderBookEntry BestAsk)> OnBestOffersChanged;
|
||||
/// <summary>
|
||||
/// Timestamp of the last update
|
||||
/// Timestamp of when the last update was applied to the book, local time
|
||||
/// </summary>
|
||||
DateTime UpdateTime { get; }
|
||||
/// <summary>
|
||||
/// Timestamp of the last event that was applied, server time
|
||||
/// </summary>
|
||||
DateTime? UpdateServerTime { get; }
|
||||
/// <summary>
|
||||
/// Timestamp of the last event that was applied, in local time, estimated based on timestamp difference between client and server
|
||||
/// </summary>
|
||||
DateTime? UpdateLocalTime { get; }
|
||||
/// <summary>
|
||||
/// Age of the data, in local time, estimated based on timestamp difference between client and server + the period since last update
|
||||
/// </summary>
|
||||
TimeSpan? DataAge { get; }
|
||||
|
||||
/// <summary>
|
||||
/// The number of asks in the book
|
||||
@ -126,5 +138,13 @@ namespace CryptoExchange.Net.Interfaces
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
string ToString(int rows);
|
||||
|
||||
/// <summary>
|
||||
/// Output the orderbook to the console
|
||||
/// </summary>
|
||||
/// <param name="numberOfEntries">Number of rows to display</param>
|
||||
/// <param name="refreshInterval">Refresh interval</param>
|
||||
/// <param name="ct">Cancellation token</param>
|
||||
Task OutputToConsoleAsync(int numberOfEntries, TimeSpan refreshInterval, CancellationToken ct = default);
|
||||
}
|
||||
}
|
||||
|
||||
@ -28,6 +28,7 @@ namespace CryptoExchange.Net.Logging.Extensions
|
||||
private static readonly Action<ILogger, string, string, Exception?> _orderBookReconnectingSocket;
|
||||
private static readonly Action<ILogger, string, string, long, long, Exception?> _orderBookSkippedMessage;
|
||||
private static readonly Action<ILogger, string, string, long, long, Exception?> _orderBookProcessedMessage;
|
||||
private static readonly Action<ILogger, string, string, long, Exception?> _orderBookProcessedMessageSingle;
|
||||
private static readonly Action<ILogger, string, string, long, long, Exception?> _orderBookOutOfSync;
|
||||
|
||||
static SymbolOrderBookLoggingExtensions()
|
||||
@ -136,6 +137,11 @@ namespace CryptoExchange.Net.Logging.Extensions
|
||||
LogLevel.Warning,
|
||||
new EventId(5020, "OrderBookOutOfSyncChecksum"),
|
||||
"{Api} order book {Symbol} out of sync. Checksum mismatch, resyncing");
|
||||
|
||||
_orderBookProcessedMessageSingle = LoggerMessage.Define<string, string, long>(
|
||||
LogLevel.Trace,
|
||||
new EventId(5021, "OrderBookProcessedMessage"),
|
||||
"{Api} order book {Symbol} update processed #{UpdateId}");
|
||||
}
|
||||
|
||||
public static void OrderBookStatusChanged(this ILogger logger, string api, string symbol, OrderBookStatus previousStatus, OrderBookStatus newStatus)
|
||||
@ -229,6 +235,10 @@ namespace CryptoExchange.Net.Logging.Extensions
|
||||
_orderBookProcessedMessage(logger, api, symbol, firstUpdateId, lastUpdateId, null);
|
||||
}
|
||||
|
||||
public static void OrderBookProcessedMessage(this ILogger logger, string api, string symbol, long updateId)
|
||||
{
|
||||
_orderBookProcessedMessageSingle(logger, api, symbol, updateId, null);
|
||||
}
|
||||
public static void OrderBookOutOfSyncChecksum(this ILogger logger, string api, string symbol)
|
||||
{
|
||||
_orderBookOutOfSyncChecksum(logger, api, symbol, null);
|
||||
|
||||
@ -18,6 +18,16 @@ namespace CryptoExchange.Net.Objects.Sockets
|
||||
/// </summary>
|
||||
public DateTime? DataTime { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// The timestamp of the data in local time. Note that this is an estimation based on average delay from the server.
|
||||
/// </summary>
|
||||
public DateTime? DataTimeLocal { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// The age of the data. Note that this is an estimation based on average delay from the server.
|
||||
/// </summary>
|
||||
public TimeSpan? DataAge => DateTime.UtcNow - DataTimeLocal;
|
||||
|
||||
/// <summary>
|
||||
/// The stream producing the update
|
||||
/// </summary>
|
||||
@ -119,9 +129,16 @@ namespace CryptoExchange.Net.Objects.Sockets
|
||||
/// <summary>
|
||||
/// Specify the data timestamp
|
||||
/// </summary>
|
||||
public DataEvent<T> WithDataTimestamp(DateTime? timestamp)
|
||||
public DataEvent<T> WithDataTimestamp(DateTime? timestamp, TimeSpan? offset)
|
||||
{
|
||||
if (timestamp == null || timestamp == default(DateTime))
|
||||
return this;
|
||||
|
||||
DataTime = timestamp;
|
||||
if (offset == null)
|
||||
return this;
|
||||
|
||||
DataTimeLocal = DataTime + offset;
|
||||
return this;
|
||||
}
|
||||
|
||||
|
||||
@ -1,95 +0,0 @@
|
||||
using System;
|
||||
using System.Threading;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
namespace CryptoExchange.Net.Objects
|
||||
{
|
||||
/// <summary>
|
||||
/// The time synchronization state of an API client
|
||||
/// </summary>
|
||||
public class TimeSyncState
|
||||
{
|
||||
/// <summary>
|
||||
/// Name of the API
|
||||
/// </summary>
|
||||
public string ApiName { get; set; }
|
||||
/// <summary>
|
||||
/// Semaphore to use for checking the time syncing. Should be shared instance among the API client
|
||||
/// </summary>
|
||||
public SemaphoreSlim Semaphore { get; }
|
||||
/// <summary>
|
||||
/// Last sync time for the API client
|
||||
/// </summary>
|
||||
public DateTime LastSyncTime { get; set; }
|
||||
/// <summary>
|
||||
/// Time offset for the API client
|
||||
/// </summary>
|
||||
public TimeSpan TimeOffset { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// ctor
|
||||
/// </summary>
|
||||
public TimeSyncState(string apiName)
|
||||
{
|
||||
ApiName = apiName;
|
||||
Semaphore = new SemaphoreSlim(1, 1);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Time synchronization info
|
||||
/// </summary>
|
||||
public class TimeSyncInfo
|
||||
{
|
||||
/// <summary>
|
||||
/// Logger
|
||||
/// </summary>
|
||||
public ILogger Logger { get; }
|
||||
/// <summary>
|
||||
/// Should synchronize time
|
||||
/// </summary>
|
||||
public bool SyncTime { get; }
|
||||
/// <summary>
|
||||
/// Timestamp recalulcation interval
|
||||
/// </summary>
|
||||
public TimeSpan RecalculationInterval { get; }
|
||||
/// <summary>
|
||||
/// Time sync state for the API client
|
||||
/// </summary>
|
||||
public TimeSyncState TimeSyncState { get; }
|
||||
|
||||
/// <summary>
|
||||
/// ctor
|
||||
/// </summary>
|
||||
/// <param name="logger"></param>
|
||||
/// <param name="recalculationInterval"></param>
|
||||
/// <param name="syncTime"></param>
|
||||
/// <param name="syncState"></param>
|
||||
public TimeSyncInfo(ILogger logger, bool syncTime, TimeSpan recalculationInterval, TimeSyncState syncState)
|
||||
{
|
||||
Logger = logger;
|
||||
SyncTime = syncTime;
|
||||
RecalculationInterval = recalculationInterval;
|
||||
TimeSyncState = syncState;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Set the time offset
|
||||
/// </summary>
|
||||
/// <param name="offset"></param>
|
||||
public void UpdateTimeOffset(TimeSpan offset)
|
||||
{
|
||||
TimeSyncState.LastSyncTime = DateTime.UtcNow;
|
||||
if (offset.TotalMilliseconds > 0 && offset.TotalMilliseconds < 500)
|
||||
{
|
||||
Logger.Log(LogLevel.Information, "{TimeSyncState.ApiName} Time offset within limits, set offset to 0ms", TimeSyncState.ApiName);
|
||||
TimeSyncState.TimeOffset = TimeSpan.Zero;
|
||||
}
|
||||
else
|
||||
{
|
||||
Logger.Log(LogLevel.Information, "{TimeSyncState.ApiName} Time offset set to {Offset}ms", TimeSyncState.ApiName, Math.Round(offset.TotalMilliseconds));
|
||||
TimeSyncState.TimeOffset = offset;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -5,6 +5,8 @@ namespace CryptoExchange.Net.OrderBook
|
||||
{
|
||||
internal class ProcessQueueItem
|
||||
{
|
||||
public DateTime? LocalDataTime { get; set; }
|
||||
public DateTime? ServerDataTime { get; set; }
|
||||
public long StartUpdateId { get; set; }
|
||||
public long EndUpdateId { get; set; }
|
||||
public ISymbolOrderBookEntry[] Bids { get; set; } = Array.Empty<ISymbolOrderBookEntry>();
|
||||
@ -13,6 +15,8 @@ namespace CryptoExchange.Net.OrderBook
|
||||
|
||||
internal class InitialOrderBookItem
|
||||
{
|
||||
public DateTime? LocalDataTime { get; set; }
|
||||
public DateTime? ServerDataTime { get; set; }
|
||||
public long StartUpdateId { get; set; }
|
||||
public long EndUpdateId { get; set; }
|
||||
public ISymbolOrderBookEntry[] Bids { get; set; } = Array.Empty<ISymbolOrderBookEntry>();
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
using System;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Collections.Generic;
|
||||
using System.Diagnostics;
|
||||
using System.Globalization;
|
||||
using System.Linq;
|
||||
using System.Text;
|
||||
@ -133,6 +134,15 @@ namespace CryptoExchange.Net.OrderBook
|
||||
/// <inheritdoc/>
|
||||
public DateTime UpdateTime { get; private set; }
|
||||
|
||||
/// <inheritdoc/>
|
||||
public DateTime? UpdateServerTime { get; private set; }
|
||||
|
||||
/// <inheritdoc/>
|
||||
public DateTime? UpdateLocalTime { get; set; }
|
||||
|
||||
/// <inheritdoc/>
|
||||
public TimeSpan? DataAge => DateTime.UtcNow - UpdateLocalTime;
|
||||
|
||||
/// <inheritdoc/>
|
||||
public int AskCount { get; private set; }
|
||||
|
||||
@ -406,8 +416,6 @@ namespace CryptoExchange.Net.OrderBook
|
||||
/// Implementation for validating a checksum value with the current order book. If checksum validation fails (returns false)
|
||||
/// the order book will be resynchronized
|
||||
/// </summary>
|
||||
/// <param name="checksum"></param>
|
||||
/// <returns></returns>
|
||||
protected virtual bool DoChecksum(int checksum) => true;
|
||||
|
||||
/// <summary>
|
||||
@ -417,9 +425,25 @@ namespace CryptoExchange.Net.OrderBook
|
||||
/// <param name="orderBookSequenceNumber">The last update sequence number until which the snapshot is in sync</param>
|
||||
/// <param name="askList">List of asks</param>
|
||||
/// <param name="bidList">List of bids</param>
|
||||
protected void SetInitialOrderBook(long orderBookSequenceNumber, ISymbolOrderBookEntry[] bidList, ISymbolOrderBookEntry[] askList)
|
||||
/// <param name="serverDataTime">Server data timestamp</param>
|
||||
/// <param name="localDataTime">local data timestamp</param>
|
||||
protected void SetInitialOrderBook(
|
||||
long orderBookSequenceNumber,
|
||||
ISymbolOrderBookEntry[] bidList,
|
||||
ISymbolOrderBookEntry[] askList,
|
||||
DateTime? serverDataTime = null,
|
||||
DateTime? localDataTime = null)
|
||||
{
|
||||
_processQueue.Enqueue(new InitialOrderBookItem { StartUpdateId = orderBookSequenceNumber, EndUpdateId = orderBookSequenceNumber, Asks = askList, Bids = bidList });
|
||||
_processQueue.Enqueue(
|
||||
new InitialOrderBookItem
|
||||
{
|
||||
LocalDataTime = localDataTime,
|
||||
ServerDataTime = serverDataTime,
|
||||
StartUpdateId = orderBookSequenceNumber,
|
||||
EndUpdateId = orderBookSequenceNumber,
|
||||
Asks = askList,
|
||||
Bids = bidList
|
||||
});
|
||||
_queueEvent.Set();
|
||||
}
|
||||
|
||||
@ -429,9 +453,25 @@ namespace CryptoExchange.Net.OrderBook
|
||||
/// <param name="updateId">The sequence number</param>
|
||||
/// <param name="bids">List of updated/new bids</param>
|
||||
/// <param name="asks">List of updated/new asks</param>
|
||||
protected void UpdateOrderBook(long updateId, ISymbolOrderBookEntry[] bids, ISymbolOrderBookEntry[] asks)
|
||||
/// <param name="serverDataTime">Server data timestamp</param>
|
||||
/// <param name="localDataTime">local data timestamp</param>
|
||||
protected void UpdateOrderBook(
|
||||
long updateId,
|
||||
ISymbolOrderBookEntry[] bids,
|
||||
ISymbolOrderBookEntry[] asks,
|
||||
DateTime? serverDataTime = null,
|
||||
DateTime? localDataTime = null)
|
||||
{
|
||||
_processQueue.Enqueue(new ProcessQueueItem { StartUpdateId = updateId, EndUpdateId = updateId, Asks = asks, Bids = bids });
|
||||
_processQueue.Enqueue(
|
||||
new ProcessQueueItem
|
||||
{
|
||||
LocalDataTime = localDataTime,
|
||||
ServerDataTime = serverDataTime,
|
||||
StartUpdateId = updateId,
|
||||
EndUpdateId = updateId,
|
||||
Asks = asks,
|
||||
Bids = bids
|
||||
});
|
||||
_queueEvent.Set();
|
||||
}
|
||||
|
||||
@ -442,9 +482,26 @@ namespace CryptoExchange.Net.OrderBook
|
||||
/// <param name="lastUpdateId">The sequence number of the last update</param>
|
||||
/// <param name="bids">List of updated/new bids</param>
|
||||
/// <param name="asks">List of updated/new asks</param>
|
||||
protected void UpdateOrderBook(long firstUpdateId, long lastUpdateId, ISymbolOrderBookEntry[] bids, ISymbolOrderBookEntry[] asks)
|
||||
/// <param name="serverDataTime">Server data timestamp</param>
|
||||
/// <param name="localDataTime">local data timestamp</param>
|
||||
protected void UpdateOrderBook(
|
||||
long firstUpdateId,
|
||||
long lastUpdateId,
|
||||
ISymbolOrderBookEntry[] bids,
|
||||
ISymbolOrderBookEntry[] asks,
|
||||
DateTime? serverDataTime = null,
|
||||
DateTime? localDataTime = null)
|
||||
{
|
||||
_processQueue.Enqueue(new ProcessQueueItem { StartUpdateId = firstUpdateId, EndUpdateId = lastUpdateId, Asks = asks, Bids = bids });
|
||||
_processQueue.Enqueue(
|
||||
new ProcessQueueItem
|
||||
{
|
||||
LocalDataTime = localDataTime,
|
||||
ServerDataTime = serverDataTime,
|
||||
StartUpdateId = firstUpdateId,
|
||||
EndUpdateId = lastUpdateId,
|
||||
Asks = asks,
|
||||
Bids = bids
|
||||
});
|
||||
_queueEvent.Set();
|
||||
}
|
||||
|
||||
@ -453,12 +510,27 @@ namespace CryptoExchange.Net.OrderBook
|
||||
/// </summary>
|
||||
/// <param name="bids">List of updated/new bids</param>
|
||||
/// <param name="asks">List of updated/new asks</param>
|
||||
protected void UpdateOrderBook(ISymbolOrderSequencedBookEntry[] bids, ISymbolOrderSequencedBookEntry[] asks)
|
||||
/// <param name="serverDataTime">Server data timestamp</param>
|
||||
/// <param name="localDataTime">local data timestamp</param>
|
||||
protected void UpdateOrderBook(
|
||||
ISymbolOrderSequencedBookEntry[] bids,
|
||||
ISymbolOrderSequencedBookEntry[] asks,
|
||||
DateTime? serverDataTime = null,
|
||||
DateTime? localDataTime = null)
|
||||
{
|
||||
var highest = Math.Max(bids.Any() ? bids.Max(b => b.Sequence) : 0, asks.Any() ? asks.Max(a => a.Sequence) : 0);
|
||||
var lowest = Math.Min(bids.Any() ? bids.Min(b => b.Sequence) : long.MaxValue, asks.Any() ? asks.Min(a => a.Sequence) : long.MaxValue);
|
||||
|
||||
_processQueue.Enqueue(new ProcessQueueItem { StartUpdateId = lowest, EndUpdateId = highest, Asks = asks, Bids = bids });
|
||||
_processQueue.Enqueue(
|
||||
new ProcessQueueItem
|
||||
{
|
||||
LocalDataTime = localDataTime,
|
||||
ServerDataTime = serverDataTime,
|
||||
StartUpdateId = lowest,
|
||||
EndUpdateId = highest,
|
||||
Asks = asks,
|
||||
Bids = bids
|
||||
});
|
||||
_queueEvent.Set();
|
||||
}
|
||||
|
||||
@ -614,6 +686,12 @@ namespace CryptoExchange.Net.OrderBook
|
||||
{
|
||||
var stringBuilder = new StringBuilder();
|
||||
var book = Book;
|
||||
stringBuilder.AppendLine($"{Exchange} - {Symbol}");
|
||||
stringBuilder.AppendLine($"Update time local: {UpdateTime:HH:mm:ss.fff} ({Math.Round((DateTime.UtcNow - UpdateTime).TotalMilliseconds)}ms ago)");
|
||||
stringBuilder.AppendLine($"Data timestamp server: {UpdateServerTime:HH:mm:ss.fff}");
|
||||
stringBuilder.AppendLine($"Data timestamp local: {UpdateLocalTime:HH:mm:ss.fff}");
|
||||
stringBuilder.AppendLine($"Data age: {DataAge?.TotalMilliseconds}ms");
|
||||
stringBuilder.AppendLine();
|
||||
stringBuilder.AppendLine($" Ask quantity Ask price | Bid price Bid quantity");
|
||||
for(var i = 0; i < numberOfEntries; i++)
|
||||
{
|
||||
@ -625,6 +703,22 @@ namespace CryptoExchange.Net.OrderBook
|
||||
return stringBuilder.ToString();
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public Task OutputToConsoleAsync(int numberOfEntries, TimeSpan refreshInterval, CancellationToken ct = default)
|
||||
{
|
||||
return Task.Run(async () =>
|
||||
{
|
||||
var referenceTime = DateTime.UtcNow;
|
||||
while (!ct.IsCancellationRequested)
|
||||
{
|
||||
Console.Clear();
|
||||
Console.WriteLine(ToString(numberOfEntries));
|
||||
var delay = Math.Max(1, (DateTime.UtcNow - referenceTime).TotalMilliseconds % refreshInterval.TotalMilliseconds);
|
||||
try { await Task.Delay(refreshInterval.Add(TimeSpan.FromMilliseconds(-delay)), ct).ConfigureAwait(false); } catch { }
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void CheckBestOffersChanged(ISymbolOrderBookEntry prevBestBid, ISymbolOrderBookEntry prevBestAsk)
|
||||
{
|
||||
var (bestBid, bestAsk) = BestOffers;
|
||||
@ -708,6 +802,9 @@ namespace CryptoExchange.Net.OrderBook
|
||||
BidCount = _bids.Count;
|
||||
|
||||
UpdateTime = DateTime.UtcNow;
|
||||
UpdateServerTime = item.ServerDataTime;
|
||||
UpdateLocalTime = item.LocalDataTime;
|
||||
|
||||
_logger.OrderBookDataSet(Api, Symbol, BidCount, AskCount, item.EndUpdateId);
|
||||
CheckProcessBuffer();
|
||||
OnOrderBookUpdate?.Invoke((item.Bids.ToArray(), item.Asks.ToArray()));
|
||||
@ -750,6 +847,9 @@ namespace CryptoExchange.Net.OrderBook
|
||||
return;
|
||||
}
|
||||
|
||||
UpdateServerTime = item.ServerDataTime;
|
||||
UpdateLocalTime = item.LocalDataTime;
|
||||
|
||||
OnOrderBookUpdate?.Invoke((item.Bids.ToArray(), item.Asks.ToArray()));
|
||||
CheckBestOffersChanged(prevBestBid, prevBestAsk);
|
||||
}
|
||||
@ -813,7 +913,11 @@ namespace CryptoExchange.Net.OrderBook
|
||||
});
|
||||
}
|
||||
|
||||
private void ProcessRangeUpdates(long firstUpdateId, long lastUpdateId, IEnumerable<ISymbolOrderBookEntry> bids, IEnumerable<ISymbolOrderBookEntry> asks)
|
||||
private void ProcessRangeUpdates(
|
||||
long firstUpdateId,
|
||||
long lastUpdateId,
|
||||
IEnumerable<ISymbolOrderBookEntry> bids,
|
||||
IEnumerable<ISymbolOrderBookEntry> asks)
|
||||
{
|
||||
if (lastUpdateId <= LastSequenceNumber)
|
||||
{
|
||||
@ -829,23 +933,28 @@ namespace CryptoExchange.Net.OrderBook
|
||||
|
||||
if (Levels.HasValue && _strictLevels)
|
||||
{
|
||||
while (this._bids.Count > Levels.Value)
|
||||
while (_bids.Count > Levels.Value)
|
||||
{
|
||||
BidCount--;
|
||||
this._bids.Remove(this._bids.Last().Key);
|
||||
_bids.Remove(_bids.Last().Key);
|
||||
}
|
||||
|
||||
while (this._asks.Count > Levels.Value)
|
||||
while (_asks.Count > Levels.Value)
|
||||
{
|
||||
AskCount--;
|
||||
this._asks.Remove(this._asks.Last().Key);
|
||||
_asks.Remove(this._asks.Last().Key);
|
||||
}
|
||||
}
|
||||
|
||||
LastSequenceNumber = lastUpdateId;
|
||||
|
||||
if (_logger.IsEnabled(LogLevel.Trace))
|
||||
{
|
||||
if (firstUpdateId != lastUpdateId)
|
||||
_logger.OrderBookProcessedMessage(Api, Symbol, firstUpdateId, lastUpdateId);
|
||||
else
|
||||
_logger.OrderBookProcessedMessage(Api, Symbol, firstUpdateId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -1236,7 +1236,7 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
subQuery.OnComplete = () =>
|
||||
{
|
||||
subscription.Status = subQuery.Result!.Success ? SubscriptionStatus.Subscribed : SubscriptionStatus.Pending;
|
||||
subscription.HandleSubQueryResponse(subQuery.Response);
|
||||
subscription.HandleSubQueryResponse(this, subQuery.Response);
|
||||
};
|
||||
|
||||
taskList.Add(SendAndWaitQueryAsync(subQuery));
|
||||
@ -1276,7 +1276,7 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
return CallResult.SuccessResult;
|
||||
|
||||
var result = await SendAndWaitQueryAsync(subQuery).ConfigureAwait(false);
|
||||
subscription.HandleSubQueryResponse(subQuery.Response!);
|
||||
subscription.HandleSubQueryResponse(this, subQuery.Response!);
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
@ -149,14 +149,12 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
/// <summary>
|
||||
/// Handle a subscription query response
|
||||
/// </summary>
|
||||
/// <param name="message"></param>
|
||||
public virtual void HandleSubQueryResponse(object? message) { }
|
||||
public virtual void HandleSubQueryResponse(SocketConnection connection, object? message) { }
|
||||
|
||||
/// <summary>
|
||||
/// Handle an unsubscription query response
|
||||
/// </summary>
|
||||
/// <param name="message"></param>
|
||||
public virtual void HandleUnsubQueryResponse(object message) { }
|
||||
public virtual void HandleUnsubQueryResponse(SocketConnection connection, object message) { }
|
||||
|
||||
/// <summary>
|
||||
/// Create a new unsubscription query
|
||||
|
||||
150
CryptoExchange.Net/TimeOffsetManager.cs
Normal file
150
CryptoExchange.Net/TimeOffsetManager.cs
Normal file
@ -0,0 +1,150 @@
|
||||
using System;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace CryptoExchange.Net
|
||||
{
|
||||
/// <summary>
|
||||
/// Manager for timing offsets in APIs
|
||||
/// </summary>
|
||||
public static class TimeOffsetManager
|
||||
{
|
||||
class SocketTimeOffset
|
||||
{
|
||||
private DateTime _lastRollOver = DateTime.UtcNow;
|
||||
private double? _fallbackLowest;
|
||||
private double? _currentLowestOffset;
|
||||
|
||||
/// <summary>
|
||||
/// Get the estimated offset, resolves to the lowest offset in time measured in the last two minutes
|
||||
/// </summary>
|
||||
public double? Offset
|
||||
{
|
||||
get
|
||||
{
|
||||
if (_currentLowestOffset == null)
|
||||
// If there is no current lowest offset return the fallback (which might or might not be null)
|
||||
return _fallbackLowest;
|
||||
|
||||
if (_fallbackLowest == null)
|
||||
// If there is no fallback return the current lowest offset
|
||||
return _currentLowestOffset;
|
||||
|
||||
// If there is both a fallback and a current offset return the min offset of those
|
||||
return Math.Min(_currentLowestOffset.Value, _fallbackLowest.Value);
|
||||
}
|
||||
}
|
||||
|
||||
public void Update(double offsetMs)
|
||||
{
|
||||
if (_currentLowestOffset == null || _currentLowestOffset > offsetMs)
|
||||
{
|
||||
_currentLowestOffset = offsetMs;
|
||||
_fallbackLowest = offsetMs;
|
||||
}
|
||||
|
||||
if (DateTime.UtcNow - _lastRollOver > TimeSpan.FromMinutes(1))
|
||||
{
|
||||
_fallbackLowest = _currentLowestOffset;
|
||||
_currentLowestOffset = null;
|
||||
_lastRollOver = DateTime.UtcNow;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class RestTimeOffset
|
||||
{
|
||||
public SemaphoreSlim SemaphoreSlim { get; } = new SemaphoreSlim(1, 1);
|
||||
public DateTime? LastUpdate { get; set; }
|
||||
public double? Offset { get; set; }
|
||||
|
||||
public void Update(double offsetMs)
|
||||
{
|
||||
LastUpdate = DateTime.UtcNow;
|
||||
Offset = offsetMs;
|
||||
}
|
||||
}
|
||||
|
||||
private static ConcurrentDictionary<string, SocketTimeOffset> _lastSocketDelays = new ConcurrentDictionary<string, SocketTimeOffset>();
|
||||
private static ConcurrentDictionary<string, RestTimeOffset> _lastRestDelays = new ConcurrentDictionary<string, RestTimeOffset>();
|
||||
|
||||
/// <summary>
|
||||
/// Update WebSocket API offset
|
||||
/// </summary>
|
||||
/// <param name="api">API name</param>
|
||||
/// <param name="offsetMs">Offset in milliseconds</param>
|
||||
public static void UpdateSocketOffset(string api, double offsetMs)
|
||||
{
|
||||
if (!_lastSocketDelays.TryGetValue(api, out var offsetValues))
|
||||
{
|
||||
offsetValues = new SocketTimeOffset();
|
||||
_lastSocketDelays.TryAdd(api, offsetValues);
|
||||
}
|
||||
|
||||
_lastSocketDelays[api].Update(offsetMs);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Update REST API offset
|
||||
/// </summary>
|
||||
/// <param name="api">API name</param>
|
||||
/// <param name="offsetMs">Offset in milliseconds</param>
|
||||
public static void UpdateRestOffset(string api, double offsetMs)
|
||||
{
|
||||
_lastRestDelays[api].Update(offsetMs);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Get REST API offset
|
||||
/// </summary>
|
||||
/// <param name="api">API name</param>
|
||||
public static TimeSpan? GetRestOffset(string api) => _lastRestDelays.TryGetValue(api, out var val) && val.Offset != null ? TimeSpan.FromMilliseconds(val.Offset.Value) : null;
|
||||
|
||||
/// <summary>
|
||||
/// Get REST API last update time
|
||||
/// </summary>
|
||||
/// <param name="api">API name</param>
|
||||
public static DateTime? GetRestLastUpdateTime(string api) => _lastRestDelays.TryGetValue(api, out var val) && val.LastUpdate != null ? val.LastUpdate.Value : null;
|
||||
|
||||
/// <summary>
|
||||
/// Register a REST API client to be tracked
|
||||
/// </summary>
|
||||
/// <param name="api"></param>
|
||||
internal static void RegisterRestApi(string api)
|
||||
{
|
||||
_lastRestDelays[api] = new RestTimeOffset();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Enter exclusive access for the API to update the time offset
|
||||
/// </summary>
|
||||
/// <param name="api"></param>
|
||||
/// <returns></returns>
|
||||
public static async ValueTask EnterAsync(string api)
|
||||
{
|
||||
await _lastRestDelays[api].SemaphoreSlim.WaitAsync().ConfigureAwait(false);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Release exclusive access for the API
|
||||
/// </summary>
|
||||
/// <param name="api"></param>
|
||||
public static void Release(string api) => _lastRestDelays[api].SemaphoreSlim.Release();
|
||||
|
||||
/// <summary>
|
||||
/// Get WebSocket API offset
|
||||
/// </summary>
|
||||
/// <param name="api">API name</param>
|
||||
public static TimeSpan? GetSocketOffset(string api) => _lastSocketDelays.TryGetValue(api, out var val) && val.Offset != null ? TimeSpan.FromMilliseconds(val.Offset.Value) : null;
|
||||
|
||||
/// <summary>
|
||||
/// Reset the WebSocket API update timestamp to trigger a new time offset calculation
|
||||
/// </summary>
|
||||
/// <param name="api">API name</param>
|
||||
public static void ResetRestUpdateTime(string api)
|
||||
{
|
||||
_lastRestDelays[api].LastUpdate = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user