1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-06-07 16:06:15 +00:00

Ratelimiting for socket requests

This commit is contained in:
JKorf 2023-08-24 20:51:17 +02:00
parent 468cd5e48e
commit be25a68c9c
15 changed files with 291 additions and 188 deletions

View File

@ -140,6 +140,12 @@ namespace CryptoExchange.Net.UnitTests
var sub2 = new SocketConnection(new TraceLogger(), client.SubClient, socket2, null);
client.SubClient.ConnectSocketSub(sub1);
client.SubClient.ConnectSocketSub(sub2);
var us1 = SocketSubscription.CreateForIdentifier(10, "Test1", true, false, (e) => { });
var us2 = SocketSubscription.CreateForIdentifier(11, "Test2", true, false, (e) => { });
sub1.AddSubscription(us1);
sub2.AddSubscription(us2);
var ups1 = new UpdateSubscription(sub1, us1);
var ups2 = new UpdateSubscription(sub2, us2);
// act
client.UnsubscribeAllAsync().Wait();

View File

@ -182,9 +182,11 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations
return await SendRequestAsync<T>(new Uri("http://www.test.com"), HttpMethod.Get, ct);
}
protected override Error ParseErrorResponse(JToken error)
protected override Error ParseErrorResponse(int httpStatusCode, IEnumerable<KeyValuePair<string, IEnumerable<string>>> responseHeaders, string data)
{
return new ServerError((int)error["errorCode"], (string)error["errorMessage"]);
var errorData = ValidateJson(data);
return new ServerError((int)errorData.Data["errorCode"], (string)errorData.Data["errorMessage"]);
}
public override TimeSpan? GetTimeOffset()

View File

@ -18,6 +18,7 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations
public event Action OnReconnected;
public event Action OnReconnecting;
#pragma warning restore 0067
public event Action<int> OnRequestSent;
public event Action<string> OnMessage;
public event Action<Exception> OnError;
public event Action OnOpen;
@ -69,10 +70,11 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations
return Task.FromResult(CanConnect);
}
public void Send(string data)
public void Send(int requestId, string data, int weight)
{
if(!Connected)
throw new Exception("Socket not connected");
OnRequestSent?.Invoke(requestId);
}
public void Reset()

View File

@ -77,15 +77,6 @@ namespace CryptoExchange.Net
/// </summary>
public bool OutputOriginalData { get; }
/// <summary>
/// The last used id, use NextId() to get the next id and up this
/// </summary>
protected static int _lastId;
/// <summary>
/// Lock for id generating
/// </summary>
protected static object _idLock = new();
/// <summary>
/// A default serializer
/// </summary>
@ -338,19 +329,6 @@ namespace CryptoExchange.Net
return await reader.ReadToEndAsync().ConfigureAwait(false);
}
/// <summary>
/// Generate a new unique id. The id is staticly stored so it is guarenteed to be unique across different client instances
/// </summary>
/// <returns></returns>
protected static int NextId()
{
lock (_idLock)
{
_lastId += 1;
return _lastId;
}
}
/// <summary>
/// Dispose
/// </summary>

View File

@ -15,6 +15,7 @@ using CryptoExchange.Net.Requests;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using static CryptoExchange.Net.Objects.RateLimiter;
namespace CryptoExchange.Net
{
@ -195,7 +196,7 @@ namespace CryptoExchange.Net
Dictionary<string, string>? additionalHeaders = null,
bool ignoreRatelimit = false)
{
var requestId = NextId();
var requestId = ExchangeHelpers.NextId();
if (signed)
{

View File

@ -12,6 +12,7 @@ using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using static CryptoExchange.Net.Objects.RateLimiter;
namespace CryptoExchange.Net
{
@ -76,9 +77,9 @@ namespace CryptoExchange.Net
protected internal bool UnhandledMessageExpected { get; set; }
/// <summary>
/// The max amount of outgoing messages per socket per second
/// The rate limiters
/// </summary>
protected internal int? RateLimitPerSocketPerSecond { get; set; }
protected internal IEnumerable<IRateLimiter>? RateLimiters { get; set; }
/// <inheritdoc />
public double IncomingKbps
@ -130,6 +131,10 @@ namespace CryptoExchange.Net
options,
apiOptions)
{
var rateLimiters = new List<IRateLimiter>();
foreach (var rateLimiter in apiOptions.RateLimiters)
rateLimiters.Add(rateLimiter);
RateLimiters = rateLimiters;
}
/// <summary>
@ -275,7 +280,7 @@ namespace CryptoExchange.Net
protected internal virtual async Task<CallResult<bool>> SubscribeAndWaitAsync(SocketConnection socketConnection, object request, SocketSubscription subscription)
{
CallResult<object>? callResult = null;
await socketConnection.SendAndWaitAsync(request, ClientOptions.RequestTimeout, subscription, data => HandleSubscriptionResponse(socketConnection, subscription, request, data, out callResult)).ConfigureAwait(false);
await socketConnection.SendAndWaitAsync(request, ClientOptions.RequestTimeout, subscription, 1, data => HandleSubscriptionResponse(socketConnection, subscription, request, data, out callResult)).ConfigureAwait(false);
if (callResult?.Success == true)
{
@ -295,10 +300,11 @@ namespace CryptoExchange.Net
/// <typeparam name="T">Expected result type</typeparam>
/// <param name="request">The request to send, will be serialized to json</param>
/// <param name="authenticated">If the query is to an authenticated endpoint</param>
/// <param name="weight">Weight of the request</param>
/// <returns></returns>
protected virtual Task<CallResult<T>> QueryAsync<T>(object request, bool authenticated)
protected virtual Task<CallResult<T>> QueryAsync<T>(object request, bool authenticated, int weight = 1)
{
return QueryAsync<T>(BaseAddress, request, authenticated);
return QueryAsync<T>(BaseAddress, request, authenticated, weight);
}
/// <summary>
@ -308,8 +314,9 @@ namespace CryptoExchange.Net
/// <param name="url">The url for the request</param>
/// <param name="request">The request to send</param>
/// <param name="authenticated">Whether the socket should be authenticated</param>
/// <param name="weight">Weight of the request</param>
/// <returns></returns>
protected virtual async Task<CallResult<T>> QueryAsync<T>(string url, object request, bool authenticated)
protected virtual async Task<CallResult<T>> QueryAsync<T>(string url, object request, bool authenticated, int weight = 1)
{
if (_disposing)
return new CallResult<T>(new InvalidOperationError("Client disposed, can't query"));
@ -348,7 +355,7 @@ namespace CryptoExchange.Net
return new CallResult<T>(new ServerError("Socket is paused"));
}
return await QueryAndWaitAsync<T>(socketConnection, request).ConfigureAwait(false);
return await QueryAndWaitAsync<T>(socketConnection, request, weight).ConfigureAwait(false);
}
/// <summary>
@ -357,11 +364,12 @@ namespace CryptoExchange.Net
/// <typeparam name="T">The expected result type</typeparam>
/// <param name="socket">The connection to send and wait on</param>
/// <param name="request">The request to send</param>
/// <param name="weight">The weight of the query</param>
/// <returns></returns>
protected virtual async Task<CallResult<T>> QueryAndWaitAsync<T>(SocketConnection socket, object request)
protected virtual async Task<CallResult<T>> QueryAndWaitAsync<T>(SocketConnection socket, object request, int weight)
{
var dataResult = new CallResult<T>(new ServerError("No response on query received"));
await socket.SendAndWaitAsync(request, ClientOptions.RequestTimeout, null, data =>
await socket.SendAndWaitAsync(request, ClientOptions.RequestTimeout, null, weight, data =>
{
if (!HandleQueryResponse<T>(socket, request, data, out var callResult))
return false;
@ -518,8 +526,8 @@ namespace CryptoExchange.Net
}
var subscription = request == null
? SocketSubscription.CreateForIdentifier(NextId(), identifier!, userSubscription, authenticated, InternalHandler)
: SocketSubscription.CreateForRequest(NextId(), request, userSubscription, authenticated, InternalHandler);
? SocketSubscription.CreateForIdentifier(ExchangeHelpers.NextId(), identifier!, userSubscription, authenticated, InternalHandler)
: SocketSubscription.CreateForRequest(ExchangeHelpers.NextId(), request, userSubscription, authenticated, InternalHandler);
if (!connection.AddSubscription(subscription))
return null;
return subscription;
@ -533,7 +541,7 @@ namespace CryptoExchange.Net
protected void AddGenericHandler(string identifier, Action<MessageEvent> action)
{
genericHandlers.Add(identifier, action);
var subscription = SocketSubscription.CreateForIdentifier(NextId(), identifier, false, false, action);
var subscription = SocketSubscription.CreateForIdentifier(ExchangeHelpers.NextId(), identifier, false, false, action);
foreach (var connection in socketConnections.Values)
connection.AddSubscription(subscription);
}
@ -607,7 +615,7 @@ namespace CryptoExchange.Net
socketConnection.UnhandledMessage += HandleUnhandledMessage;
foreach (var kvp in genericHandlers)
{
var handler = SocketSubscription.CreateForIdentifier(NextId(), kvp.Key, false, false, kvp.Value);
var handler = SocketSubscription.CreateForIdentifier(ExchangeHelpers.NextId(), kvp.Key, false, false, kvp.Value);
socketConnection.AddSubscription(handler);
}
@ -651,7 +659,7 @@ namespace CryptoExchange.Net
DataInterpreterString = dataInterpreterString,
KeepAliveInterval = KeepAliveInterval,
ReconnectInterval = ClientOptions.ReconnectInterval,
RatelimitPerSecond = RateLimitPerSocketPerSecond,
RateLimiters = RateLimiters,
Proxy = ClientOptions.Proxy,
Timeout = ApiOptions.SocketNoDataTimeout ?? ClientOptions.SocketNoDataTimeout
};
@ -704,7 +712,7 @@ namespace CryptoExchange.Net
try
{
socketConnection.Send(obj);
socketConnection.Send(ExchangeHelpers.NextId(), obj, 1);
}
catch (Exception ex)
{

View File

@ -8,6 +8,15 @@ namespace CryptoExchange.Net
/// </summary>
public static class ExchangeHelpers
{
/// <summary>
/// The last used id, use NextId() to get the next id and up this
/// </summary>
private static int _lastId;
/// <summary>
/// Lock for id generating
/// </summary>
private static object _idLock = new();
/// <summary>
/// Clamp a value between a min and max
/// </summary>
@ -118,5 +127,19 @@ namespace CryptoExchange.Net
{
return value / 1.000000000000000000000000000000000m;
}
/// <summary>
/// Generate a new unique id. The id is staticly stored so it is guarenteed to be unique
/// </summary>
/// <returns></returns>
public static int NextId()
{
lock (_idLock)
{
_lastId += 1;
return _lastId;
}
}
}
}

View File

@ -8,7 +8,7 @@ using System.Threading.Tasks;
namespace CryptoExchange.Net.Interfaces
{
/// <summary>
/// Webscoket connection interface
/// Websocket connection interface
/// </summary>
public interface IWebsocket: IDisposable
{
@ -21,6 +21,10 @@ namespace CryptoExchange.Net.Interfaces
/// </summary>
event Action<string> OnMessage;
/// <summary>
/// Websocket sent event, RequestId as parameter
/// </summary>
event Action<int> OnRequestSent;
/// <summary>
/// Websocket error event
/// </summary>
event Action<Exception> OnError;
@ -69,8 +73,10 @@ namespace CryptoExchange.Net.Interfaces
/// <summary>
/// Send data
/// </summary>
/// <param name="id"></param>
/// <param name="data"></param>
void Send(string data);
/// <param name="weight"></param>
void Send(int id, string data, int weight);
/// <summary>
/// Reconnect the socket
/// </summary>

View File

@ -41,7 +41,7 @@ namespace CryptoExchange.Net.Objects
/// <returns></returns>
public override string ToString()
{
return $"{Code}: {Message} {Data}";
return Code != null ? $"{Code}: {Message} {Data}" : $"{Message} {Data}";
}
}

View File

@ -1,5 +1,7 @@
using CryptoExchange.Net.Authentication;
using CryptoExchange.Net.Interfaces;
using System;
using System.Collections.Generic;
namespace CryptoExchange.Net.Objects.Options
{
@ -8,6 +10,11 @@ namespace CryptoExchange.Net.Objects.Options
/// </summary>
public class SocketApiOptions : ApiOptions
{
/// <summary>
/// List of rate limiters to use
/// </summary>
public List<IRateLimiter> RateLimiters { get; set; } = new List<IRateLimiter>();
/// <summary>
/// The max time of not receiving any data after which the connection is assumed to be dropped. This can only be used for socket connections where a steady flow of data is expected,
/// for example when the server sends intermittent ping requests
@ -30,6 +37,7 @@ namespace CryptoExchange.Net.Objects.Options
{
ApiCredentials = ApiCredentials?.Copy(),
OutputOriginalData = OutputOriginalData,
RateLimiters = RateLimiters,
SocketNoDataTimeout = SocketNoDataTimeout,
MaxSocketConnections = MaxSocketConnections,
};

View File

@ -17,7 +17,7 @@ namespace CryptoExchange.Net.Objects
public class RateLimiter : IRateLimiter
{
private readonly object _limiterLock = new object();
internal List<Limiter> Limiters = new List<Limiter>();
internal List<Limiter> _limiters = new List<Limiter>();
/// <summary>
/// Create a new RateLimiter. Configure the rate limiter by calling <see cref="AddTotalRateLimit"/>,
@ -35,7 +35,7 @@ namespace CryptoExchange.Net.Objects
public RateLimiter AddTotalRateLimit(int limit, TimeSpan perTimePeriod)
{
lock(_limiterLock)
Limiters.Add(new TotalRateLimiter(limit, perTimePeriod, null));
_limiters.Add(new TotalRateLimiter(limit, perTimePeriod, null));
return this;
}
@ -50,7 +50,7 @@ namespace CryptoExchange.Net.Objects
public RateLimiter AddEndpointLimit(string endpoint, int limit, TimeSpan perTimePeriod, HttpMethod? method = null, bool excludeFromOtherRateLimits = false)
{
lock(_limiterLock)
Limiters.Add(new EndpointRateLimiter(new[] { endpoint }, limit, perTimePeriod, method, excludeFromOtherRateLimits));
_limiters.Add(new EndpointRateLimiter(new[] { endpoint }, limit, perTimePeriod, method, excludeFromOtherRateLimits));
return this;
}
@ -65,7 +65,7 @@ namespace CryptoExchange.Net.Objects
public RateLimiter AddEndpointLimit(IEnumerable<string> endpoints, int limit, TimeSpan perTimePeriod, HttpMethod? method = null, bool excludeFromOtherRateLimits = false)
{
lock(_limiterLock)
Limiters.Add(new EndpointRateLimiter(endpoints.ToArray(), limit, perTimePeriod, method, excludeFromOtherRateLimits));
_limiters.Add(new EndpointRateLimiter(endpoints.ToArray(), limit, perTimePeriod, method, excludeFromOtherRateLimits));
return this;
}
@ -81,7 +81,7 @@ namespace CryptoExchange.Net.Objects
public RateLimiter AddPartialEndpointLimit(string endpoint, int limit, TimeSpan perTimePeriod, HttpMethod? method = null, bool countPerEndpoint = false, bool ignoreOtherRateLimits = false)
{
lock(_limiterLock)
Limiters.Add(new PartialEndpointRateLimiter(new[] { endpoint }, limit, perTimePeriod, method, ignoreOtherRateLimits, countPerEndpoint));
_limiters.Add(new PartialEndpointRateLimiter(new[] { endpoint }, limit, perTimePeriod, method, ignoreOtherRateLimits, countPerEndpoint));
return this;
}
@ -95,7 +95,20 @@ namespace CryptoExchange.Net.Objects
public RateLimiter AddApiKeyLimit(int limit, TimeSpan perTimePeriod, bool onlyForSignedRequests, bool excludeFromTotalRateLimit)
{
lock(_limiterLock)
Limiters.Add(new ApiKeyRateLimiter(limit, perTimePeriod, null, onlyForSignedRequests, excludeFromTotalRateLimit));
_limiters.Add(new ApiKeyRateLimiter(limit, perTimePeriod, null, onlyForSignedRequests, excludeFromTotalRateLimit));
return this;
}
/// <summary>
/// Add a rate limit for the amount of messages that can be send per connection
/// </summary>
/// <param name="endpoint">The endpoint that the limit is for</param>
/// <param name="limit">The limit per period. Note that this is weight, not single request, altough by default requests have a weight of 1</param>
/// <param name="perTimePeriod">The time period the limit is for</param>
public RateLimiter AddConnectionRateLimit(string endpoint, int limit, TimeSpan perTimePeriod)
{
lock (_limiterLock)
_limiters.Add(new ConnectionRateLimiter(new[] { endpoint }, limit, perTimePeriod));
return this;
}
@ -106,7 +119,7 @@ namespace CryptoExchange.Net.Objects
EndpointRateLimiter? endpointLimit;
lock (_limiterLock)
endpointLimit = Limiters.OfType<EndpointRateLimiter>().SingleOrDefault(h => h.Endpoints.Contains(endpoint) && (h.Method == null || h.Method == method));
endpointLimit = _limiters.OfType<EndpointRateLimiter>().SingleOrDefault(h => h.Endpoints.Contains(endpoint) && (h.Method == null || h.Method == method));
if(endpointLimit != null)
{
var waitResult = await ProcessTopic(logger, endpointLimit, endpoint, requestWeight, limitBehaviour, ct).ConfigureAwait(false);
@ -121,7 +134,7 @@ namespace CryptoExchange.Net.Objects
List<PartialEndpointRateLimiter> partialEndpointLimits;
lock (_limiterLock)
partialEndpointLimits = Limiters.OfType<PartialEndpointRateLimiter>().Where(h => h.PartialEndpoints.Any(h => endpoint.Contains(h)) && (h.Method == null || h.Method == method)).ToList();
partialEndpointLimits = _limiters.OfType<PartialEndpointRateLimiter>().Where(h => h.PartialEndpoints.Any(h => endpoint.Contains(h)) && (h.Method == null || h.Method == method)).ToList();
foreach (var partialEndpointLimit in partialEndpointLimits)
{
if (partialEndpointLimit.CountPerEndpoint)
@ -129,11 +142,11 @@ namespace CryptoExchange.Net.Objects
SingleTopicRateLimiter? thisEndpointLimit;
lock (_limiterLock)
{
thisEndpointLimit = Limiters.OfType<SingleTopicRateLimiter>().SingleOrDefault(h => h.Type == RateLimitType.PartialEndpoint && (string)h.Topic == endpoint);
thisEndpointLimit = _limiters.OfType<SingleTopicRateLimiter>().SingleOrDefault(h => h.Type == RateLimitType.PartialEndpoint && (string)h.Topic == endpoint);
if (thisEndpointLimit == null)
{
thisEndpointLimit = new SingleTopicRateLimiter(endpoint, partialEndpointLimit);
Limiters.Add(thisEndpointLimit);
_limiters.Add(thisEndpointLimit);
}
}
@ -158,7 +171,7 @@ namespace CryptoExchange.Net.Objects
ApiKeyRateLimiter? apiLimit;
lock (_limiterLock)
apiLimit = Limiters.OfType<ApiKeyRateLimiter>().SingleOrDefault(h => h.Type == RateLimitType.ApiKey);
apiLimit = _limiters.OfType<ApiKeyRateLimiter>().SingleOrDefault(h => h.Type == RateLimitType.ApiKey);
if (apiLimit != null)
{
if(apiKey == null)
@ -177,11 +190,11 @@ namespace CryptoExchange.Net.Objects
SingleTopicRateLimiter? thisApiLimit;
lock (_limiterLock)
{
thisApiLimit = Limiters.OfType<SingleTopicRateLimiter>().SingleOrDefault(h => h.Type == RateLimitType.ApiKey && ((SecureString)h.Topic).IsEqualTo(apiKey));
thisApiLimit = _limiters.OfType<SingleTopicRateLimiter>().SingleOrDefault(h => h.Type == RateLimitType.ApiKey && ((SecureString)h.Topic).IsEqualTo(apiKey));
if (thisApiLimit == null)
{
thisApiLimit = new SingleTopicRateLimiter(apiKey, apiLimit);
Limiters.Add(thisApiLimit);
_limiters.Add(thisApiLimit);
}
}
@ -198,7 +211,7 @@ namespace CryptoExchange.Net.Objects
TotalRateLimiter? totalLimit;
lock (_limiterLock)
totalLimit = Limiters.OfType<TotalRateLimiter>().SingleOrDefault();
totalLimit = _limiters.OfType<TotalRateLimiter>().SingleOrDefault();
if (totalLimit != null)
{
var waitResult = await ProcessTopic(logger, totalLimit, endpoint, requestWeight, limitBehaviour, ct).ConfigureAwait(false);
@ -224,63 +237,68 @@ namespace CryptoExchange.Net.Objects
}
sw.Stop();
int totalWaitTime = 0;
while (true)
try
{
// Remove requests no longer in time period from the history
var checkTime = DateTime.UtcNow;
for (var i = 0; i < historyTopic.Entries.Count; i++)
int totalWaitTime = 0;
while (true)
{
if (historyTopic.Entries[i].Timestamp < checkTime - historyTopic.Period)
// Remove requests no longer in time period from the history
var checkTime = DateTime.UtcNow;
for (var i = 0; i < historyTopic.Entries.Count; i++)
{
historyTopic.Entries.Remove(historyTopic.Entries[i]);
i--;
if (historyTopic.Entries[i].Timestamp < checkTime - historyTopic.Period)
{
historyTopic.Entries.Remove(historyTopic.Entries[i]);
i--;
}
else
break;
}
var currentWeight = !historyTopic.Entries.Any() ? 0 : historyTopic.Entries.Sum(h => h.Weight);
if (currentWeight + requestWeight > historyTopic.Limit)
{
if (currentWeight == 0)
throw new Exception("Request limit reached without any prior request. " +
$"This request can never execute with the current rate limiter. Request weight: {requestWeight}, Ratelimit: {historyTopic.Limit}");
// Wait until the next entry should be removed from the history
var thisWaitTime = (int)Math.Round(((historyTopic.Entries.First().Timestamp + historyTopic.Period) - checkTime).TotalMilliseconds);
if (thisWaitTime > 0)
{
if (limitBehaviour == RateLimitingBehaviour.Fail)
{
var msg = $"Request to {endpoint} failed because of rate limit `{historyTopic.Type}`. Current weight: {currentWeight}/{historyTopic.Limit}, request weight: {requestWeight}";
logger.Log(LogLevel.Warning, msg);
return new CallResult<int>(new ClientRateLimitError(msg) { RetryAfter = DateTime.UtcNow.AddSeconds(thisWaitTime) });
}
logger.Log(LogLevel.Information, $"Message to {endpoint} waiting {thisWaitTime}ms for rate limit `{historyTopic.Type}`. Current weight: {currentWeight}/{historyTopic.Limit}, request weight: {requestWeight}");
try
{
await Task.Delay(thisWaitTime, ct).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
return new CallResult<int>(new CancellationRequestedError());
}
totalWaitTime += thisWaitTime;
}
}
else
break;
}
var currentWeight = !historyTopic.Entries.Any() ? 0: historyTopic.Entries.Sum(h => h.Weight);
if (currentWeight + requestWeight > historyTopic.Limit)
{
if (currentWeight == 0)
throw new Exception("Request limit reached without any prior request. " +
$"This request can never execute with the current rate limiter. Request weight: {requestWeight}, Ratelimit: {historyTopic.Limit}");
// Wait until the next entry should be removed from the history
var thisWaitTime = (int)Math.Round((historyTopic.Entries.First().Timestamp - (checkTime - historyTopic.Period)).TotalMilliseconds);
if (thisWaitTime > 0)
{
if (limitBehaviour == RateLimitingBehaviour.Fail)
{
historyTopic.Semaphore.Release();
var msg = $"Request to {endpoint} failed because of rate limit `{historyTopic.Type}`. Current weight: {currentWeight}/{historyTopic.Limit}, request weight: {requestWeight}";
logger.Log(LogLevel.Warning, msg);
return new CallResult<int>(new ClientRateLimitError(msg) { RetryAfter = DateTime.UtcNow.AddSeconds(thisWaitTime) });
}
logger.Log(LogLevel.Information, $"Request to {endpoint} waiting {thisWaitTime}ms for rate limit `{historyTopic.Type}`. Current weight: {currentWeight}/{historyTopic.Limit}, request weight: {requestWeight}");
try
{
await Task.Delay(thisWaitTime, ct).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
return new CallResult<int>(new CancellationRequestedError());
}
totalWaitTime += thisWaitTime;
break;
}
}
else
{
break;
}
}
var newTime = DateTime.UtcNow;
historyTopic.Entries.Add(new LimitEntry(newTime, requestWeight));
historyTopic.Semaphore.Release();
return new CallResult<int>(totalWaitTime);
var newTime = DateTime.UtcNow;
historyTopic.Entries.Add(new LimitEntry(newTime, requestWeight));
return new CallResult<int>(totalWaitTime);
}
finally
{
historyTopic.Semaphore.Release();
}
}
internal struct LimitEntry
@ -329,6 +347,24 @@ namespace CryptoExchange.Net.Objects
}
}
internal class ConnectionRateLimiter : PartialEndpointRateLimiter
{
public ConnectionRateLimiter(int limit, TimeSpan perPeriod)
: base(new[] { "/" }, limit, perPeriod, null, true, true)
{
}
public ConnectionRateLimiter(string[] endpoints, int limit, TimeSpan perPeriod)
: base(endpoints, limit, perPeriod, null, true, true)
{
}
public override string ToString()
{
return nameof(ConnectionRateLimiter);
}
}
internal class EndpointRateLimiter: Limiter
{
public string[] Endpoints { get; set; }

View File

@ -7,6 +7,7 @@ using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;
@ -30,9 +31,8 @@ namespace CryptoExchange.Net.Sockets
private static readonly object _streamIdLock = new();
private readonly AsyncResetEvent _sendEvent;
private readonly ConcurrentQueue<byte[]> _sendBuffer;
private readonly ConcurrentQueue<SendItem> _sendBuffer;
private readonly SemaphoreSlim _closeSem;
private readonly List<DateTime> _outgoingMessages;
private ClientWebSocket _socket;
private CancellationTokenSource _ctsSource;
@ -103,6 +103,9 @@ namespace CryptoExchange.Net.Sockets
/// <inheritdoc />
public event Action<string>? OnMessage;
/// <inheritdoc />
public event Action<int>? OnRequestSent;
/// <inheritdoc />
public event Action<Exception>? OnError;
@ -128,10 +131,9 @@ namespace CryptoExchange.Net.Sockets
_logger = logger;
Parameters = websocketParameters;
_outgoingMessages = new List<DateTime>();
_receivedMessages = new List<ReceiveItem>();
_sendEvent = new AsyncResetEvent();
_sendBuffer = new ConcurrentQueue<byte[]>();
_sendBuffer = new ConcurrentQueue<SendItem>();
_ctsSource = new CancellationTokenSource();
_receivedMessagesLock = new object();
@ -270,14 +272,14 @@ namespace CryptoExchange.Net.Sockets
}
/// <inheritdoc />
public virtual void Send(string data)
public virtual void Send(int id, string data, int weight)
{
if (_ctsSource.IsCancellationRequested)
return;
var bytes = Parameters.Encoding.GetBytes(data);
_logger.Log(LogLevel.Trace, $"Socket {Id} Adding {bytes.Length} to sent buffer");
_sendBuffer.Enqueue(bytes);
_logger.Log(LogLevel.Trace, $"Socket {Id} - msg {id} - Adding {bytes.Length} to send buffer");
_sendBuffer.Enqueue(new SendItem { Id = id, Weight = weight, Bytes = bytes });
_sendEvent.Set();
}
@ -392,6 +394,7 @@ namespace CryptoExchange.Net.Sockets
{
try
{
var limitKey = Uri.ToString() + "/" + Id.ToString();
while (true)
{
if (_ctsSource.IsCancellationRequested)
@ -404,25 +407,24 @@ namespace CryptoExchange.Net.Sockets
while (_sendBuffer.TryDequeue(out var data))
{
if (Parameters.RatelimitPerSecond != null)
if (Parameters.RateLimiters != null)
{
// Wait for rate limit
DateTime? start = null;
while (MessagesSentLastSecond() >= Parameters.RatelimitPerSecond)
foreach(var ratelimiter in Parameters.RateLimiters)
{
start ??= DateTime.UtcNow;
await Task.Delay(50).ConfigureAwait(false);
var limitResult = await ratelimiter.LimitRequestAsync(_logger, limitKey, HttpMethod.Get, false, null, RateLimitingBehaviour.Wait, data.Weight, _ctsSource.Token).ConfigureAwait(false);
if (limitResult.Success)
{
if (limitResult.Data > 0)
_logger.Log(LogLevel.Debug, $"Socket {Id} - msg {data.Id} - send delayed {limitResult.Data}ms because of rate limit");
}
}
if (start != null)
_logger.Log(LogLevel.Debug, $"Socket {Id} sent delayed {Math.Round((DateTime.UtcNow - start.Value).TotalMilliseconds)}ms because of rate limit");
}
try
{
await _socket.SendAsync(new ArraySegment<byte>(data, 0, data.Length), WebSocketMessageType.Text, true, _ctsSource.Token).ConfigureAwait(false);
_outgoingMessages.Add(DateTime.UtcNow);
_logger.Log(LogLevel.Trace, $"Socket {Id} sent {data.Length} bytes");
await _socket.SendAsync(new ArraySegment<byte>(data.Bytes, 0, data.Bytes.Length), WebSocketMessageType.Text, true, _ctsSource.Token).ConfigureAwait(false);
OnRequestSent?.Invoke(data.Id);
_logger.Log(LogLevel.Trace, $"Socket {Id} - msg {data.Id} - sent {data.Bytes.Length} bytes");
}
catch (OperationCanceledException)
{
@ -630,42 +632,6 @@ namespace CryptoExchange.Net.Sockets
}
}
/// <summary>
/// Trigger the OnMessage event
/// </summary>
/// <param name="data"></param>
protected void TriggerOnMessage(string data)
{
LastActionTime = DateTime.UtcNow;
OnMessage?.Invoke(data);
}
/// <summary>
/// Trigger the OnError event
/// </summary>
/// <param name="ex"></param>
protected void TriggerOnError(Exception ex) => OnError?.Invoke(ex);
/// <summary>
/// Trigger the OnError event
/// </summary>
protected void TriggerOnOpen() => OnOpen?.Invoke();
/// <summary>
/// Trigger the OnError event
/// </summary>
protected void TriggerOnClose() => OnClose?.Invoke();
/// <summary>
/// Trigger the OnReconnecting event
/// </summary>
protected void TriggerOnReconnecting() => OnReconnecting?.Invoke();
/// <summary>
/// Trigger the OnReconnected event
/// </summary>
protected void TriggerOnReconnected() => OnReconnected?.Invoke();
/// <summary>
/// Checks if there is no data received for a period longer than the specified timeout
/// </summary>
@ -721,13 +687,6 @@ namespace CryptoExchange.Net.Sockets
}
}
private int MessagesSentLastSecond()
{
var testTime = DateTime.UtcNow;
_outgoingMessages.RemoveAll(r => testTime - r > TimeSpan.FromSeconds(1));
return _outgoingMessages.Count;
}
/// <summary>
/// Update the received messages list, removing messages received longer than 3s ago
/// </summary>
@ -769,6 +728,32 @@ namespace CryptoExchange.Net.Sockets
}
}
/// <summary>
/// Message info
/// </summary>
public struct SendItem
{
/// <summary>
/// The request id
/// </summary>
public int Id { get; set; }
/// <summary>
/// The request id
/// </summary>
public int Weight { get; set; }
/// <summary>
/// Timestamp the request was sent
/// </summary>
public DateTime SendTime { get; set; }
/// <summary>
/// The bytes to send
/// </summary>
public byte[] Bytes { get; set; }
}
/// <summary>
/// Received message info
/// </summary>

View File

@ -7,6 +7,7 @@ namespace CryptoExchange.Net.Sockets
{
internal class PendingRequest
{
public int Id { get; set; }
public Func<JToken, bool> Handler { get; }
public JToken? Result { get; private set; }
public bool Completed { get; private set; }
@ -15,17 +16,22 @@ namespace CryptoExchange.Net.Sockets
public TimeSpan Timeout { get; }
public SocketSubscription? Subscription { get; }
private CancellationTokenSource _cts;
private CancellationTokenSource? _cts;
public PendingRequest(Func<JToken, bool> handler, TimeSpan timeout, SocketSubscription? subscription)
public PendingRequest(int id, Func<JToken, bool> handler, TimeSpan timeout, SocketSubscription? subscription)
{
Id = id;
Handler = handler;
Event = new AsyncResetEvent(false, false);
Timeout = timeout;
RequestTimestamp = DateTime.UtcNow;
Subscription = subscription;
}
_cts = new CancellationTokenSource(timeout);
public void IsSend()
{
// Start timeout countdown
_cts = new CancellationTokenSource(Timeout);
_cts.Token.Register(Fail, false);
}

View File

@ -182,6 +182,7 @@ namespace CryptoExchange.Net.Sockets
_socket = socket;
_socket.OnMessage += HandleMessage;
_socket.OnRequestSent += HandleRequestSent;
_socket.OnOpen += HandleOpen;
_socket.OnClose += HandleClose;
_socket.OnReconnecting += HandleReconnecting;
@ -284,6 +285,22 @@ namespace CryptoExchange.Net.Sockets
_logger.Log(LogLevel.Warning, $"Socket {SocketId} error: " + e.ToLogString());
}
/// <summary>
/// Handler for whenever a request is sent over the websocket
/// </summary>
/// <param name="requestId">Id of the request sent</param>
protected virtual void HandleRequestSent(int requestId)
{
var pendingRequest = _pendingRequests.SingleOrDefault(p => p.Id == requestId);
if (pendingRequest == null)
{
_logger.Log(LogLevel.Debug, $"Socket {SocketId} - msg {requestId} - message sent, but not pending");
return;
}
pendingRequest.IsSend();
}
/// <summary>
/// Process a message received by the socket
/// </summary>
@ -318,7 +335,6 @@ namespace CryptoExchange.Net.Sockets
// Check if this message is an answer on any pending requests
foreach (var pendingRequest in requests)
{
if (pendingRequest.CheckData(tokenData))
{
lock (_pendingRequests)
@ -329,12 +345,13 @@ namespace CryptoExchange.Net.Sockets
// Answer to a timed out request, unsub if it is a subscription request
if (pendingRequest.Subscription != null)
{
_logger.Log(LogLevel.Warning, "Received subscription info after request timed out; unsubscribing. Consider increasing the SocketResponseTimout");
_logger.Log(LogLevel.Warning, "Received subscription info after request timed out; unsubscribing. Consider increasing the RequestTimeout");
_ = ApiClient.UnsubscribeAsync(this, pendingRequest.Subscription).ConfigureAwait(false);
}
}
else
{
_logger.Log(LogLevel.Trace, $"Socket {SocketId} - msg {pendingRequest.Id} - received data matched to pending request");
pendingRequest.Succeed(tokenData);
}
@ -570,45 +587,69 @@ namespace CryptoExchange.Net.Sockets
/// <param name="timeout">The timeout for response</param>
/// <param name="subscription">Subscription if this is a subscribe request</param>
/// <param name="handler">The response handler, should return true if the received JToken was the response to the request</param>
/// <param name="weight">The weight of the message</param>
/// <returns></returns>
public virtual Task SendAndWaitAsync<T>(T obj, TimeSpan timeout, SocketSubscription? subscription, Func<JToken, bool> handler)
public virtual async Task SendAndWaitAsync<T>(T obj, TimeSpan timeout, SocketSubscription? subscription, int weight, Func<JToken, bool> handler)
{
var pending = new PendingRequest(handler, timeout, subscription);
var pending = new PendingRequest(ExchangeHelpers.NextId(), handler, timeout, subscription);
lock (_pendingRequests)
{
_pendingRequests.Add(pending);
}
var sendOk = Send(obj);
if(!sendOk)
pending.Fail();
return pending.Event.WaitAsync(timeout);
var sendOk = Send(pending.Id, obj, weight);
if (!sendOk)
{
pending.Fail();
return;
}
while (true)
{
if(!_socket.IsOpen)
{
pending.Fail();
return;
}
if (pending.Completed)
return;
await pending.Event.WaitAsync(TimeSpan.FromMilliseconds(500)).ConfigureAwait(false);
if (pending.Completed)
return;
}
}
/// <summary>
/// Send data over the websocket connection
/// </summary>
/// <typeparam name="T">The type of the object to send</typeparam>
/// <param name="requestId">The request id</param>
/// <param name="obj">The object to send</param>
/// <param name="nullValueHandling">How null values should be serialized</param>
public virtual bool Send<T>(T obj, NullValueHandling nullValueHandling = NullValueHandling.Ignore)
/// <param name="weight">The weight of the message</param>
public virtual bool Send<T>(int requestId, T obj, int weight, NullValueHandling nullValueHandling = NullValueHandling.Ignore)
{
if(obj is string str)
return Send(str);
return Send(requestId, str, weight);
else
return Send(JsonConvert.SerializeObject(obj, Formatting.None, new JsonSerializerSettings { NullValueHandling = nullValueHandling }));
return Send(requestId, JsonConvert.SerializeObject(obj, Formatting.None, new JsonSerializerSettings { NullValueHandling = nullValueHandling }), weight);
}
/// <summary>
/// Send string data over the websocket connection
/// </summary>
/// <param name="data">The data to send</param>
public virtual bool Send(string data)
/// <param name="weight">The weight of the message</param>
/// <param name="requestId">The id of the request</param>
public virtual bool Send(int requestId, string data, int weight)
{
_logger.Log(LogLevel.Trace, $"Socket {SocketId} sending data: {data}");
_logger.Log(LogLevel.Trace, $"Socket {SocketId} - msg {requestId} - sending messsage: {data}");
try
{
_socket.Send(data);
_socket.Send(requestId, data, weight);
return true;
}
catch(Exception)

View File

@ -1,4 +1,5 @@
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Objects;
using System;
using System.Collections.Generic;
using System.Text;
@ -52,9 +53,9 @@ namespace CryptoExchange.Net.Sockets
public TimeSpan? KeepAliveInterval { get; set; }
/// <summary>
/// The max amount of messages to send per second
/// The rate limiters for the socket connection
/// </summary>
public int? RatelimitPerSecond { get; set; }
public IEnumerable<IRateLimiter>? RateLimiters { get; set; }
/// <summary>
/// Origin header value to send in the connection handshake