1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-06-07 07:56:12 +00:00

Feature/ratelimit refactor (#197)

This commit is contained in:
Jan Korf 2024-04-16 14:55:27 +02:00 committed by GitHub
parent 2dbd5be924
commit 1b1961db00
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
57 changed files with 2294 additions and 688 deletions

View File

@ -13,6 +13,11 @@ using System.Net.Http;
using System.Threading.Tasks;
using System.Threading;
using NUnit.Framework.Legacy;
using CryptoExchange.Net.RateLimiting;
using System.Net;
using CryptoExchange.Net.RateLimiting.Guards;
using CryptoExchange.Net.RateLimiting.Filters;
using CryptoExchange.Net.RateLimiting.Interfaces;
namespace CryptoExchange.Net.UnitTests
{
@ -107,14 +112,14 @@ namespace CryptoExchange.Net.UnitTests
// arrange
// act
var options = new TestClientOptions();
options.Api1Options.RateLimiters = new List<IRateLimiter> { new RateLimiter() };
options.Api1Options.RateLimitingBehaviour = RateLimitingBehaviour.Fail;
options.Api1Options.TimestampRecalculationInterval = TimeSpan.FromMinutes(10);
options.Api1Options.OutputOriginalData = true;
options.RequestTimeout = TimeSpan.FromMinutes(1);
var client = new TestBaseClient(options);
// assert
Assert.That(((TestClientOptions)client.ClientOptions).Api1Options.RateLimiters.Count == 1);
Assert.That(((TestClientOptions)client.ClientOptions).Api1Options.RateLimitingBehaviour == RateLimitingBehaviour.Fail);
Assert.That(((TestClientOptions)client.ClientOptions).Api1Options.TimestampRecalculationInterval == TimeSpan.FromMinutes(10));
Assert.That(((TestClientOptions)client.ClientOptions).Api1Options.OutputOriginalData == true);
Assert.That(((TestClientOptions)client.ClientOptions).RequestTimeout == TimeSpan.FromMinutes(1));
}
@ -162,18 +167,22 @@ namespace CryptoExchange.Net.UnitTests
[TestCase(1, 2)]
public async Task PartialEndpointRateLimiterBasics(int requests, double perSeconds)
{
var rateLimiter = new RateLimiter();
rateLimiter.AddPartialEndpointLimit("/sapi/", requests, TimeSpan.FromSeconds(perSeconds));
var rateLimiter = new RateLimitGate("Test");
rateLimiter.AddGuard(new RateLimitGuard(RateLimitGuard.PerHost, new PathStartFilter("/sapi/"), requests, TimeSpan.FromSeconds(perSeconds), RateLimitWindowType.Fixed));
var triggered = false;
rateLimiter.RateLimitTriggered += (x) => { triggered = true; };
var requestDefinition = new RequestDefinition("/sapi/v1/system/status", HttpMethod.Get);
for (var i = 0; i < requests + 1; i++)
{
var result1 = await rateLimiter.LimitRequestAsync(new TraceLogger(), "/sapi/v1/system/status", HttpMethod.Get, false, "123".ToSecureString(), RateLimitingBehaviour.Wait, 1, default);
Assert.That(i == requests? result1.Data > 1 : result1.Data == 0);
var result1 = await rateLimiter.ProcessAsync(new TraceLogger(), 1, RateLimitItemType.Request, requestDefinition, "https://test.com", "123".ToSecureString(), 1, RateLimitingBehaviour.Wait, default);
Assert.That(i == requests? triggered : !triggered);
}
triggered = false;
await Task.Delay((int)Math.Round(perSeconds * 1000) + 10);
var result2 = await rateLimiter.LimitRequestAsync(new TraceLogger(), "/sapi/v1/system/status", HttpMethod.Get, false, "123".ToSecureString(), RateLimitingBehaviour.Wait, 1, default);
Assert.That(result2.Data == 0);
var result2 = await rateLimiter.ProcessAsync(new TraceLogger(), 1, RateLimitItemType.Request, requestDefinition, "https://test.com", "123".ToSecureString(), 1, RateLimitingBehaviour.Wait, default);
Assert.That(!triggered);
}
[TestCase("/sapi/test1", true)]
@ -183,29 +192,40 @@ namespace CryptoExchange.Net.UnitTests
[TestCase("/sapi/", true)]
public async Task PartialEndpointRateLimiterEndpoints(string endpoint, bool expectLimiting)
{
var rateLimiter = new RateLimiter();
rateLimiter.AddPartialEndpointLimit("/sapi/", 1, TimeSpan.FromSeconds(0.1));
var rateLimiter = new RateLimitGate("Test");
rateLimiter.AddGuard(new RateLimitGuard(RateLimitGuard.PerHost, new PathStartFilter("/sapi/"), 1, TimeSpan.FromSeconds(0.1), RateLimitWindowType.Fixed));
var requestDefinition = new RequestDefinition(endpoint, HttpMethod.Get);
RateLimitEvent evnt = null;
rateLimiter.RateLimitTriggered += (x) => { evnt = x; };
for (var i = 0; i < 2; i++)
{
var result1 = await rateLimiter.LimitRequestAsync(new TraceLogger(), endpoint, HttpMethod.Get, false, "123".ToSecureString(), RateLimitingBehaviour.Wait, 1, default);
bool expected = i == 1 ? (expectLimiting ? result1.Data > 1 : result1.Data == 0) : result1.Data == 0;
var result1 = await rateLimiter.ProcessAsync(new TraceLogger(), 1, RateLimitItemType.Request, requestDefinition, "https://test.com", "123".ToSecureString(), 1, RateLimitingBehaviour.Wait, default);
bool expected = i == 1 ? (expectLimiting ? evnt.DelayTime > TimeSpan.Zero : evnt == null) : evnt == null;
Assert.That(expected);
}
}
[TestCase("/sapi/", "/sapi/", true)]
[TestCase("/sapi/test", "/sapi/test", true)]
[TestCase("/sapi/test", "/sapi/test123", false)]
[TestCase("/sapi/test", "/sapi/", false)]
public async Task PartialEndpointRateLimiterEndpoints(string endpoint1, string endpoint2, bool expectLimiting)
{
var rateLimiter = new RateLimiter();
rateLimiter.AddPartialEndpointLimit("/sapi/", 1, TimeSpan.FromSeconds(0.1), countPerEndpoint: true);
var rateLimiter = new RateLimitGate("Test");
rateLimiter.AddGuard(new RateLimitGuard(RateLimitGuard.PerEndpoint, new PathStartFilter("/sapi/"), 1, TimeSpan.FromSeconds(0.1), RateLimitWindowType.Fixed));
var result1 = await rateLimiter.LimitRequestAsync(new TraceLogger(), endpoint1, HttpMethod.Get, false, "123".ToSecureString(), RateLimitingBehaviour.Wait, 1, default);
var result2 = await rateLimiter.LimitRequestAsync(new TraceLogger(), endpoint2, HttpMethod.Get, false, "123".ToSecureString(), RateLimitingBehaviour.Wait, 1, default);
Assert.That(result1.Data == 0);
Assert.That(expectLimiting ? result2.Data > 0 : result2.Data == 0);
var requestDefinition1 = new RequestDefinition(endpoint1, HttpMethod.Get);
var requestDefinition2 = new RequestDefinition(endpoint2, HttpMethod.Get);
RateLimitEvent evnt = null;
rateLimiter.RateLimitTriggered += (x) => { evnt = x; };
var result1 = await rateLimiter.ProcessAsync(new TraceLogger(), 1, RateLimitItemType.Request, requestDefinition1, "https://test.com", "123".ToSecureString(), 1, RateLimitingBehaviour.Wait, default);
Assert.That(evnt == null);
var result2 = await rateLimiter.ProcessAsync(new TraceLogger(), 1, RateLimitItemType.Request, requestDefinition2, "https://test.com", "123".ToSecureString(), 1, RateLimitingBehaviour.Wait, default);
Assert.That(expectLimiting ? evnt != null : evnt == null);
}
[TestCase(1, 0.1)]
@ -214,18 +234,22 @@ namespace CryptoExchange.Net.UnitTests
[TestCase(1, 2)]
public async Task EndpointRateLimiterBasics(int requests, double perSeconds)
{
var rateLimiter = new RateLimiter();
rateLimiter.AddEndpointLimit("/sapi/test", requests, TimeSpan.FromSeconds(perSeconds));
var rateLimiter = new RateLimitGate("Test");
rateLimiter.AddGuard(new RateLimitGuard(RateLimitGuard.PerEndpoint, new PathStartFilter("/sapi/test"), requests, TimeSpan.FromSeconds(perSeconds), RateLimitWindowType.Fixed));
bool triggered = false;
rateLimiter.RateLimitTriggered += (x) => { triggered = true; };
var requestDefinition = new RequestDefinition("/sapi/test", HttpMethod.Get);
for (var i = 0; i < requests + 1; i++)
{
var result1 = await rateLimiter.LimitRequestAsync(new TraceLogger(), "/sapi/test", HttpMethod.Get, false, "123".ToSecureString(), RateLimitingBehaviour.Wait, 1, default);
Assert.That(i == requests ? result1.Data > 1 : result1.Data == 0);
var result1 = await rateLimiter.ProcessAsync(new TraceLogger(), 1, RateLimitItemType.Request, requestDefinition, "https://test.com", "123".ToSecureString(), 1, RateLimitingBehaviour.Wait, default);
Assert.That(i == requests ? triggered : !triggered);
}
triggered = false;
await Task.Delay((int)Math.Round(perSeconds * 1000) + 10);
var result2 = await rateLimiter.LimitRequestAsync(new TraceLogger(), "/sapi/test", HttpMethod.Get, false, "123".ToSecureString(), RateLimitingBehaviour.Wait, 1, default);
Assert.That(result2.Data == 0);
var result2 = await rateLimiter.ProcessAsync(new TraceLogger(), 1, RateLimitItemType.Request, requestDefinition, "https://test.com", "123".ToSecureString(), 1, RateLimitingBehaviour.Wait, default);
Assert.That(!triggered);
}
[TestCase("/", false)]
@ -233,13 +257,17 @@ namespace CryptoExchange.Net.UnitTests
[TestCase("/sapi/test/123", false)]
public async Task EndpointRateLimiterEndpoints(string endpoint, bool expectLimited)
{
var rateLimiter = new RateLimiter();
rateLimiter.AddEndpointLimit("/sapi/test", 1, TimeSpan.FromSeconds(0.1));
var rateLimiter = new RateLimitGate("Test");
rateLimiter.AddGuard(new RateLimitGuard(RateLimitGuard.PerEndpoint, new ExactPathFilter("/sapi/test"), 1, TimeSpan.FromSeconds(0.1), RateLimitWindowType.Fixed));
var requestDefinition = new RequestDefinition(endpoint, HttpMethod.Get);
RateLimitEvent evnt = null;
rateLimiter.RateLimitTriggered += (x) => { evnt = x; };
for (var i = 0; i < 2; i++)
{
var result1 = await rateLimiter.LimitRequestAsync(new TraceLogger(), endpoint, HttpMethod.Get, false, "123".ToSecureString(), RateLimitingBehaviour.Wait, 1, default);
bool expected = i == 1 ? (expectLimited ? result1.Data > 1 : result1.Data == 0) : result1.Data == 0;
var result1 = await rateLimiter.ProcessAsync(new TraceLogger(), 1, RateLimitItemType.Request, requestDefinition, "https://test.com", "123".ToSecureString(), 1, RateLimitingBehaviour.Wait, default);
bool expected = i == 1 ? (expectLimited ? evnt.DelayTime > TimeSpan.Zero : evnt == null) : evnt == null;
Assert.That(expected);
}
}
@ -250,47 +278,41 @@ namespace CryptoExchange.Net.UnitTests
[TestCase("/sapi/test23", false)]
public async Task EndpointRateLimiterMultipleEndpoints(string endpoint, bool expectLimited)
{
var rateLimiter = new RateLimiter();
rateLimiter.AddEndpointLimit(new[] { "/sapi/test", "/sapi/test2" }, 1, TimeSpan.FromSeconds(0.1));
var rateLimiter = new RateLimitGate("Test");
rateLimiter.AddGuard(new RateLimitGuard(RateLimitGuard.PerEndpoint, new ExactPathsFilter(new[] { "/sapi/test", "/sapi/test2" }), 1, TimeSpan.FromSeconds(0.1), RateLimitWindowType.Fixed));
var requestDefinition = new RequestDefinition(endpoint, HttpMethod.Get);
RateLimitEvent evnt = null;
rateLimiter.RateLimitTriggered += (x) => { evnt = x; };
for (var i = 0; i < 2; i++)
{
var result1 = await rateLimiter.LimitRequestAsync(new TraceLogger(), endpoint, HttpMethod.Get, false, "123".ToSecureString(), RateLimitingBehaviour.Wait, 1, default);
bool expected = i == 1 ? (expectLimited ? result1.Data > 1 : result1.Data == 0) : result1.Data == 0;
var result1 = await rateLimiter.ProcessAsync(new TraceLogger(), 1, RateLimitItemType.Request, requestDefinition, "https://test.com", "123".ToSecureString(), 1, RateLimitingBehaviour.Wait, default);
bool expected = i == 1 ? (expectLimited ? evnt.DelayTime > TimeSpan.Zero : evnt == null) : evnt == null;
Assert.That(expected);
}
}
[TestCase("123", "123", "/sapi/test", "/sapi/test", true, true, true, true)]
[TestCase("123", "456", "/sapi/test", "/sapi/test", true, true, true, false)]
[TestCase("123", "123", "/sapi/test", "/sapi/test2", true, true, true, true)]
[TestCase("123", "123", "/sapi/test2", "/sapi/test", true, true, true, true)]
[TestCase("123", "123", "/sapi/test", "/sapi/test", true, false, true, false)]
[TestCase("123", "123", "/sapi/test", "/sapi/test", false, true, true, false)]
[TestCase("123", "123", "/sapi/test", "/sapi/test", false, false, true, false)]
[TestCase(null, "123", "/sapi/test", "/sapi/test", false, true, true, false)]
[TestCase("123", null, "/sapi/test", "/sapi/test", true, false, true, false)]
[TestCase(null, null, "/sapi/test", "/sapi/test", false, false, true, false)]
[TestCase("123", "123", "/sapi/test", "/sapi/test", true, true, false, true)]
[TestCase("123", "456", "/sapi/test", "/sapi/test", true, true, false, false)]
[TestCase("123", "123", "/sapi/test", "/sapi/test2", true, true, false, true)]
[TestCase("123", "123", "/sapi/test2", "/sapi/test", true, true, false, true)]
[TestCase("123", "123", "/sapi/test", "/sapi/test", true, false, false, true)]
[TestCase("123", "123", "/sapi/test", "/sapi/test", false, true, false, true)]
[TestCase("123", "123", "/sapi/test", "/sapi/test", false, false, false, true)]
[TestCase(null, "123", "/sapi/test", "/sapi/test", false, true, false, false)]
[TestCase("123", null, "/sapi/test", "/sapi/test", true, false, false, false)]
[TestCase(null, null, "/sapi/test", "/sapi/test", false, false, false, true)]
public async Task ApiKeyRateLimiterBasics(string key1, string key2, string endpoint1, string endpoint2, bool signed1, bool signed2, bool onlyForSignedRequests, bool expectLimited)
[TestCase("123", "123", "/sapi/test", "/sapi/test", true)]
[TestCase("123", "456", "/sapi/test", "/sapi/test", false)]
[TestCase("123", "123", "/sapi/test", "/sapi/test2", true)]
[TestCase("123", "123", "/sapi/test2", "/sapi/test", true)]
[TestCase(null, "123", "/sapi/test", "/sapi/test", false)]
[TestCase("123", null, "/sapi/test", "/sapi/test", false)]
[TestCase(null, null, "/sapi/test", "/sapi/test", false)]
public async Task ApiKeyRateLimiterBasics(string key1, string key2, string endpoint1, string endpoint2, bool expectLimited)
{
var rateLimiter = new RateLimiter();
rateLimiter.AddApiKeyLimit(1, TimeSpan.FromSeconds(0.1), onlyForSignedRequests, false);
var rateLimiter = new RateLimitGate("Test");
rateLimiter.AddGuard(new RateLimitGuard(RateLimitGuard.PerApiKey, new AuthenticatedEndpointFilter(true), 1, TimeSpan.FromSeconds(0.1), RateLimitWindowType.Fixed));
var requestDefinition1 = new RequestDefinition(endpoint1, HttpMethod.Get) { Authenticated = key1 != null };
var requestDefinition2 = new RequestDefinition(endpoint2, HttpMethod.Get) { Authenticated = key2 != null };
var result1 = await rateLimiter.LimitRequestAsync(new TraceLogger(), endpoint1, HttpMethod.Get, signed1, key1?.ToSecureString(), RateLimitingBehaviour.Wait, 1, default);
var result2 = await rateLimiter.LimitRequestAsync(new TraceLogger(), endpoint2, HttpMethod.Get, signed2, key2?.ToSecureString(), RateLimitingBehaviour.Wait, 1, default);
Assert.That(result1.Data == 0);
Assert.That(expectLimited ? result2.Data > 0 : result2.Data == 0);
RateLimitEvent evnt = null;
rateLimiter.RateLimitTriggered += (x) => { evnt = x; };
var result1 = await rateLimiter.ProcessAsync(new TraceLogger(), 1, RateLimitItemType.Request, requestDefinition1, "https://test.com", key1?.ToSecureString(), 1, RateLimitingBehaviour.Wait, default);
Assert.That(evnt == null);
var result2 = await rateLimiter.ProcessAsync(new TraceLogger(), 1, RateLimitItemType.Request, requestDefinition2, "https://test.com", key2?.ToSecureString(), 1, RateLimitingBehaviour.Wait, default);
Assert.That(expectLimited ? evnt != null : evnt == null);
}
[TestCase("/sapi/test", "/sapi/test", true)]
@ -298,29 +320,55 @@ namespace CryptoExchange.Net.UnitTests
[TestCase("/", "/sapi/test2", true)]
public async Task TotalRateLimiterBasics(string endpoint1, string endpoint2, bool expectLimited)
{
var rateLimiter = new RateLimiter();
rateLimiter.AddTotalRateLimit(1, TimeSpan.FromSeconds(0.1));
var rateLimiter = new RateLimitGate("Test");
rateLimiter.AddGuard(new RateLimitGuard(RateLimitGuard.PerHost, Array.Empty<IGuardFilter>(), 1, TimeSpan.FromSeconds(0.1), RateLimitWindowType.Fixed));
var requestDefinition1 = new RequestDefinition(endpoint1, HttpMethod.Get);
var requestDefinition2 = new RequestDefinition(endpoint2, HttpMethod.Get) { Authenticated = true };
var result1 = await rateLimiter.LimitRequestAsync(new TraceLogger(), endpoint1, HttpMethod.Get, false, "123".ToSecureString(), RateLimitingBehaviour.Wait, 1, default);
var result2 = await rateLimiter.LimitRequestAsync(new TraceLogger(), endpoint2, HttpMethod.Get, true, "123".ToSecureString(), RateLimitingBehaviour.Wait, 1, default);
Assert.That(result1.Data == 0);
Assert.That(expectLimited ? result2.Data > 0 : result2.Data == 0);
RateLimitEvent evnt = null;
rateLimiter.RateLimitTriggered += (x) => { evnt = x; };
var result1 = await rateLimiter.ProcessAsync(new TraceLogger(), 1, RateLimitItemType.Request, requestDefinition1, "https://test.com", "123".ToSecureString(), 1, RateLimitingBehaviour.Wait, default);
Assert.That(evnt == null);
var result2 = await rateLimiter.ProcessAsync(new TraceLogger(), 1, RateLimitItemType.Request, requestDefinition2, "https://test.com", null, 1, RateLimitingBehaviour.Wait, default);
Assert.That(expectLimited ? evnt != null : evnt == null);
}
[TestCase("/sapi/test", true, true, true, false)]
[TestCase("/sapi/test", false, true, true, false)]
[TestCase("/sapi/test", false, true, false, true)]
[TestCase("/sapi/test", true, true, false, true)]
public async Task ApiKeyRateLimiterIgnores_TotalRateLimiter_IfSet(string endpoint, bool signed1, bool signed2, bool ignoreTotal, bool expectLimited)
[TestCase("https://test.com", "/sapi/test", "https://test.com", "/sapi/test", true)]
[TestCase("https://test2.com", "/sapi/test", "https://test.com", "/sapi/test", false)]
[TestCase("https://test.com", "/sapi/test", "https://test2.com", "/sapi/test", false)]
[TestCase("https://test.com", "/sapi/test", "https://test.com", "/sapi/test2", true)]
public async Task HostRateLimiterBasics(string host1, string endpoint1, string host2, string endpoint2, bool expectLimited)
{
var rateLimiter = new RateLimiter();
rateLimiter.AddApiKeyLimit(100, TimeSpan.FromSeconds(0.1), true, ignoreTotal);
rateLimiter.AddTotalRateLimit(1, TimeSpan.FromSeconds(0.1));
var rateLimiter = new RateLimitGate("Test");
rateLimiter.AddGuard(new RateLimitGuard(RateLimitGuard.PerHost, new HostFilter("https://test.com"), 1, TimeSpan.FromSeconds(0.1), RateLimitWindowType.Fixed));
var requestDefinition1 = new RequestDefinition(endpoint1, HttpMethod.Get);
var requestDefinition2 = new RequestDefinition(endpoint2, HttpMethod.Get) { Authenticated = true };
var result1 = await rateLimiter.LimitRequestAsync(new TraceLogger(), endpoint, HttpMethod.Get, signed1, "123".ToSecureString(), RateLimitingBehaviour.Wait, 1, default);
var result2 = await rateLimiter.LimitRequestAsync(new TraceLogger(), endpoint, HttpMethod.Get, signed2, "123".ToSecureString(), RateLimitingBehaviour.Wait, 1, default);
Assert.That(result1.Data == 0);
Assert.That(expectLimited ? result2.Data > 0 : result2.Data == 0);
RateLimitEvent evnt = null;
rateLimiter.RateLimitTriggered += (x) => { evnt = x; };
var result1 = await rateLimiter.ProcessAsync(new TraceLogger(), 1, RateLimitItemType.Request, requestDefinition1, host1, "123".ToSecureString(), 1, RateLimitingBehaviour.Wait, default);
Assert.That(evnt == null);
var result2 = await rateLimiter.ProcessAsync(new TraceLogger(), 1, RateLimitItemType.Request, requestDefinition1, host2, "123".ToSecureString(), 1, RateLimitingBehaviour.Wait, default);
Assert.That(expectLimited ? evnt != null : evnt == null);
}
[TestCase("https://test.com", "https://test.com", true)]
[TestCase("https://test2.com", "https://test.com", false)]
[TestCase("https://test.com", "https://test2.com", false)]
public async Task ConnectionRateLimiterBasics(string host1, string host2, bool expectLimited)
{
var rateLimiter = new RateLimitGate("Test");
rateLimiter.AddGuard(new RateLimitGuard(RateLimitGuard.PerHost, new LimitItemTypeFilter(RateLimitItemType.Connection), 1, TimeSpan.FromSeconds(0.1), RateLimitWindowType.Fixed));
RateLimitEvent evnt = null;
rateLimiter.RateLimitTriggered += (x) => { evnt = x; };
var result1 = await rateLimiter.ProcessAsync(new TraceLogger(), 1, RateLimitItemType.Connection, new RequestDefinition("1", HttpMethod.Get), host1, "123".ToSecureString(), 1, RateLimitingBehaviour.Wait, default);
Assert.That(evnt == null);
var result2 = await rateLimiter.ProcessAsync(new TraceLogger(), 1, RateLimitItemType.Connection, new RequestDefinition("1", HttpMethod.Get), host2, "123".ToSecureString(), 1, RateLimitingBehaviour.Wait, default);
Assert.That(expectLimited ? evnt != null : evnt == null);
}
}
}

View File

@ -139,12 +139,12 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations
public async Task<CallResult<T>> Request<T>(CancellationToken ct = default) where T : class
{
return await SendRequestAsync<T>(new Uri("http://www.test.com"), HttpMethod.Get, ct);
return await SendRequestAsync<T>(new Uri("http://www.test.com"), HttpMethod.Get, ct, requestWeight: 0);
}
public async Task<CallResult<T>> RequestWithParams<T>(HttpMethod method, Dictionary<string, object> parameters, Dictionary<string, string> headers) where T : class
{
return await SendRequestAsync<T>(new Uri("http://www.test.com"), method, default, parameters, additionalHeaders: headers);
return await SendRequestAsync<T>(new Uri("http://www.test.com"), method, default, parameters, requestWeight: 0, additionalHeaders: headers);
}
public void SetParameterPosition(HttpMethod method, HttpMethodParameterPosition position)
@ -180,7 +180,7 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations
public async Task<CallResult<T>> Request<T>(CancellationToken ct = default) where T : class
{
return await SendRequestAsync<T>(new Uri("http://www.test.com"), HttpMethod.Get, ct);
return await SendRequestAsync<T>(new Uri("http://www.test.com"), HttpMethod.Get, ct, requestWeight: 0);
}
protected override Error ParseErrorResponse(int httpStatusCode, IEnumerable<KeyValuePair<string, IEnumerable<string>>> responseHeaders, IMessageAccessor accessor)

View File

@ -18,6 +18,7 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations
#pragma warning disable 0067
public event Func<Task> OnReconnected;
public event Func<Task> OnReconnecting;
public event Func<int, Task> OnRequestRateLimited;
#pragma warning restore 0067
public event Func<int, Task> OnRequestSent;
public event Action<WebSocketMessageType, ReadOnlyMemory<byte>> OnStreamMessage;
@ -62,13 +63,13 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations
}
}
public Task<bool> ConnectAsync()
public Task<CallResult> ConnectAsync()
{
Connected = CanConnect;
ConnectCalls++;
if (CanConnect)
InvokeOpen();
return Task.FromResult(CanConnect);
return Task.FromResult(CanConnect ? new CallResult(null) : new CallResult(new CantConnectError()));
}
public void Send(int requestId, string data, int weight)

View File

@ -92,7 +92,7 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations
protected override AuthenticationProvider CreateAuthenticationProvider(ApiCredentials credentials)
=> new TestAuthProvider(credentials);
public CallResult<bool> ConnectSocketSub(SocketConnection sub)
public CallResult ConnectSocketSub(SocketConnection sub)
{
return ConnectSocketAsync(sub).Result;
}

View File

@ -1,13 +1,6 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Net.Http;
using System.Text;
using System.Threading.Tasks;
using CryptoExchange.Net.Authentication;
using CryptoExchange.Net.Converters;
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Options;
using Microsoft.Extensions.Logging;

View File

@ -13,6 +13,8 @@ using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Logging.Extensions;
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Options;
using CryptoExchange.Net.RateLimiting;
using CryptoExchange.Net.RateLimiting.Interfaces;
using CryptoExchange.Net.Requests;
using Microsoft.Extensions.Logging;
@ -55,11 +57,6 @@ namespace CryptoExchange.Net.Clients
/// </summary>
protected Dictionary<string, string>? StandardRequestHeaders { get; set; }
/// <summary>
/// List of rate limiters
/// </summary>
internal IEnumerable<IRateLimiter> RateLimiters { get; }
/// <summary>
/// Where to put the parameters for requests with different Http methods
/// </summary>
@ -94,11 +91,6 @@ namespace CryptoExchange.Net.Clients
options,
apiOptions)
{
var rateLimiters = new List<IRateLimiter>();
foreach (var rateLimiter in apiOptions.RateLimiters)
rateLimiters.Add(rateLimiter);
RateLimiters = rateLimiters;
RequestFactory.Configure(options.Proxy, options.RequestTimeout, httpClient);
}
@ -114,6 +106,255 @@ namespace CryptoExchange.Net.Clients
/// <returns></returns>
protected virtual IMessageSerializer CreateSerializer() => new JsonNetMessageSerializer();
/// <summary>
/// Send a request to the base address based on the request definition
/// </summary>
/// <param name="baseAddress">Host and schema</param>
/// <param name="definition">Request definition</param>
/// <param name="parameters">Request parameters</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <param name="additionalHeaders">Additional headers for this request</param>
/// <param name="weight">Override the request weight for this request definition, for example when the weight depends on the parameters</param>
/// <returns></returns>
protected virtual async Task<WebCallResult> SendAsync(
string baseAddress,
RequestDefinition definition,
ParameterCollection? parameters,
CancellationToken cancellationToken,
Dictionary<string, string>? additionalHeaders = null,
int? weight = null)
{
var result = await SendAsync<object>(baseAddress, definition, parameters, cancellationToken, additionalHeaders, weight).ConfigureAwait(false);
return result.AsDataless();
}
/// <summary>
/// Send a request to the base address based on the request definition
/// </summary>
/// <typeparam name="T">Response type</typeparam>
/// <param name="baseAddress">Host and schema</param>
/// <param name="definition">Request definition</param>
/// <param name="parameters">Request parameters</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <param name="additionalHeaders">Additional headers for this request</param>
/// <param name="weight">Override the request weight for this request definition, for example when the weight depends on the parameters</param>
/// <returns></returns>
protected virtual async Task<WebCallResult<T>> SendAsync<T>(
string baseAddress,
RequestDefinition definition,
ParameterCollection? parameters,
CancellationToken cancellationToken,
Dictionary<string, string>? additionalHeaders = null,
int? weight = null) where T : class
{
int currentTry = 0;
while (true)
{
currentTry++;
var prepareResult = await PrepareAsync(baseAddress, definition, parameters, cancellationToken, additionalHeaders, weight).ConfigureAwait(false);
if (!prepareResult)
return new WebCallResult<T>(prepareResult.Error!);
var request = CreateRequest(baseAddress, definition, parameters, additionalHeaders);
_logger.RestApiSendRequest(request.RequestId, definition, request.Content, request.Uri.Query, string.Join(", ", request.GetHeaders().Select(h => h.Key + $"=[{string.Join(",", h.Value)}]")));
TotalRequestsMade++;
var result = await GetResponseAsync<T>(request, definition.RateLimitGate, cancellationToken).ConfigureAwait(false);
if (!result)
_logger.RestApiErrorReceived(result.RequestId, result.ResponseStatusCode, (long)Math.Floor(result.ResponseTime!.Value.TotalMilliseconds), result.Error?.ToString());
else
_logger.RestApiResponseReceived(result.RequestId, result.ResponseStatusCode, (long)Math.Floor(result.ResponseTime!.Value.TotalMilliseconds), OutputOriginalData ? result.OriginalData : "[Data only available when OutputOriginal = true]");
if (await ShouldRetryRequestAsync(definition.RateLimitGate, result, currentTry).ConfigureAwait(false))
continue;
return result;
}
}
/// <summary>
/// Prepare before sending a request. Sync time between client and server and check rate limits
/// </summary>
/// <param name="baseAddress">Host and schema</param>
/// <param name="definition">Request definition</param>
/// <param name="parameters">Request parameters</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <param name="additionalHeaders">Additional headers for this request</param>
/// <param name="weight">Override the request weight for this request</param>
/// <returns></returns>
/// <exception cref="Exception"></exception>
protected virtual async Task<CallResult> PrepareAsync(
string baseAddress,
RequestDefinition definition,
ParameterCollection? parameters,
CancellationToken cancellationToken,
Dictionary<string, string>? additionalHeaders = null,
int? weight = null)
{
var requestId = ExchangeHelpers.NextId();
var requestWeight = weight ?? definition.Weight;
// Time sync
if (definition.Authenticated)
{
if (AuthenticationProvider == null)
{
_logger.RestApiNoApiCredentials(requestId, definition.Path);
return new CallResult<IRequest>(new NoApiCredentialsError());
}
var syncTask = SyncTimeAsync();
var timeSyncInfo = GetTimeSyncInfo();
if (timeSyncInfo != null && timeSyncInfo.TimeSyncState.LastSyncTime == default)
{
// Initially with first request we'll need to wait for the time syncing, if it's not the first request we can just continue
var syncTimeResult = await syncTask.ConfigureAwait(false);
if (!syncTimeResult)
{
_logger.RestApiFailedToSyncTime(requestId, syncTimeResult.Error!.ToString());
return syncTimeResult.AsDataless();
}
}
}
// Rate limiting
if (requestWeight != 0)
{
if (definition.RateLimitGate == null)
throw new Exception("Ratelimit gate not set when request weight is not 0");
if (ClientOptions.RatelimiterEnabled)
{
var limitResult = await definition.RateLimitGate.ProcessAsync(_logger, requestId, RateLimitItemType.Request, definition, baseAddress, ApiOptions.ApiCredentials?.Key ?? ClientOptions.ApiCredentials?.Key, requestWeight, ClientOptions.RateLimitingBehaviour, cancellationToken).ConfigureAwait(false);
if (!limitResult)
return new CallResult(limitResult.Error!);
}
}
// Endpoint specific rate limiting
if (definition.EndpointLimitCount != null && definition.EndpointLimitPeriod != null)
{
if (definition.RateLimitGate == null)
throw new Exception("Ratelimit gate not set when endpoint limit is specified");
if (ClientOptions.RatelimiterEnabled)
{
var limitResult = await definition.RateLimitGate.ProcessSingleAsync(_logger, requestId, RateLimitItemType.Request, definition, baseAddress, ApiOptions.ApiCredentials?.Key ?? ClientOptions.ApiCredentials?.Key, requestWeight, ClientOptions.RateLimitingBehaviour, cancellationToken).ConfigureAwait(false);
if (!limitResult)
return new CallResult(limitResult.Error!);
}
}
return new CallResult(null);
}
/// <summary>
/// Creates a request object
/// </summary>
/// <param name="baseAddress">Host and schema</param>
/// <param name="definition">Request definition</param>
/// <param name="parameters">The parameters of the request</param>
/// <param name="additionalHeaders">Additional headers to send with the request</param>
/// <returns></returns>
protected virtual IRequest CreateRequest(
string baseAddress,
RequestDefinition definition,
ParameterCollection? parameters,
Dictionary<string, string>? additionalHeaders)
{
parameters ??= new ParameterCollection();
var uri = new Uri(baseAddress.AppendPath(definition.Path));
var parameterPosition = definition.ParameterPosition ?? ParameterPositions[definition.Method];
var arraySerialization = definition.ArraySerialization ?? ArraySerialization;
var bodyFormat = definition.RequestBodyFormat ?? RequestBodyFormat;
var requestId = ExchangeHelpers.NextId();
for (var i = 0; i < parameters.Count; i++)
{
var kvp = parameters.ElementAt(i);
if (kvp.Value is Func<object> delegateValue)
parameters[kvp.Key] = delegateValue();
}
if (parameterPosition == HttpMethodParameterPosition.InUri)
{
foreach (var parameter in parameters)
uri = uri.AddQueryParmeter(parameter.Key, parameter.Value.ToString());
}
var headers = new Dictionary<string, string>();
var uriParameters = parameterPosition == HttpMethodParameterPosition.InUri ? new SortedDictionary<string, object>(parameters) : new SortedDictionary<string, object>();
var bodyParameters = parameterPosition == HttpMethodParameterPosition.InBody ? new SortedDictionary<string, object>(parameters) : new SortedDictionary<string, object>();
if (AuthenticationProvider != null)
{
try
{
AuthenticationProvider.AuthenticateRequest(
this,
uri,
definition.Method,
parameters,
definition.Authenticated,
arraySerialization,
parameterPosition,
bodyFormat,
out uriParameters,
out bodyParameters,
out headers);
}
catch (Exception ex)
{
throw new Exception("Failed to authenticate request, make sure your API credentials are correct", ex);
}
}
// Sanity check
foreach (var param in parameters)
{
if (!uriParameters.ContainsKey(param.Key) && !bodyParameters.ContainsKey(param.Key))
{
throw new Exception($"Missing parameter {param.Key} after authentication processing. AuthenticationProvider implementation " +
$"should return provided parameters in either the uri or body parameters output");
}
}
// Add the auth parameters to the uri, start with a new URI to be able to sort the parameters including the auth parameters
uri = uri.SetParameters(uriParameters, arraySerialization);
var request = RequestFactory.Create(definition.Method, uri, requestId);
request.Accept = Constants.JsonContentHeader;
foreach (var header in headers)
request.AddHeader(header.Key, header.Value);
if (additionalHeaders != null)
{
foreach (var header in additionalHeaders)
request.AddHeader(header.Key, header.Value);
}
if (StandardRequestHeaders != null)
{
foreach (var header in StandardRequestHeaders)
{
// Only add it if it isn't overwritten
if (additionalHeaders?.ContainsKey(header.Key) != true)
request.AddHeader(header.Key, header.Value);
}
}
if (parameterPosition == HttpMethodParameterPosition.InBody)
{
var contentType = bodyFormat == RequestBodyFormat.Json ? Constants.JsonContentHeader : Constants.FormContentHeader;
if (bodyParameters.Any())
WriteParamBody(request, bodyParameters, contentType);
else
request.SetContent(RequestBodyEmptyContent, contentType);
}
return request;
}
/// <summary>
/// Execute a request to the uri and returns if it was successful
/// </summary>
@ -127,7 +368,7 @@ namespace CryptoExchange.Net.Clients
/// <param name="arraySerialization">How array parameters should be serialized, overwrites the value set in the client</param>
/// <param name="requestWeight">Credits used for the request</param>
/// <param name="additionalHeaders">Additional headers to send with the request</param>
/// <param name="ignoreRatelimit">Ignore rate limits for this request</param>
/// <param name="gate">The ratelimit gate to use</param>
/// <returns></returns>
[return: NotNull]
protected virtual async Task<WebCallResult> SendRequestAsync(
@ -141,23 +382,23 @@ namespace CryptoExchange.Net.Clients
ArrayParametersSerialization? arraySerialization = null,
int requestWeight = 1,
Dictionary<string, string>? additionalHeaders = null,
bool ignoreRatelimit = false)
IRateLimitGate? gate = null)
{
int currentTry = 0;
while (true)
{
currentTry++;
var request = await PrepareRequestAsync(uri, method, cancellationToken, parameters, signed, requestBodyFormat, parameterPosition, arraySerialization, requestWeight, additionalHeaders, ignoreRatelimit).ConfigureAwait(false);
var request = await PrepareRequestAsync(uri, method, cancellationToken, parameters, signed, requestBodyFormat, parameterPosition, arraySerialization, requestWeight, additionalHeaders, gate).ConfigureAwait(false);
if (!request)
return new WebCallResult(request.Error!);
var result = await GetResponseAsync<object>(request.Data, cancellationToken).ConfigureAwait(false);
var result = await GetResponseAsync<object>(request.Data, gate, cancellationToken).ConfigureAwait(false);
if (!result)
_logger.RestApiErrorReceived(result.RequestId, result.ResponseStatusCode, (long)Math.Floor(result.ResponseTime!.Value.TotalMilliseconds), result.Error?.ToString());
else
_logger.RestApiResponseReceived(result.RequestId, result.ResponseStatusCode, (long)Math.Floor(result.ResponseTime!.Value.TotalMilliseconds), OutputOriginalData ? result.OriginalData : "[Data only available when OutputOriginal = true]");
if (await ShouldRetryRequestAsync(result, currentTry).ConfigureAwait(false))
if (await ShouldRetryRequestAsync(gate, result, currentTry).ConfigureAwait(false))
continue;
return result.AsDataless();
@ -178,7 +419,7 @@ namespace CryptoExchange.Net.Clients
/// <param name="arraySerialization">How array parameters should be serialized, overwrites the value set in the client</param>
/// <param name="requestWeight">Credits used for the request</param>
/// <param name="additionalHeaders">Additional headers to send with the request</param>
/// <param name="ignoreRatelimit">Ignore rate limits for this request</param>
/// <param name="gate">The ratelimit gate to use</param>
/// <returns></returns>
[return: NotNull]
protected virtual async Task<WebCallResult<T>> SendRequestAsync<T>(
@ -192,24 +433,24 @@ namespace CryptoExchange.Net.Clients
ArrayParametersSerialization? arraySerialization = null,
int requestWeight = 1,
Dictionary<string, string>? additionalHeaders = null,
bool ignoreRatelimit = false
IRateLimitGate? gate = null
) where T : class
{
int currentTry = 0;
while (true)
{
currentTry++;
var request = await PrepareRequestAsync(uri, method, cancellationToken, parameters, signed, requestBodyFormat, parameterPosition, arraySerialization, requestWeight, additionalHeaders, ignoreRatelimit).ConfigureAwait(false);
var request = await PrepareRequestAsync(uri, method, cancellationToken, parameters, signed, requestBodyFormat, parameterPosition, arraySerialization, requestWeight, additionalHeaders, gate).ConfigureAwait(false);
if (!request)
return new WebCallResult<T>(request.Error!);
var result = await GetResponseAsync<T>(request.Data, cancellationToken).ConfigureAwait(false);
var result = await GetResponseAsync<T>(request.Data, gate, cancellationToken).ConfigureAwait(false);
if (!result)
_logger.RestApiErrorReceived(result.RequestId, result.ResponseStatusCode, (long)Math.Floor(result.ResponseTime!.Value.TotalMilliseconds), result.Error?.ToString());
else
_logger.RestApiResponseReceived(result.RequestId, result.ResponseStatusCode, (long)Math.Floor(result.ResponseTime!.Value.TotalMilliseconds), OutputOriginalData ? result.OriginalData : "[Data only available when OutputOriginal = true]");
if (await ShouldRetryRequestAsync(result, currentTry).ConfigureAwait(false))
if (await ShouldRetryRequestAsync(gate, result, currentTry).ConfigureAwait(false))
continue;
return result;
@ -229,7 +470,7 @@ namespace CryptoExchange.Net.Clients
/// <param name="arraySerialization">How array parameters should be serialized, overwrites the value set in the client</param>
/// <param name="requestWeight">Credits used for the request</param>
/// <param name="additionalHeaders">Additional headers to send with the request</param>
/// <param name="ignoreRatelimit">Ignore rate limits for this request</param>
/// <param name="gate">The rate limit gate to use</param>
/// <returns></returns>
protected virtual async Task<CallResult<IRequest>> PrepareRequestAsync(
Uri uri,
@ -242,12 +483,18 @@ namespace CryptoExchange.Net.Clients
ArrayParametersSerialization? arraySerialization = null,
int requestWeight = 1,
Dictionary<string, string>? additionalHeaders = null,
bool ignoreRatelimit = false)
IRateLimitGate? gate = null)
{
var requestId = ExchangeHelpers.NextId();
if (signed)
{
if (AuthenticationProvider == null)
{
_logger.RestApiNoApiCredentials(requestId, uri.AbsolutePath);
return new CallResult<IRequest>(new NoApiCredentialsError());
}
var syncTask = SyncTimeAsync();
var timeSyncInfo = GetTimeSyncInfo();
@ -262,23 +509,20 @@ namespace CryptoExchange.Net.Clients
}
}
}
if (!ignoreRatelimit)
if (requestWeight != 0)
{
foreach (var limiter in RateLimiters)
if (gate == null)
throw new Exception("Ratelimit gate not set when request weight is not 0");
if (ClientOptions.RatelimiterEnabled)
{
var limitResult = await limiter.LimitRequestAsync(_logger, uri.AbsolutePath, method, signed, ApiOptions.ApiCredentials?.Key ?? ClientOptions.ApiCredentials?.Key, ApiOptions.RateLimitingBehaviour, requestWeight, cancellationToken).ConfigureAwait(false);
if (!limitResult.Success)
var limitResult = await gate.ProcessAsync(_logger, requestId, RateLimitItemType.Request, new RequestDefinition(uri.AbsolutePath.TrimStart('/'), method) { Authenticated = signed }, uri.Host, ApiOptions.ApiCredentials?.Key ?? ClientOptions.ApiCredentials?.Key, requestWeight, ClientOptions.RateLimitingBehaviour, cancellationToken).ConfigureAwait(false);
if (!limitResult)
return new CallResult<IRequest>(limitResult.Error!);
}
}
if (signed && AuthenticationProvider == null)
{
_logger.RestApiNoApiCredentials(requestId, uri.AbsolutePath);
return new CallResult<IRequest>(new NoApiCredentialsError());
}
_logger.RestApiCreatingRequest(requestId, uri);
var paramsPosition = parameterPosition ?? ParameterPositions[method];
var request = ConstructRequest(uri, method, parameters?.OrderBy(p => p.Key).ToDictionary(p => p.Key, p => p.Value), signed, paramsPosition, arraySerialization ?? ArraySerialization, requestBodyFormat ?? RequestBodyFormat, requestId, additionalHeaders);
@ -300,10 +544,12 @@ namespace CryptoExchange.Net.Clients
/// Executes the request and returns the result deserialized into the type parameter class
/// </summary>
/// <param name="request">The request object to execute</param>
/// <param name="gate">The ratelimit gate used</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns></returns>
protected virtual async Task<WebCallResult<T>> GetResponseAsync<T>(
IRequest request,
IRateLimitGate? gate,
CancellationToken cancellationToken)
{
var sw = Stopwatch.StartNew();
@ -328,7 +574,16 @@ namespace CryptoExchange.Net.Clients
Error error;
if (response.StatusCode == (HttpStatusCode)418 || response.StatusCode == (HttpStatusCode)429)
error = ParseRateLimitResponse((int)response.StatusCode, response.ResponseHeaders, accessor);
{
var rateError = ParseRateLimitResponse((int)response.StatusCode, response.ResponseHeaders, accessor);
if (rateError.RetryAfter != null && gate != null && ClientOptions.RatelimiterEnabled)
{
_logger.RestApiRateLimitPauseUntil(request.RequestId, rateError.RetryAfter.Value);
await gate.SetRetryAfterGuardAsync(rateError.RetryAfter.Value).ConfigureAwait(false);
}
error = rateError;
}
else
error = ParseErrorResponse((int)response.StatusCode, response.ResponseHeaders, accessor);
@ -400,10 +655,34 @@ namespace CryptoExchange.Net.Clients
/// Note that this is always called; even when the request might be successful
/// </summary>
/// <typeparam name="T">WebCallResult type parameter</typeparam>
/// <param name="gate">The rate limit gate the call used</param>
/// <param name="callResult">The result of the call</param>
/// <param name="tries">The current try number</param>
/// <returns>True if call should retry, false if the call should return</returns>
protected virtual Task<bool> ShouldRetryRequestAsync<T>(WebCallResult<T> callResult, int tries) => Task.FromResult(false);
protected virtual async Task<bool> ShouldRetryRequestAsync<T>(IRateLimitGate? gate, WebCallResult<T> callResult, int tries)
{
if (tries >= 2)
// Only retry once
return false;
if ((int?)callResult.ResponseStatusCode == 429
&& ClientOptions.RatelimiterEnabled
&& ClientOptions.RateLimitingBehaviour != RateLimitingBehaviour.Fail
&& gate != null)
{
var retryTime = await gate.GetRetryAfterTime().ConfigureAwait(false);
if (retryTime == null)
return false;
if (retryTime.Value - DateTime.UtcNow < TimeSpan.FromSeconds(60))
{
_logger.RestApiRateLimitRetry(callResult.RequestId!.Value, retryTime.Value);
return true;
}
}
return false;
}
/// <summary>
/// Creates a request object
@ -559,7 +838,7 @@ namespace CryptoExchange.Net.Clients
/// <param name="responseHeaders">The response headers</param>
/// <param name="accessor">Data accessor</param>
/// <returns></returns>
protected virtual Error ParseRateLimitResponse(int httpStatusCode, IEnumerable<KeyValuePair<string, IEnumerable<string>>> responseHeaders, IMessageAccessor accessor)
protected virtual ServerRateLimitError ParseRateLimitResponse(int httpStatusCode, IEnumerable<KeyValuePair<string, IEnumerable<string>>> responseHeaders, IMessageAccessor accessor)
{
var message = accessor.OriginalDataAvailable ? accessor.GetOriginalString() : "[Error response content only available when OutputOriginal = true in client options]";

View File

@ -4,6 +4,7 @@ using CryptoExchange.Net.Logging.Extensions;
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Options;
using CryptoExchange.Net.Objects.Sockets;
using CryptoExchange.Net.RateLimiting.Interfaces;
using CryptoExchange.Net.Sockets;
using Microsoft.Extensions.Logging;
using System;
@ -59,7 +60,7 @@ namespace CryptoExchange.Net.Clients
/// <summary>
/// The rate limiters
/// </summary>
protected internal IEnumerable<IRateLimiter>? RateLimiters { get; set; }
protected internal IRateLimitGate? RateLimiter { get; set; }
/// <summary>
/// The max size a websocket message size can be
@ -67,7 +68,7 @@ namespace CryptoExchange.Net.Clients
protected internal int? MessageSendSizeLimit { get; set; }
/// <summary>
/// Periodic task regisrations
/// Periodic task registrations
/// </summary>
protected List<PeriodicTaskRegistration> PeriodicTaskRegistrations { get; set; } = new List<PeriodicTaskRegistration>();
@ -121,10 +122,6 @@ namespace CryptoExchange.Net.Clients
options,
apiOptions)
{
var rateLimiters = new List<IRateLimiter>();
foreach (var rateLimiter in apiOptions.RateLimiters)
rateLimiters.Add(rateLimiter);
RateLimiters = rateLimiters;
}
/// <summary>
@ -344,20 +341,20 @@ namespace CryptoExchange.Net.Clients
/// <param name="socket">The connection to check</param>
/// <param name="authenticated">Whether the socket should authenticated</param>
/// <returns></returns>
protected virtual async Task<CallResult<bool>> ConnectIfNeededAsync(SocketConnection socket, bool authenticated)
protected virtual async Task<CallResult> ConnectIfNeededAsync(SocketConnection socket, bool authenticated)
{
if (socket.Connected)
return new CallResult<bool>(true);
return new CallResult(null);
var connectResult = await ConnectSocketAsync(socket).ConfigureAwait(false);
if (!connectResult)
return new CallResult<bool>(connectResult.Error!);
return connectResult;
if (ClientOptions.DelayAfterConnect != TimeSpan.Zero)
await Task.Delay(ClientOptions.DelayAfterConnect).ConfigureAwait(false);
if (!authenticated || socket.Authenticated)
return new CallResult<bool>(true);
return new CallResult(null);
return await AuthenticateSocketAsync(socket).ConfigureAwait(false);
}
@ -367,10 +364,10 @@ namespace CryptoExchange.Net.Clients
/// </summary>
/// <param name="socket">Socket to authenticate</param>
/// <returns></returns>
public virtual async Task<CallResult<bool>> AuthenticateSocketAsync(SocketConnection socket)
public virtual async Task<CallResult> AuthenticateSocketAsync(SocketConnection socket)
{
if (AuthenticationProvider == null)
return new CallResult<bool>(new NoApiCredentialsError());
return new CallResult(new NoApiCredentialsError());
_logger.AttemptingToAuthenticate(socket.SocketId);
var authRequest = GetAuthenticationRequest();
@ -385,13 +382,13 @@ namespace CryptoExchange.Net.Clients
await socket.CloseAsync().ConfigureAwait(false);
result.Error!.Message = "Authentication failed: " + result.Error.Message;
return new CallResult<bool>(result.Error)!;
return new CallResult(result.Error)!;
}
}
_logger.Authenticated(socket.SocketId);
socket.Authenticated = true;
return new CallResult<bool>(true);
return new CallResult(null);
}
/// <summary>
@ -499,16 +496,17 @@ namespace CryptoExchange.Net.Clients
/// </summary>
/// <param name="socketConnection">The socket to connect</param>
/// <returns></returns>
protected virtual async Task<CallResult<bool>> ConnectSocketAsync(SocketConnection socketConnection)
protected virtual async Task<CallResult> ConnectSocketAsync(SocketConnection socketConnection)
{
if (await socketConnection.ConnectAsync().ConfigureAwait(false))
var connectResult = await socketConnection.ConnectAsync().ConfigureAwait(false);
if (connectResult)
{
socketConnections.TryAdd(socketConnection.SocketId, socketConnection);
return new CallResult<bool>(true);
return connectResult;
}
socketConnection.Dispose();
return new CallResult<bool>(new CantConnectError());
return connectResult;
}
/// <summary>
@ -521,7 +519,8 @@ namespace CryptoExchange.Net.Clients
{
KeepAliveInterval = KeepAliveInterval,
ReconnectInterval = ClientOptions.ReconnectInterval,
RateLimiters = RateLimiters,
RateLimiter = ClientOptions.RatelimiterEnabled ? RateLimiter : null,
RateLimitingBehaviour = ClientOptions.RateLimitingBehaviour,
Proxy = ClientOptions.Proxy,
Timeout = ApiOptions.SocketNoDataTimeout ?? ClientOptions.SocketNoDataTimeout
};

View File

@ -1,6 +1,4 @@
using System;
using System.Collections.Generic;
using System.Text;
namespace CryptoExchange.Net.Converters
{

View File

@ -1,6 +1,5 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO.Compression;
using System.IO;
using System.Linq;
@ -9,12 +8,7 @@ using System.Security;
using System.Text;
using System.Web;
using CryptoExchange.Net.Objects;
using Microsoft.Extensions.Logging;
using System.Globalization;
using System.Collections;
using System.Net.Http;
using System.Data.Common;
using Newtonsoft.Json.Linq;
namespace CryptoExchange.Net
{

View File

@ -1,8 +1,6 @@
using CryptoExchange.Net.Interfaces.CommonClients;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Text;
namespace CryptoExchange.Net.Interfaces
{

View File

@ -1,8 +1,4 @@
using CryptoExchange.Net.Interfaces.CommonClients;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Text;
using System;
namespace CryptoExchange.Net.Interfaces
{

View File

@ -3,7 +3,6 @@ using CryptoExchange.Net.Objects.Sockets;
using CryptoExchange.Net.Sockets;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Interfaces
{

View File

@ -1,5 +1,5 @@
using System;
using System.IO;
using CryptoExchange.Net.Objects;
using System;
using System.Net.WebSockets;
using System.Threading.Tasks;
@ -23,6 +23,10 @@ namespace CryptoExchange.Net.Interfaces
/// </summary>
event Func<int, Task> OnRequestSent;
/// <summary>
/// Websocket query was ratelimited and couldn't be send
/// </summary>
event Func<int, Task>? OnRequestRateLimited;
/// <summary>
/// Websocket error event
/// </summary>
event Func<Exception, Task> OnError;
@ -67,7 +71,7 @@ namespace CryptoExchange.Net.Interfaces
/// Connect the socket
/// </summary>
/// <returns></returns>
Task<bool> ConnectAsync();
Task<CallResult> ConnectAsync();
/// <summary>
/// Send data
/// </summary>

View File

@ -20,7 +20,6 @@ namespace CryptoExchange.Net.Logging.Extensions
private static readonly Action<ILogger, int, Exception?> _closed;
private static readonly Action<ILogger, int, Exception?> _disposing;
private static readonly Action<ILogger, int, Exception?> _disposed;
private static readonly Action<ILogger, int, int, int, Exception?> _sendDelayedBecauseOfRateLimit;
private static readonly Action<ILogger, int, int, int, Exception?> _sentBytes;
private static readonly Action<ILogger, int, string, Exception?> _sendLoopStoppedWithException;
private static readonly Action<ILogger, int, Exception?> _sendLoopFinished;
@ -74,7 +73,7 @@ namespace CryptoExchange.Net.Logging.Extensions
_addingBytesToSendBuffer = LoggerMessage.Define<int, int, int>(
LogLevel.Trace,
new EventId(1007, "AddingBytesToSendBuffer"),
"[Sckt {SocketId}] msg {RequestId} - Adding {NumBytes} bytes to send buffer");
"[Sckt {SocketId}] [Req {RequestId}] adding {NumBytes} bytes to send buffer");
_reconnectRequested = LoggerMessage.Define<int>(
LogLevel.Debug,
@ -111,15 +110,10 @@ namespace CryptoExchange.Net.Logging.Extensions
new EventId(1014, "Disposed"),
"[Sckt {SocketId}] disposed");
_sendDelayedBecauseOfRateLimit = LoggerMessage.Define<int, int, int>(
LogLevel.Debug,
new EventId(1015, "SendDelayedBecauseOfRateLimit"),
"[Sckt {SocketId}] msg {RequestId} - send delayed {DelayMS}ms because of rate limit");
_sentBytes = LoggerMessage.Define<int, int, int>(
LogLevel.Trace,
new EventId(1016, "SentBytes"),
"[Sckt {SocketId}] msg {RequestId} - sent {NumBytes} bytes");
"[Sckt {SocketId}] [Req {RequestId}] sent {NumBytes} bytes");
_sendLoopStoppedWithException = LoggerMessage.Define<int, string>(
LogLevel.Warning,
@ -267,12 +261,6 @@ namespace CryptoExchange.Net.Logging.Extensions
_disposed(logger, socketId, null);
}
public static void SocketSendDelayedBecauseOfRateLimit(
this ILogger logger, int socketId, int requestId, int delay)
{
_sendDelayedBecauseOfRateLimit(logger, socketId, requestId, delay, null);
}
public static void SocketSentBytes(
this ILogger logger, int socketId, int requestId, int numBytes)
{

View File

@ -0,0 +1,78 @@
using Microsoft.Extensions.Logging;
using System;
namespace CryptoExchange.Net.Logging.Extensions
{
internal static class RateLimitGateLoggingExtensions
{
private static readonly Action<ILogger, int, string, string, string, Exception?> _rateLimitRequestFailed;
private static readonly Action<ILogger, int, string, string, Exception?> _rateLimitConnectionFailed;
private static readonly Action<ILogger, int, string, TimeSpan, string, string, Exception?> _rateLimitDelayingRequest;
private static readonly Action<ILogger, int, TimeSpan, string, string, Exception?> _rateLimitDelayingConnection;
private static readonly Action<ILogger, int, string, string, string, int, Exception?> _rateLimitAppliedRequest;
private static readonly Action<ILogger, int, string, string, int, Exception?> _rateLimitAppliedConnection;
static RateLimitGateLoggingExtensions()
{
_rateLimitRequestFailed = LoggerMessage.Define<int, string, string, string>(
LogLevel.Warning,
new EventId(6000, "RateLimitRequestFailed"),
"[Req {Id}] Call to {Path} failed because of ratelimit guard {Guard}; {Limit}");
_rateLimitConnectionFailed = LoggerMessage.Define<int, string, string>(
LogLevel.Warning,
new EventId(6001, "RateLimitConnectionFailed"),
"[Sckt {Id}] Connection failed because of ratelimit guard {Guard}; {Limit}");
_rateLimitDelayingRequest = LoggerMessage.Define<int, string, TimeSpan, string, string>(
LogLevel.Warning,
new EventId(6002, "RateLimitDelayingRequest"),
"[Req {Id}] Delaying call to {Path} by {Delay} because of ratelimit guard {Guard}; {Limit}");
_rateLimitDelayingConnection = LoggerMessage.Define<int, TimeSpan, string, string>(
LogLevel.Warning,
new EventId(6003, "RateLimitDelayingConnection"),
"[Sckt {Id}] Delaying connection by {Delay} because of ratelimit guard {Guard}; {Limit}");
_rateLimitAppliedConnection = LoggerMessage.Define<int, string, string, int>(
LogLevel.Trace,
new EventId(6004, "RateLimitDelayingConnection"),
"[Sckt {Id}] Connection passed ratelimit guard {Guard}; {Limit}, New count: {Current}");
_rateLimitAppliedRequest = LoggerMessage.Define<int, string, string, string, int>(
LogLevel.Trace,
new EventId(6005, "RateLimitAppliedRequest"),
"[Req {Id}] Call to {Path} passed ratelimit guard {Guard}; {Limit}, New count: {Current}");
}
public static void RateLimitRequestFailed(this ILogger logger, int requestId, string path, string guard, string limit)
{
_rateLimitRequestFailed(logger, requestId, path, guard, limit, null);
}
public static void RateLimitConnectionFailed(this ILogger logger, int connectionId, string guard, string limit)
{
_rateLimitConnectionFailed(logger, connectionId, guard, limit, null);
}
public static void RateLimitDelayingRequest(this ILogger logger, int requestId, string path, TimeSpan delay, string guard, string limit)
{
_rateLimitDelayingRequest(logger, requestId, path, delay, guard, limit, null);
}
public static void RateLimitDelayingConnection(this ILogger logger, int connectionId, TimeSpan delay, string guard, string limit)
{
_rateLimitDelayingConnection(logger, connectionId, delay, guard, limit, null);
}
public static void RateLimitAppliedConnection(this ILogger logger, int connectionId, string guard, string limit, int current)
{
_rateLimitAppliedConnection(logger, connectionId, guard, limit, current, null);
}
public static void RateLimitAppliedRequest(this ILogger logger, int requestIdId, string path, string guard, string limit, int current)
{
_rateLimitAppliedRequest(logger, requestIdId, path, guard, limit, current, null);
}
}
}

View File

@ -1,4 +1,5 @@
using Microsoft.Extensions.Logging;
using CryptoExchange.Net.Objects;
using Microsoft.Extensions.Logging;
using System;
using System.Net;
using System.Net.Http;
@ -13,6 +14,9 @@ namespace CryptoExchange.Net.Logging.Extensions
private static readonly Action<ILogger, int, string, Exception?> _restApiNoApiCredentials;
private static readonly Action<ILogger, int, Uri, Exception?> _restApiCreatingRequest;
private static readonly Action<ILogger, int, HttpMethod, string, Uri, string, Exception?> _restApiSendingRequest;
private static readonly Action<ILogger, int, DateTime, Exception?> _restApiRateLimitRetry;
private static readonly Action<ILogger, int, DateTime, Exception?> _restApiRateLimitPauseUntil;
private static readonly Action<ILogger, int, RequestDefinition, string?, string, string, Exception?> _restApiSendRequest;
static RestApiClientLoggingExtensions()
@ -46,6 +50,21 @@ namespace CryptoExchange.Net.Logging.Extensions
LogLevel.Trace,
new EventId(4005, "RestApiSendingRequest"),
"[Req {RequestId}] Sending {Method} {Signed} request to {RestApiUri}{Query}");
_restApiRateLimitRetry = LoggerMessage.Define<int, DateTime>(
LogLevel.Warning,
new EventId(4006, "RestApiRateLimitRetry"),
"[Req {RequestId}] Received ratelimit error, retrying after {Timestamp}");
_restApiRateLimitPauseUntil = LoggerMessage.Define<int, DateTime>(
LogLevel.Warning,
new EventId(4007, "RestApiRateLimitPauseUntil"),
"[Req {RequestId}] Ratelimit error from server, pausing requests until {Until}");
_restApiSendRequest = LoggerMessage.Define<int, RequestDefinition, string?, string, string>(
LogLevel.Debug,
new EventId(4008, "RestApiSendRequest"),
"[Req {RequestId}] Sending {Definition} request with body {Body}, query parameters {Query} and headers {Headers}");
}
public static void RestApiErrorReceived(this ILogger logger, int? requestId, HttpStatusCode? responseStatusCode, long responseTime, string? error)
@ -77,5 +96,20 @@ namespace CryptoExchange.Net.Logging.Extensions
{
_restApiSendingRequest(logger, requestId, method, signed, uri, paramString, null);
}
public static void RestApiRateLimitRetry(this ILogger logger, int requestId, DateTime retryAfter)
{
_restApiRateLimitRetry(logger, requestId, retryAfter, null);
}
public static void RestApiRateLimitPauseUntil(this ILogger logger, int requestId, DateTime retryAfter)
{
_restApiRateLimitPauseUntil(logger, requestId, retryAfter, null);
}
public static void RestApiSendRequest(this ILogger logger, int requestId, RequestDefinition definition, string? body, string query, string headers)
{
_restApiSendRequest(logger, requestId, definition, body, query, headers, null);
}
}
}

View File

@ -1,6 +1,5 @@
using System;
using System.Net.WebSockets;
using CryptoExchange.Net.Objects;
using Microsoft.Extensions.Logging;
namespace CryptoExchange.Net.Logging.Extensions
@ -73,7 +72,7 @@ namespace CryptoExchange.Net.Logging.Extensions
_messageSentNotPending = LoggerMessage.Define<int, int>(
LogLevel.Debug,
new EventId(2006, "MessageSentNotPending"),
"[Sckt {SocketId}] msg {RequestId} - message sent, but not pending");
"[Sckt {SocketId}] [Req {RequestId}] message sent, but not pending");
_receivedData = LoggerMessage.Define<int, string>(
LogLevel.Trace,
@ -178,12 +177,12 @@ namespace CryptoExchange.Net.Logging.Extensions
_periodicSendFailed = LoggerMessage.Define<int, string, string>(
LogLevel.Warning,
new EventId(2027, "PeriodicSendFailed"),
"[Sckt {SocketId}] Periodic send {Identifier} failed: {ErrorMessage}");
"[Sckt {SocketId}] periodic send {Identifier} failed: {ErrorMessage}");
_sendingData = LoggerMessage.Define<int, int, string>(
LogLevel.Trace,
new EventId(2028, "SendingData"),
"[Sckt {SocketId}] msg {RequestId} - sending messsage: {Data}");
"[Sckt {SocketId}] [Req {RequestId}] sending messsage: {Data}");
_receivedMessageNotMatchedToAnyListener = LoggerMessage.Define<int, string, string>(
LogLevel.Warning,

View File

@ -15,6 +15,29 @@
Wait
}
/// <summary>
/// What to do when a request would exceed the rate limit
/// </summary>
public enum RateLimitWindowType
{
/// <summary>
/// A sliding window
/// </summary>
Sliding,
/// <summary>
/// A fixed interval window
/// </summary>
Fixed,
/// <summary>
/// A fixed interval starting after the first request
/// </summary>
FixedAfterFirst,
/// <summary>
/// Decaying window
/// </summary>
Decay
}
/// <summary>
/// Where the parameters for a HttpMethod should be added in a request
/// </summary>

View File

@ -28,6 +28,15 @@ namespace CryptoExchange.Net.Objects.Options
/// </summary>
public ApiCredentials? ApiCredentials { get; set; }
/// <summary>
/// Whether or not client side rate limiting should be applied
/// </summary>
public bool RatelimiterEnabled { get; set; } = true;
/// <summary>
/// What should happen when a rate limit is reached
/// </summary>
public RateLimitingBehaviour RateLimitingBehaviour { get; set; } = RateLimitingBehaviour.Wait;
/// <inheritdoc />
public override string ToString()
{

View File

@ -1,7 +1,5 @@
using CryptoExchange.Net.Authentication;
using CryptoExchange.Net.Interfaces;
using System;
using System.Collections.Generic;
namespace CryptoExchange.Net.Objects.Options
{
@ -10,16 +8,6 @@ namespace CryptoExchange.Net.Objects.Options
/// </summary>
public class RestApiOptions : ApiOptions
{
/// <summary>
/// List of rate limiters to use
/// </summary>
public List<IRateLimiter> RateLimiters { get; set; } = new List<IRateLimiter>();
/// <summary>
/// What to do when a call would exceed the rate limit
/// </summary>
public RateLimitingBehaviour RateLimitingBehaviour { get; set; } = RateLimitingBehaviour.Wait;
/// <summary>
/// Whether or not to automatically sync the local time with the server time
/// </summary>
@ -42,8 +30,6 @@ namespace CryptoExchange.Net.Objects.Options
ApiCredentials = ApiCredentials?.Copy(),
OutputOriginalData = OutputOriginalData,
AutoTimestamp = AutoTimestamp,
RateLimiters = RateLimiters,
RateLimitingBehaviour = RateLimitingBehaviour,
TimestampRecalculationInterval = TimestampRecalculationInterval
};
}

View File

@ -32,7 +32,9 @@ namespace CryptoExchange.Net.Objects.Options
TimestampRecalculationInterval = TimestampRecalculationInterval,
ApiCredentials = ApiCredentials?.Copy(),
Proxy = Proxy,
RequestTimeout = RequestTimeout
RequestTimeout = RequestTimeout,
RatelimiterEnabled = RatelimiterEnabled,
RateLimitingBehaviour = RateLimitingBehaviour
};
}
}

View File

@ -1,7 +1,5 @@
using CryptoExchange.Net.Authentication;
using CryptoExchange.Net.Interfaces;
using System;
using System.Collections.Generic;
namespace CryptoExchange.Net.Objects.Options
{
@ -10,11 +8,6 @@ namespace CryptoExchange.Net.Objects.Options
/// </summary>
public class SocketApiOptions : ApiOptions
{
/// <summary>
/// List of rate limiters to use
/// </summary>
public List<IRateLimiter> RateLimiters { get; set; } = new List<IRateLimiter>();
/// <summary>
/// The max time of not receiving any data after which the connection is assumed to be dropped. This can only be used for socket connections where a steady flow of data is expected,
/// for example when the server sends intermittent ping requests
@ -37,7 +30,6 @@ namespace CryptoExchange.Net.Objects.Options
{
ApiCredentials = ApiCredentials?.Copy(),
OutputOriginalData = OutputOriginalData,
RateLimiters = RateLimiters,
SocketNoDataTimeout = SocketNoDataTimeout,
MaxSocketConnections = MaxSocketConnections,
};

View File

@ -65,7 +65,9 @@ namespace CryptoExchange.Net.Objects.Options
SocketSubscriptionsCombineTarget = SocketSubscriptionsCombineTarget,
MaxSocketConnections = MaxSocketConnections,
Proxy = Proxy,
RequestTimeout = RequestTimeout
RequestTimeout = RequestTimeout,
RateLimitingBehaviour = RateLimitingBehaviour,
RatelimiterEnabled = RatelimiterEnabled,
};
}
}

View File

@ -1,5 +1,4 @@
using CryptoExchange.Net.Attributes;
using CryptoExchange.Net.Converters;
using CryptoExchange.Net.Converters.SystemTextJson;
using System;
using System.Collections.Generic;

View File

@ -1,443 +0,0 @@
using CryptoExchange.Net.Interfaces;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net.Http;
using System.Security;
using System.Threading;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Objects
{
/// <summary>
/// Limits the amount of requests to a certain constraint
/// </summary>
public class RateLimiter : IRateLimiter
{
private readonly object _limiterLock = new object();
internal List<Limiter> _limiters = new List<Limiter>();
/// <summary>
/// Create a new RateLimiter. Configure the rate limiter by calling <see cref="AddTotalRateLimit"/>,
/// <see cref="AddEndpointLimit(string, int, TimeSpan, HttpMethod?, bool)"/>, <see cref="AddPartialEndpointLimit(string, int, TimeSpan, HttpMethod?, bool, bool)"/> or <see cref="AddApiKeyLimit"/>.
/// </summary>
public RateLimiter()
{
}
/// <summary>
/// Add a rate limit for the total amount of requests per time period
/// </summary>
/// <param name="limit">The limit per period. Note that this is weight, not single request, altough by default requests have a weight of 1</param>
/// <param name="perTimePeriod">The time period the limit is for</param>
public RateLimiter AddTotalRateLimit(int limit, TimeSpan perTimePeriod)
{
lock(_limiterLock)
_limiters.Add(new TotalRateLimiter(limit, perTimePeriod, null));
return this;
}
/// <summary>
/// Add a rate lmit for the amount of requests per time for an endpoint
/// </summary>
/// <param name="endpoint">The endpoint the limit is for</param>
/// <param name="limit">The limit per period. Note that this is weight, not single request, altough by default requests have a weight of 1</param>
/// <param name="perTimePeriod">The time period the limit is for</param>
/// <param name="method">The HttpMethod the limit is for, null for all</param>
/// <param name="excludeFromOtherRateLimits">If set to true it ignores other rate limits</param>
public RateLimiter AddEndpointLimit(string endpoint, int limit, TimeSpan perTimePeriod, HttpMethod? method = null, bool excludeFromOtherRateLimits = false)
{
lock(_limiterLock)
_limiters.Add(new EndpointRateLimiter(new[] { endpoint }, limit, perTimePeriod, method, excludeFromOtherRateLimits));
return this;
}
/// <summary>
/// Add a rate lmit for the amount of requests per time for an endpoint
/// </summary>
/// <param name="endpoints">The endpoints the limit is for</param>
/// <param name="limit">The limit per period. Note that this is weight, not single request, altough by default requests have a weight of 1</param>
/// <param name="perTimePeriod">The time period the limit is for</param>
/// <param name="method">The HttpMethod the limit is for, null for all</param>
/// <param name="excludeFromOtherRateLimits">If set to true it ignores other rate limits</param>
public RateLimiter AddEndpointLimit(IEnumerable<string> endpoints, int limit, TimeSpan perTimePeriod, HttpMethod? method = null, bool excludeFromOtherRateLimits = false)
{
lock(_limiterLock)
_limiters.Add(new EndpointRateLimiter(endpoints.ToArray(), limit, perTimePeriod, method, excludeFromOtherRateLimits));
return this;
}
/// <summary>
/// Add a rate lmit for the amount of requests per time for an endpoint
/// </summary>
/// <param name="endpoint">The endpoint the limit is for</param>
/// <param name="limit">The limit per period. Note that this is weight, not single request, altough by default requests have a weight of 1</param>
/// <param name="perTimePeriod">The time period the limit is for</param>
/// <param name="method">The HttpMethod the limit is for, null for all</param>
/// <param name="ignoreOtherRateLimits">If set to true it ignores other rate limits</param>
/// <param name="countPerEndpoint">Whether all requests for this partial endpoint are bound to the same limit or each individual endpoint has its own limit</param>
public RateLimiter AddPartialEndpointLimit(string endpoint, int limit, TimeSpan perTimePeriod, HttpMethod? method = null, bool countPerEndpoint = false, bool ignoreOtherRateLimits = false)
{
lock(_limiterLock)
_limiters.Add(new PartialEndpointRateLimiter(new[] { endpoint }, limit, perTimePeriod, method, ignoreOtherRateLimits, countPerEndpoint));
return this;
}
/// <summary>
/// Add a rate limit for the amount of requests per Api key
/// </summary>
/// <param name="limit">The limit per period. Note that this is weight, not single request, altough by default requests have a weight of 1</param>
/// <param name="perTimePeriod">The time period the limit is for</param>
/// <param name="onlyForSignedRequests">Only include calls that are signed in this limiter</param>
/// <param name="excludeFromTotalRateLimit">Exclude requests with API key from the total rate limiter</param>
public RateLimiter AddApiKeyLimit(int limit, TimeSpan perTimePeriod, bool onlyForSignedRequests, bool excludeFromTotalRateLimit)
{
lock(_limiterLock)
_limiters.Add(new ApiKeyRateLimiter(limit, perTimePeriod, null, onlyForSignedRequests, excludeFromTotalRateLimit));
return this;
}
/// <summary>
/// Add a rate limit for the amount of messages that can be send per connection
/// </summary>
/// <param name="endpoint">The endpoint that the limit is for</param>
/// <param name="limit">The limit per period. Note that this is weight, not single request, altough by default requests have a weight of 1</param>
/// <param name="perTimePeriod">The time period the limit is for</param>
public RateLimiter AddConnectionRateLimit(string endpoint, int limit, TimeSpan perTimePeriod)
{
lock (_limiterLock)
_limiters.Add(new ConnectionRateLimiter(new[] { endpoint }, limit, perTimePeriod));
return this;
}
/// <inheritdoc />
public async Task<CallResult<int>> LimitRequestAsync(ILogger logger, string endpoint, HttpMethod method, bool signed, SecureString? apiKey, RateLimitingBehaviour limitBehaviour, int requestWeight, CancellationToken ct)
{
int totalWaitTime = 0;
List<EndpointRateLimiter> endpointLimits;
lock (_limiterLock)
endpointLimits = _limiters.OfType<EndpointRateLimiter>().Where(h => h.Endpoints.Contains(endpoint) && (h.Method == null || h.Method == method)).ToList();
foreach (var endpointLimit in endpointLimits)
{
var waitResult = await ProcessTopic(logger, endpointLimit, endpoint, requestWeight, limitBehaviour, ct).ConfigureAwait(false);
if (!waitResult)
return waitResult;
totalWaitTime += waitResult.Data;
}
if (endpointLimits.Any(l => l.IgnoreOtherRateLimits))
return new CallResult<int>(totalWaitTime);
List<PartialEndpointRateLimiter> partialEndpointLimits;
lock (_limiterLock)
partialEndpointLimits = _limiters.OfType<PartialEndpointRateLimiter>().Where(h => h.PartialEndpoints.Any(h => endpoint.Contains(h)) && (h.Method == null || h.Method == method)).ToList();
foreach (var partialEndpointLimit in partialEndpointLimits)
{
if (partialEndpointLimit.CountPerEndpoint)
{
SingleTopicRateLimiter? thisEndpointLimit;
lock (_limiterLock)
{
thisEndpointLimit = _limiters.OfType<SingleTopicRateLimiter>().SingleOrDefault(h => h.Type == RateLimitType.PartialEndpoint && (string)h.Topic == endpoint);
if (thisEndpointLimit == null)
{
thisEndpointLimit = new SingleTopicRateLimiter(endpoint, partialEndpointLimit);
_limiters.Add(thisEndpointLimit);
}
}
var waitResult = await ProcessTopic(logger, thisEndpointLimit, endpoint, requestWeight, limitBehaviour, ct).ConfigureAwait(false);
if (!waitResult)
return waitResult;
totalWaitTime += waitResult.Data;
}
else
{
var waitResult = await ProcessTopic(logger, partialEndpointLimit, endpoint, requestWeight, limitBehaviour, ct).ConfigureAwait(false);
if (!waitResult)
return waitResult;
totalWaitTime += waitResult.Data;
}
}
if(partialEndpointLimits.Any(p => p.IgnoreOtherRateLimits))
return new CallResult<int>(totalWaitTime);
List<ApiKeyRateLimiter> apiLimits;
lock (_limiterLock)
apiLimits = _limiters.OfType<ApiKeyRateLimiter>().Where(h => h.Type == RateLimitType.ApiKey).ToList();
foreach (var apiLimit in apiLimits)
{
if(apiKey == null)
{
if (!apiLimit.OnlyForSignedRequests)
{
var waitResult = await ProcessTopic(logger, apiLimit, endpoint, requestWeight, limitBehaviour, ct).ConfigureAwait(false);
if (!waitResult)
return waitResult;
totalWaitTime += waitResult.Data;
}
}
else if (signed || !apiLimit.OnlyForSignedRequests)
{
SingleTopicRateLimiter? thisApiLimit;
lock (_limiterLock)
{
thisApiLimit = _limiters.OfType<SingleTopicRateLimiter>().SingleOrDefault(h => h.Type == RateLimitType.ApiKey && ((SecureString)h.Topic).IsEqualTo(apiKey));
if (thisApiLimit == null)
{
thisApiLimit = new SingleTopicRateLimiter(apiKey, apiLimit);
_limiters.Add(thisApiLimit);
}
}
var waitResult = await ProcessTopic(logger, thisApiLimit, endpoint, requestWeight, limitBehaviour, ct).ConfigureAwait(false);
if (!waitResult)
return waitResult;
totalWaitTime += waitResult.Data;
}
}
if ((signed || apiLimits.All(l => !l.OnlyForSignedRequests)) && apiLimits.Any(l => l.IgnoreTotalRateLimit))
return new CallResult<int>(totalWaitTime);
List<TotalRateLimiter> totalLimits;
lock (_limiterLock)
totalLimits = _limiters.OfType<TotalRateLimiter>().ToList();
foreach(var totalLimit in totalLimits)
{
var waitResult = await ProcessTopic(logger, totalLimit, endpoint, requestWeight, limitBehaviour, ct).ConfigureAwait(false);
if (!waitResult)
return waitResult;
totalWaitTime += waitResult.Data;
}
return new CallResult<int>(totalWaitTime);
}
private static async Task<CallResult<int>> ProcessTopic(ILogger logger, Limiter historyTopic, string endpoint, int requestWeight, RateLimitingBehaviour limitBehaviour, CancellationToken ct)
{
var sw = Stopwatch.StartNew();
try
{
await historyTopic.Semaphore.WaitAsync(ct).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
return new CallResult<int>(new CancellationRequestedError());
}
sw.Stop();
try
{
int totalWaitTime = 0;
while (true)
{
// Remove requests no longer in time period from the history
var checkTime = DateTime.UtcNow;
for (var i = 0; i < historyTopic.Entries.Count; i++)
{
if (historyTopic.Entries[i].Timestamp < checkTime - historyTopic.Period)
{
historyTopic.Entries.Remove(historyTopic.Entries[i]);
i--;
}
else
break;
}
var currentWeight = !historyTopic.Entries.Any() ? 0 : historyTopic.Entries.Sum(h => h.Weight);
if (currentWeight + requestWeight > historyTopic.Limit)
{
if (currentWeight == 0)
throw new Exception("Request limit reached without any prior request. " +
$"This request can never execute with the current rate limiter. Request weight: {requestWeight}, Ratelimit: {historyTopic.Limit}");
// Wait until the next entry should be removed from the history
var thisWaitTime = (int)Math.Round(((historyTopic.Entries.First().Timestamp + historyTopic.Period) - checkTime).TotalMilliseconds);
if (thisWaitTime > 0)
{
if (limitBehaviour == RateLimitingBehaviour.Fail)
{
var msg = $"Request to {endpoint} failed because of rate limit `{historyTopic.Type}`. Current weight: {currentWeight}/{historyTopic.Limit}, request weight: {requestWeight}";
logger.Log(LogLevel.Warning, msg);
return new CallResult<int>(new ClientRateLimitError(msg) { RetryAfter = DateTime.UtcNow.AddSeconds(thisWaitTime) });
}
logger.Log(LogLevel.Information, $"Message to {endpoint} waiting {thisWaitTime}ms for rate limit `{historyTopic.Type}`. Current weight: {currentWeight}/{historyTopic.Limit}, request weight: {requestWeight}");
try
{
await Task.Delay(thisWaitTime, ct).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
return new CallResult<int>(new CancellationRequestedError());
}
totalWaitTime += thisWaitTime;
}
}
else
{
break;
}
}
var newTime = DateTime.UtcNow;
historyTopic.Entries.Add(new LimitEntry(newTime, requestWeight));
return new CallResult<int>(totalWaitTime);
}
finally
{
historyTopic.Semaphore.Release();
}
}
internal struct LimitEntry
{
public DateTime Timestamp { get; set; }
public int Weight { get; set; }
public LimitEntry(DateTime timestamp, int weight)
{
Timestamp = timestamp;
Weight = weight;
}
}
internal class Limiter
{
public RateLimitType Type { get; set; }
public HttpMethod? Method { get; set; }
public SemaphoreSlim Semaphore { get; set; }
public int Limit { get; set; }
public TimeSpan Period { get; set; }
public List<LimitEntry> Entries { get; set; } = new List<LimitEntry>();
public Limiter(RateLimitType type, int limit, TimeSpan perPeriod, HttpMethod? method)
{
Semaphore = new SemaphoreSlim(1, 1);
Type = type;
Limit = limit;
Period = perPeriod;
Method = method;
}
}
internal class TotalRateLimiter : Limiter
{
public TotalRateLimiter(int limit, TimeSpan perPeriod, HttpMethod? method)
: base(RateLimitType.Total, limit, perPeriod, method)
{
}
public override string ToString()
{
return nameof(TotalRateLimiter);
}
}
internal class ConnectionRateLimiter : PartialEndpointRateLimiter
{
public ConnectionRateLimiter(int limit, TimeSpan perPeriod)
: base(new[] { "/" }, limit, perPeriod, null, true, true)
{
}
public ConnectionRateLimiter(string[] endpoints, int limit, TimeSpan perPeriod)
: base(endpoints, limit, perPeriod, null, true, true)
{
}
public override string ToString()
{
return nameof(ConnectionRateLimiter);
}
}
internal class EndpointRateLimiter: Limiter
{
public string[] Endpoints { get; set; }
public bool IgnoreOtherRateLimits { get; set; }
public EndpointRateLimiter(string[] endpoints, int limit, TimeSpan perPeriod, HttpMethod? method, bool ignoreOtherRateLimits)
:base(RateLimitType.Endpoint, limit, perPeriod, method)
{
Endpoints = endpoints;
IgnoreOtherRateLimits = ignoreOtherRateLimits;
}
public override string ToString()
{
return nameof(EndpointRateLimiter) + $": {string.Join(", ", Endpoints)}";
}
}
internal class PartialEndpointRateLimiter : Limiter
{
public string[] PartialEndpoints { get; set; }
public bool IgnoreOtherRateLimits { get; set; }
public bool CountPerEndpoint { get; set; }
public PartialEndpointRateLimiter(string[] partialEndpoints, int limit, TimeSpan perPeriod, HttpMethod? method, bool ignoreOtherRateLimits, bool countPerEndpoint)
: base(RateLimitType.PartialEndpoint, limit, perPeriod, method)
{
PartialEndpoints = partialEndpoints;
IgnoreOtherRateLimits = ignoreOtherRateLimits;
CountPerEndpoint = countPerEndpoint;
}
public override string ToString()
{
return nameof(PartialEndpointRateLimiter) + $": {string.Join(", ", PartialEndpoints)}";
}
}
internal class ApiKeyRateLimiter : Limiter
{
public bool OnlyForSignedRequests { get; set; }
public bool IgnoreTotalRateLimit { get; set; }
public ApiKeyRateLimiter(int limit, TimeSpan perPeriod, HttpMethod? method, bool onlyForSignedRequests, bool ignoreTotalRateLimit)
:base(RateLimitType.ApiKey, limit, perPeriod, method)
{
OnlyForSignedRequests = onlyForSignedRequests;
IgnoreTotalRateLimit = ignoreTotalRateLimit;
}
}
internal class SingleTopicRateLimiter: Limiter
{
public object Topic { get; set; }
public SingleTopicRateLimiter(object topic, Limiter limiter)
:base(limiter.Type, limiter.Limit, limiter.Period, limiter.Method)
{
Topic = topic;
}
public override string ToString()
{
return (Type == RateLimitType.ApiKey ? nameof(ApiKeyRateLimiter): nameof(EndpointRateLimiter)) + $": {Topic}";
}
}
internal enum RateLimitType
{
Total,
Endpoint,
PartialEndpoint,
ApiKey
}
}
}

View File

@ -0,0 +1,81 @@
using CryptoExchange.Net.RateLimiting.Interfaces;
using System;
using System.Net.Http;
namespace CryptoExchange.Net.Objects
{
/// <summary>
/// The definition of a rest request
/// </summary>
public class RequestDefinition
{
private string? _stringRep;
// Basics
/// <summary>
/// Path of the request
/// </summary>
public string Path { get; set; }
/// <summary>
/// Http method of the request
/// </summary>
public HttpMethod Method { get; set; }
/// <summary>
/// Is the request authenticated
/// </summary>
public bool Authenticated { get; set; }
// Formating
/// <summary>
/// The body format for this request
/// </summary>
public RequestBodyFormat? RequestBodyFormat { get; set; }
/// <summary>
/// The position of parameters for this request
/// </summary>
public HttpMethodParameterPosition? ParameterPosition { get; set; }
/// <summary>
/// The array serialization type for this request
/// </summary>
public ArrayParametersSerialization? ArraySerialization { get; set; }
// Rate limiting
/// <summary>
/// Request weight
/// </summary>
public int Weight { get; set; } = 1;
/// <summary>
/// Rate limit gate to use
/// </summary>
public IRateLimitGate? RateLimitGate { get; set; }
/// <summary>
/// Rate limit for this specific endpoint
/// </summary>
public int? EndpointLimitCount { get; set; }
/// <summary>
/// Rate limit period for this specific endpoint
/// </summary>
public TimeSpan? EndpointLimitPeriod { get; set; }
/// <summary>
/// ctor
/// </summary>
/// <param name="path"></param>
/// <param name="method"></param>
public RequestDefinition(string path, HttpMethod method)
{
Path = path;
Method = method;
}
/// <inheritdoc />
public override string ToString()
{
return _stringRep ??= $"{Method} {Path}{(Authenticated ? " authenticated" : "")}";
}
}
}

View File

@ -0,0 +1,83 @@
using CryptoExchange.Net.RateLimiting.Interfaces;
using System;
using System.Collections.Generic;
using System.Net.Http;
namespace CryptoExchange.Net.Objects
{
/// <summary>
/// Request definitions cache
/// </summary>
public class RequestDefinitionCache
{
private readonly Dictionary<string, RequestDefinition> _definitions = new();
/// <summary>
/// Get a definition if it is already in the cache or create a new definition and add it to the cache
/// </summary>
/// <param name="method">The HttpMethod</param>
/// <param name="path">Endpoint path</param>
/// <param name="authenticated">Endpoint is authenticated</param>
/// <returns></returns>
public RequestDefinition GetOrCreate(HttpMethod method, string path, bool authenticated = false)
=> GetOrCreate(method, path, null, 0, authenticated, null, null, null, null, null);
/// <summary>
/// Get a definition if it is already in the cache or create a new definition and add it to the cache
/// </summary>
/// <param name="method">The HttpMethod</param>
/// <param name="path">Endpoint path</param>
/// <param name="rateLimitGate">The rate limit gate</param>
/// <param name="weight">Request weight</param>
/// <param name="authenticated">Endpoint is authenticated</param>
/// <returns></returns>
public RequestDefinition GetOrCreate(HttpMethod method, string path, IRateLimitGate rateLimitGate, int weight = 1, bool authenticated = false)
=> GetOrCreate(method, path, rateLimitGate, weight, authenticated, null, null, null, null, null);
/// <summary>
/// Get a definition if it is already in the cache or create a new definition and add it to the cache
/// </summary>
/// <param name="method">The HttpMethod</param>
/// <param name="path">Endpoint path</param>
/// <param name="rateLimitGate">The rate limit gate</param>
/// <param name="endpointLimitCount">The limit count for this specific endpoint</param>
/// <param name="endpointLimitPeriod">The period for the limit for this specific endpoint</param>
/// <param name="weight">Request weight</param>
/// <param name="authenticated">Endpoint is authenticated</param>
/// <param name="requestBodyFormat">Request body format</param>
/// <param name="parameterPosition">Parameter position</param>
/// <param name="arraySerialization">Array serialization type</param>
/// <returns></returns>
public RequestDefinition GetOrCreate(
HttpMethod method,
string path,
IRateLimitGate? rateLimitGate,
int weight,
bool authenticated,
int? endpointLimitCount = null,
TimeSpan? endpointLimitPeriod = null,
RequestBodyFormat? requestBodyFormat = null,
HttpMethodParameterPosition? parameterPosition = null,
ArrayParametersSerialization? arraySerialization = null)
{
if (!_definitions.TryGetValue(method + path, out var def))
{
def = new RequestDefinition(path, method)
{
Authenticated = authenticated,
EndpointLimitCount = endpointLimitCount,
EndpointLimitPeriod = endpointLimitPeriod,
RateLimitGate = rateLimitGate,
Weight = weight,
ArraySerialization = arraySerialization,
RequestBodyFormat = requestBodyFormat,
ParameterPosition = parameterPosition,
};
_definitions.Add(method + path, def);
}
return def;
}
}
}

View File

@ -1,4 +1,4 @@
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.RateLimiting.Interfaces;
using System;
using System.Collections.Generic;
using System.Text;
@ -51,9 +51,13 @@ namespace CryptoExchange.Net.Objects.Sockets
public TimeSpan? KeepAliveInterval { get; set; }
/// <summary>
/// The rate limiters for the socket connection
/// The rate limiter for the socket connection
/// </summary>
public IEnumerable<IRateLimiter>? RateLimiters { get; set; }
public IRateLimitGate? RateLimiter { get; set; }
/// <summary>
/// What to do when rate limit is reached
/// </summary>
public RateLimitingBehaviour RateLimitingBehaviour { get; set; }
/// <summary>
/// Encoding for sending/receiving data

View File

@ -0,0 +1,27 @@
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.RateLimiting.Interfaces;
using System.Security;
namespace CryptoExchange.Net.RateLimiting.Filters
{
/// <summary>
/// Filter requests based on whether they're authenticated or not
/// </summary>
public class AuthenticatedEndpointFilter : IGuardFilter
{
private readonly bool _authenticated;
/// <summary>
/// ctor
/// </summary>
/// <param name="authenticated"></param>
public AuthenticatedEndpointFilter(bool authenticated)
{
_authenticated = authenticated;
}
/// <inheritdoc />
public bool Passes(RateLimitItemType type, RequestDefinition definition, string host, SecureString? apiKey)
=> definition.Authenticated == _authenticated;
}
}

View File

@ -0,0 +1,30 @@
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.RateLimiting.Interfaces;
using System;
using System.Collections.Generic;
using System.Security;
using System.Text;
namespace CryptoExchange.Net.RateLimiting.Filters
{
/// <summary>
/// Filter requests based on whether the request path matches a specific path
/// </summary>
public class ExactPathFilter : IGuardFilter
{
private readonly string _path;
/// <summary>
/// ctor
/// </summary>
/// <param name="path"></param>
public ExactPathFilter(string path)
{
_path = path;
}
/// <inheritdoc />
public bool Passes(RateLimitItemType type, RequestDefinition definition, string host, SecureString? apiKey)
=> string.Equals(definition.Path, _path, StringComparison.OrdinalIgnoreCase);
}
}

View File

@ -0,0 +1,28 @@
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.RateLimiting.Interfaces;
using System.Collections.Generic;
using System.Security;
namespace CryptoExchange.Net.RateLimiting.Filters
{
/// <summary>
/// Filter requests based on whether the request path matches any specific path in a list
/// </summary>
public class ExactPathsFilter : IGuardFilter
{
private readonly HashSet<string> _paths;
/// <summary>
/// ctor
/// </summary>
/// <param name="paths"></param>
public ExactPathsFilter(IEnumerable<string> paths)
{
_paths = new HashSet<string>(paths);
}
/// <inheritdoc />
public bool Passes(RateLimitItemType type, RequestDefinition definition, string host, SecureString? apiKey)
=> _paths.Contains(definition.Path);
}
}

View File

@ -0,0 +1,28 @@
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.RateLimiting.Interfaces;
using System.Security;
namespace CryptoExchange.Net.RateLimiting.Filters
{
/// <summary>
/// Filter requests based on whether the host address matches a specific address
/// </summary>
public class HostFilter : IGuardFilter
{
private readonly string _host;
/// <summary>
/// ctor
/// </summary>
/// <param name="host"></param>
public HostFilter(string host)
{
_host = host;
}
/// <inheritdoc />
public bool Passes(RateLimitItemType type, RequestDefinition definition, string host, SecureString? apiKey)
=> host == _host;
}
}

View File

@ -0,0 +1,27 @@
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.RateLimiting.Interfaces;
using System.Security;
namespace CryptoExchange.Net.RateLimiting.Filters
{
/// <summary>
/// Filter requests based on whether it's a connection or a request
/// </summary>
public class LimitItemTypeFilter : IGuardFilter
{
private readonly RateLimitItemType _type;
/// <summary>
/// ctor
/// </summary>
/// <param name="type"></param>
public LimitItemTypeFilter(RateLimitItemType type)
{
_type = type;
}
/// <inheritdoc />
public bool Passes(RateLimitItemType type, RequestDefinition definition, string host, SecureString? apiKey)
=> type == _type;
}
}

View File

@ -0,0 +1,28 @@
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.RateLimiting.Interfaces;
using System;
using System.Security;
namespace CryptoExchange.Net.RateLimiting.Filters
{
/// <summary>
/// Filter requests based on whether the path starts with a specific string
/// </summary>
public class PathStartFilter : IGuardFilter
{
private readonly string _path;
/// <summary>
/// ctor
/// </summary>
/// <param name="path"></param>
public PathStartFilter(string path)
{
_path = path;
}
/// <inheritdoc />
public bool Passes(RateLimitItemType type, RequestDefinition definition, string host, SecureString? apiKey)
=> definition.Path.StartsWith(_path, StringComparison.OrdinalIgnoreCase);
}
}

View File

@ -0,0 +1,146 @@
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.RateLimiting.Interfaces;
using CryptoExchange.Net.RateLimiting.Trackers;
using System;
using System.Collections.Generic;
using System.Security;
using System.Text;
namespace CryptoExchange.Net.RateLimiting.Guards
{
/// <inheritdoc />
public class RateLimitGuard : IRateLimitGuard
{
/// <summary>
/// Apply guard per host
/// </summary>
public static Func<RequestDefinition, string, SecureString?, string> PerHost { get; } = new Func<RequestDefinition, string, SecureString?, string>((def, host, key) => host);
/// <summary>
/// Apply guard per endpoint
/// </summary>
public static Func<RequestDefinition, string, SecureString?, string> PerEndpoint { get; } = new Func<RequestDefinition, string, SecureString?, string>((def, host, key) => def.Path + def.Method);
/// <summary>
/// Apply guard per API key
/// </summary>
public static Func<RequestDefinition, string, SecureString?, string> PerApiKey { get; } = new Func<RequestDefinition, string, SecureString?, string>((def, host, key) => key!.GetString());
/// <summary>
/// Apply guard per API key per endpoint
/// </summary>
public static Func<RequestDefinition, string, SecureString?, string> PerApiKeyPerEndpoint { get; } = new Func<RequestDefinition, string, SecureString?, string>((def, host, key) => key!.GetString() + def.Path + def.Method);
private readonly IEnumerable<IGuardFilter> _filters;
private readonly Dictionary<string, IWindowTracker> _trackers;
private RateLimitWindowType _windowType;
private double? _decayRate;
private int? _connectionWeight;
private readonly Func<RequestDefinition, string, SecureString?, string> _keySelector;
/// <inheritdoc />
public string Name => "RateLimitGuard";
/// <inheritdoc />
public string Description => _windowType == RateLimitWindowType.Decay ? $"Limit of {Limit} with a decay rate of {_decayRate}" : $"Limit of {Limit} per {TimeSpan}";
/// <summary>
/// The limit per period
/// </summary>
public int Limit { get; }
/// <summary>
/// The time period for the limit
/// </summary>
public TimeSpan TimeSpan { get; }
/// <summary>
/// ctor
/// </summary>
/// <param name="keySelector">The rate limit key selector</param>
/// <param name="filter">Filter for rate limit items. Only when the rate limit item passes the filter the guard will apply</param>
/// <param name="limit">Limit per period</param>
/// <param name="timeSpan">Timespan for the period</param>
/// <param name="windowType">Type of rate limit window</param>
/// <param name="decayPerTimeSpan">The decay per timespan if windowType is DecayWindowTracker</param>
/// <param name="connectionWeight">The weight of a new connection</param>
public RateLimitGuard(Func<RequestDefinition, string, SecureString?, string> keySelector, IGuardFilter filter, int limit, TimeSpan timeSpan, RateLimitWindowType windowType, double? decayPerTimeSpan = null, int? connectionWeight = null)
: this(keySelector, new[] { filter }, limit, timeSpan, windowType, decayPerTimeSpan, connectionWeight)
{
}
/// <summary>
/// ctor
/// </summary>
/// <param name="keySelector">The rate limit key selector</param>
/// <param name="filters">Filters for rate limit items. Only when the rate limit item passes all filters the guard will apply</param>
/// <param name="limit">Limit per period</param>
/// <param name="timeSpan">Timespan for the period</param>
/// <param name="windowType">Type of rate limit window</param>
/// <param name="decayPerTimeSpan">The decay per timespan if windowType is DecayWindowTracker</param>
/// <param name="connectionWeight">The weight of a new connection</param>
public RateLimitGuard(Func<RequestDefinition, string, SecureString?, string> keySelector, IEnumerable<IGuardFilter> filters, int limit, TimeSpan timeSpan, RateLimitWindowType windowType, double? decayPerTimeSpan = null, int? connectionWeight = null)
{
_filters = filters;
_trackers = new Dictionary<string, IWindowTracker>();
_windowType = windowType;
Limit = limit;
TimeSpan = timeSpan;
_keySelector = keySelector;
_decayRate = decayPerTimeSpan;
_connectionWeight = connectionWeight;
}
/// <inheritdoc />
public LimitCheck Check(RateLimitItemType type, RequestDefinition definition, string host, SecureString? apiKey, int requestWeight)
{
foreach(var filter in _filters)
{
if (!filter.Passes(type, definition, host, apiKey))
return LimitCheck.NotApplicable;
}
if (type == RateLimitItemType.Connection)
requestWeight = _connectionWeight ?? requestWeight;
var key = _keySelector(definition, host, apiKey);
if (!_trackers.TryGetValue(key, out var tracker))
{
tracker = CreateTracker();
_trackers.Add(key, tracker);
}
var delay = tracker.GetWaitTime(requestWeight);
if (delay == default)
return LimitCheck.NotNeeded;
return LimitCheck.Needed(delay, Limit, TimeSpan, tracker.Current);
}
/// <inheritdoc />
public RateLimitState ApplyWeight(RateLimitItemType type, RequestDefinition definition, string host, SecureString? apiKey, int requestWeight)
{
foreach (var filter in _filters)
{
if (!filter.Passes(type, definition, host, apiKey))
return RateLimitState.NotApplied;
}
if (type == RateLimitItemType.Connection)
requestWeight = _connectionWeight ?? requestWeight;
var key = _keySelector(definition, host, apiKey);
var tracker = _trackers[key];
tracker.ApplyWeight(requestWeight);
return RateLimitState.Applied(Limit, TimeSpan, tracker.Current);
}
/// <summary>
/// Create a new WindowTracker
/// </summary>
/// <returns></returns>
protected IWindowTracker CreateTracker()
{
return _windowType == RateLimitWindowType.Sliding ? new SlidingWindowTracker(Limit, TimeSpan)
: _windowType == RateLimitWindowType.Fixed ? new FixedWindowTracker(Limit, TimeSpan)
: _windowType == RateLimitWindowType.FixedAfterFirst ? new FixedAfterStartWindowTracker(Limit, TimeSpan) :
new DecayWindowTracker(Limit, TimeSpan, _decayRate ?? throw new InvalidOperationException("Decay rate not provided"));
}
}
}

View File

@ -0,0 +1,62 @@
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.RateLimiting.Interfaces;
using System;
using System.Collections.Generic;
using System.Security;
using System.Text;
namespace CryptoExchange.Net.RateLimiting.Guards
{
/// <summary>
/// Retry after guard
/// </summary>
public class RetryAfterGuard : IRateLimitGuard
{
/// <summary>
/// Additional wait time to apply to account for time offset between server and client
/// </summary>
private static readonly TimeSpan _windowBuffer = TimeSpan.FromMilliseconds(1000);
/// <inheritdoc />
public string Name => "RetryAfterGuard";
/// <inheritdoc />
public string Description => $"Pause requests until after {After}";
/// <summary>
/// The timestamp after which requests are allowed again
/// </summary>
public DateTime After { get; private set; }
/// <summary>
/// ctor
/// </summary>
/// <param name="after"></param>
public RetryAfterGuard(DateTime after)
{
After = after;
}
/// <inheritdoc />
public LimitCheck Check(RateLimitItemType type, RequestDefinition definition, string host, SecureString? apiKey, int requestWeight)
{
var dif = (After + _windowBuffer) - DateTime.UtcNow;
if (dif <= TimeSpan.Zero)
return LimitCheck.NotApplicable;
return LimitCheck.Needed(dif, default, default, default);
}
/// <inheritdoc />
public RateLimitState ApplyWeight(RateLimitItemType type, RequestDefinition definition, string host, SecureString? apiKey, int requestWeight)
{
return RateLimitState.NotApplied;
}
/// <summary>
/// Update the 'after' time
/// </summary>
/// <param name="after"></param>
public void UpdateAfter(DateTime after) => After = after;
}
}

View File

@ -0,0 +1,72 @@
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.RateLimiting.Interfaces;
using CryptoExchange.Net.RateLimiting.Trackers;
using System;
using System.Collections.Generic;
using System.Security;
namespace CryptoExchange.Net.RateLimiting.Guards
{
/// <summary>
/// Rate limit guard for a per endpoint limit
/// </summary>
public class SingleLimitGuard : IRateLimitGuard
{
private readonly Dictionary<string, IWindowTracker> _trackers;
private readonly RateLimitWindowType _windowType;
private readonly double? _decayRate;
/// <inheritdoc />
public string Name => "EndpointLimitGuard";
/// <inheritdoc />
public string Description => $"Limit requests to endpoint";
/// <summary>
/// ctor
/// </summary>
public SingleLimitGuard(RateLimitWindowType windowType, double? decayRate = null)
{
_windowType = windowType;
_decayRate = decayRate;
_trackers = new Dictionary<string, IWindowTracker>();
}
/// <inheritdoc />
public LimitCheck Check(RateLimitItemType type, RequestDefinition definition, string host, SecureString? apiKey, int requestWeight)
{
var key = definition.Path + definition.Method;
if (!_trackers.TryGetValue(key, out var tracker))
{
tracker = CreateTracker(definition.EndpointLimitCount!.Value, definition.EndpointLimitPeriod!.Value);
_trackers.Add(key, tracker);
}
var delay = tracker.GetWaitTime(requestWeight);
if (delay == default)
return LimitCheck.NotNeeded;
return LimitCheck.Needed(delay, definition.EndpointLimitCount!.Value, definition.EndpointLimitPeriod!.Value, tracker.Current);
}
/// <inheritdoc />
public RateLimitState ApplyWeight(RateLimitItemType type, RequestDefinition definition, string host, SecureString? apiKey, int requestWeight)
{
var key = definition.Path + definition.Method + definition;
var tracker = _trackers[key];
tracker.ApplyWeight(requestWeight);
return RateLimitState.Applied(definition.EndpointLimitCount!.Value, definition.EndpointLimitPeriod!.Value, tracker.Current);
}
/// <summary>
/// Create a new WindowTracker
/// </summary>
/// <returns></returns>
protected IWindowTracker CreateTracker(int limit, TimeSpan timeSpan)
{
return _windowType == RateLimitWindowType.Sliding ? new SlidingWindowTracker(limit, timeSpan)
: _windowType == RateLimitWindowType.Fixed ? new FixedWindowTracker(limit, timeSpan) :
new DecayWindowTracker(limit, timeSpan, _decayRate ?? throw new InvalidOperationException("Decay rate not provided"));
}
}
}

View File

@ -0,0 +1,21 @@
using CryptoExchange.Net.Objects;
using System.Security;
namespace CryptoExchange.Net.RateLimiting.Interfaces
{
/// <summary>
/// Filter requests based on specific condition
/// </summary>
public interface IGuardFilter
{
/// <summary>
/// Whether a request or connection passes this filter
/// </summary>
/// <param name="type">The type of item</param>
/// <param name="definition">The request definition</param>
/// <param name="host">The host address</param>
/// <param name="apiKey">The API key</param>
/// <returns>True if passed</returns>
bool Passes(RateLimitItemType type, RequestDefinition definition, string host, SecureString? apiKey);
}
}

View File

@ -0,0 +1,78 @@
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.RateLimiting.Guards;
using Microsoft.Extensions.Logging;
using System;
using System.Security;
using System.Threading;
using System.Threading.Tasks;
namespace CryptoExchange.Net.RateLimiting.Interfaces
{
/// <summary>
/// Rate limit gate
/// </summary>
public interface IRateLimitGate
{
/// <summary>
/// Event when the rate limit is triggered
/// </summary>
event Action<RateLimitEvent> RateLimitTriggered;
/// <summary>
/// Add a rate limit guard
/// </summary>
/// <param name="guard">Guard to add</param>
/// <returns></returns>
IRateLimitGate AddGuard(IRateLimitGuard guard);
/// <summary>
/// Set a RetryAfter guard, can be used when a server rate limit is hit and a RetryAfter header is specified
/// </summary>
/// <param name="retryAfter">The time after which requests can be send again</param>
/// <returns></returns>
Task SetRetryAfterGuardAsync(DateTime retryAfter);
/// <summary>
/// Set the SingleLimitGuard for handling individual endpoint rate limits
/// </summary>
/// <param name="guard"></param>
/// <returns></returns>
IRateLimitGate SetSingleLimitGuard(SingleLimitGuard guard);
/// <summary>
/// Returns the 'retry after' timestamp if set
/// </summary>
/// <returns></returns>
Task<DateTime?> GetRetryAfterTime();
/// <summary>
/// Process a request. Enforces the configured rate limits. When a rate limit is hit will wait for the rate limit to pass if RateLimitingBehaviour is Wait, or return an error if it is set to Fail
/// </summary>
/// <param name="logger">Logger</param>
/// <param name="itemId">Id of the item to check</param>
/// <param name="type">The rate limit item type</param>
/// <param name="definition">The request definition</param>
/// <param name="baseAddress">The host address</param>
/// <param name="apiKey">The API key</param>
/// <param name="requestWeight">Request weight</param>
/// <param name="behaviour">Behaviour when rate limit is hit</param>
/// <param name="ct">Cancelation token</param>
/// <returns>Error if RateLimitingBehaviour is Fail and rate limit is hit</returns>
Task<CallResult> ProcessAsync(ILogger logger, int itemId, RateLimitItemType type, RequestDefinition definition, string baseAddress, SecureString? apiKey, int requestWeight, RateLimitingBehaviour behaviour, CancellationToken ct);
/// <summary>
/// Enforces the rate limit as defined in the request definition. When a rate limit is hit will wait for the rate limit to pass if RateLimitingBehaviour is Wait, or return an error if it is set to Fail
/// </summary>
/// <param name="logger">Logger</param>
/// <param name="itemId">Id of the item to check</param>
/// <param name="type">The rate limit item type</param>
/// <param name="definition">The request definition</param>
/// <param name="baseAddress">The host address</param>
/// <param name="apiKey">The API key</param>
/// <param name="requestWeight">Request weight</param>
/// <param name="behaviour">Behaviour when rate limit is hit</param>
/// <param name="ct">Cancelation token</param>
/// <returns>Error if RateLimitingBehaviour is Fail and rate limit is hit</returns>
Task<CallResult> ProcessSingleAsync(ILogger logger, int itemId, RateLimitItemType type, RequestDefinition definition, string baseAddress, SecureString? apiKey, int requestWeight, RateLimitingBehaviour behaviour, CancellationToken ct);
}
}

View File

@ -0,0 +1,44 @@
using CryptoExchange.Net.Objects;
using System.Net.Http;
using System.Security;
namespace CryptoExchange.Net.RateLimiting.Interfaces
{
/// <summary>
/// Rate limit guard
/// </summary>
public interface IRateLimitGuard
{
/// <summary>
/// Name
/// </summary>
string Name { get; }
/// <summary>
/// Description
/// </summary>
string Description { get; }
/// <summary>
/// Check whether a request can pass this rate limit guard
/// </summary>
/// <param name="type">The rate limit item type</param>
/// <param name="definition">The request definition</param>
/// <param name="host">The host address</param>
/// <param name="apiKey">The API key</param>
/// <param name="requestWeight">The request weight</param>
/// <returns></returns>
LimitCheck Check(RateLimitItemType type, RequestDefinition definition, string host, SecureString? apiKey, int requestWeight);
/// <summary>
/// Apply the request to this guard with the specified weight
/// </summary>
/// <param name="type">The rate limit item type</param>
/// <param name="definition">The request definition</param>
/// <param name="host">The host address</param>
/// <param name="apiKey">The API key</param>
/// <param name="requestWeight">The request weight</param>
/// <returns></returns>
RateLimitState ApplyWeight(RateLimitItemType type, RequestDefinition definition, string host, SecureString? apiKey, int requestWeight);
}
}

View File

@ -0,0 +1,34 @@
using System;
namespace CryptoExchange.Net.RateLimiting.Interfaces
{
/// <summary>
/// Rate limit window tracker
/// </summary>
public interface IWindowTracker
{
/// <summary>
/// Time period the limit is for
/// </summary>
TimeSpan TimePeriod { get; }
/// <summary>
/// The limit in the time period
/// </summary>
int Limit { get; }
/// <summary>
/// The current count within the time period
/// </summary>
int Current { get; }
/// <summary>
/// Get the time to wait to fit the weight
/// </summary>
/// <param name="weight"></param>
/// <returns></returns>
TimeSpan GetWaitTime(int weight);
/// <summary>
/// Register the weight in this window
/// </summary>
/// <param name="weight">Request weight</param>
void ApplyWeight(int weight);
}
}

View File

@ -0,0 +1,60 @@
using System;
namespace CryptoExchange.Net.RateLimiting
{
/// <summary>
/// Limit check
/// </summary>
public readonly struct LimitCheck
{
/// <summary>
/// Is guard applicable
/// </summary>
public bool Applicable { get; }
/// <summary>
/// Delay needed
/// </summary>
public TimeSpan Delay { get; }
/// <summary>
/// Current counter
/// </summary>
public int Current { get; }
/// <summary>
/// Limit
/// </summary>
public int? Limit { get; }
/// <summary>
/// Time period
/// </summary>
public TimeSpan? Period { get; }
private LimitCheck(bool applicable, TimeSpan delay, int limit, TimeSpan period, int current)
{
Applicable = applicable;
Delay = delay;
Limit = limit;
Period = period;
Current = current;
}
/// <summary>
/// Not applicable
/// </summary>
public static LimitCheck NotApplicable { get; } = new LimitCheck(false, default, default, default, default);
/// <summary>
/// No wait needed
/// </summary>
public static LimitCheck NotNeeded { get; } = new LimitCheck(true, default, default, default, default);
/// <summary>
/// Wait needed
/// </summary>
/// <param name="delay">The delay needed</param>
/// <param name="limit">Limit per period</param>
/// <param name="period">Period the limit is for</param>
/// <param name="current">Current counter</param>
/// <returns></returns>
public static LimitCheck Needed(TimeSpan delay, int limit, TimeSpan period, int current) => new(true, delay, limit, period, current);
}
}

View File

@ -0,0 +1,30 @@
using System;
namespace CryptoExchange.Net.RateLimiting
{
/// <summary>
/// A rate limit entry
/// </summary>
public struct LimitEntry
{
/// <summary>
/// Timestamp of the item
/// </summary>
public DateTime Timestamp { get; set; }
/// <summary>
/// Item weight
/// </summary>
public int Weight { get; set; }
/// <summary>
/// ctor
/// </summary>
/// <param name="timestamp"></param>
/// <param name="weight"></param>
public LimitEntry(DateTime timestamp, int weight)
{
Timestamp = timestamp;
Weight = weight;
}
}
}

View File

@ -0,0 +1,80 @@
using CryptoExchange.Net.Objects;
using System;
namespace CryptoExchange.Net.RateLimiting
{
/// <summary>
/// Rate limit event
/// </summary>
public record RateLimitEvent
{
/// <summary>
/// Name of the API limit that is reached
/// </summary>
public string ApiLimit { get; set; } = string.Empty;
/// <summary>
/// Description of the limit that is reached
/// </summary>
public string LimitDescription { get; set; } = string.Empty;
/// <summary>
/// The request definition
/// </summary>
public RequestDefinition RequestDefinition { get; set; }
/// <summary>
/// The host the request is for
/// </summary>
public string Host { get; set; } = default!;
/// <summary>
/// The current counter value
/// </summary>
public int Current { get; set; }
/// <summary>
/// The weight of the limited request
/// </summary>
public int RequestWeight { get; set; }
/// <summary>
/// The limit per time period
/// </summary>
public int? Limit { get; set; }
/// <summary>
/// The time period the limit is for
/// </summary>
public TimeSpan? TimePeriod { get; set; }
/// <summary>
/// The time the request will be delayed for if the Behaviour is RateLimitingBehaviour.Wait
/// </summary>
public TimeSpan? DelayTime { get; set; }
/// <summary>
/// The handling behaviour for the rquest
/// </summary>
public RateLimitingBehaviour Behaviour { get; set; }
/// <summary>
/// ctor
/// </summary>
/// <param name="apiLimit"></param>
/// <param name="limitDescription"></param>
/// <param name="definition"></param>
/// <param name="host"></param>
/// <param name="current"></param>
/// <param name="requestWeight"></param>
/// <param name="limit"></param>
/// <param name="timePeriod"></param>
/// <param name="delayTime"></param>
/// <param name="behaviour"></param>
public RateLimitEvent(string apiLimit, string limitDescription, RequestDefinition definition, string host, int current, int requestWeight, int? limit, TimeSpan? timePeriod, TimeSpan? delayTime, RateLimitingBehaviour behaviour)
{
ApiLimit = apiLimit;
LimitDescription = limitDescription;
RequestDefinition = definition;
Host = host;
Current = current;
RequestWeight = requestWeight;
Limit = limit;
TimePeriod = timePeriod;
DelayTime = delayTime;
Behaviour = behaviour;
}
}
}

View File

@ -0,0 +1,174 @@
using CryptoExchange.Net.Logging.Extensions;
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.RateLimiting.Guards;
using CryptoExchange.Net.RateLimiting.Interfaces;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Security;
using System.Threading;
using System.Threading.Tasks;
namespace CryptoExchange.Net.RateLimiting
{
/// <inheritdoc />
public class RateLimitGate : IRateLimitGate
{
private IRateLimitGuard _singleLimitGuard = new SingleLimitGuard(RateLimitWindowType.Sliding);
private readonly ConcurrentBag<IRateLimitGuard> _guards;
private readonly SemaphoreSlim _semaphore;
private readonly string _name;
private int _waitingCount;
/// <inheritdoc />
public event Action<RateLimitEvent>? RateLimitTriggered;
/// <summary>
/// ctor
/// </summary>
public RateLimitGate(string name)
{
_name = name;
_guards = new ConcurrentBag<IRateLimitGuard>();
_semaphore = new SemaphoreSlim(1);
}
/// <inheritdoc />
public async Task<CallResult> ProcessAsync(ILogger logger, int itemId, RateLimitItemType type, RequestDefinition definition, string host, SecureString? apiKey, int requestWeight, RateLimitingBehaviour rateLimitingBehaviour, CancellationToken ct)
{
await _semaphore.WaitAsync(ct).ConfigureAwait(false);
_waitingCount++;
try
{
return await CheckGuardsAsync(_guards, logger, itemId, type, definition, host, apiKey, requestWeight, rateLimitingBehaviour, ct).ConfigureAwait(false);
}
finally
{
_waitingCount--;
_semaphore.Release();
}
}
/// <inheritdoc />
public async Task<CallResult> ProcessSingleAsync(ILogger logger, int itemId, RateLimitItemType type, RequestDefinition definition, string host, SecureString? apiKey, int requestWeight, RateLimitingBehaviour rateLimitingBehaviour, CancellationToken ct)
{
await _semaphore.WaitAsync(ct).ConfigureAwait(false);
if (requestWeight == 0)
requestWeight = 1;
_waitingCount++;
try
{
return await CheckGuardsAsync(new IRateLimitGuard[] { _singleLimitGuard }, logger, itemId, type, definition, host, apiKey, requestWeight, rateLimitingBehaviour, ct).ConfigureAwait(false);
}
finally
{
_waitingCount--;
_semaphore.Release();
}
}
private async Task<CallResult> CheckGuardsAsync(IEnumerable<IRateLimitGuard> guards, ILogger logger, int itemId, RateLimitItemType type, RequestDefinition definition, string host, SecureString? apiKey, int requestWeight, RateLimitingBehaviour rateLimitingBehaviour, CancellationToken ct)
{
foreach (var guard in guards)
{
// Check if a wait is needed for this guard
var result = guard.Check(type, definition, host, apiKey, requestWeight);
if (result.Delay != TimeSpan.Zero && rateLimitingBehaviour == RateLimitingBehaviour.Fail)
{
// Delay is needed and limit behaviour is to fail the request
if (type == RateLimitItemType.Connection)
logger.RateLimitConnectionFailed(itemId, guard.Name, guard.Description);
else
logger.RateLimitRequestFailed(itemId, definition.Path, guard.Name, guard.Description);
RateLimitTriggered?.Invoke(new RateLimitEvent(_name, guard.Description, definition, host, result.Current, requestWeight, result.Limit, result.Period, result.Delay, rateLimitingBehaviour));
return new CallResult(new ClientRateLimitError($"Rate limit check failed on guard {guard.Name}; {guard.Description}"));
}
if (result.Delay != TimeSpan.Zero)
{
// Delay is needed and limit behaviour is to wait for the request to be under the limit
_semaphore.Release();
var description = result.Limit == null ? guard.Description : $"{guard.Description}, Request weight: {requestWeight}, Current: {result.Current}, Limit: {result.Limit}, requests now being limited: {_waitingCount}";
if (type == RateLimitItemType.Connection)
logger.RateLimitDelayingConnection(itemId, result.Delay, guard.Name, description);
else
logger.RateLimitDelayingRequest(itemId, definition.Path, result.Delay, guard.Name, description);
RateLimitTriggered?.Invoke(new RateLimitEvent(_name, guard.Description, definition, host, result.Current, requestWeight, result.Limit, result.Period, result.Delay, rateLimitingBehaviour));
await Task.Delay(result.Delay, ct).ConfigureAwait(false);
await _semaphore.WaitAsync(ct).ConfigureAwait(false);
return await CheckGuardsAsync(guards, logger, itemId, type, definition, host, apiKey, requestWeight, rateLimitingBehaviour, ct).ConfigureAwait(false);
}
}
// Apply the weight on each guard
foreach (var guard in guards)
{
var result = guard.ApplyWeight(type, definition, host, apiKey, requestWeight);
if (result.IsApplied)
{
if (type == RateLimitItemType.Connection)
logger.RateLimitAppliedConnection(itemId, guard.Name, guard.Description, result.Current);
else
logger.RateLimitAppliedRequest(itemId, definition.Path, guard.Name, guard.Description, result.Current);
}
}
return new CallResult(null);
}
/// <inheritdoc />
public IRateLimitGate AddGuard(IRateLimitGuard guard)
{
_guards.Add(guard);
return this;
}
/// <inheritdoc />
public IRateLimitGate SetSingleLimitGuard(SingleLimitGuard guard)
{
_singleLimitGuard = guard;
return this;
}
/// <inheritdoc />
public async Task SetRetryAfterGuardAsync(DateTime retryAfter)
{
await _semaphore.WaitAsync().ConfigureAwait(false);
try
{
var retryAfterGuard = _guards.OfType<RetryAfterGuard>().SingleOrDefault();
if (retryAfterGuard == null)
_guards.Add(new RetryAfterGuard(retryAfter));
else
retryAfterGuard.UpdateAfter(retryAfter);
}
finally
{
_semaphore.Release();
}
}
/// <inheritdoc />
public async Task<DateTime?> GetRetryAfterTime()
{
await _semaphore.WaitAsync().ConfigureAwait(false);
try
{
var retryAfterGuard = _guards.OfType<RetryAfterGuard>().SingleOrDefault();
return retryAfterGuard?.After;
}
finally
{
_semaphore.Release();
}
}
}
}

View File

@ -0,0 +1,20 @@
using System;
namespace CryptoExchange.Net.RateLimiting
{
/// <summary>
/// Rate limit item type
/// </summary>
[Flags]
public enum RateLimitItemType
{
/// <summary>
/// A connection attempt
/// </summary>
Connection = 1,
/// <summary>
/// A request
/// </summary>
Request = 2
}
}

View File

@ -0,0 +1,55 @@
using System;
namespace CryptoExchange.Net.RateLimiting
{
/// <summary>
/// Limit state
/// </summary>
public struct RateLimitState
{
/// <summary>
/// Limit
/// </summary>
public int Limit { get; }
/// <summary>
/// Period
/// </summary>
public TimeSpan Period { get; }
/// <summary>
/// Current count
/// </summary>
public int Current { get; }
/// <summary>
/// Whether the limit is applied
/// </summary>
public bool IsApplied { get; set; }
/// <summary>
/// ctor
/// </summary>
/// <param name="applied"></param>
/// <param name="limit"></param>
/// <param name="period"></param>
/// <param name="current"></param>
public RateLimitState(bool applied, int limit, TimeSpan period, int current)
{
IsApplied = applied;
Limit = limit;
Period = period;
Current = current;
}
/// <summary>
/// Not applied result
/// </summary>
public static RateLimitState NotApplied { get; } = new RateLimitState(false, default, default, default);
/// <summary>
/// Applied result
/// </summary>
/// <param name="limit"></param>
/// <param name="period"></param>
/// <param name="current"></param>
/// <returns></returns>
public static RateLimitState Applied(int limit, TimeSpan period, int current) => new RateLimitState(true, limit, period, current);
}
}

View File

@ -0,0 +1,86 @@
using System;
using CryptoExchange.Net.RateLimiting.Interfaces;
namespace CryptoExchange.Net.RateLimiting.Trackers
{
internal class DecayWindowTracker : IWindowTracker
{
/// <inheritdoc />
public TimeSpan TimePeriod { get; }
/// <summary>
/// Decrease rate per TimePeriod
/// </summary>
public double DecreaseRate { get; }
/// <inheritdoc />
public int Limit { get; }
/// <inheritdoc />
public int Current => _currentWeight;
private int _currentWeight = 0;
private DateTime _lastDecrease = DateTime.UtcNow;
public DecayWindowTracker(int limit, TimeSpan period, double decayRate)
{
Limit = limit;
TimePeriod = period;
DecreaseRate = decayRate;
}
/// <inheritdoc />
public TimeSpan GetWaitTime(int weight)
{
// Decrease the counter based on the last update time and decay rate
DecreaseCounter(DateTime.UtcNow);
if (Current + weight > Limit)
{
// The weight would cause the rate limit to be passed
if (Current == 0)
{
throw new Exception("Request limit reached without any prior request. " +
$"This request can never execute with the current rate limiter. Request weight: {weight}, Ratelimit: {Limit}");
}
// Determine the time to wait before this weight can be applied without going over the rate limit
return DetermineWaitTime(weight);
}
// Weight can fit without going over limit
return TimeSpan.Zero;
}
/// <inheritdoc />
public void ApplyWeight(int weight)
{
if (_currentWeight == 0)
_lastDecrease = DateTime.UtcNow;
_currentWeight += weight;
}
/// <summary>
/// Decrease the counter based on time passed since last update and the decay rate
/// </summary>
/// <param name="time"></param>
protected void DecreaseCounter(DateTime time)
{
var dif = (time - _lastDecrease).TotalMilliseconds / TimePeriod.TotalMilliseconds * DecreaseRate;
var decrease = (int)Math.Floor(dif);
if (decrease >= 1)
{
_currentWeight = Math.Max(0, _currentWeight - (int)Math.Floor(dif));
_lastDecrease = time;
}
}
/// <summary>
/// Determine the time to wait before the weight would fit
/// </summary>
/// <param name="requestWeight"></param>
/// <returns></returns>
private TimeSpan DetermineWaitTime(int requestWeight)
{
var weightToRemove = Math.Max(Current - (Limit - requestWeight), 0);
return TimeSpan.FromMilliseconds(Math.Ceiling(weightToRemove / DecreaseRate) * TimePeriod.TotalMilliseconds);
}
}
}

View File

@ -0,0 +1,103 @@
using System;
using System.Collections.Generic;
using CryptoExchange.Net.RateLimiting.Interfaces;
namespace CryptoExchange.Net.RateLimiting.Trackers
{
internal class FixedAfterStartWindowTracker : IWindowTracker
{
/// <inheritdoc />
public TimeSpan TimePeriod { get; }
/// <inheritdoc />
public int Limit { get; }
/// <inheritdoc />
public int Current => _currentWeight;
private readonly Queue<LimitEntry> _entries;
private int _currentWeight = 0;
private DateTime? _nextReset;
/// <summary>
/// Additional wait time to apply to account for time offset between server and client
/// </summary>
private static TimeSpan _fixedWindowBuffer = TimeSpan.FromMilliseconds(1000);
public FixedAfterStartWindowTracker(int limit, TimeSpan period)
{
Limit = limit;
TimePeriod = period;
_entries = new Queue<LimitEntry>();
}
public TimeSpan GetWaitTime(int weight)
{
// Remove requests no longer in time period from the history
var checkTime = DateTime.UtcNow;
if (_nextReset != null && checkTime > _nextReset)
RemoveBefore(_nextReset.Value);
if (Current == 0)
_nextReset = null;
if (Current + weight > Limit)
{
// The weight would cause the rate limit to be passed
if (Current == 0)
{
throw new Exception("Request limit reached without any prior request. " +
$"This request can never execute with the current rate limiter. Request weight: {weight}, Ratelimit: {Limit}");
}
// Determine the time to wait before this weight can be applied without going over the rate limit
return DetermineWaitTime();
}
// Weight can fit without going over limit
return TimeSpan.Zero;
}
/// <inheritdoc />
public void ApplyWeight(int weight)
{
if (_currentWeight == 0)
_nextReset = DateTime.UtcNow + TimePeriod;
_currentWeight += weight;
_entries.Enqueue(new LimitEntry(DateTime.UtcNow, weight));
}
/// <summary>
/// Remove items before a certain time
/// </summary>
/// <param name="time"></param>
protected void RemoveBefore(DateTime time)
{
while (true)
{
if (_entries.Count == 0)
break;
var firstItem = _entries.Peek();
if (firstItem.Timestamp < time)
{
_entries.Dequeue();
_currentWeight -= firstItem.Weight;
}
else
{
// Either no entries left, or the entry time is still within the window
break;
}
}
}
/// <summary>
/// Determine the time to wait before a new item would fit
/// </summary>
/// <returns></returns>
private TimeSpan DetermineWaitTime()
{
var checkTime = DateTime.UtcNow;
return (_nextReset!.Value - checkTime) + _fixedWindowBuffer;
}
}
}

View File

@ -0,0 +1,99 @@
using System;
using System.Collections.Generic;
using CryptoExchange.Net.RateLimiting.Interfaces;
namespace CryptoExchange.Net.RateLimiting.Trackers
{
internal class FixedWindowTracker : IWindowTracker
{
/// <inheritdoc />
public TimeSpan TimePeriod { get; }
/// <inheritdoc />
public int Limit { get; }
/// <inheritdoc />
public int Current => _currentWeight;
private readonly Queue<LimitEntry> _entries;
private int _currentWeight = 0;
/// <summary>
/// Additional wait time to apply to account for time offset between server and client
/// </summary>
private static readonly TimeSpan _fixedWindowBuffer = TimeSpan.FromMilliseconds(1000);
public FixedWindowTracker(int limit, TimeSpan period)
{
Limit = limit;
TimePeriod = period;
_entries = new Queue<LimitEntry>();
}
/// <inheritdoc />
public TimeSpan GetWaitTime(int weight)
{
// Remove requests no longer in time period from the history
var checkTime = DateTime.UtcNow;
RemoveBefore(checkTime.AddTicks(-(checkTime.Ticks % TimePeriod.Ticks)));
if (Current + weight > Limit)
{
// The weight would cause the rate limit to be passed
if (Current == 0)
{
throw new Exception("Request limit reached without any prior request. " +
$"This request can never execute with the current rate limiter. Request weight: {weight}, Ratelimit: {Limit}");
}
// Determine the time to wait before this weight can be applied without going over the rate limit
return DetermineWaitTime();
}
// Weight can fit without going over limit
return TimeSpan.Zero;
}
/// <inheritdoc />
public void ApplyWeight(int weight)
{
_currentWeight += weight;
_entries.Enqueue(new LimitEntry(DateTime.UtcNow, weight));
}
/// <summary>
/// Remove items before a certain time
/// </summary>
/// <param name="time"></param>
protected void RemoveBefore(DateTime time)
{
while (true)
{
if (_entries.Count == 0)
break;
var firstItem = _entries.Peek();
if (firstItem.Timestamp < time)
{
_entries.Dequeue();
_currentWeight -= firstItem.Weight;
}
else
{
// Either no entries left, or the entry time is still within the window
break;
}
}
}
/// <summary>
/// Determine the time to wait before a new item would fit
/// </summary>
/// <returns></returns>
private TimeSpan DetermineWaitTime()
{
var checkTime = DateTime.UtcNow;
var startCurrentWindow = checkTime.AddTicks(-(checkTime.Ticks % TimePeriod.Ticks));
var wait = startCurrentWindow.Add(TimePeriod) - checkTime;
return wait.Add(_fixedWindowBuffer);
}
}
}

View File

@ -0,0 +1,100 @@
using System;
using System.Collections.Generic;
using CryptoExchange.Net.RateLimiting.Interfaces;
namespace CryptoExchange.Net.RateLimiting.Trackers
{
internal class SlidingWindowTracker : IWindowTracker
{
/// <inheritdoc />
public TimeSpan TimePeriod { get; }
/// <inheritdoc />
public int Limit { get; }
/// <inheritdoc />
public int Current => _currentWeight;
private readonly List<LimitEntry> _entries;
private int _currentWeight = 0;
public SlidingWindowTracker(int limit, TimeSpan period)
{
Limit = limit;
TimePeriod = period;
_entries = new List<LimitEntry>();
}
/// <inheritdoc />
public TimeSpan GetWaitTime(int weight)
{
// Remove requests no longer in time period from the history
RemoveBefore(DateTime.UtcNow - TimePeriod);
if (Current + weight > Limit)
{
// The weight would cause the rate limit to be passed
if (Current == 0)
{
throw new Exception("Request limit reached without any prior request. " +
$"This request can never execute with the current rate limiter. Request weight: {weight}, Ratelimit: {Limit}");
}
// Determine the time to wait before this weight can be applied without going over the rate limit
return DetermineWaitTime(weight);
}
// Weight can fit without going over limit
return TimeSpan.Zero;
}
/// <inheritdoc />
public void ApplyWeight(int weight)
{
_currentWeight += weight;
_entries.Add(new LimitEntry(DateTime.UtcNow, weight));
}
/// <summary>
/// Remove items before a certain time
/// </summary>
/// <param name="time"></param>
protected void RemoveBefore(DateTime time)
{
for (var i = 0; i < _entries.Count; i++)
{
if (_entries[i].Timestamp < time)
{
var entry = _entries[i];
_entries.Remove(entry);
_currentWeight -= entry.Weight;
i--;
}
else
{
break;
}
}
}
/// <summary>
/// Determine the time to wait before the weight would fit
/// </summary>
/// <returns></returns>
private TimeSpan DetermineWaitTime(int requestWeight)
{
var weightToRemove = Math.Max(Current - (Limit - requestWeight), 0);
var removedWeight = 0;
for (var i = 0; i < _entries.Count; i++)
{
var entry = _entries[i];
removedWeight += entry.Weight;
if (removedWeight >= weightToRemove)
{
return entry.Timestamp + TimePeriod - DateTime.UtcNow;
}
}
throw new Exception("Request not possible to execute with current rate limit guard. " +
$" Request weight: {requestWeight}, Ratelimit: {Limit}");
}
}
}

View File

@ -2,6 +2,7 @@
using CryptoExchange.Net.Logging.Extensions;
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Sockets;
using CryptoExchange.Net.RateLimiting;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Concurrent;
@ -45,6 +46,7 @@ namespace CryptoExchange.Net.Sockets
private bool _disposed;
private ProcessState _processState;
private DateTime _lastReconnectTime;
private string _baseAddress;
private const int _receiveBufferSize = 1048576;
private const int _sendBufferSize = 4096;
@ -110,6 +112,9 @@ namespace CryptoExchange.Net.Sockets
/// <inheritdoc />
public event Func<int, Task>? OnRequestSent;
/// <inheritdoc />
public event Func<int, Task>? OnRequestRateLimited;
/// <inheritdoc />
public event Func<Exception, Task>? OnError;
@ -143,17 +148,19 @@ namespace CryptoExchange.Net.Sockets
_closeSem = new SemaphoreSlim(1, 1);
_socket = CreateSocket();
_baseAddress = $"{Uri.Scheme}://{Uri.Host}";
}
/// <inheritdoc />
public virtual async Task<bool> ConnectAsync()
public virtual async Task<CallResult> ConnectAsync()
{
if (!await ConnectInternalAsync().ConfigureAwait(false))
return false;
var connectResult = await ConnectInternalAsync().ConfigureAwait(false);
if (!connectResult)
return connectResult;
await (OnOpen?.Invoke() ?? Task.CompletedTask).ConfigureAwait(false);
_processTask = ProcessAsync();
return true;
return connectResult;
}
/// <summary>
@ -188,11 +195,19 @@ namespace CryptoExchange.Net.Sockets
return socket;
}
private async Task<bool> ConnectInternalAsync()
private async Task<CallResult> ConnectInternalAsync()
{
_logger.SocketConnecting(Id);
try
{
if (Parameters.RateLimiter != null)
{
var definition = new RequestDefinition(Id.ToString(), HttpMethod.Get);
var limitResult = await Parameters.RateLimiter.ProcessAsync(_logger, Id, RateLimitItemType.Connection, definition, _baseAddress, null, 1, Parameters.RateLimitingBehaviour, _ctsSource.Token).ConfigureAwait(false);
if (!limitResult)
return new CallResult(new ClientRateLimitError("Connection limit reached"));
}
using CancellationTokenSource tcs = new(TimeSpan.FromSeconds(10));
using var linked = CancellationTokenSource.CreateLinkedTokenSource(tcs.Token, _ctsSource.Token);
await _socket.ConnectAsync(Uri, linked.Token).ConfigureAwait(false);
@ -204,11 +219,11 @@ namespace CryptoExchange.Net.Sockets
// if _ctsSource was canceled this was already logged
_logger.SocketConnectionFailed(Id, e.Message, e);
}
return false;
return new CallResult(new CantConnectError());
}
_logger.SocketConnected(Id, Uri);
return true;
return new CallResult(null);
}
/// <inheritdoc />
@ -407,9 +422,9 @@ namespace CryptoExchange.Net.Sockets
/// <returns></returns>
private async Task SendLoopAsync()
{
var requestDefinition = new RequestDefinition(Id.ToString(), HttpMethod.Get);
try
{
var limitKey = Uri.ToString() + "/" + Id.ToString();
while (true)
{
if (_ctsSource.IsCancellationRequested)
@ -422,16 +437,13 @@ namespace CryptoExchange.Net.Sockets
while (_sendBuffer.TryDequeue(out var data))
{
if (Parameters.RateLimiters != null)
if (Parameters.RateLimiter != null)
{
foreach(var ratelimiter in Parameters.RateLimiters)
var limitResult = await Parameters.RateLimiter.ProcessAsync(_logger, data.Id, RateLimitItemType.Request, requestDefinition, _baseAddress, null, data.Weight, Parameters.RateLimitingBehaviour, _ctsSource.Token).ConfigureAwait(false);
if (!limitResult)
{
var limitResult = await ratelimiter.LimitRequestAsync(_logger, limitKey, HttpMethod.Get, false, null, RateLimitingBehaviour.Wait, data.Weight, _ctsSource.Token).ConfigureAwait(false);
if (limitResult.Success)
{
if (limitResult.Data > 0)
_logger.SocketSendDelayedBecauseOfRateLimit(Id, data.Id, limitResult.Data);
}
await (OnRequestRateLimited?.Invoke(data.Id) ?? Task.CompletedTask).ConfigureAwait(false);
continue;
}
}

View File

@ -1,7 +1,5 @@
using CryptoExchange.Net.Objects;
using System;
using System.Collections.Generic;
using System.Text;
namespace CryptoExchange.Net.Sockets
{

View File

@ -3,7 +3,6 @@ using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Sockets;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

View File

@ -216,6 +216,7 @@ namespace CryptoExchange.Net.Sockets
_socket = socket;
_socket.OnStreamMessage += HandleStreamMessage;
_socket.OnRequestSent += HandleRequestSentAsync;
_socket.OnRequestRateLimited += HandleRequestRateLimitedAsync;
_socket.OnOpen += HandleOpenAsync;
_socket.OnClose += HandleCloseAsync;
_socket.OnReconnecting += HandleReconnectingAsync;
@ -359,6 +360,26 @@ namespace CryptoExchange.Net.Sockets
return Task.CompletedTask;
}
/// <summary>
/// Handler for whenever a request is rate limited and rate limit behaviour is set to fail
/// </summary>
/// <param name="requestId"></param>
/// <returns></returns>
protected virtual Task HandleRequestRateLimitedAsync(int requestId)
{
Query query;
lock (_listenersLock)
{
query = _listeners.OfType<Query>().FirstOrDefault(x => x.Id == requestId);
}
if (query == null)
return Task.CompletedTask;
query.Fail(new ClientRateLimitError("Connection rate limit reached"));
return Task.CompletedTask;
}
/// <summary>
/// Handler for whenever a request is sent over the websocket
/// </summary>
@ -500,7 +521,7 @@ namespace CryptoExchange.Net.Sockets
/// Connect the websocket
/// </summary>
/// <returns></returns>
public async Task<bool> ConnectAsync() => await _socket.ConnectAsync().ConfigureAwait(false);
public async Task<CallResult> ConnectAsync() => await _socket.ConnectAsync().ConfigureAwait(false);
/// <summary>
/// Retrieve the underlying socket
@ -750,13 +771,13 @@ namespace CryptoExchange.Net.Sockets
if (ApiClient.MessageSendSizeLimit != null && data.Length > ApiClient.MessageSendSizeLimit.Value)
{
var info = $"Message to send exceeds the max server message size ({ApiClient.MessageSendSizeLimit.Value} bytes). Split the request into batches to keep below this limit";
_logger.LogWarning("[Sckt {SocketId}] msg {RequestId} - {Info}", SocketId, requestId, info);
_logger.LogWarning("[Sckt {SocketId}] [Req {RequestId}] {Info}", SocketId, requestId, info);
return new CallResult(new InvalidOperationError(info));
}
if (!_socket.IsOpen)
{
_logger.LogWarning("[Sckt {SocketId}] msg {RequestId} - Failed to send, socket no longer open", SocketId, requestId);
_logger.LogWarning("[Sckt {SocketId}] [Req {RequestId}] failed to send, socket no longer open", SocketId, requestId);
return new CallResult(new WebError("Failed to send message, socket no longer open"));
}
@ -775,7 +796,7 @@ namespace CryptoExchange.Net.Sockets
private async Task<CallResult> ProcessReconnectAsync()
{
if (!_socket.IsOpen)
return new CallResult<bool>(new WebError("Socket not connected"));
return new CallResult(new WebError("Socket not connected"));
bool anySubscriptions;
lock (_listenersLock)
@ -785,7 +806,7 @@ namespace CryptoExchange.Net.Sockets
// No need to resubscribe anything
_logger.NothingToResubscribeCloseConnection(SocketId);
_ = _socket.CloseAsync();
return new CallResult<bool>(true);
return new CallResult(null);
}
bool anyAuthenticated;
@ -825,7 +846,7 @@ namespace CryptoExchange.Net.Sockets
for (var i = 0; i < subList.Count; i += ApiClient.ClientOptions.MaxConcurrentResubscriptionsPerSocket)
{
if (!_socket.IsOpen)
return new CallResult<bool>(new WebError("Socket not connected"));
return new CallResult(new WebError("Socket not connected"));
var taskList = new List<Task<CallResult>>();
foreach (var subscription in subList.Skip(i).Take(ApiClient.ClientOptions.MaxConcurrentResubscriptionsPerSocket))
@ -852,10 +873,10 @@ namespace CryptoExchange.Net.Sockets
subscription.Confirmed = true;
if (!_socket.IsOpen)
return new CallResult<bool>(new WebError("Socket not connected"));
return new CallResult(new WebError("Socket not connected"));
_logger.AllSubscriptionResubscribed(SocketId);
return new CallResult<bool>(true);
return new CallResult(null);
}
internal async Task UnsubscribeAsync(Subscription subscription)

View File

@ -5,7 +5,6 @@ using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Sockets
{

View File

@ -3,7 +3,6 @@ using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Sockets;
using Microsoft.Extensions.Logging;
using System;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Sockets
{