1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-12-14 01:33:26 +00:00
This commit is contained in:
JKorf 2025-11-15 17:46:22 +01:00 committed by Jkorf
parent c945176049
commit 68f772a13a
23 changed files with 256 additions and 154 deletions

View File

@ -1,17 +1,19 @@
using System;
using System.Collections.Generic;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Sockets;
using CryptoExchange.Net.Sockets;
using CryptoExchange.Net.Testing.Implementations;
using CryptoExchange.Net.UnitTests.TestImplementations;
using CryptoExchange.Net.UnitTests.TestImplementations.Sockets;
using Microsoft.Extensions.Logging;
using Moq;
using NUnit.Framework;
using NUnit.Framework.Legacy;
using System;
using System.Collections.Generic;
using System.Net.Sockets;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
namespace CryptoExchange.Net.UnitTests
{
@ -44,7 +46,8 @@ namespace CryptoExchange.Net.UnitTests
socket.CanConnect = canConnect;
//act
var connectResult = client.SubClient.ConnectSocketSub(new SocketConnection(new TraceLogger(), client.SubClient, socket, null));
var connectResult = client.SubClient.ConnectSocketSub(
new SocketConnection(new TraceLogger(), new TestWebsocketFactory(socket), new WebSocketParameters(new Uri("https://localhost/"), ReconnectPolicy.Disabled), client.SubClient, ""));
//assert
Assert.That(connectResult.Success == canConnect);
@ -59,7 +62,7 @@ namespace CryptoExchange.Net.UnitTests
});
var socket = client.CreateSocket();
socket.CanConnect = true;
var sub = new SocketConnection(new TraceLogger(), client.SubClient, socket, null);
var sub = new SocketConnection(new TraceLogger(), new TestWebsocketFactory(socket), new WebSocketParameters(new Uri("https://localhost/"), ReconnectPolicy.Disabled), client.SubClient, "");
var rstEvent = new ManualResetEvent(false);
Dictionary<string, string> result = null;
@ -92,7 +95,7 @@ namespace CryptoExchange.Net.UnitTests
});
var socket = client.CreateSocket();
socket.CanConnect = true;
var sub = new SocketConnection(new TraceLogger(), client.SubClient, socket, null);
var sub = new SocketConnection(new TraceLogger(), new TestWebsocketFactory(socket), new WebSocketParameters(new Uri("https://localhost/"), ReconnectPolicy.Disabled), client.SubClient, "");
var rstEvent = new ManualResetEvent(false);
string original = null;
@ -123,7 +126,7 @@ namespace CryptoExchange.Net.UnitTests
});
var socket = client.CreateSocket();
socket.CanConnect = true;
var sub = new SocketConnection(new TraceLogger(), client.SubClient, socket, null);
var sub = new SocketConnection(new TraceLogger(), new TestWebsocketFactory(socket), new WebSocketParameters(new Uri("https://localhost/"), ReconnectPolicy.Disabled), client.SubClient, "");
client.SubClient.ConnectSocketSub(sub);
var subscription = new TestSubscription<Dictionary<string, string>>(Mock.Of<ILogger>(), (messageEvent) => { });
@ -146,8 +149,8 @@ namespace CryptoExchange.Net.UnitTests
var socket2 = client.CreateSocket();
socket1.CanConnect = true;
socket2.CanConnect = true;
var sub1 = new SocketConnection(new TraceLogger(), client.SubClient, socket1, null);
var sub2 = new SocketConnection(new TraceLogger(), client.SubClient, socket2, null);
var sub1 = new SocketConnection(new TraceLogger(), new TestWebsocketFactory(socket1), new WebSocketParameters(new Uri("https://localhost/"), ReconnectPolicy.Disabled), client.SubClient, "");
var sub2 = new SocketConnection(new TraceLogger(), new TestWebsocketFactory(socket2), new WebSocketParameters(new Uri("https://localhost/"), ReconnectPolicy.Disabled), client.SubClient, "");
client.SubClient.ConnectSocketSub(sub1);
client.SubClient.ConnectSocketSub(sub2);
var subscription1 = new TestSubscription<Dictionary<string, string>>(Mock.Of<ILogger>(), (messageEvent) => { });
@ -173,7 +176,7 @@ namespace CryptoExchange.Net.UnitTests
var client = new TestSocketClient(options => { options.ReconnectInterval = TimeSpan.Zero; });
var socket = client.CreateSocket();
socket.CanConnect = false;
var sub1 = new SocketConnection(new TraceLogger(), client.SubClient, socket, null);
var sub1 = new SocketConnection(new TraceLogger(), new TestWebsocketFactory(socket), new WebSocketParameters(new Uri("https://localhost/"), ReconnectPolicy.Disabled), client.SubClient, "");
// act
var connectResult = client.SubClient.ConnectSocketSub(sub1);
@ -194,7 +197,7 @@ namespace CryptoExchange.Net.UnitTests
});
var socket = client.CreateSocket();
socket.CanConnect = true;
client.SubClient.ConnectSocketSub(new SocketConnection(new TraceLogger(), client.SubClient, socket, "https://test.test"));
client.SubClient.ConnectSocketSub(new SocketConnection(new TraceLogger(), new TestWebsocketFactory(socket), new WebSocketParameters(new Uri("https://localhost/"), ReconnectPolicy.Disabled), client.SubClient, ""));
// act
var sub = client.SubClient.SubscribeToSomethingAsync(channel, onUpdate => {}, ct: default);
@ -217,7 +220,7 @@ namespace CryptoExchange.Net.UnitTests
});
var socket = client.CreateSocket();
socket.CanConnect = true;
client.SubClient.ConnectSocketSub(new SocketConnection(new TraceLogger(), client.SubClient, socket, "https://test.test"));
client.SubClient.ConnectSocketSub(new SocketConnection(new TraceLogger(), new TestWebsocketFactory(socket), new WebSocketParameters(new Uri("https://localhost/"), ReconnectPolicy.Disabled), client.SubClient, ""));
// act
var sub = client.SubClient.SubscribeToSomethingAsync(channel, onUpdate => {}, ct: default);

View File

@ -18,6 +18,7 @@ using CryptoExchange.Net.SharedApis;
using Microsoft.Extensions.Options;
using CryptoExchange.Net.Converters.SystemTextJson;
using System.Net.WebSockets;
using System.Text.Json;
namespace CryptoExchange.Net.UnitTests.TestImplementations
{
@ -94,6 +95,8 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations
public Subscription TestSubscription { get; private set; } = null;
public override JsonSerializerOptions JsonSerializerOptions => new JsonSerializerOptions();
public TestSubSocketClient(TestSocketOptions options, SocketApiOptions apiOptions) : base(new TraceLogger(), options.Environment.TestAddress, options, apiOptions)
{

View File

@ -1,13 +1,18 @@
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
namespace CryptoExchange.Net.Caching
{
internal class MemoryCache
{
private readonly ConcurrentDictionary<string, CacheItem> _cache = new ConcurrentDictionary<string, CacheItem>();
#if NET9_0_OR_GREATER
private readonly Lock _lock = new Lock();
#else
private readonly object _lock = new object();
#endif
/// <summary>
/// Add a new cache entry. Will override an existing entry if it already exists

View File

@ -4,6 +4,7 @@ using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using System;
using System.Collections.Generic;
using System.Threading;
namespace CryptoExchange.Net.Clients
{
@ -49,7 +50,11 @@ namespace CryptoExchange.Net.Clients
/// </summary>
protected internal ILogger _logger;
#if NET9_0_OR_GREATER
private readonly Lock _versionLock = new Lock();
#else
private readonly object _versionLock = new object();
#endif
private Version _exchangeVersion;
/// <summary>

View File

@ -20,7 +20,7 @@
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
<PackageReleaseNotes>https://github.com/JKorf/CryptoExchange.Net?tab=readme-ov-file#release-notes</PackageReleaseNotes>
<Nullable>enable</Nullable>
<LangVersion>12.0</LangVersion>
<LangVersion>latest</LangVersion>
<PackageLicenseExpression>MIT</PackageLicenseExpression>
</PropertyGroup>
<ItemGroup>

View File

@ -381,13 +381,14 @@ namespace CryptoExchange.Net
}
/// <summary>
/// Queue updates received from a websocket subscriptions and process them async
/// Queue updates and process them async
/// </summary>
/// <typeparam name="T">The queued update type</typeparam>
/// <param name="subscribeCall">The subscribe call</param>
/// <param name="asyncHandler">The async update handler</param>
/// <param name="maxQueuedItems">The max number of updates to be queued up. When happens when the queue is full and a new write is attempted can be specified with <see>fullMode</see></param>
/// <param name="fullBehavior">What should happen if the queue contains <see>maxQueuedItems</see> pending updates. If no max is set this setting is ignored</param>
/// <param name="ct">Cancellation token to stop the processing</param>
public static async Task ProcessQueuedAsync<T>(
Func<Action<T>, Task> subscribeCall,
Func<T, Task> asyncHandler,

View File

@ -1,8 +1,6 @@
using CryptoExchange.Net.Objects.Sockets;
using Microsoft.Extensions.Logging;
using System;
using System.IO.Pipelines;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Interfaces
{
@ -19,6 +17,9 @@ namespace CryptoExchange.Net.Interfaces
/// <returns></returns>
IWebsocket CreateWebsocket(ILogger logger, WebSocketParameters parameters);
/// <summary>
/// Create high performance websocket
/// </summary>
IHighPerfWebsocket CreateHighPerfWebsocket(ILogger logger, WebSocketParameters parameters, PipeWriter pipeWriter);
}
}

View File

@ -158,6 +158,9 @@ namespace CryptoExchange.Net
#endif
}
/// <summary>
/// Waits for all of the ValueTasks to complete
/// </summary>
public static async ValueTask WhenAll(IReadOnlyList<ValueTask> tasks)
{
if (tasks.Count == 0)
@ -184,6 +187,9 @@ namespace CryptoExchange.Net
await Task.WhenAll(toAwait!).ConfigureAwait(false);
}
/// <summary>
/// Waits for all of the ValueTasks to complete
/// </summary>
public static ValueTask WhenAll(IEnumerable<ValueTask> tasks)
{
return WhenAll(tasks.ToList());

View File

@ -14,6 +14,11 @@ namespace CryptoExchange.Net.Objects
{
private static readonly Task<bool> _completed = Task.FromResult(true);
private Queue<TaskCompletionSource<bool>> _waits = new Queue<TaskCompletionSource<bool>>();
#if NET9_0_OR_GREATER
private readonly Lock _waitsLock = new Lock();
#else
private readonly object _waitsLock = new object();
#endif
private bool _signaled;
private readonly bool _reset;
@ -38,7 +43,7 @@ namespace CryptoExchange.Net.Objects
try
{
Task<bool> waiter = _completed;
lock (_waits)
lock (_waitsLock)
{
if (_signaled)
{
@ -57,7 +62,7 @@ namespace CryptoExchange.Net.Objects
registration = ct.Register(() =>
{
lock (_waits)
lock (_waitsLock)
{
tcs.TrySetResult(false);
@ -85,7 +90,7 @@ namespace CryptoExchange.Net.Objects
/// </summary>
public void Set()
{
lock (_waits)
lock (_waitsLock)
{
if (!_reset)
{
@ -106,7 +111,9 @@ namespace CryptoExchange.Net.Objects
toRelease.TrySetResult(true);
}
else if (!_signaled)
{
_signaled = true;
}
}
}
}

View File

@ -2,6 +2,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Objects.Sockets
@ -14,14 +15,14 @@ namespace CryptoExchange.Net.Objects.Sockets
private readonly HighPerfSocketConnection _connection;
internal readonly HighPerfSubscription _subscription;
private object _eventLock = new object();
private bool _connectionEventsSubscribed = true;
private List<Action> _connectionClosedEventHandlers = new List<Action>();
#if NET9_0_OR_GREATER
private readonly Lock _eventLock = new Lock();
#else
private readonly object _eventLock = new object();
#endif
/// <summary>
/// Event when the status of the subscription changes
/// </summary>
public event Action<SubscriptionStatus>? SubscriptionStatusChanged;
private bool _connectionEventsSubscribed = true;
private readonly List<Action> _connectionClosedEventHandlers = new List<Action>();
/// <summary>
/// Event when the connection is closed and will not be reconnected

View File

@ -2,6 +2,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Objects.Sockets
@ -14,7 +15,12 @@ namespace CryptoExchange.Net.Objects.Sockets
private readonly SocketConnection _connection;
internal readonly Subscription _subscription;
private object _eventLock = new object();
#if NET9_0_OR_GREATER
private readonly Lock _eventLock = new Lock();
#else
private readonly object _eventLock = new object();
#endif
private bool _connectionEventsSubscribed = true;
private List<Action> _connectionClosedEventHandlers = new List<Action>();
private List<Action> _connectionLostEventHandlers = new List<Action>();

View File

@ -75,8 +75,6 @@ namespace CryptoExchange.Net.Objects.Sockets
/// </summary>
public int? ReceiveBufferSize { get; set; } = null;
public PipeWriter? PipeWriter { get; set; } = null;
/// <summary>
/// ctor
/// </summary>

View File

@ -22,7 +22,11 @@ namespace CryptoExchange.Net.OrderBook
/// </summary>
public abstract class SymbolOrderBook : ISymbolOrderBook, IDisposable
{
#if NET9_0_OR_GREATER
private readonly Lock _bookLock = new Lock();
#else
private readonly object _bookLock = new object();
#endif
private OrderBookStatus _status;
private UpdateSubscription? _subscription;

View File

@ -33,7 +33,11 @@ namespace CryptoExchange.Net.Sockets
}
internal static int _lastStreamId;
private static readonly object _streamIdLock = new();
#if NET9_0_OR_GREATER
private static readonly Lock _streamIdLock = new Lock();
#else
private static readonly object _streamIdLock = new object();
#endif
private static readonly ArrayPool<byte> _receiveBufferPool = ArrayPool<byte>.Shared;
private readonly AsyncResetEvent _sendEvent;
@ -64,7 +68,11 @@ namespace CryptoExchange.Net.Sockets
/// <summary>
/// Received messages lock
/// </summary>
protected readonly object _receivedMessagesLock;
#if NET9_0_OR_GREATER
private readonly Lock _receivedMessagesLock = new Lock();
#else
private readonly object _receivedMessagesLock = new object();
#endif
/// <summary>
/// Log
@ -152,7 +160,6 @@ namespace CryptoExchange.Net.Sockets
_sendEvent = new AsyncResetEvent();
_sendBuffer = new ConcurrentQueue<SendItem>();
_ctsSource = new CancellationTokenSource();
_receivedMessagesLock = new object();
_receiveBufferSize = websocketParameters.ReceiveBufferSize ?? _defaultReceiveBufferSize;
_closeSem = new SemaphoreSlim(1, 1);
@ -460,11 +467,11 @@ namespace CryptoExchange.Net.Sockets
{
if (_socket.State == WebSocketState.CloseReceived)
{
await _socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing", default).ConfigureAwait(false);
await _socket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Closing", default).ConfigureAwait(false);
}
else if (_socket.State == WebSocketState.Open)
{
await _socket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Closing", default).ConfigureAwait(false);
await _socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing", default).ConfigureAwait(false);
var startWait = DateTime.UtcNow;
while (_socket.State != WebSocketState.Closed && _socket.State != WebSocketState.Aborted)
{
@ -482,7 +489,8 @@ namespace CryptoExchange.Net.Sockets
// So socket might go to aborted state, might still be open
}
_ctsSource.Cancel();
if (!_disposed)
_ctsSource.Cancel();
}
/// <summary>

View File

@ -1,5 +1,4 @@
using CryptoExchange.Net.Objects;
using System;
using System;
namespace CryptoExchange.Net.Sockets.HighPerf
{
@ -17,7 +16,7 @@ namespace CryptoExchange.Net.Sockets.HighPerf
/// </summary>
public TimeSpan Interval { get; set; }
/// <summary>
/// Delegate for getting the query
/// Delegate for getting the request which should be send
/// </summary>
public Func<HighPerfSocketConnection, object> GetRequestDelegate { get; set; } = null!;
}

View File

@ -7,7 +7,6 @@ using Microsoft.Extensions.Logging;
using CryptoExchange.Net.Objects;
using System.Net.WebSockets;
using CryptoExchange.Net.Objects.Sockets;
using System.Diagnostics;
using CryptoExchange.Net.Clients;
using CryptoExchange.Net.Logging.Extensions;
using System.Threading;
@ -32,12 +31,12 @@ namespace CryptoExchange.Net.Sockets
/// <summary>
/// The amount of subscriptions on this connection
/// </summary>
public int UserSubscriptionCount => _subscriptions.Length;
public int UserSubscriptionCount => Subscriptions.Length;
/// <summary>
/// Get a copy of the current message subscriptions
/// </summary>
public HighPerfSubscription[] Subscriptions => _subscriptions;
public abstract HighPerfSubscription[] Subscriptions { get; }
/// <summary>
/// If connection is made
@ -86,16 +85,27 @@ namespace CryptoExchange.Net.Sockets
}
}
private readonly ILogger _logger;
private SocketStatus _status;
/// <summary>
/// Logger
/// </summary>
protected readonly ILogger _logger;
private readonly IMessageSerializer _serializer;
protected readonly JsonSerializerOptions _serializerOptions;
protected readonly Pipe _pipe;
private readonly CancellationTokenSource _cts = new CancellationTokenSource();
private SocketStatus _status;
private Task? _processTask;
private CancellationTokenSource _cts = new CancellationTokenSource();
protected abstract HighPerfSubscription[] _subscriptions { get; }
/// <summary>
/// Serializer options
/// </summary>
protected readonly JsonSerializerOptions _serializerOptions;
/// <summary>
/// The pipe the websocket will write to
/// </summary>
protected readonly Pipe _pipe;
/// <summary>
/// Update type
/// </summary>
public abstract Type UpdateType { get; }
/// <summary>
@ -119,10 +129,7 @@ namespace CryptoExchange.Net.Sockets
public HighPerfSocketConnection(ILogger logger, IWebsocketFactory socketFactory, WebSocketParameters parameters, SocketApiClient apiClient, JsonSerializerOptions serializerOptions, string tag)
{
_logger = logger;
_pipe = new Pipe(new PipeOptions
{
//ReaderScheduler
});
_pipe = new Pipe();
_serializerOptions = serializerOptions;
ApiClient = apiClient;
Tag = tag;
@ -139,6 +146,9 @@ namespace CryptoExchange.Net.Sockets
_serializer = apiClient.CreateSerializer();
}
/// <summary>
/// Process message from the pipe
/// </summary>
protected abstract Task ProcessAsync(CancellationToken ct);
/// <summary>
@ -156,7 +166,7 @@ namespace CryptoExchange.Net.Sockets
protected virtual async Task HandleCloseAsync()
{
Status = SocketStatus.Closed;
_cts.Cancel();
_cts.CancelAfter(TimeSpan.FromSeconds(1)); // Cancel after 1 second to make sure we process pending messages from the pipe
if (ApiClient.highPerfSocketConnections.ContainsKey(SocketId))
ApiClient.highPerfSocketConnections.TryRemove(SocketId, out _);
@ -193,12 +203,6 @@ namespace CryptoExchange.Net.Sockets
return result;
}
/// <summary>
/// Retrieve the underlying socket
/// </summary>
/// <returns></returns>
public IHighPerfWebsocket GetSocket() => _socket;
/// <summary>
/// Close the connection
/// </summary>
@ -211,7 +215,7 @@ namespace CryptoExchange.Net.Sockets
if (ApiClient.socketConnections.ContainsKey(SocketId))
ApiClient.socketConnections.TryRemove(SocketId, out _);
foreach (var subscription in _subscriptions)
foreach (var subscription in Subscriptions)
{
if (subscription.CancellationTokenRegistration.HasValue)
subscription.CancellationTokenRegistration.Value.Dispose();
@ -235,7 +239,7 @@ namespace CryptoExchange.Net.Sockets
if (subscription.CancellationTokenRegistration.HasValue)
subscription.CancellationTokenRegistration.Value.Dispose();
var anyOtherSubscriptions = _subscriptions.Any(x => x != subscription);
var anyOtherSubscriptions = Subscriptions.Any(x => x != subscription);
if (anyOtherSubscriptions)
await UnsubscribeAsync(subscription).ConfigureAwait(false);
@ -413,11 +417,19 @@ namespace CryptoExchange.Net.Sockets
}
}
/// <inheritdoc />
public class HighPerfSocketConnection<T> : HighPerfSocketConnection
{
#if NET9_0_OR_GREATER
private readonly Lock _listenersLock = new Lock();
#else
private readonly object _listenersLock = new object();
private List<HighPerfSubscription<T>> _typedSubscriptions;
protected override HighPerfSubscription[] _subscriptions
#endif
private readonly List<HighPerfSubscription<T>> _typedSubscriptions;
/// <inheritdoc />
public override HighPerfSubscription[] Subscriptions
{
get
{
@ -426,42 +438,65 @@ namespace CryptoExchange.Net.Sockets
}
}
/// <inheritdoc />
public override Type UpdateType => typeof(T);
/// <summary>
/// ctor
/// </summary>
public HighPerfSocketConnection(ILogger logger, IWebsocketFactory socketFactory, WebSocketParameters parameters, SocketApiClient apiClient, JsonSerializerOptions serializerOptions, string tag) : base(logger, socketFactory, parameters, apiClient, serializerOptions, tag)
{
_typedSubscriptions = new List<HighPerfSubscription<T>>();
}
/// <summary>
/// Add a subscription to this connection
/// Add a new subscription
/// </summary>
/// <param name="subscription"></param>
public bool AddSubscription(HighPerfSubscription<T> subscription)
{
if (Status != SocketStatus.None && Status != SocketStatus.Connected)
return false;
//lock (_listenersLock)
_typedSubscriptions.Add(subscription);
//_logger.AddingNewSubscription(SocketId, subscription.Id, UserSubscriptionCount);
_logger.AddingNewSubscription(SocketId, subscription.Id, UserSubscriptionCount);
return true;
}
/// <summary>
/// Remove a subscription
/// </summary>
/// <param name="subscription"></param>
public void RemoveSubscription(HighPerfSubscription<T> subscription)
{
lock (_listenersLock)
_typedSubscriptions.Remove(subscription);
}
/// <inheritdoc />
protected override async Task ProcessAsync(CancellationToken ct)
{
try
{
#pragma warning disable IL2026 // Members annotated with 'RequiresUnreferencedCodeAttribute' require dynamic access otherwise can break functionality when trimming application code
#pragma warning disable IL3050 // Calling members annotated with 'RequiresDynamicCodeAttribute' may break functionality when AOT compiling.
await foreach (var update in JsonSerializer.DeserializeAsyncEnumerable<T>(_pipe.Reader, true, _serializerOptions, ct).ConfigureAwait(false))
#pragma warning restore IL3050 // Calling members annotated with 'RequiresDynamicCodeAttribute' may break functionality when AOT compiling.
#pragma warning restore IL2026 // Members annotated with 'RequiresUnreferencedCodeAttribute' require dynamic access otherwise can break functionality when trimming application code
{
var tasks = _typedSubscriptions.Select(sub => sub.HandleAsync(update!));
var tasks = _typedSubscriptions.Select(sub =>
{
try
{
return sub.HandleAsync(update!);
}
catch (Exception ex)
{
sub.InvokeExceptionHandler(ex);
_logger.UserMessageProcessingFailed(SocketId, ex.Message, ex);
return new ValueTask();
}
});
await LibraryHelpers.WhenAll(tasks).ConfigureAwait(false);
}
}

View File

@ -1,5 +1,4 @@
using Microsoft.Extensions.Logging;
using System;
using System;
using System.Threading;
using System.Threading.Tasks;
@ -20,11 +19,6 @@ namespace CryptoExchange.Net.Sockets
/// </summary>
public int TotalInvocations { get; set; }
/// <summary>
/// Logger
/// </summary>
protected readonly ILogger _logger;
/// <summary>
/// Cancellation token registration
/// </summary>
@ -48,9 +42,8 @@ namespace CryptoExchange.Net.Sockets
/// <summary>
/// ctor
/// </summary>
public HighPerfSubscription(ILogger logger)
public HighPerfSubscription()
{
_logger = logger;
Id = ExchangeHelpers.NextId();
}
@ -94,7 +87,6 @@ namespace CryptoExchange.Net.Sockets
{
Exception?.Invoke(e);
}
}
/// <inheritdoc />
@ -105,13 +97,17 @@ namespace CryptoExchange.Net.Sockets
/// <summary>
/// ctor
/// </summary>
protected HighPerfSubscription(ILogger logger, Func<TUpdateType, ValueTask> handler) : base(logger)
protected HighPerfSubscription(Func<TUpdateType, ValueTask> handler) : base()
{
_handler = handler;
}
/// <summary>
/// Handle an update
/// </summary>
public ValueTask HandleAsync(TUpdateType update)
{
TotalInvocations++;
return _handler.Invoke(update);
}
}

View File

@ -6,20 +6,16 @@ using CryptoExchange.Net.Objects.Sockets;
using Microsoft.Extensions.Logging;
using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO.Pipelines;
using System.Net;
using System.Net.Http;
using System.Net.WebSockets;
using System.Text;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Sockets
{
/// <summary>
/// A wrapper around the ClientWebSocket
/// A high performance websocket client implementation
/// </summary>
public class HighPerfWebSocketClient : IHighPerfWebsocket
{
@ -43,8 +39,6 @@ namespace CryptoExchange.Net.Sockets
private const int _defaultReceiveBufferSize = 4096;
private const int _sendBufferSize = 4096;
private byte[] _commaBytes = new byte[] { 44 };
/// <summary>
/// Log
/// </summary>
@ -90,12 +84,6 @@ namespace CryptoExchange.Net.Sockets
_closeSem = new SemaphoreSlim(1, 1);
}
/// <inheritdoc />
public void UpdateProxy(ApiProxy? proxy)
{
Parameters.Proxy = proxy;
}
/// <inheritdoc />
public virtual async Task<CallResult> ConnectAsync(CancellationToken ct)
{
@ -169,7 +157,7 @@ namespace CryptoExchange.Net.Sockets
if (e is WebSocketException we)
{
#if (NET6_0_OR_GREATER)
if (_socket.HttpStatusCode == HttpStatusCode.TooManyRequests)
if (_socket!.HttpStatusCode == HttpStatusCode.TooManyRequests)
{
return new CallResult(new ServerRateLimitError(we.Message, we));
}
@ -296,19 +284,20 @@ namespace CryptoExchange.Net.Sockets
{
if (_socket!.State == WebSocketState.CloseReceived)
{
await _socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing", default).ConfigureAwait(false);
await _socket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Closing", default).ConfigureAwait(false);
}
else if (_socket.State == WebSocketState.Open)
{
await _socket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Closing", default).ConfigureAwait(false);
await _socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing", default).ConfigureAwait(false);
var startWait = DateTime.UtcNow;
while (_socket.State != WebSocketState.Closed && _socket.State != WebSocketState.Aborted)
while (_processing && _socket.State != WebSocketState.Closed && _socket.State != WebSocketState.Aborted)
{
// Wait until we receive close confirmation
await Task.Delay(10).ConfigureAwait(false);
if (DateTime.UtcNow - startWait > TimeSpan.FromSeconds(1))
break; // Wait for max 1 second, then just abort the connection
}
}
}
}
catch (Exception)
@ -318,7 +307,8 @@ namespace CryptoExchange.Net.Sockets
// So socket might go to aborted state, might still be open
}
_ctsSource.Cancel();
if (!_disposed)
_ctsSource.Cancel();
}
/// <summary>
@ -342,9 +332,9 @@ namespace CryptoExchange.Net.Sockets
#if NETSTANDARD2_1 || NET8_0_OR_GREATER
private async Task ReceiveLoopAsync()
{
Exception? exitException = null;
try
{
Exception? exitException = null;
while (true)
{
if (_ctsSource.IsCancellationRequested)
@ -356,7 +346,7 @@ namespace CryptoExchange.Net.Sockets
{
receiveResult = await _socket!.ReceiveAsync(_pipeWriter.GetMemory(_receiveBufferSize), _ctsSource.Token).ConfigureAwait(false);
// Advance the writer to communicate which part the memory was written
// Advance the writer to communicate which part of the memory was written
_pipeWriter.Advance(receiveResult.Count);
}
catch (OperationCanceledException ex)
@ -371,7 +361,6 @@ namespace CryptoExchange.Net.Sockets
if (_closeTask?.IsCompleted != false)
_closeTask = CloseInternalAsync();
// canceled
exitException = ex;
break;
}
@ -388,42 +377,39 @@ namespace CryptoExchange.Net.Sockets
break;
}
if (receiveResult.MessageType == WebSocketMessageType.Close)
{
// Connection closed
if (_socket.State == WebSocketState.CloseReceived)
{
// Close received means it server initiated, we should send a confirmation and close the socket
//_logger.SocketReceivedCloseMessage(Id, receiveResult.CloseStatus.ToString()!, receiveResult.CloseStatusDescription ?? string.Empty);
if (_closeTask?.IsCompleted != false)
_closeTask = CloseInternalAsync();
}
else
{
// Means the socket is now closed and we were the one initiating it
//_logger.SocketReceivedCloseConfirmation(Id, receiveResult.CloseStatus.ToString()!, receiveResult.CloseStatusDescription ?? string.Empty);
}
break;
}
if (receiveResult.EndOfMessage)
{
// Write a comma to split the json data for the reader
// This will also flush the written bytes
// Flush the full message
var flushResult = await _pipeWriter.FlushAsync().ConfigureAwait(false);
if (flushResult.IsCompleted)
{
// Flush indicated that the reader is no longer listening
// Flush indicated that the reader is no longer listening, so we should stop writing
if (_closeTask?.IsCompleted != false)
_closeTask = CloseInternalAsync();
break;
}
}
}
await _pipeWriter.CompleteAsync(exitException).ConfigureAwait(false);
if (receiveResult.MessageType == WebSocketMessageType.Close)
{
// Connection closed
if (_socket.State == WebSocketState.CloseReceived)
{
// Close received means it's server initiated, we should send a confirmation and close the socket
_logger.SocketReceivedCloseMessage(Id, _socket.CloseStatus?.ToString() ?? string.Empty, _socket.CloseStatusDescription ?? string.Empty);
if (_closeTask?.IsCompleted != false)
_closeTask = CloseInternalAsync();
}
else
{
// Means the socket is now closed and we were the one initiating it
_logger.SocketReceivedCloseConfirmation(Id, _socket.CloseStatus?.ToString() ?? string.Empty, _socket.CloseStatusDescription ?? string.Empty);
}
break;
}
}
}
catch (Exception e)
{
@ -432,32 +418,27 @@ namespace CryptoExchange.Net.Sockets
// Make sure we at least let the owner know there was an error
_logger.SocketReceiveLoopStoppedWithException(Id, e);
await _pipeWriter.CompleteAsync(e).ConfigureAwait(false);
exitException = e;
await (OnError?.Invoke(e) ?? Task.CompletedTask).ConfigureAwait(false);
if (_closeTask?.IsCompleted != false)
_closeTask = CloseInternalAsync();
}
finally
{
await _pipeWriter.CompleteAsync(exitException).ConfigureAwait(false);
_logger.SocketReceiveLoopFinished(Id);
}
}
#else
/// <summary>
/// Loop for receiving and reassembling data
/// </summary>
/// <returns></returns>
private async Task ReceiveLoopAsync()
{
byte[] rentedBuffer = _receiveBufferPool.Rent(_receiveBufferSize);
var buffer = new ArraySegment<byte>(rentedBuffer);
Exception? exitException = null;
try
{
Exception? exitException = null;
while (true)
{
if (_ctsSource.IsCancellationRequested)
@ -480,7 +461,6 @@ namespace CryptoExchange.Net.Sockets
if (_closeTask?.IsCompleted != false)
_closeTask = CloseInternalAsync();
// canceled
exitException = ex;
break;
}
@ -497,6 +477,9 @@ namespace CryptoExchange.Net.Sockets
break;
}
if (receiveResult.Count > 0)
await _pipeWriter.WriteAsync(buffer.AsMemory(0, receiveResult.Count)).ConfigureAwait(false);
if (receiveResult.MessageType == WebSocketMessageType.Close)
{
// Connection closed
@ -515,12 +498,8 @@ namespace CryptoExchange.Net.Sockets
break;
}
await _pipeWriter.WriteAsync(buffer.AsMemory(0, receiveResult.Count)).ConfigureAwait(false);
}
await _pipeWriter.CompleteAsync(exitException).ConfigureAwait(false);
}
catch (Exception e)
{
@ -529,13 +508,14 @@ namespace CryptoExchange.Net.Sockets
// Make sure we at least let the owner know there was an error
_logger.SocketReceiveLoopStoppedWithException(Id, e);
await _pipeWriter.CompleteAsync(e).ConfigureAwait(false);
exitException = e;
await (OnError?.Invoke(e) ?? Task.CompletedTask).ConfigureAwait(false);
if (_closeTask?.IsCompleted != false)
_closeTask = CloseInternalAsync();
}
finally
{
await _pipeWriter.CompleteAsync(exitException).ConfigureAwait(false);
_receiveBufferPool.Return(rentedBuffer, true);
_logger.SocketReceiveLoopFinished(Id);

View File

@ -1,30 +1,57 @@
using CryptoExchange.Net.Clients;
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Objects;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Sockets
{
/// <summary>
/// Socket connection
/// </summary>
public interface ISocketConnection
{
/// <summary>
/// The API client the connection belongs to
/// </summary>
SocketApiClient ApiClient { get; set; }
/// <summary>
/// Whether the connection has been authenticated
/// </summary>
bool Authenticated { get; set; }
/// <summary>
/// Whether the connection is established
/// </summary>
bool Connected { get; }
/// <summary>
/// Connection URI
/// </summary>
Uri ConnectionUri { get; }
/// <summary>
/// Id
/// </summary>
int SocketId { get; }
/// <summary>
/// Tag
/// </summary>
string Tag { get; set; }
/// <summary>
/// Closed event
/// </summary>
event Action? ConnectionClosed;
/// <summary>
/// Connect the websocket
/// </summary>
Task<CallResult> ConnectAsync(CancellationToken ct);
/// <summary>
/// Close the connection
/// </summary>
/// <returns></returns>
Task CloseAsync();
/// <summary>
/// Dispose
/// </summary>
void Dispose();
//ValueTask<CallResult> SendStringAsync(int requestId, string data, int weight);
//ValueTask<CallResult> SendAsync<T>(int requestId, T obj, int weight);
//ValueTask<CallResult> SendBytesAsync(int requestId, byte[] data, int weight);
}
}

View File

@ -253,7 +253,11 @@ namespace CryptoExchange.Net.Sockets
}
private bool _pausedActivity;
private readonly object _listenersLock;
#if NET9_0_OR_GREATER
private readonly Lock _listenersLock = new Lock();
#else
private readonly object _listenersLock = new object();
#endif
private readonly List<IMessageProcessor> _listeners;
private readonly ILogger _logger;
private SocketStatus _status;
@ -306,7 +310,6 @@ namespace CryptoExchange.Net.Sockets
_socket.OnError += HandleErrorAsync;
_socket.GetReconnectionUrl = GetReconnectionUrlAsync;
_listenersLock = new object();
_listeners = new List<IMessageProcessor>();
_serializer = apiClient.CreateSerializer();

View File

@ -39,7 +39,11 @@ namespace CryptoExchange.Net.Testing.Implementations
public Func<Task<Uri?>>? GetReconnectionUrl { get; set; }
public static int lastId = 0;
public static object lastIdLock = new object();
#if NET9_0_OR_GREATER
public static readonly Lock lastIdLock = new Lock();
#else
public static readonly object lastIdLock = new object();
#endif
public TestSocket(string address)
{

View File

@ -8,6 +8,7 @@ using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Trackers.Klines
@ -31,7 +32,11 @@ namespace CryptoExchange.Net.Trackers.Klines
/// <summary>
/// Lock for accessing _data
/// </summary>
protected readonly object _lock = new object();
#if NET9_0_OR_GREATER
private readonly Lock _lock = new Lock();
#else
private readonly object _lock = new object();
#endif
/// <summary>
/// The last time the window was applied
/// </summary>

View File

@ -8,6 +8,7 @@ using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Trackers.Trades
@ -42,7 +43,11 @@ namespace CryptoExchange.Net.Trackers.Trades
/// <summary>
/// Lock for accessing _data
/// </summary>
protected readonly object _lock = new object();
#if NET9_0_OR_GREATER
private readonly Lock _lock = new Lock();
#else
private readonly object _lock = new object();
#endif
/// <summary>
/// Whether the snapshot has been set
/// </summary>