1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-12-14 09:51:50 +00:00
This commit is contained in:
Jkorf 2025-11-14 16:13:14 +01:00
parent e2ffad9c61
commit c945176049
9 changed files with 278 additions and 281 deletions

View File

@ -7,6 +7,7 @@ using CryptoExchange.Net.Objects.Sockets;
using CryptoExchange.Net.RateLimiting;
using CryptoExchange.Net.RateLimiting.Interfaces;
using CryptoExchange.Net.Sockets;
using CryptoExchange.Net.Sockets.HighPerf;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Concurrent;
@ -78,6 +79,10 @@ namespace CryptoExchange.Net.Clients
/// Periodic task registrations
/// </summary>
protected List<PeriodicTaskRegistration> PeriodicTaskRegistrations { get; set; } = new List<PeriodicTaskRegistration>();
/// <summary>
/// Periodic task registrations
/// </summary>
protected List<HighPerfPeriodicTaskRegistration> HighPerfPeriodicTaskRegistrations { get; set; } = new List<HighPerfPeriodicTaskRegistration>();
/// <summary>
/// List of address to keep an alive connection to
@ -389,11 +394,11 @@ namespace CryptoExchange.Net.Clients
semaphoreSlim.Release();
}
var subQuery = subscription.CreateSubscriptionQuery(socketConnection);
if (subQuery != null)
var subRequest = subscription.CreateSubscriptionQuery(socketConnection);
if (subRequest != null)
{
// Send the request and wait for answer
var sendResult = await socketConnection.SendAsync(subQuery.Id, subQuery.Request, subQuery.Weight).ConfigureAwait(false);
var sendResult = await socketConnection.SendAsync(subRequest).ConfigureAwait(false);
if (!sendResult)
{
// Needed?
@ -741,11 +746,8 @@ namespace CryptoExchange.Net.Clients
// Create new socket connection
var socketConnection = new HighPerfSocketConnection<TUpdateType>(_logger, SocketFactory, GetWebSocketParameters(connectionAddress.Data!), this, JsonSerializerOptions, address);
foreach (var ptg in PeriodicTaskRegistrations)
socketConnection.QueryPeriodic(ptg.Identifier, ptg.Interval, ptg.QueryDelegate, ptg.Callback);
//foreach (var systemSubscription in systemSubscriptions)
// socketConnection.AddSubscription(systemSubscription);
foreach (var ptg in HighPerfPeriodicTaskRegistrations)
socketConnection.QueryPeriodic(ptg.Identifier, ptg.Interval, ptg.GetRequestDelegate);
return new CallResult<HighPerfSocketConnection<TUpdateType>>(socketConnection);
}

View File

@ -48,17 +48,11 @@ namespace CryptoExchange.Net.Interfaces
/// <summary>
/// Send string data
/// </summary>
/// <param name="id"></param>
/// <param name="data"></param>
/// <param name="weight"></param>
ValueTask<bool> SendAsync(int id, string data, int weight);
ValueTask<bool> SendAsync(string data);
/// <summary>
/// Send byte data
/// </summary>
/// <param name="id"></param>
/// <param name="data"></param>
/// <param name="weight"></param>
ValueTask<bool> SendAsync(int id, byte[] data, int weight);
ValueTask<bool> SendAsync(byte[] data, WebSocketMessageType type = WebSocketMessageType.Binary);
/// <summary>
/// Close the connection
/// </summary>

View File

@ -16,6 +16,10 @@ namespace CryptoExchange.Net.Interfaces
/// </summary>
event Func<Task> OnClose;
/// <summary>
/// Websocket message received event
/// </summary>
event Func<WebSocketMessageType, ReadOnlyMemory<byte>, Task> OnStreamMessage;
/// <summary>
/// Websocket sent event, RequestId as parameter
/// </summary>
event Func<int, Task> OnRequestSent;

View File

@ -0,0 +1,24 @@
using CryptoExchange.Net.Objects;
using System;
namespace CryptoExchange.Net.Sockets.HighPerf
{
/// <summary>
/// Periodic task registration
/// </summary>
public class HighPerfPeriodicTaskRegistration
{
/// <summary>
/// Identifier
/// </summary>
public string Identifier { get; set; } = string.Empty;
/// <summary>
/// Interval of query
/// </summary>
public TimeSpan Interval { get; set; }
/// <summary>
/// Delegate for getting the query
/// </summary>
public Func<HighPerfSocketConnection, object> GetRequestDelegate { get; set; } = null!;
}
}

View File

@ -32,19 +32,12 @@ namespace CryptoExchange.Net.Sockets
/// <summary>
/// The amount of subscriptions on this connection
/// </summary>
public int UserSubscriptionCount => _subscriptions.Count;
public int UserSubscriptionCount => _subscriptions.Length;
/// <summary>
/// Get a copy of the current message subscriptions
/// </summary>
public HighPerfSubscription[] Subscriptions
{
get
{
lock (_listenersLock)
return _subscriptions.ToArray();
}
}
public HighPerfSubscription[] Subscriptions => _subscriptions;
/// <summary>
/// If connection is made
@ -93,18 +86,15 @@ namespace CryptoExchange.Net.Sockets
}
}
public string Topic { get; set; }
private readonly object _listenersLock;
private readonly ILogger _logger;
private SocketStatus _status;
private readonly IMessageSerializer _serializer;
protected readonly JsonSerializerOptions _serializerOptions;
protected readonly Pipe _pipe;
private Task _processTask;
private Task? _processTask;
private CancellationTokenSource _cts = new CancellationTokenSource();
protected abstract List<HighPerfSubscription> _subscriptions { get; }
protected abstract HighPerfSubscription[] _subscriptions { get; }
public abstract Type UpdateType { get; }
@ -129,7 +119,10 @@ namespace CryptoExchange.Net.Sockets
public HighPerfSocketConnection(ILogger logger, IWebsocketFactory socketFactory, WebSocketParameters parameters, SocketApiClient apiClient, JsonSerializerOptions serializerOptions, string tag)
{
_logger = logger;
_pipe = new Pipe();
_pipe = new Pipe(new PipeOptions
{
//ReaderScheduler
});
_serializerOptions = serializerOptions;
ApiClient = apiClient;
Tag = tag;
@ -143,8 +136,6 @@ namespace CryptoExchange.Net.Sockets
_socket.OnError += HandleErrorAsync;
_listenersLock = new object();
_serializer = apiClient.CreateSerializer();
}
@ -162,19 +153,17 @@ namespace CryptoExchange.Net.Sockets
/// <summary>
/// Handler for a socket closing without reconnect
/// </summary>
protected virtual Task HandleCloseAsync()
protected virtual async Task HandleCloseAsync()
{
Status = SocketStatus.Closed;
_cts.Cancel();
lock (_listenersLock)
{
foreach (var subscription in _subscriptions)
subscription.Reset();
}
if (ApiClient.highPerfSocketConnections.ContainsKey(SocketId))
ApiClient.highPerfSocketConnections.TryRemove(SocketId, out _);
await _processTask!.ConfigureAwait(false);
_ = Task.Run(() => ConnectionClosed?.Invoke());
return Task.CompletedTask;
}
/// <summary>
@ -222,13 +211,10 @@ namespace CryptoExchange.Net.Sockets
if (ApiClient.socketConnections.ContainsKey(SocketId))
ApiClient.socketConnections.TryRemove(SocketId, out _);
lock (_listenersLock)
foreach (var subscription in _subscriptions)
{
foreach (var subscription in _subscriptions)
{
if (subscription.CancellationTokenRegistration.HasValue)
subscription.CancellationTokenRegistration.Value.Dispose();
}
if (subscription.CancellationTokenRegistration.HasValue)
subscription.CancellationTokenRegistration.Value.Dispose();
}
await _socket.CloseAsync().ConfigureAwait(false);
@ -243,17 +229,13 @@ namespace CryptoExchange.Net.Sockets
public async Task CloseAsync(HighPerfSubscription subscription)
{
if (Status == SocketStatus.Closing || Status == SocketStatus.Closed || Status == SocketStatus.Disposed)
{
return;
}
_logger.ClosingSubscription(SocketId, subscription.Id);
if (subscription.CancellationTokenRegistration.HasValue)
subscription.CancellationTokenRegistration.Value.Dispose();
bool anyOtherSubscriptions;
lock (_listenersLock)
anyOtherSubscriptions = _subscriptions.Any(x => x != subscription);
var anyOtherSubscriptions = _subscriptions.Any(x => x != subscription);
if (anyOtherSubscriptions)
await UnsubscribeAsync(subscription).ConfigureAwait(false);
@ -271,8 +253,6 @@ namespace CryptoExchange.Net.Sockets
await CloseAsync().ConfigureAwait(false);
}
lock (_listenersLock)
_subscriptions.Remove(subscription);
}
/// <summary>
@ -286,42 +266,24 @@ namespace CryptoExchange.Net.Sockets
_socket.Dispose();
}
/// <summary>
/// Whether or not a new subscription can be added to this connection
/// </summary>
/// <returns></returns>
public bool CanAddSubscription() => Status == SocketStatus.None || Status == SocketStatus.Connected;
/// <summary>
/// Get a subscription on this connection by id
/// </summary>
/// <param name="id"></param>
public HighPerfSubscription? GetSubscription(int id)
{
lock (_listenersLock)
return _subscriptions.SingleOrDefault(s => s.Id == id);
}
/// <summary>
/// Send data over the websocket connection
/// </summary>
/// <typeparam name="T">The type of the object to send</typeparam>
/// <param name="requestId">The request id</param>
/// <param name="obj">The object to send</param>
/// <param name="weight">The weight of the message</param>
public virtual ValueTask<CallResult> SendAsync<T>(int requestId, T obj, int weight)
public virtual ValueTask<CallResult> SendAsync<T>(T obj)
{
if (_serializer is IByteMessageSerializer byteSerializer)
{
return SendBytesAsync(requestId, byteSerializer.Serialize(obj), weight);
return SendBytesAsync(byteSerializer.Serialize(obj));
}
else if (_serializer is IStringMessageSerializer stringSerializer)
{
if (obj is string str)
return SendStringAsync(requestId, str, weight);
return SendStringAsync(str);
str = stringSerializer.Serialize(obj);
return SendStringAsync(requestId, str, weight);
return SendStringAsync(str);
}
throw new Exception("Unknown serializer when sending message");
@ -331,27 +293,25 @@ namespace CryptoExchange.Net.Sockets
/// Send byte data over the websocket connection
/// </summary>
/// <param name="data">The data to send</param>
/// <param name="weight">The weight of the message</param>
/// <param name="requestId">The id of the request</param>
public virtual async ValueTask<CallResult> SendBytesAsync(int requestId, byte[] data, int weight)
public virtual async ValueTask<CallResult> SendBytesAsync(byte[] data)
{
if (ApiClient.MessageSendSizeLimit != null && data.Length > ApiClient.MessageSendSizeLimit.Value)
{
var info = $"Message to send exceeds the max server message size ({data.Length} vs {ApiClient.MessageSendSizeLimit.Value} bytes). Split the request into batches to keep below this limit";
_logger.LogWarning("[Sckt {SocketId}] [Req {RequestId}] {Info}", SocketId, requestId, info);
_logger.LogWarning("[Sckt {SocketId}] {Info}", SocketId, info);
return new CallResult(new InvalidOperationError(info));
}
if (!_socket.IsOpen)
{
_logger.LogWarning("[Sckt {SocketId}] [Req {RequestId}] failed to send, socket no longer open", SocketId, requestId);
_logger.LogWarning("[Sckt {SocketId}] Request failed to send, socket no longer open", SocketId);
return new CallResult(new WebError("Failed to send message, socket no longer open"));
}
_logger.SendingByteData(SocketId, requestId, data.Length);
_logger.SendingByteData(SocketId, 0, data.Length);
try
{
if (!await _socket.SendAsync(requestId, data, weight).ConfigureAwait(false))
if (!await _socket.SendAsync(data).ConfigureAwait(false))
return new CallResult(new WebError("Failed to send message, connection not open"));
return CallResult.SuccessResult;
@ -366,27 +326,25 @@ namespace CryptoExchange.Net.Sockets
/// Send string data over the websocket connection
/// </summary>
/// <param name="data">The data to send</param>
/// <param name="weight">The weight of the message</param>
/// <param name="requestId">The id of the request</param>
public virtual async ValueTask<CallResult> SendStringAsync(int requestId, string data, int weight)
public virtual async ValueTask<CallResult> SendStringAsync(string data)
{
if (ApiClient.MessageSendSizeLimit != null && data.Length > ApiClient.MessageSendSizeLimit.Value)
{
var info = $"Message to send exceeds the max server message size ({data.Length} vs {ApiClient.MessageSendSizeLimit.Value} bytes). Split the request into batches to keep below this limit";
_logger.LogWarning("[Sckt {SocketId}] [Req {RequestId}] {Info}", SocketId, requestId, info);
_logger.LogWarning("[Sckt {SocketId}] {Info}", SocketId, info);
return new CallResult(new InvalidOperationError(info));
}
if (!_socket.IsOpen)
{
_logger.LogWarning("[Sckt {SocketId}] [Req {RequestId}] failed to send, socket no longer open", SocketId, requestId);
_logger.LogWarning("[Sckt {SocketId}] Request failed to send, socket no longer open", SocketId);
return new CallResult(new WebError("Failed to send message, socket no longer open"));
}
_logger.SendingData(SocketId, requestId, data);
_logger.SendingData(SocketId, 0, data);
try
{
if (!await _socket.SendAsync(requestId, data, weight).ConfigureAwait(false))
if (!await _socket.SendAsync(data).ConfigureAwait(false))
return new CallResult(new WebError("Failed to send message, connection not open"));
return CallResult.SuccessResult;
@ -403,7 +361,7 @@ namespace CryptoExchange.Net.Sockets
if (unsubscribeRequest == null)
return;
await SendAsync(unsubscribeRequest.Id, unsubscribeRequest.Request, unsubscribeRequest.Weight).ConfigureAwait(false);
await SendAsync(unsubscribeRequest).ConfigureAwait(false);
_logger.SubscriptionUnsubscribed(SocketId, subscription.Id);
}
@ -413,8 +371,7 @@ namespace CryptoExchange.Net.Sockets
/// <param name="identifier">Identifier for the periodic send</param>
/// <param name="interval">How often</param>
/// <param name="queryDelegate">Method returning the query to send</param>
/// <param name="callback">The callback for processing the response</param>
public virtual void QueryPeriodic(string identifier, TimeSpan interval, Func<HighPerfSocketConnection, Query> queryDelegate, Action<HighPerfSocketConnection, CallResult>? callback)
public virtual void QueryPeriodic(string identifier, TimeSpan interval, Func<HighPerfSocketConnection, object> queryDelegate)
{
if (queryDelegate == null)
throw new ArgumentNullException(nameof(queryDelegate));
@ -445,8 +402,7 @@ namespace CryptoExchange.Net.Sockets
try
{
var result = await SendAsync(query.Id, query.Request, query.Weight).ConfigureAwait(false);
callback?.Invoke(this, result);
var result = await SendAsync(query).ConfigureAwait(false);
}
catch (Exception ex)
{
@ -455,15 +411,20 @@ namespace CryptoExchange.Net.Sockets
}
});
}
public void QueryPeriodic(string identifier, TimeSpan interval, Func<SocketConnection, Query> queryDelegate, Action<SocketConnection, CallResult>? callback) => throw new NotImplementedException();
}
public class HighPerfSocketConnection<T> : HighPerfSocketConnection
{
private readonly object _listenersLock = new object();
private List<HighPerfSubscription<T>> _typedSubscriptions;
protected override List<HighPerfSubscription> _subscriptions => _typedSubscriptions.Select(x => (HighPerfSubscription)x).ToList();
protected override HighPerfSubscription[] _subscriptions
{
get
{
lock (_listenersLock)
return _typedSubscriptions.Select(x => (HighPerfSubscription)x).ToArray();
}
}
public override Type UpdateType => typeof(T);
@ -488,11 +449,17 @@ namespace CryptoExchange.Net.Sockets
return true;
}
public void RemoveSubscription(HighPerfSubscription<T> subscription)
{
lock (_listenersLock)
_typedSubscriptions.Remove(subscription);
}
protected override async Task ProcessAsync(CancellationToken ct)
{
try
{
await foreach (var update in JsonSerializer.DeserializeAsyncEnumerable<T>(_pipe.Reader, _serializerOptions, ct).ConfigureAwait(false))
await foreach (var update in JsonSerializer.DeserializeAsyncEnumerable<T>(_pipe.Reader, true, _serializerOptions, ct).ConfigureAwait(false))
{
var tasks = _typedSubscriptions.Select(sub => sub.HandleAsync(update!));
await LibraryHelpers.WhenAll(tasks).ConfigureAwait(false);

View File

@ -20,33 +20,6 @@ namespace CryptoExchange.Net.Sockets
/// </summary>
public int TotalInvocations { get; set; }
//private SubscriptionStatus _status;
///// <summary>
///// Current subscription status
///// </summary>
//public SubscriptionStatus Status
//{
// get => _status;
// set
// {
// if (_status == value)
// return;
// _status = value;
// Task.Run(() => StatusChanged?.Invoke(value));
// }
//}
///// <summary>
///// Whether the subscription is active
///// </summary>
//public bool Active => Status != SubscriptionStatus.Closing && Status != SubscriptionStatus.Closed;
///// <summary>
///// Whether the unsubscribing of this subscription lead to the closing of the connection
///// </summary>
//public bool IsClosingConnection { get; set; }
/// <summary>
/// Logger
/// </summary>
@ -61,20 +34,16 @@ namespace CryptoExchange.Net.Sockets
/// Exception event
/// </summary>
public event Action<Exception>? Exception;
///// <summary>
///// Listener unsubscribed event
///// </summary>
//public event Action<SubscriptionStatus>? StatusChanged;
/// <summary>
/// The subscribe query for this subscription
/// </summary>
public Query? SubscriptionQuery { get; private set; }
public object? SubscriptionQuery { get; private set; }
/// <summary>
/// The unsubscribe query for this subscription
/// </summary>
public Query? UnsubscriptionQuery { get; private set; }
public object? UnsubscriptionQuery { get; private set; }
/// <summary>
/// ctor
@ -88,7 +57,7 @@ namespace CryptoExchange.Net.Sockets
/// <summary>
/// Create a new subscription query
/// </summary>
public Query? CreateSubscriptionQuery(HighPerfSocketConnection connection)
public object? CreateSubscriptionQuery(HighPerfSocketConnection connection)
{
var query = GetSubQuery(connection);
SubscriptionQuery = query;
@ -99,12 +68,12 @@ namespace CryptoExchange.Net.Sockets
/// Get the subscribe query to send when subscribing
/// </summary>
/// <returns></returns>
protected abstract Query? GetSubQuery(HighPerfSocketConnection connection);
protected abstract object? GetSubQuery(HighPerfSocketConnection connection);
/// <summary>
/// Create a new unsubscription query
/// </summary>
public Query? CreateUnsubscriptionQuery(HighPerfSocketConnection connection)
public object? CreateUnsubscriptionQuery(HighPerfSocketConnection connection)
{
var query = GetUnsubQuery(connection);
UnsubscriptionQuery = query;
@ -115,20 +84,7 @@ namespace CryptoExchange.Net.Sockets
/// Get the unsubscribe query to send when unsubscribing
/// </summary>
/// <returns></returns>
protected abstract Query? GetUnsubQuery(HighPerfSocketConnection connection);
/// <summary>
/// Reset the subscription
/// </summary>
public void Reset()
{
DoHandleReset();
}
/// <summary>
/// Connection has been reset, do any logic for resetting the subscription
/// </summary>
public virtual void DoHandleReset() { }
protected abstract object? GetUnsubQuery(HighPerfSocketConnection connection);
/// <summary>
/// Invoke the exception event

View File

@ -6,6 +6,7 @@ using CryptoExchange.Net.Objects.Sockets;
using Microsoft.Extensions.Logging;
using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO.Pipelines;
using System.Net;
using System.Net.Http;
@ -22,17 +23,11 @@ namespace CryptoExchange.Net.Sockets
/// </summary>
public class HighPerfWebSocketClient : IHighPerfWebsocket
{
enum ProcessState
{
Idle,
Processing,
WaitingForClose,
Reconnecting
}
private ClientWebSocket? _socket;
#if NETSTANDARD2_0
private static readonly ArrayPool<byte> _receiveBufferPool = ArrayPool<byte>.Shared;
#endif
private readonly SemaphoreSlim _closeSem;
@ -41,16 +36,15 @@ namespace CryptoExchange.Net.Sockets
private Task? _closeTask;
private bool _stopRequested;
private bool _disposed;
private ProcessState _processState;
private DateTime _lastReconnectTime;
private readonly string _baseAddress;
private int _reconnectAttempt;
private bool _processing;
private readonly int _receiveBufferSize;
private readonly PipeWriter _pipeWriter;
private const int _defaultReceiveBufferSize = 1048576;
private const int _defaultReceiveBufferSize = 4096;
private const int _sendBufferSize = 4096;
private byte[] _commaBytes = new byte[] { 44 };
/// <summary>
/// Log
/// </summary>
@ -94,7 +88,6 @@ namespace CryptoExchange.Net.Sockets
_pipeWriter = pipeWriter;
_closeSem = new SemaphoreSlim(1, 1);
_baseAddress = $"{Uri.Scheme}://{Uri.Host}";
}
/// <inheritdoc />
@ -206,65 +199,39 @@ namespace CryptoExchange.Net.Sockets
private async Task ProcessAsync()
{
_logger.SocketStartingProcessing(Id);
SetProcessState(ProcessState.Processing);
_processing = true;
await ReceiveLoopAsync().ConfigureAwait(false);
_processing = false;
_logger.SocketFinishedProcessing(Id);
SetProcessState(ProcessState.WaitingForClose);
while (_closeTask == null)
await Task.Delay(50).ConfigureAwait(false);
await _closeTask.ConfigureAwait(false);
if (!_stopRequested)
_closeTask = null;
SetProcessState(ProcessState.Idle);
await (OnClose?.Invoke() ?? Task.CompletedTask).ConfigureAwait(false);
_logger.SocketClosed(Id);
}
/// <inheritdoc />
public virtual async ValueTask<bool> SendAsync(int id, string data, int weight)
public virtual ValueTask<bool> SendAsync(string data)
{
if (_ctsSource.IsCancellationRequested || _processState != ProcessState.Processing)
return false;
var bytes = Parameters.Encoding.GetBytes(data);
_logger.SocketAddingBytesToSendBuffer(Id, id, bytes);
try
{
await _socket!.SendAsync(new ArraySegment<byte>(bytes, 0, bytes.Length), WebSocketMessageType.Text, true, _ctsSource.Token).ConfigureAwait(false);
_logger.SocketSentBytes(Id, id, bytes.Length);
return true;
}
catch (OperationCanceledException)
{
// canceled
return false;
}
catch (Exception ioe)
{
// Connection closed unexpectedly, .NET framework
await (OnError?.Invoke(ioe) ?? Task.CompletedTask).ConfigureAwait(false);
if (_closeTask?.IsCompleted != false)
_closeTask = CloseInternalAsync();
return false;
}
return SendAsync(bytes, WebSocketMessageType.Text);
}
/// <inheritdoc />
public virtual async ValueTask<bool> SendAsync(int id, byte[] data, int weight)
public virtual async ValueTask<bool> SendAsync(byte[] data, WebSocketMessageType type = WebSocketMessageType.Binary)
{
if (_ctsSource.IsCancellationRequested || _processState != ProcessState.Processing)
if (_ctsSource.IsCancellationRequested || !_processing)
return false;
_logger.SocketAddingBytesToSendBuffer(Id, id, data);
#warning todo logging overloads without id
_logger.SocketAddingBytesToSendBuffer(Id, 0, data);
try
{
await _socket!.SendAsync(new ArraySegment<byte>(data, 0, data.Length), WebSocketMessageType.Binary, true, _ctsSource.Token).ConfigureAwait(false);
_logger.SocketSentBytes(Id, id, data.Length);
await _socket!.SendAsync(new ArraySegment<byte>(data, 0, data.Length), type, true, _ctsSource.Token).ConfigureAwait(false);
_logger.SocketSentBytes(Id, 0, data.Length);
return true;
}
catch (OperationCanceledException)
@ -372,6 +339,112 @@ namespace CryptoExchange.Net.Sockets
_logger.SocketDisposed(Id);
}
#if NETSTANDARD2_1 || NET8_0_OR_GREATER
private async Task ReceiveLoopAsync()
{
try
{
Exception? exitException = null;
while (true)
{
if (_ctsSource.IsCancellationRequested)
break;
ValueWebSocketReceiveResult receiveResult = default;
try
{
receiveResult = await _socket!.ReceiveAsync(_pipeWriter.GetMemory(_receiveBufferSize), _ctsSource.Token).ConfigureAwait(false);
// Advance the writer to communicate which part the memory was written
_pipeWriter.Advance(receiveResult.Count);
}
catch (OperationCanceledException ex)
{
if (ex.InnerException?.InnerException?.Message.Contains("KeepAliveTimeout") == true)
{
// Specific case that the websocket connection got closed because of a ping frame timeout
// Unfortunately doesn't seem to be a nicer way to catch
_logger.SocketPingTimeout(Id);
}
if (_closeTask?.IsCompleted != false)
_closeTask = CloseInternalAsync();
// canceled
exitException = ex;
break;
}
catch (Exception wse)
{
if (!_ctsSource.Token.IsCancellationRequested && !_stopRequested)
// Connection closed unexpectedly
await (OnError?.Invoke(wse) ?? Task.CompletedTask).ConfigureAwait(false);
if (_closeTask?.IsCompleted != false)
_closeTask = CloseInternalAsync();
exitException = wse;
break;
}
if (receiveResult.MessageType == WebSocketMessageType.Close)
{
// Connection closed
if (_socket.State == WebSocketState.CloseReceived)
{
// Close received means it server initiated, we should send a confirmation and close the socket
//_logger.SocketReceivedCloseMessage(Id, receiveResult.CloseStatus.ToString()!, receiveResult.CloseStatusDescription ?? string.Empty);
if (_closeTask?.IsCompleted != false)
_closeTask = CloseInternalAsync();
}
else
{
// Means the socket is now closed and we were the one initiating it
//_logger.SocketReceivedCloseConfirmation(Id, receiveResult.CloseStatus.ToString()!, receiveResult.CloseStatusDescription ?? string.Empty);
}
break;
}
if (receiveResult.EndOfMessage)
{
// Write a comma to split the json data for the reader
// This will also flush the written bytes
var flushResult = await _pipeWriter.FlushAsync().ConfigureAwait(false);
if (flushResult.IsCompleted)
{
// Flush indicated that the reader is no longer listening
if (_closeTask?.IsCompleted != false)
_closeTask = CloseInternalAsync();
break;
}
}
}
await _pipeWriter.CompleteAsync(exitException).ConfigureAwait(false);
}
catch (Exception e)
{
// Because this is running in a separate task and not awaited until the socket gets closed
// any exception here will crash the receive processing, but do so silently unless the socket gets stopped.
// Make sure we at least let the owner know there was an error
_logger.SocketReceiveLoopStoppedWithException(Id, e);
await _pipeWriter.CompleteAsync(e).ConfigureAwait(false);
await (OnError?.Invoke(e) ?? Task.CompletedTask).ConfigureAwait(false);
if (_closeTask?.IsCompleted != false)
_closeTask = CloseInternalAsync();
}
finally
{
_logger.SocketReceiveLoopFinished(Id);
}
}
#else
/// <summary>
/// Loop for receiving and reassembling data
/// </summary>
@ -380,115 +453,95 @@ namespace CryptoExchange.Net.Sockets
{
byte[] rentedBuffer = _receiveBufferPool.Rent(_receiveBufferSize);
var buffer = new ArraySegment<byte>(rentedBuffer);
var first = true;
try
{
Exception? exitException = null;
while (true)
{
if (_ctsSource.IsCancellationRequested)
break;
WebSocketReceiveResult? receiveResult = null;
while (true)
try
{
try
receiveResult = await _socket!.ReceiveAsync(buffer, _ctsSource.Token).ConfigureAwait(false);
}
catch (OperationCanceledException ex)
{
if (ex.InnerException?.InnerException?.Message.Contains("KeepAliveTimeout") == true)
{
//_stream.Read
receiveResult = await _socket!.ReceiveAsync(buffer, _ctsSource.Token).ConfigureAwait(false);
// Specific case that the websocket connection got closed because of a ping frame timeout
// Unfortunately doesn't seem to be a nicer way to catch
_logger.SocketPingTimeout(Id);
}
catch (OperationCanceledException ex)
{
if (ex.InnerException?.InnerException?.Message.Contains("KeepAliveTimeout") == true)
{
// Specific case that the websocket connection got closed because of a ping frame timeout
// Unfortunately doesn't seem to be a nicer way to catch
_logger.SocketPingTimeout(Id);
}
if (_closeTask?.IsCompleted != false)
_closeTask = CloseInternalAsync();
// canceled
exitException = ex;
break;
}
catch (Exception wse)
{
if (!_ctsSource.Token.IsCancellationRequested && !_stopRequested)
// Connection closed unexpectedly
await (OnError?.Invoke(wse) ?? Task.CompletedTask).ConfigureAwait(false);
if (_closeTask?.IsCompleted != false)
_closeTask = CloseInternalAsync();
exitException = wse;
break;
}
if (receiveResult.MessageType == WebSocketMessageType.Close)
{
// Connection closed
if (_socket.State == WebSocketState.CloseReceived)
{
// Close received means it server initiated, we should send a confirmation and close the socket
_logger.SocketReceivedCloseMessage(Id, receiveResult.CloseStatus.ToString()!, receiveResult.CloseStatusDescription ?? string.Empty);
if (_closeTask?.IsCompleted != false)
_closeTask = CloseInternalAsync();
// canceled
break;
}
catch (Exception wse)
{
if (!_ctsSource.Token.IsCancellationRequested && !_stopRequested)
// Connection closed unexpectedly
await (OnError?.Invoke(wse) ?? Task.CompletedTask).ConfigureAwait(false);
if (_closeTask?.IsCompleted != false)
_closeTask = CloseInternalAsync();
break;
}
if (receiveResult.MessageType == WebSocketMessageType.Close)
{
// Connection closed
if (_socket.State == WebSocketState.CloseReceived)
{
// Close received means it server initiated, we should send a confirmation and close the socket
_logger.SocketReceivedCloseMessage(Id, receiveResult.CloseStatus.ToString()!, receiveResult.CloseStatusDescription ?? string.Empty);
if (_closeTask?.IsCompleted != false)
_closeTask = CloseInternalAsync();
}
else
{
// Means the socket is now closed and we were the one initiating it
_logger.SocketReceivedCloseConfirmation(Id, receiveResult.CloseStatus.ToString()!, receiveResult.CloseStatusDescription ?? string.Empty);
}
break;
}
if (!first)
{
// Write a comma to split the json data
if (receiveResult.EndOfMessage)
await _pipeWriter.WriteAsync(new byte[] { 44 }).ConfigureAwait(false);
}
else
{
// Write a opening bracket
await _pipeWriter.WriteAsync(new byte[] { 91 }).ConfigureAwait(false);
first = false;
// Means the socket is now closed and we were the one initiating it
_logger.SocketReceivedCloseConfirmation(Id, receiveResult.CloseStatus.ToString()!, receiveResult.CloseStatusDescription ?? string.Empty);
}
await _pipeWriter.WriteAsync(new ReadOnlyMemory<byte>(buffer.Array!, buffer.Offset, receiveResult.Count)).ConfigureAwait(false);
}
if (receiveResult?.MessageType == WebSocketMessageType.Close)
{
// Received close message
break;
}
if (receiveResult == null || _ctsSource.IsCancellationRequested)
{
// Error during receiving or cancellation requested, stop.
break;
}
await _pipeWriter.WriteAsync(buffer.AsMemory(0, receiveResult.Count)).ConfigureAwait(false);
}
await _pipeWriter.CompleteAsync(exitException).ConfigureAwait(false);
}
catch(Exception e)
catch (Exception e)
{
// Because this is running in a separate task and not awaited until the socket gets closed
// any exception here will crash the receive processing, but do so silently unless the socket gets stopped.
// Make sure we at least let the owner know there was an error
_logger.SocketReceiveLoopStoppedWithException(Id, e);
await _pipeWriter.CompleteAsync(e).ConfigureAwait(false);
await (OnError?.Invoke(e) ?? Task.CompletedTask).ConfigureAwait(false);
if (_closeTask?.IsCompleted != false)
_closeTask = CloseInternalAsync();
}
finally
{
// Not needed?
//await _pipeWriter.WriteAsync(Encoding.UTF8.GetBytes("]")).ConfigureAwait(false);
_receiveBufferPool.Return(rentedBuffer, true);
_logger.SocketReceiveLoopFinished(Id);
}
}
#endif
/// <summary>
/// Set proxy on socket
@ -511,14 +564,5 @@ namespace CryptoExchange.Net.Sockets
if (proxy.Login != null)
socket.Options.Proxy.Credentials = new NetworkCredential(proxy.Login, proxy.Password);
}
private void SetProcessState(ProcessState state)
{
if (_processState == state)
return;
_logger.SocketProcessingStateChanged(Id, _processState.ToString(), state.ToString());
_processState = state;
}
}
}

View File

@ -23,8 +23,8 @@ namespace CryptoExchange.Net.Sockets
Task CloseAsync();
void Dispose();
ValueTask<CallResult> SendStringAsync(int requestId, string data, int weight);
ValueTask<CallResult> SendAsync<T>(int requestId, T obj, int weight);
ValueTask<CallResult> SendBytesAsync(int requestId, byte[] data, int weight);
//ValueTask<CallResult> SendStringAsync(int requestId, string data, int weight);
//ValueTask<CallResult> SendAsync<T>(int requestId, T obj, int weight);
//ValueTask<CallResult> SendBytesAsync(int requestId, byte[] data, int weight);
}
}

View File

@ -295,7 +295,7 @@ namespace CryptoExchange.Net.Sockets
_socket = socketFactory.CreateWebsocket(logger, parameters);
_logger.SocketCreatedForAddress(_socket.Id, parameters.Uri.ToString());
//_socket.OnStreamMessage += HandleStreamMessage;
_socket.OnStreamMessage += HandleStreamMessage;
_socket.OnRequestSent += HandleRequestSentAsync;
_socket.OnRequestRateLimited += HandleRequestRateLimitedAsync;
_socket.OnConnectRateLimited += HandleConnectRateLimitedAsync;
@ -330,10 +330,16 @@ namespace CryptoExchange.Net.Sockets
Status = SocketStatus.Closed;
Authenticated = false;
if (ApiClient.socketConnections.ContainsKey(SocketId))
ApiClient.socketConnections.TryRemove(SocketId, out _);
lock (_listenersLock)
{
foreach (var subscription in _listeners.OfType<Subscription>().Where(l => l.UserSubscription && !l.IsClosingConnection))
{
subscription.IsClosingConnection = true;
subscription.Reset();
}
foreach (var query in _listeners.OfType<Query>().ToList())
{