diff --git a/CryptoExchange.Net.UnitTests/SocketClientTests.cs b/CryptoExchange.Net.UnitTests/SocketClientTests.cs index d194df1..b35dd36 100644 --- a/CryptoExchange.Net.UnitTests/SocketClientTests.cs +++ b/CryptoExchange.Net.UnitTests/SocketClientTests.cs @@ -140,6 +140,12 @@ namespace CryptoExchange.Net.UnitTests var sub2 = new SocketConnection(new TraceLogger(), client.SubClient, socket2, null); client.SubClient.ConnectSocketSub(sub1); client.SubClient.ConnectSocketSub(sub2); + var us1 = SocketSubscription.CreateForIdentifier(10, "Test1", true, false, (e) => { }); + var us2 = SocketSubscription.CreateForIdentifier(11, "Test2", true, false, (e) => { }); + sub1.AddSubscription(us1); + sub2.AddSubscription(us2); + var ups1 = new UpdateSubscription(sub1, us1); + var ups2 = new UpdateSubscription(sub2, us2); // act client.UnsubscribeAllAsync().Wait(); diff --git a/CryptoExchange.Net.UnitTests/TestImplementations/TestRestClient.cs b/CryptoExchange.Net.UnitTests/TestImplementations/TestRestClient.cs index a0c201e..b4a4637 100644 --- a/CryptoExchange.Net.UnitTests/TestImplementations/TestRestClient.cs +++ b/CryptoExchange.Net.UnitTests/TestImplementations/TestRestClient.cs @@ -182,9 +182,11 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations return await SendRequestAsync(new Uri("http://www.test.com"), HttpMethod.Get, ct); } - protected override Error ParseErrorResponse(JToken error) + protected override Error ParseErrorResponse(int httpStatusCode, IEnumerable>> responseHeaders, string data) { - return new ServerError((int)error["errorCode"], (string)error["errorMessage"]); + var errorData = ValidateJson(data); + + return new ServerError((int)errorData.Data["errorCode"], (string)errorData.Data["errorMessage"]); } public override TimeSpan? GetTimeOffset() diff --git a/CryptoExchange.Net.UnitTests/TestImplementations/TestSocket.cs b/CryptoExchange.Net.UnitTests/TestImplementations/TestSocket.cs index 431de97..7f2afb1 100644 --- a/CryptoExchange.Net.UnitTests/TestImplementations/TestSocket.cs +++ b/CryptoExchange.Net.UnitTests/TestImplementations/TestSocket.cs @@ -18,6 +18,7 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations public event Action OnReconnected; public event Action OnReconnecting; #pragma warning restore 0067 + public event Action OnRequestSent; public event Action OnMessage; public event Action OnError; public event Action OnOpen; @@ -69,10 +70,11 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations return Task.FromResult(CanConnect); } - public void Send(string data) + public void Send(int requestId, string data, int weight) { if(!Connected) throw new Exception("Socket not connected"); + OnRequestSent?.Invoke(requestId); } public void Reset() diff --git a/CryptoExchange.Net/Clients/BaseApiClient.cs b/CryptoExchange.Net/Clients/BaseApiClient.cs index a11a9d8..dbdd27f 100644 --- a/CryptoExchange.Net/Clients/BaseApiClient.cs +++ b/CryptoExchange.Net/Clients/BaseApiClient.cs @@ -77,15 +77,6 @@ namespace CryptoExchange.Net /// public bool OutputOriginalData { get; } - /// - /// The last used id, use NextId() to get the next id and up this - /// - protected static int _lastId; - /// - /// Lock for id generating - /// - protected static object _idLock = new(); - /// /// A default serializer /// @@ -338,19 +329,6 @@ namespace CryptoExchange.Net return await reader.ReadToEndAsync().ConfigureAwait(false); } - /// - /// Generate a new unique id. The id is staticly stored so it is guarenteed to be unique across different client instances - /// - /// - protected static int NextId() - { - lock (_idLock) - { - _lastId += 1; - return _lastId; - } - } - /// /// Dispose /// diff --git a/CryptoExchange.Net/Clients/RestApiClient.cs b/CryptoExchange.Net/Clients/RestApiClient.cs index 67b0247..d520adc 100644 --- a/CryptoExchange.Net/Clients/RestApiClient.cs +++ b/CryptoExchange.Net/Clients/RestApiClient.cs @@ -15,6 +15,7 @@ using CryptoExchange.Net.Requests; using Microsoft.Extensions.Logging; using Newtonsoft.Json; using Newtonsoft.Json.Linq; +using static CryptoExchange.Net.Objects.RateLimiter; namespace CryptoExchange.Net { @@ -195,7 +196,7 @@ namespace CryptoExchange.Net Dictionary? additionalHeaders = null, bool ignoreRatelimit = false) { - var requestId = NextId(); + var requestId = ExchangeHelpers.NextId(); if (signed) { diff --git a/CryptoExchange.Net/Clients/SocketApiClient.cs b/CryptoExchange.Net/Clients/SocketApiClient.cs index cf64da9..c8b0e7d 100644 --- a/CryptoExchange.Net/Clients/SocketApiClient.cs +++ b/CryptoExchange.Net/Clients/SocketApiClient.cs @@ -12,6 +12,7 @@ using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; +using static CryptoExchange.Net.Objects.RateLimiter; namespace CryptoExchange.Net { @@ -76,9 +77,9 @@ namespace CryptoExchange.Net protected internal bool UnhandledMessageExpected { get; set; } /// - /// The max amount of outgoing messages per socket per second + /// The rate limiters /// - protected internal int? RateLimitPerSocketPerSecond { get; set; } + protected internal IEnumerable? RateLimiters { get; set; } /// public double IncomingKbps @@ -130,6 +131,10 @@ namespace CryptoExchange.Net options, apiOptions) { + var rateLimiters = new List(); + foreach (var rateLimiter in apiOptions.RateLimiters) + rateLimiters.Add(rateLimiter); + RateLimiters = rateLimiters; } /// @@ -275,7 +280,7 @@ namespace CryptoExchange.Net protected internal virtual async Task> SubscribeAndWaitAsync(SocketConnection socketConnection, object request, SocketSubscription subscription) { CallResult? callResult = null; - await socketConnection.SendAndWaitAsync(request, ClientOptions.RequestTimeout, subscription, data => HandleSubscriptionResponse(socketConnection, subscription, request, data, out callResult)).ConfigureAwait(false); + await socketConnection.SendAndWaitAsync(request, ClientOptions.RequestTimeout, subscription, 1, data => HandleSubscriptionResponse(socketConnection, subscription, request, data, out callResult)).ConfigureAwait(false); if (callResult?.Success == true) { @@ -295,10 +300,11 @@ namespace CryptoExchange.Net /// Expected result type /// The request to send, will be serialized to json /// If the query is to an authenticated endpoint + /// Weight of the request /// - protected virtual Task> QueryAsync(object request, bool authenticated) + protected virtual Task> QueryAsync(object request, bool authenticated, int weight = 1) { - return QueryAsync(BaseAddress, request, authenticated); + return QueryAsync(BaseAddress, request, authenticated, weight); } /// @@ -308,8 +314,9 @@ namespace CryptoExchange.Net /// The url for the request /// The request to send /// Whether the socket should be authenticated + /// Weight of the request /// - protected virtual async Task> QueryAsync(string url, object request, bool authenticated) + protected virtual async Task> QueryAsync(string url, object request, bool authenticated, int weight = 1) { if (_disposing) return new CallResult(new InvalidOperationError("Client disposed, can't query")); @@ -348,7 +355,7 @@ namespace CryptoExchange.Net return new CallResult(new ServerError("Socket is paused")); } - return await QueryAndWaitAsync(socketConnection, request).ConfigureAwait(false); + return await QueryAndWaitAsync(socketConnection, request, weight).ConfigureAwait(false); } /// @@ -357,11 +364,12 @@ namespace CryptoExchange.Net /// The expected result type /// The connection to send and wait on /// The request to send + /// The weight of the query /// - protected virtual async Task> QueryAndWaitAsync(SocketConnection socket, object request) + protected virtual async Task> QueryAndWaitAsync(SocketConnection socket, object request, int weight) { var dataResult = new CallResult(new ServerError("No response on query received")); - await socket.SendAndWaitAsync(request, ClientOptions.RequestTimeout, null, data => + await socket.SendAndWaitAsync(request, ClientOptions.RequestTimeout, null, weight, data => { if (!HandleQueryResponse(socket, request, data, out var callResult)) return false; @@ -518,8 +526,8 @@ namespace CryptoExchange.Net } var subscription = request == null - ? SocketSubscription.CreateForIdentifier(NextId(), identifier!, userSubscription, authenticated, InternalHandler) - : SocketSubscription.CreateForRequest(NextId(), request, userSubscription, authenticated, InternalHandler); + ? SocketSubscription.CreateForIdentifier(ExchangeHelpers.NextId(), identifier!, userSubscription, authenticated, InternalHandler) + : SocketSubscription.CreateForRequest(ExchangeHelpers.NextId(), request, userSubscription, authenticated, InternalHandler); if (!connection.AddSubscription(subscription)) return null; return subscription; @@ -533,7 +541,7 @@ namespace CryptoExchange.Net protected void AddGenericHandler(string identifier, Action action) { genericHandlers.Add(identifier, action); - var subscription = SocketSubscription.CreateForIdentifier(NextId(), identifier, false, false, action); + var subscription = SocketSubscription.CreateForIdentifier(ExchangeHelpers.NextId(), identifier, false, false, action); foreach (var connection in socketConnections.Values) connection.AddSubscription(subscription); } @@ -607,7 +615,7 @@ namespace CryptoExchange.Net socketConnection.UnhandledMessage += HandleUnhandledMessage; foreach (var kvp in genericHandlers) { - var handler = SocketSubscription.CreateForIdentifier(NextId(), kvp.Key, false, false, kvp.Value); + var handler = SocketSubscription.CreateForIdentifier(ExchangeHelpers.NextId(), kvp.Key, false, false, kvp.Value); socketConnection.AddSubscription(handler); } @@ -651,7 +659,7 @@ namespace CryptoExchange.Net DataInterpreterString = dataInterpreterString, KeepAliveInterval = KeepAliveInterval, ReconnectInterval = ClientOptions.ReconnectInterval, - RatelimitPerSecond = RateLimitPerSocketPerSecond, + RateLimiters = RateLimiters, Proxy = ClientOptions.Proxy, Timeout = ApiOptions.SocketNoDataTimeout ?? ClientOptions.SocketNoDataTimeout }; @@ -704,7 +712,7 @@ namespace CryptoExchange.Net try { - socketConnection.Send(obj); + socketConnection.Send(ExchangeHelpers.NextId(), obj, 1); } catch (Exception ex) { diff --git a/CryptoExchange.Net/ExchangeHelpers.cs b/CryptoExchange.Net/ExchangeHelpers.cs index 31163a2..df89554 100644 --- a/CryptoExchange.Net/ExchangeHelpers.cs +++ b/CryptoExchange.Net/ExchangeHelpers.cs @@ -8,6 +8,15 @@ namespace CryptoExchange.Net /// public static class ExchangeHelpers { + /// + /// The last used id, use NextId() to get the next id and up this + /// + private static int _lastId; + /// + /// Lock for id generating + /// + private static object _idLock = new(); + /// /// Clamp a value between a min and max /// @@ -118,5 +127,19 @@ namespace CryptoExchange.Net { return value / 1.000000000000000000000000000000000m; } + + + /// + /// Generate a new unique id. The id is staticly stored so it is guarenteed to be unique + /// + /// + public static int NextId() + { + lock (_idLock) + { + _lastId += 1; + return _lastId; + } + } } } diff --git a/CryptoExchange.Net/Interfaces/IWebsocket.cs b/CryptoExchange.Net/Interfaces/IWebsocket.cs index 7ef74e6..1c6bfc6 100644 --- a/CryptoExchange.Net/Interfaces/IWebsocket.cs +++ b/CryptoExchange.Net/Interfaces/IWebsocket.cs @@ -8,7 +8,7 @@ using System.Threading.Tasks; namespace CryptoExchange.Net.Interfaces { /// - /// Webscoket connection interface + /// Websocket connection interface /// public interface IWebsocket: IDisposable { @@ -21,6 +21,10 @@ namespace CryptoExchange.Net.Interfaces /// event Action OnMessage; /// + /// Websocket sent event, RequestId as parameter + /// + event Action OnRequestSent; + /// /// Websocket error event /// event Action OnError; @@ -69,8 +73,10 @@ namespace CryptoExchange.Net.Interfaces /// /// Send data /// + /// /// - void Send(string data); + /// + void Send(int id, string data, int weight); /// /// Reconnect the socket /// diff --git a/CryptoExchange.Net/Objects/Error.cs b/CryptoExchange.Net/Objects/Error.cs index dea1799..a78fa67 100644 --- a/CryptoExchange.Net/Objects/Error.cs +++ b/CryptoExchange.Net/Objects/Error.cs @@ -41,7 +41,7 @@ namespace CryptoExchange.Net.Objects /// public override string ToString() { - return $"{Code}: {Message} {Data}"; + return Code != null ? $"{Code}: {Message} {Data}" : $"{Message} {Data}"; } } diff --git a/CryptoExchange.Net/Objects/Options/SocketApiOptions.cs b/CryptoExchange.Net/Objects/Options/SocketApiOptions.cs index da43509..3afdb76 100644 --- a/CryptoExchange.Net/Objects/Options/SocketApiOptions.cs +++ b/CryptoExchange.Net/Objects/Options/SocketApiOptions.cs @@ -1,5 +1,7 @@ using CryptoExchange.Net.Authentication; +using CryptoExchange.Net.Interfaces; using System; +using System.Collections.Generic; namespace CryptoExchange.Net.Objects.Options { @@ -8,6 +10,11 @@ namespace CryptoExchange.Net.Objects.Options /// public class SocketApiOptions : ApiOptions { + /// + /// List of rate limiters to use + /// + public List RateLimiters { get; set; } = new List(); + /// /// The max time of not receiving any data after which the connection is assumed to be dropped. This can only be used for socket connections where a steady flow of data is expected, /// for example when the server sends intermittent ping requests @@ -30,6 +37,7 @@ namespace CryptoExchange.Net.Objects.Options { ApiCredentials = ApiCredentials?.Copy(), OutputOriginalData = OutputOriginalData, + RateLimiters = RateLimiters, SocketNoDataTimeout = SocketNoDataTimeout, MaxSocketConnections = MaxSocketConnections, }; diff --git a/CryptoExchange.Net/Objects/RateLimiter.cs b/CryptoExchange.Net/Objects/RateLimiter.cs index a54a799..8219b4b 100644 --- a/CryptoExchange.Net/Objects/RateLimiter.cs +++ b/CryptoExchange.Net/Objects/RateLimiter.cs @@ -17,7 +17,7 @@ namespace CryptoExchange.Net.Objects public class RateLimiter : IRateLimiter { private readonly object _limiterLock = new object(); - internal List Limiters = new List(); + internal List _limiters = new List(); /// /// Create a new RateLimiter. Configure the rate limiter by calling , @@ -35,7 +35,7 @@ namespace CryptoExchange.Net.Objects public RateLimiter AddTotalRateLimit(int limit, TimeSpan perTimePeriod) { lock(_limiterLock) - Limiters.Add(new TotalRateLimiter(limit, perTimePeriod, null)); + _limiters.Add(new TotalRateLimiter(limit, perTimePeriod, null)); return this; } @@ -50,7 +50,7 @@ namespace CryptoExchange.Net.Objects public RateLimiter AddEndpointLimit(string endpoint, int limit, TimeSpan perTimePeriod, HttpMethod? method = null, bool excludeFromOtherRateLimits = false) { lock(_limiterLock) - Limiters.Add(new EndpointRateLimiter(new[] { endpoint }, limit, perTimePeriod, method, excludeFromOtherRateLimits)); + _limiters.Add(new EndpointRateLimiter(new[] { endpoint }, limit, perTimePeriod, method, excludeFromOtherRateLimits)); return this; } @@ -65,7 +65,7 @@ namespace CryptoExchange.Net.Objects public RateLimiter AddEndpointLimit(IEnumerable endpoints, int limit, TimeSpan perTimePeriod, HttpMethod? method = null, bool excludeFromOtherRateLimits = false) { lock(_limiterLock) - Limiters.Add(new EndpointRateLimiter(endpoints.ToArray(), limit, perTimePeriod, method, excludeFromOtherRateLimits)); + _limiters.Add(new EndpointRateLimiter(endpoints.ToArray(), limit, perTimePeriod, method, excludeFromOtherRateLimits)); return this; } @@ -81,7 +81,7 @@ namespace CryptoExchange.Net.Objects public RateLimiter AddPartialEndpointLimit(string endpoint, int limit, TimeSpan perTimePeriod, HttpMethod? method = null, bool countPerEndpoint = false, bool ignoreOtherRateLimits = false) { lock(_limiterLock) - Limiters.Add(new PartialEndpointRateLimiter(new[] { endpoint }, limit, perTimePeriod, method, ignoreOtherRateLimits, countPerEndpoint)); + _limiters.Add(new PartialEndpointRateLimiter(new[] { endpoint }, limit, perTimePeriod, method, ignoreOtherRateLimits, countPerEndpoint)); return this; } @@ -95,7 +95,20 @@ namespace CryptoExchange.Net.Objects public RateLimiter AddApiKeyLimit(int limit, TimeSpan perTimePeriod, bool onlyForSignedRequests, bool excludeFromTotalRateLimit) { lock(_limiterLock) - Limiters.Add(new ApiKeyRateLimiter(limit, perTimePeriod, null, onlyForSignedRequests, excludeFromTotalRateLimit)); + _limiters.Add(new ApiKeyRateLimiter(limit, perTimePeriod, null, onlyForSignedRequests, excludeFromTotalRateLimit)); + return this; + } + + /// + /// Add a rate limit for the amount of messages that can be send per connection + /// + /// The endpoint that the limit is for + /// The limit per period. Note that this is weight, not single request, altough by default requests have a weight of 1 + /// The time period the limit is for + public RateLimiter AddConnectionRateLimit(string endpoint, int limit, TimeSpan perTimePeriod) + { + lock (_limiterLock) + _limiters.Add(new ConnectionRateLimiter(new[] { endpoint }, limit, perTimePeriod)); return this; } @@ -106,7 +119,7 @@ namespace CryptoExchange.Net.Objects EndpointRateLimiter? endpointLimit; lock (_limiterLock) - endpointLimit = Limiters.OfType().SingleOrDefault(h => h.Endpoints.Contains(endpoint) && (h.Method == null || h.Method == method)); + endpointLimit = _limiters.OfType().SingleOrDefault(h => h.Endpoints.Contains(endpoint) && (h.Method == null || h.Method == method)); if(endpointLimit != null) { var waitResult = await ProcessTopic(logger, endpointLimit, endpoint, requestWeight, limitBehaviour, ct).ConfigureAwait(false); @@ -121,7 +134,7 @@ namespace CryptoExchange.Net.Objects List partialEndpointLimits; lock (_limiterLock) - partialEndpointLimits = Limiters.OfType().Where(h => h.PartialEndpoints.Any(h => endpoint.Contains(h)) && (h.Method == null || h.Method == method)).ToList(); + partialEndpointLimits = _limiters.OfType().Where(h => h.PartialEndpoints.Any(h => endpoint.Contains(h)) && (h.Method == null || h.Method == method)).ToList(); foreach (var partialEndpointLimit in partialEndpointLimits) { if (partialEndpointLimit.CountPerEndpoint) @@ -129,11 +142,11 @@ namespace CryptoExchange.Net.Objects SingleTopicRateLimiter? thisEndpointLimit; lock (_limiterLock) { - thisEndpointLimit = Limiters.OfType().SingleOrDefault(h => h.Type == RateLimitType.PartialEndpoint && (string)h.Topic == endpoint); + thisEndpointLimit = _limiters.OfType().SingleOrDefault(h => h.Type == RateLimitType.PartialEndpoint && (string)h.Topic == endpoint); if (thisEndpointLimit == null) { thisEndpointLimit = new SingleTopicRateLimiter(endpoint, partialEndpointLimit); - Limiters.Add(thisEndpointLimit); + _limiters.Add(thisEndpointLimit); } } @@ -158,7 +171,7 @@ namespace CryptoExchange.Net.Objects ApiKeyRateLimiter? apiLimit; lock (_limiterLock) - apiLimit = Limiters.OfType().SingleOrDefault(h => h.Type == RateLimitType.ApiKey); + apiLimit = _limiters.OfType().SingleOrDefault(h => h.Type == RateLimitType.ApiKey); if (apiLimit != null) { if(apiKey == null) @@ -177,11 +190,11 @@ namespace CryptoExchange.Net.Objects SingleTopicRateLimiter? thisApiLimit; lock (_limiterLock) { - thisApiLimit = Limiters.OfType().SingleOrDefault(h => h.Type == RateLimitType.ApiKey && ((SecureString)h.Topic).IsEqualTo(apiKey)); + thisApiLimit = _limiters.OfType().SingleOrDefault(h => h.Type == RateLimitType.ApiKey && ((SecureString)h.Topic).IsEqualTo(apiKey)); if (thisApiLimit == null) { thisApiLimit = new SingleTopicRateLimiter(apiKey, apiLimit); - Limiters.Add(thisApiLimit); + _limiters.Add(thisApiLimit); } } @@ -198,7 +211,7 @@ namespace CryptoExchange.Net.Objects TotalRateLimiter? totalLimit; lock (_limiterLock) - totalLimit = Limiters.OfType().SingleOrDefault(); + totalLimit = _limiters.OfType().SingleOrDefault(); if (totalLimit != null) { var waitResult = await ProcessTopic(logger, totalLimit, endpoint, requestWeight, limitBehaviour, ct).ConfigureAwait(false); @@ -224,63 +237,68 @@ namespace CryptoExchange.Net.Objects } sw.Stop(); - int totalWaitTime = 0; - while (true) + try { - // Remove requests no longer in time period from the history - var checkTime = DateTime.UtcNow; - for (var i = 0; i < historyTopic.Entries.Count; i++) + int totalWaitTime = 0; + while (true) { - if (historyTopic.Entries[i].Timestamp < checkTime - historyTopic.Period) + // Remove requests no longer in time period from the history + var checkTime = DateTime.UtcNow; + for (var i = 0; i < historyTopic.Entries.Count; i++) { - historyTopic.Entries.Remove(historyTopic.Entries[i]); - i--; + if (historyTopic.Entries[i].Timestamp < checkTime - historyTopic.Period) + { + historyTopic.Entries.Remove(historyTopic.Entries[i]); + i--; + } + else + break; + } + + var currentWeight = !historyTopic.Entries.Any() ? 0 : historyTopic.Entries.Sum(h => h.Weight); + if (currentWeight + requestWeight > historyTopic.Limit) + { + if (currentWeight == 0) + throw new Exception("Request limit reached without any prior request. " + + $"This request can never execute with the current rate limiter. Request weight: {requestWeight}, Ratelimit: {historyTopic.Limit}"); + + // Wait until the next entry should be removed from the history + var thisWaitTime = (int)Math.Round(((historyTopic.Entries.First().Timestamp + historyTopic.Period) - checkTime).TotalMilliseconds); + if (thisWaitTime > 0) + { + if (limitBehaviour == RateLimitingBehaviour.Fail) + { + var msg = $"Request to {endpoint} failed because of rate limit `{historyTopic.Type}`. Current weight: {currentWeight}/{historyTopic.Limit}, request weight: {requestWeight}"; + logger.Log(LogLevel.Warning, msg); + return new CallResult(new ClientRateLimitError(msg) { RetryAfter = DateTime.UtcNow.AddSeconds(thisWaitTime) }); + } + + logger.Log(LogLevel.Information, $"Message to {endpoint} waiting {thisWaitTime}ms for rate limit `{historyTopic.Type}`. Current weight: {currentWeight}/{historyTopic.Limit}, request weight: {requestWeight}"); + try + { + await Task.Delay(thisWaitTime, ct).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + return new CallResult(new CancellationRequestedError()); + } + totalWaitTime += thisWaitTime; + } } else - break; - } - - var currentWeight = !historyTopic.Entries.Any() ? 0: historyTopic.Entries.Sum(h => h.Weight); - if (currentWeight + requestWeight > historyTopic.Limit) - { - if (currentWeight == 0) - throw new Exception("Request limit reached without any prior request. " + - $"This request can never execute with the current rate limiter. Request weight: {requestWeight}, Ratelimit: {historyTopic.Limit}"); - - // Wait until the next entry should be removed from the history - var thisWaitTime = (int)Math.Round((historyTopic.Entries.First().Timestamp - (checkTime - historyTopic.Period)).TotalMilliseconds); - if (thisWaitTime > 0) { - if (limitBehaviour == RateLimitingBehaviour.Fail) - { - historyTopic.Semaphore.Release(); - var msg = $"Request to {endpoint} failed because of rate limit `{historyTopic.Type}`. Current weight: {currentWeight}/{historyTopic.Limit}, request weight: {requestWeight}"; - logger.Log(LogLevel.Warning, msg); - return new CallResult(new ClientRateLimitError(msg) { RetryAfter = DateTime.UtcNow.AddSeconds(thisWaitTime) }); - } - - logger.Log(LogLevel.Information, $"Request to {endpoint} waiting {thisWaitTime}ms for rate limit `{historyTopic.Type}`. Current weight: {currentWeight}/{historyTopic.Limit}, request weight: {requestWeight}"); - try - { - await Task.Delay(thisWaitTime, ct).ConfigureAwait(false); - } - catch (OperationCanceledException) - { - return new CallResult(new CancellationRequestedError()); - } - totalWaitTime += thisWaitTime; + break; } } - else - { - break; - } - } - var newTime = DateTime.UtcNow; - historyTopic.Entries.Add(new LimitEntry(newTime, requestWeight)); - historyTopic.Semaphore.Release(); - return new CallResult(totalWaitTime); + var newTime = DateTime.UtcNow; + historyTopic.Entries.Add(new LimitEntry(newTime, requestWeight)); + return new CallResult(totalWaitTime); + } + finally + { + historyTopic.Semaphore.Release(); + } } internal struct LimitEntry @@ -329,6 +347,24 @@ namespace CryptoExchange.Net.Objects } } + internal class ConnectionRateLimiter : PartialEndpointRateLimiter + { + public ConnectionRateLimiter(int limit, TimeSpan perPeriod) + : base(new[] { "/" }, limit, perPeriod, null, true, true) + { + } + + public ConnectionRateLimiter(string[] endpoints, int limit, TimeSpan perPeriod) + : base(endpoints, limit, perPeriod, null, true, true) + { + } + + public override string ToString() + { + return nameof(ConnectionRateLimiter); + } + } + internal class EndpointRateLimiter: Limiter { public string[] Endpoints { get; set; } diff --git a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs index b2ab5e9..cc493be 100644 --- a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs +++ b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs @@ -7,6 +7,7 @@ using System.Collections.Generic; using System.IO; using System.Linq; using System.Net; +using System.Net.Http; using System.Net.WebSockets; using System.Threading; using System.Threading.Tasks; @@ -30,9 +31,8 @@ namespace CryptoExchange.Net.Sockets private static readonly object _streamIdLock = new(); private readonly AsyncResetEvent _sendEvent; - private readonly ConcurrentQueue _sendBuffer; + private readonly ConcurrentQueue _sendBuffer; private readonly SemaphoreSlim _closeSem; - private readonly List _outgoingMessages; private ClientWebSocket _socket; private CancellationTokenSource _ctsSource; @@ -103,6 +103,9 @@ namespace CryptoExchange.Net.Sockets /// public event Action? OnMessage; + /// + public event Action? OnRequestSent; + /// public event Action? OnError; @@ -128,10 +131,9 @@ namespace CryptoExchange.Net.Sockets _logger = logger; Parameters = websocketParameters; - _outgoingMessages = new List(); _receivedMessages = new List(); _sendEvent = new AsyncResetEvent(); - _sendBuffer = new ConcurrentQueue(); + _sendBuffer = new ConcurrentQueue(); _ctsSource = new CancellationTokenSource(); _receivedMessagesLock = new object(); @@ -270,14 +272,14 @@ namespace CryptoExchange.Net.Sockets } /// - public virtual void Send(string data) + public virtual void Send(int id, string data, int weight) { if (_ctsSource.IsCancellationRequested) return; var bytes = Parameters.Encoding.GetBytes(data); - _logger.Log(LogLevel.Trace, $"Socket {Id} Adding {bytes.Length} to sent buffer"); - _sendBuffer.Enqueue(bytes); + _logger.Log(LogLevel.Trace, $"Socket {Id} - msg {id} - Adding {bytes.Length} to send buffer"); + _sendBuffer.Enqueue(new SendItem { Id = id, Weight = weight, Bytes = bytes }); _sendEvent.Set(); } @@ -392,6 +394,7 @@ namespace CryptoExchange.Net.Sockets { try { + var limitKey = Uri.ToString() + "/" + Id.ToString(); while (true) { if (_ctsSource.IsCancellationRequested) @@ -404,25 +407,24 @@ namespace CryptoExchange.Net.Sockets while (_sendBuffer.TryDequeue(out var data)) { - if (Parameters.RatelimitPerSecond != null) + if (Parameters.RateLimiters != null) { - // Wait for rate limit - DateTime? start = null; - while (MessagesSentLastSecond() >= Parameters.RatelimitPerSecond) + foreach(var ratelimiter in Parameters.RateLimiters) { - start ??= DateTime.UtcNow; - await Task.Delay(50).ConfigureAwait(false); + var limitResult = await ratelimiter.LimitRequestAsync(_logger, limitKey, HttpMethod.Get, false, null, RateLimitingBehaviour.Wait, data.Weight, _ctsSource.Token).ConfigureAwait(false); + if (limitResult.Success) + { + if (limitResult.Data > 0) + _logger.Log(LogLevel.Debug, $"Socket {Id} - msg {data.Id} - send delayed {limitResult.Data}ms because of rate limit"); + } } - - if (start != null) - _logger.Log(LogLevel.Debug, $"Socket {Id} sent delayed {Math.Round((DateTime.UtcNow - start.Value).TotalMilliseconds)}ms because of rate limit"); } try { - await _socket.SendAsync(new ArraySegment(data, 0, data.Length), WebSocketMessageType.Text, true, _ctsSource.Token).ConfigureAwait(false); - _outgoingMessages.Add(DateTime.UtcNow); - _logger.Log(LogLevel.Trace, $"Socket {Id} sent {data.Length} bytes"); + await _socket.SendAsync(new ArraySegment(data.Bytes, 0, data.Bytes.Length), WebSocketMessageType.Text, true, _ctsSource.Token).ConfigureAwait(false); + OnRequestSent?.Invoke(data.Id); + _logger.Log(LogLevel.Trace, $"Socket {Id} - msg {data.Id} - sent {data.Bytes.Length} bytes"); } catch (OperationCanceledException) { @@ -630,42 +632,6 @@ namespace CryptoExchange.Net.Sockets } } - /// - /// Trigger the OnMessage event - /// - /// - protected void TriggerOnMessage(string data) - { - LastActionTime = DateTime.UtcNow; - OnMessage?.Invoke(data); - } - - /// - /// Trigger the OnError event - /// - /// - protected void TriggerOnError(Exception ex) => OnError?.Invoke(ex); - - /// - /// Trigger the OnError event - /// - protected void TriggerOnOpen() => OnOpen?.Invoke(); - - /// - /// Trigger the OnError event - /// - protected void TriggerOnClose() => OnClose?.Invoke(); - - /// - /// Trigger the OnReconnecting event - /// - protected void TriggerOnReconnecting() => OnReconnecting?.Invoke(); - - /// - /// Trigger the OnReconnected event - /// - protected void TriggerOnReconnected() => OnReconnected?.Invoke(); - /// /// Checks if there is no data received for a period longer than the specified timeout /// @@ -721,13 +687,6 @@ namespace CryptoExchange.Net.Sockets } } - private int MessagesSentLastSecond() - { - var testTime = DateTime.UtcNow; - _outgoingMessages.RemoveAll(r => testTime - r > TimeSpan.FromSeconds(1)); - return _outgoingMessages.Count; - } - /// /// Update the received messages list, removing messages received longer than 3s ago /// @@ -769,6 +728,32 @@ namespace CryptoExchange.Net.Sockets } } + /// + /// Message info + /// + public struct SendItem + { + /// + /// The request id + /// + public int Id { get; set; } + + /// + /// The request id + /// + public int Weight { get; set; } + + /// + /// Timestamp the request was sent + /// + public DateTime SendTime { get; set; } + + /// + /// The bytes to send + /// + public byte[] Bytes { get; set; } + } + /// /// Received message info /// diff --git a/CryptoExchange.Net/Sockets/PendingRequest.cs b/CryptoExchange.Net/Sockets/PendingRequest.cs index 2a833af..6043d22 100644 --- a/CryptoExchange.Net/Sockets/PendingRequest.cs +++ b/CryptoExchange.Net/Sockets/PendingRequest.cs @@ -7,6 +7,7 @@ namespace CryptoExchange.Net.Sockets { internal class PendingRequest { + public int Id { get; set; } public Func Handler { get; } public JToken? Result { get; private set; } public bool Completed { get; private set; } @@ -15,17 +16,22 @@ namespace CryptoExchange.Net.Sockets public TimeSpan Timeout { get; } public SocketSubscription? Subscription { get; } - private CancellationTokenSource _cts; + private CancellationTokenSource? _cts; - public PendingRequest(Func handler, TimeSpan timeout, SocketSubscription? subscription) + public PendingRequest(int id, Func handler, TimeSpan timeout, SocketSubscription? subscription) { + Id = id; Handler = handler; Event = new AsyncResetEvent(false, false); Timeout = timeout; RequestTimestamp = DateTime.UtcNow; Subscription = subscription; + } - _cts = new CancellationTokenSource(timeout); + public void IsSend() + { + // Start timeout countdown + _cts = new CancellationTokenSource(Timeout); _cts.Token.Register(Fail, false); } diff --git a/CryptoExchange.Net/Sockets/SocketConnection.cs b/CryptoExchange.Net/Sockets/SocketConnection.cs index ec83229..dc71700 100644 --- a/CryptoExchange.Net/Sockets/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/SocketConnection.cs @@ -182,6 +182,7 @@ namespace CryptoExchange.Net.Sockets _socket = socket; _socket.OnMessage += HandleMessage; + _socket.OnRequestSent += HandleRequestSent; _socket.OnOpen += HandleOpen; _socket.OnClose += HandleClose; _socket.OnReconnecting += HandleReconnecting; @@ -284,6 +285,22 @@ namespace CryptoExchange.Net.Sockets _logger.Log(LogLevel.Warning, $"Socket {SocketId} error: " + e.ToLogString()); } + /// + /// Handler for whenever a request is sent over the websocket + /// + /// Id of the request sent + protected virtual void HandleRequestSent(int requestId) + { + var pendingRequest = _pendingRequests.SingleOrDefault(p => p.Id == requestId); + if (pendingRequest == null) + { + _logger.Log(LogLevel.Debug, $"Socket {SocketId} - msg {requestId} - message sent, but not pending"); + return; + } + + pendingRequest.IsSend(); + } + /// /// Process a message received by the socket /// @@ -318,7 +335,6 @@ namespace CryptoExchange.Net.Sockets // Check if this message is an answer on any pending requests foreach (var pendingRequest in requests) { - if (pendingRequest.CheckData(tokenData)) { lock (_pendingRequests) @@ -329,12 +345,13 @@ namespace CryptoExchange.Net.Sockets // Answer to a timed out request, unsub if it is a subscription request if (pendingRequest.Subscription != null) { - _logger.Log(LogLevel.Warning, "Received subscription info after request timed out; unsubscribing. Consider increasing the SocketResponseTimout"); + _logger.Log(LogLevel.Warning, "Received subscription info after request timed out; unsubscribing. Consider increasing the RequestTimeout"); _ = ApiClient.UnsubscribeAsync(this, pendingRequest.Subscription).ConfigureAwait(false); } } else { + _logger.Log(LogLevel.Trace, $"Socket {SocketId} - msg {pendingRequest.Id} - received data matched to pending request"); pendingRequest.Succeed(tokenData); } @@ -570,45 +587,69 @@ namespace CryptoExchange.Net.Sockets /// The timeout for response /// Subscription if this is a subscribe request /// The response handler, should return true if the received JToken was the response to the request + /// The weight of the message /// - public virtual Task SendAndWaitAsync(T obj, TimeSpan timeout, SocketSubscription? subscription, Func handler) + public virtual async Task SendAndWaitAsync(T obj, TimeSpan timeout, SocketSubscription? subscription, int weight, Func handler) { - var pending = new PendingRequest(handler, timeout, subscription); + var pending = new PendingRequest(ExchangeHelpers.NextId(), handler, timeout, subscription); lock (_pendingRequests) { _pendingRequests.Add(pending); } - var sendOk = Send(obj); - if(!sendOk) - pending.Fail(); - return pending.Event.WaitAsync(timeout); + var sendOk = Send(pending.Id, obj, weight); + if (!sendOk) + { + pending.Fail(); + return; + } + + while (true) + { + if(!_socket.IsOpen) + { + pending.Fail(); + return; + } + + if (pending.Completed) + return; + + await pending.Event.WaitAsync(TimeSpan.FromMilliseconds(500)).ConfigureAwait(false); + + if (pending.Completed) + return; + } } /// /// Send data over the websocket connection /// /// The type of the object to send + /// The request id /// The object to send /// How null values should be serialized - public virtual bool Send(T obj, NullValueHandling nullValueHandling = NullValueHandling.Ignore) + /// The weight of the message + public virtual bool Send(int requestId, T obj, int weight, NullValueHandling nullValueHandling = NullValueHandling.Ignore) { if(obj is string str) - return Send(str); + return Send(requestId, str, weight); else - return Send(JsonConvert.SerializeObject(obj, Formatting.None, new JsonSerializerSettings { NullValueHandling = nullValueHandling })); + return Send(requestId, JsonConvert.SerializeObject(obj, Formatting.None, new JsonSerializerSettings { NullValueHandling = nullValueHandling }), weight); } /// /// Send string data over the websocket connection /// /// The data to send - public virtual bool Send(string data) + /// The weight of the message + /// The id of the request + public virtual bool Send(int requestId, string data, int weight) { - _logger.Log(LogLevel.Trace, $"Socket {SocketId} sending data: {data}"); + _logger.Log(LogLevel.Trace, $"Socket {SocketId} - msg {requestId} - sending messsage: {data}"); try { - _socket.Send(data); + _socket.Send(requestId, data, weight); return true; } catch(Exception) diff --git a/CryptoExchange.Net/Sockets/WebSocketParameters.cs b/CryptoExchange.Net/Sockets/WebSocketParameters.cs index f1e20b5..a20b296 100644 --- a/CryptoExchange.Net/Sockets/WebSocketParameters.cs +++ b/CryptoExchange.Net/Sockets/WebSocketParameters.cs @@ -1,4 +1,5 @@ -using CryptoExchange.Net.Objects; +using CryptoExchange.Net.Interfaces; +using CryptoExchange.Net.Objects; using System; using System.Collections.Generic; using System.Text; @@ -52,9 +53,9 @@ namespace CryptoExchange.Net.Sockets public TimeSpan? KeepAliveInterval { get; set; } /// - /// The max amount of messages to send per second + /// The rate limiters for the socket connection /// - public int? RatelimitPerSecond { get; set; } + public IEnumerable? RateLimiters { get; set; } /// /// Origin header value to send in the connection handshake