diff --git a/CryptoExchange.Net/Clients/BaseApiClient.cs b/CryptoExchange.Net/Clients/BaseApiClient.cs index 55072c7..eb03c3d 100644 --- a/CryptoExchange.Net/Clients/BaseApiClient.cs +++ b/CryptoExchange.Net/Clients/BaseApiClient.cs @@ -1,8 +1,6 @@ using System; using System.Collections.Generic; -using System.Globalization; using System.IO; -using System.Net; using System.Net.Http; using System.Text; using System.Threading.Tasks; diff --git a/CryptoExchange.Net/Clients/SocketApiClient.cs b/CryptoExchange.Net/Clients/SocketApiClient.cs index 699a3dc..950c513 100644 --- a/CryptoExchange.Net/Clients/SocketApiClient.cs +++ b/CryptoExchange.Net/Clients/SocketApiClient.cs @@ -4,17 +4,14 @@ using CryptoExchange.Net.Objects.Options; using CryptoExchange.Net.Objects.Sockets; using CryptoExchange.Net.Sockets; using Microsoft.Extensions.Logging; -using Newtonsoft.Json.Linq; using System; using System.Collections.Concurrent; using System.Collections.Generic; -using System.Diagnostics.CodeAnalysis; +using System.IO; using System.Linq; -using System.Net.Sockets; using System.Text; using System.Threading; using System.Threading.Tasks; -using static CryptoExchange.Net.Objects.RateLimiter; namespace CryptoExchange.Net { @@ -43,19 +40,14 @@ namespace CryptoExchange.Net protected TimeSpan KeepAliveInterval { get; set; } = TimeSpan.FromSeconds(10); /// - /// Delegate used for processing byte data received from socket connections before it is processed by handlers + /// Delegate used for manipulating data received from socket connections before it is processed by listeners /// - protected Func? dataInterpreterBytes; - - /// - /// Delegate used for processing string data received from socket connections before it is processed by handlers - /// - protected Func? dataInterpreterString; + protected Func? interceptor; /// /// Handlers for data from the socket which doesn't need to be forwarded to the caller. Ping or welcome messages for example. /// - protected List genericHandlers = new(); + protected List systemSubscriptions = new(); /// /// The task that is sending periodic data on the websocket. Can be used for sending Ping messages every x seconds or similair. Not necesarry. @@ -106,7 +98,7 @@ namespace CryptoExchange.Net if (!socketConnections.Any()) return 0; - return socketConnections.Sum(s => s.Value.SubscriptionCount); + return socketConnections.Sum(s => s.Value.UserListenerCount); } } @@ -140,27 +132,24 @@ namespace CryptoExchange.Net } /// - /// Set a delegate to be used for processing data received from socket connections before it is processed by handlers + /// Set a delegate which can manipulate the message stream before it is processed by listeners /// - /// Handler for byte data - /// Handler for string data - protected void SetDataInterpreter(Func? byteHandler, Func? stringHandler) + /// Interceptor + protected void SetInterceptor(Func interceptor) { - // TODO - dataInterpreterBytes = byteHandler; - dataInterpreterString = stringHandler; + this.interceptor = interceptor; } /// /// Connect to an url and listen for data on the BaseAddress /// /// The type of the expected data - /// The subscription + /// The subscription /// Cancellation token for closing this subscription /// - protected virtual Task> SubscribeAsync(SubscriptionActor subscriptionObject, CancellationToken ct) + protected virtual Task> SubscribeAsync(Subscription subscription, CancellationToken ct) { - return SubscribeAsync(BaseAddress, subscriptionObject, ct); + return SubscribeAsync(BaseAddress, subscription, ct); } /// @@ -168,16 +157,16 @@ namespace CryptoExchange.Net /// /// The type of the expected data /// The URL to connect to - /// The subscription + /// The subscription /// Cancellation token for closing this subscription /// - protected virtual async Task> SubscribeAsync(string url, SubscriptionActor subscriptionObject, CancellationToken ct) + protected virtual async Task> SubscribeAsync(string url, Subscription subscription, CancellationToken ct) { if (_disposing) return new CallResult(new InvalidOperationError("Client disposed, can't subscribe")); SocketConnection socketConnection; - SocketSubscriptionListener? subscription; + MessageListener? messageListener; var released = false; // Wait for a semaphore here, so we only connect 1 socket at a time. // This is necessary for being able to see if connections can be combined @@ -195,15 +184,15 @@ namespace CryptoExchange.Net while (true) { // Get a new or existing socket connection - var socketResult = await GetSocketConnection(url, subscriptionObject.Authenticated).ConfigureAwait(false); + var socketResult = await GetSocketConnection(url, subscription.Authenticated).ConfigureAwait(false); if (!socketResult) return socketResult.As(null); socketConnection = socketResult.Data; // Add a subscription on the socket connection - subscription = AddSubscription(subscriptionObject, true, socketConnection); - if (subscription == null) + messageListener = AddSubscription(subscription, true, socketConnection); + if (messageListener == null) { _logger.Log(LogLevel.Trace, $"Socket {socketConnection.SocketId} failed to add subscription, retrying on different connection"); continue; @@ -218,7 +207,7 @@ namespace CryptoExchange.Net var needsConnecting = !socketConnection.Connected; - var connectResult = await ConnectIfNeededAsync(socketConnection, subscriptionObject.Authenticated).ConfigureAwait(false); + var connectResult = await ConnectIfNeededAsync(socketConnection, subscription.Authenticated).ConfigureAwait(false); if (!connectResult) return new CallResult(connectResult.Error!); @@ -237,35 +226,35 @@ namespace CryptoExchange.Net return new CallResult(new ServerError("Socket is paused")); } - var request = subscriptionObject.GetSubscribeRequest(); + var request = subscription.GetSubRequest(); if (request != null) { // Send the request and wait for answer - var subResult = await SubscribeAndWaitAsync(socketConnection, request, subscription).ConfigureAwait(false); + var subResult = await SubscribeAndWaitAsync(socketConnection, request, messageListener).ConfigureAwait(false); if (!subResult) { _logger.Log(LogLevel.Warning, $"Socket {socketConnection.SocketId} failed to subscribe: {subResult.Error}"); - await socketConnection.CloseAsync(subscription).ConfigureAwait(false); + await socketConnection.CloseAsync(messageListener).ConfigureAwait(false); return new CallResult(subResult.Error!); } } else { // No request to be sent, so just mark the subscription as comfirmed - subscription.Confirmed = true; + messageListener.Confirmed = true; } if (ct != default) { - subscription.CancellationTokenRegistration = ct.Register(async () => + messageListener.CancellationTokenRegistration = ct.Register(async () => { - _logger.Log(LogLevel.Information, $"Socket {socketConnection.SocketId} Cancellation token set, closing subscription"); - await socketConnection.CloseAsync(subscription).ConfigureAwait(false); + _logger.Log(LogLevel.Information, $"Socket {socketConnection.SocketId} Cancellation token set, closing subscription {messageListener.Id}"); + await socketConnection.CloseAsync(messageListener).ConfigureAwait(false); }, false); } - _logger.Log(LogLevel.Information, $"Socket {socketConnection.SocketId} subscription {subscription.Id} completed successfully"); - return new CallResult(new UpdateSubscription(socketConnection, subscription)); + _logger.Log(LogLevel.Information, $"Socket {socketConnection.SocketId} subscription {messageListener.Id} completed successfully"); + return new CallResult(new UpdateSubscription(socketConnection, messageListener)); } /// @@ -273,14 +262,14 @@ namespace CryptoExchange.Net /// /// The connection to send the request on /// The request to send, will be serialized to json - /// The subscription the request is for + /// The message listener for the subscription /// - protected internal virtual async Task> SubscribeAndWaitAsync(SocketConnection socketConnection, object request, SocketSubscriptionListener subscription) + protected internal virtual async Task> SubscribeAndWaitAsync(SocketConnection socketConnection, object request, MessageListener listener) { CallResult? callResult = null; - await socketConnection.SendAndWaitAsync(request, ClientOptions.RequestTimeout, subscription, 1, x => + await socketConnection.SendAndWaitAsync(request, ClientOptions.RequestTimeout, listener, 1, x => { - var (matches, result) = subscription.Subscription!.MessageMatchesSubscribeRequest(x); + var (matches, result) = listener.Subscription!.MessageMatchesSubRequest(x); if (matches) callResult = result; return matches; @@ -288,7 +277,7 @@ namespace CryptoExchange.Net if (callResult?.Success == true) { - subscription.Confirmed = true; + listener.Confirmed = true; return new CallResult(true); } @@ -303,11 +292,10 @@ namespace CryptoExchange.Net /// /// Expected result type /// The query - /// Weight of the request /// - protected virtual Task> QueryAsync(QueryActor query, int weight = 1) + protected virtual Task> QueryAsync(Query query) { - return QueryAsync(BaseAddress, query, weight); + return QueryAsync(BaseAddress, query); } /// @@ -315,10 +303,9 @@ namespace CryptoExchange.Net /// /// The expected result type /// The url for the request - /// The request to send - /// Weight of the request + /// The query /// - protected virtual async Task> QueryAsync(string url, QueryActor request, int weight = 1) + protected virtual async Task> QueryAsync(string url, Query query) { if (_disposing) return new CallResult(new InvalidOperationError("Client disposed, can't query")); @@ -328,7 +315,7 @@ namespace CryptoExchange.Net await semaphoreSlim.WaitAsync().ConfigureAwait(false); try { - var socketResult = await GetSocketConnection(url, request.Authenticated).ConfigureAwait(false); + var socketResult = await GetSocketConnection(url, query.Authenticated).ConfigureAwait(false); if (!socketResult) return socketResult.As(default); @@ -341,7 +328,7 @@ namespace CryptoExchange.Net released = true; } - var connectResult = await ConnectIfNeededAsync(socketConnection, request.Authenticated).ConfigureAwait(false); + var connectResult = await ConnectIfNeededAsync(socketConnection, query.Authenticated).ConfigureAwait(false); if (!connectResult) return new CallResult(connectResult.Error!); } @@ -357,7 +344,7 @@ namespace CryptoExchange.Net return new CallResult(new ServerError("Socket is paused")); } - return await QueryAndWaitAsync(socketConnection, request, weight).ConfigureAwait(false); + return await QueryAndWaitAsync(socketConnection, query).ConfigureAwait(false); } /// @@ -365,18 +352,17 @@ namespace CryptoExchange.Net /// /// The expected result type /// The connection to send and wait on - /// The request to send - /// The weight of the query + /// The query /// - protected virtual async Task> QueryAndWaitAsync(SocketConnection socket, QueryActor request, int weight) + protected virtual async Task> QueryAndWaitAsync(SocketConnection socket, Query query) { var dataResult = new CallResult(new ServerError("No response on query received")); - await socket.SendAndWaitAsync(request.Query, ClientOptions.RequestTimeout, null, weight, x => + await socket.SendAndWaitAsync(query.Request, ClientOptions.RequestTimeout, null, query.Weight, x => { - var matches = request.MessageMatchesQuery(x); + var matches = query.MessageMatchesQuery(x); if (matches) { - request.HandleResponse(x); + query.HandleResponse(x); return true; } @@ -420,7 +406,7 @@ namespace CryptoExchange.Net _logger.Log(LogLevel.Debug, $"Socket {socket.SocketId} Attempting to authenticate"); var authRequest = GetAuthenticationRequest(); var authResult = new CallResult(new ServerError("No response from server")); - await socket.SendAndWaitAsync(authRequest.Query, ClientOptions.RequestTimeout, null, 1, x => + await socket.SendAndWaitAsync(authRequest.Request, ClientOptions.RequestTimeout, null, 1, x => { var matches = authRequest.MessageMatchesQuery(x); if (matches) @@ -451,33 +437,23 @@ namespace CryptoExchange.Net /// Should return the request which can be used to authenticate a socket connection /// /// - protected internal abstract QueryActor GetAuthenticationRequest(); - - /// - /// Optional handler to interpolate data before sending it to the handlers - /// - /// - /// - //protected internal virtual JToken ProcessTokenData(JToken message) - //{ - // return message; - //} + protected internal abstract Query GetAuthenticationRequest(); /// /// Add a subscription to a connection /// /// The type of data the subscription expects - /// The subscription + /// The subscription /// Whether or not this is a user subscription (counts towards the max amount of handlers on a socket) /// The socket connection the handler is on /// - protected virtual SocketSubscriptionListener? AddSubscription(SubscriptionActor subscriptionObject, bool userSubscription, SocketConnection connection) + protected virtual MessageListener? AddSubscription(Subscription subscription, bool userSubscription, SocketConnection connection) { - var subscription = new SocketSubscriptionListener(ExchangeHelpers.NextId(), subscriptionObject, userSubscription); - if (!connection.AddSubscription(subscription)) + var messageListener = new MessageListener(ExchangeHelpers.NextId(), subscription, userSubscription); + if (!connection.AddListener(messageListener)) return null; - return subscription; + return messageListener; } /// @@ -486,10 +462,10 @@ namespace CryptoExchange.Net /// The subscription protected void AddSystemSubscription(SystemSubscription systemSubscription) { - genericHandlers.Add(systemSubscription); - var subscription = new SocketSubscriptionListener(ExchangeHelpers.NextId(), systemSubscription, false); + systemSubscriptions.Add(systemSubscription); + var subscription = new MessageListener(ExchangeHelpers.NextId(), systemSubscription, false); foreach (var connection in socketConnections.Values) - connection.AddSubscription(subscription); + connection.AddListener(subscription); } /// @@ -534,11 +510,11 @@ namespace CryptoExchange.Net var socketResult = socketConnections.Where(s => (s.Value.Status == SocketConnection.SocketStatus.None || s.Value.Status == SocketConnection.SocketStatus.Connected) && s.Value.Tag.TrimEnd('/') == address.TrimEnd('/') && (s.Value.ApiClient.GetType() == GetType()) - && (s.Value.Authenticated == authenticated || !authenticated) && s.Value.Connected).OrderBy(s => s.Value.SubscriptionCount).FirstOrDefault(); + && (s.Value.Authenticated == authenticated || !authenticated) && s.Value.Connected).OrderBy(s => s.Value.UserListenerCount).FirstOrDefault(); var result = socketResult.Equals(default(KeyValuePair)) ? null : socketResult.Value; if (result != null) { - if (result.SubscriptionCount < ClientOptions.SocketSubscriptionsCombineTarget || (socketConnections.Count >= (ApiOptions.MaxSocketConnections ?? ClientOptions.MaxSocketConnections) && socketConnections.All(s => s.Value.SubscriptionCount >= ClientOptions.SocketSubscriptionsCombineTarget))) + if (result.UserListenerCount < ClientOptions.SocketSubscriptionsCombineTarget || (socketConnections.Count >= (ApiOptions.MaxSocketConnections ?? ClientOptions.MaxSocketConnections) && socketConnections.All(s => s.Value.UserListenerCount >= ClientOptions.SocketSubscriptionsCombineTarget))) { // Use existing socket if it has less than target connections OR it has the least connections and we can't make new return new CallResult(result); @@ -560,10 +536,10 @@ namespace CryptoExchange.Net var socketConnection = new SocketConnection(_logger, this, socket, address); socketConnection.UnhandledMessage += HandleUnhandledMessage; - foreach (var systemHandler in genericHandlers) + foreach (var systemSubscription in systemSubscriptions) { - var handler = new SocketSubscriptionListener(ExchangeHelpers.NextId(), systemHandler, false); - socketConnection.AddSubscription(handler); + var handler = new MessageListener(ExchangeHelpers.NextId(), systemSubscription, false); + socketConnection.AddListener(handler); } return new CallResult(socketConnection); @@ -602,8 +578,7 @@ namespace CryptoExchange.Net protected virtual WebSocketParameters GetWebSocketParameters(string address) => new(new Uri(address), ClientOptions.AutoReconnect) { - DataInterpreterBytes = dataInterpreterBytes, - DataInterpreterString = dataInterpreterString, + Interceptor = interceptor, KeepAliveInterval = KeepAliveInterval, ReconnectInterval = ClientOptions.ReconnectInterval, RateLimiters = RateLimiters, @@ -677,11 +652,11 @@ namespace CryptoExchange.Net /// public virtual async Task UnsubscribeAsync(int subscriptionId) { - SocketSubscriptionListener? subscription = null; + MessageListener? subscription = null; SocketConnection? connection = null; foreach (var socket in socketConnections.Values.ToList()) { - subscription = socket.GetSubscription(subscriptionId); + subscription = socket.GetListener(subscriptionId); if (subscription != null) { connection = socket; @@ -717,11 +692,11 @@ namespace CryptoExchange.Net /// public virtual async Task UnsubscribeAllAsync() { - var sum = socketConnections.Sum(s => s.Value.SubscriptionCount); + var sum = socketConnections.Sum(s => s.Value.UserListenerCount); if (sum == 0) return; - _logger.Log(LogLevel.Information, $"Unsubscribing all {socketConnections.Sum(s => s.Value.SubscriptionCount)} subscriptions"); + _logger.Log(LogLevel.Information, $"Unsubscribing all {socketConnections.Sum(s => s.Value.UserListenerCount)} subscriptions"); var tasks = new List(); { var socketList = socketConnections.Values; @@ -758,8 +733,8 @@ namespace CryptoExchange.Net sb.AppendLine($"{socketConnections.Count} connections, {CurrentSubscriptions} subscriptions, kbps: {IncomingKbps}"); foreach (var connection in socketConnections) { - sb.AppendLine($" Connection {connection.Key}: {connection.Value.SubscriptionCount} subscriptions, status: {connection.Value.Status}, authenticated: {connection.Value.Authenticated}, kbps: {connection.Value.IncomingKbps}"); - foreach (var subscription in connection.Value.Subscriptions) + sb.AppendLine($" Connection {connection.Key}: {connection.Value.UserListenerCount} subscriptions, status: {connection.Value.Status}, authenticated: {connection.Value.Authenticated}, kbps: {connection.Value.IncomingKbps}"); + foreach (var subscription in connection.Value.MessageListeners) sb.AppendLine($" Subscription {subscription.Id}, authenticated: {subscription.Authenticated}, confirmed: {subscription.Confirmed}"); } return sb.ToString(); @@ -773,7 +748,7 @@ namespace CryptoExchange.Net _disposing = true; periodicEvent?.Set(); periodicEvent?.Dispose(); - if (socketConnections.Sum(s => s.Value.SubscriptionCount) > 0) + if (socketConnections.Sum(s => s.Value.UserListenerCount) > 0) { _logger.Log(LogLevel.Debug, "Disposing socket client, closing all subscriptions"); _ = UnsubscribeAllAsync(); diff --git a/CryptoExchange.Net/Interfaces/IWebsocket.cs b/CryptoExchange.Net/Interfaces/IWebsocket.cs index 2d7145a..2ab78a8 100644 --- a/CryptoExchange.Net/Interfaces/IWebsocket.cs +++ b/CryptoExchange.Net/Interfaces/IWebsocket.cs @@ -20,7 +20,7 @@ namespace CryptoExchange.Net.Interfaces /// /// Websocket message received event /// - event Func OnStreamMessage; + event Func OnStreamMessage; /// /// Websocket sent event, RequestId as parameter /// diff --git a/CryptoExchange.Net/Objects/Sockets/PendingRequest.cs b/CryptoExchange.Net/Objects/Sockets/PendingRequest.cs index 9e26257..5ef161c 100644 --- a/CryptoExchange.Net/Objects/Sockets/PendingRequest.cs +++ b/CryptoExchange.Net/Objects/Sockets/PendingRequest.cs @@ -1,5 +1,4 @@ using CryptoExchange.Net.Interfaces; -using CryptoExchange.Net.Objects; using System; using System.Threading; using System.Threading.Tasks; @@ -14,20 +13,20 @@ namespace CryptoExchange.Net.Objects.Sockets public AsyncResetEvent Event { get; } public DateTime RequestTimestamp { get; set; } public TimeSpan Timeout { get; } - public SocketSubscriptionListener? Subscription { get; } + public MessageListener? MessageListener { get; } private CancellationTokenSource? _cts; public int Priority => 100; - public PendingRequest(int id, Func messageMatchesHandler, TimeSpan timeout, SocketSubscriptionListener? subscription) + public PendingRequest(int id, Func messageMatchesHandler, TimeSpan timeout, MessageListener? subscription) { Id = id; MessageMatchesHandler = messageMatchesHandler; Event = new AsyncResetEvent(false, false); Timeout = timeout; RequestTimestamp = DateTime.UtcNow; - Subscription = subscription; + MessageListener = subscription; } public void IsSend() diff --git a/CryptoExchange.Net/Objects/Sockets/SocketSubscription.cs b/CryptoExchange.Net/Objects/Sockets/SocketSubscription.cs index d662738..fb3bd62 100644 --- a/CryptoExchange.Net/Objects/Sockets/SocketSubscription.cs +++ b/CryptoExchange.Net/Objects/Sockets/SocketSubscription.cs @@ -7,12 +7,12 @@ using System.Threading.Tasks; namespace CryptoExchange.Net.Objects.Sockets { /// - /// Socket subscription + /// Socket listener /// - public class SocketSubscriptionListener : IStreamMessageListener + public class MessageListener : IStreamMessageListener { /// - /// Unique subscription id + /// Unique listener id /// public int Id { get; } @@ -24,12 +24,12 @@ namespace CryptoExchange.Net.Objects.Sockets /// /// The request object send when subscribing on the server. Either this or the `Identifier` property should be set /// - public SubscriptionActor Subscription { get; set; } + public Subscription Subscription { get; set; } /// /// Whether this is a user subscription or an internal listener /// - public bool UserSubscription { get; set; } + public bool UserListener { get; set; } /// /// If the subscription has been confirmed to be subscribed by the server @@ -58,10 +58,10 @@ namespace CryptoExchange.Net.Objects.Sockets /// /// /// - public SocketSubscriptionListener(int id, SubscriptionActor request, bool userSubscription) + public MessageListener(int id, Subscription request, bool userSubscription) { Id = id; - UserSubscription = userSubscription; + UserListener = userSubscription; Subscription = request; } @@ -84,7 +84,7 @@ namespace CryptoExchange.Net.Objects.Sockets /// /// /// - public bool MessageMatches(StreamMessage message) => Subscription.MessageMatchesSubscription(message); + public bool MessageMatches(StreamMessage message) => Subscription.MessageMatchesEvent(message); /// /// Process the message diff --git a/CryptoExchange.Net/Objects/Sockets/StreamMessage.cs b/CryptoExchange.Net/Objects/Sockets/StreamMessage.cs index 3930918..b4d94e4 100644 --- a/CryptoExchange.Net/Objects/Sockets/StreamMessage.cs +++ b/CryptoExchange.Net/Objects/Sockets/StreamMessage.cs @@ -20,7 +20,7 @@ namespace CryptoExchange.Net.Objects.Sockets /// /// The data stream /// - public MemoryStream Stream { get; } + public Stream Stream { get; } /// /// Receive timestamp /// @@ -45,6 +45,9 @@ namespace CryptoExchange.Net.Objects.Sockets return result; } + /// + /// Dispose + /// public void Dispose() { Stream.Dispose(); @@ -56,7 +59,7 @@ namespace CryptoExchange.Net.Objects.Sockets /// /// /// - public StreamMessage(SocketConnection connection, MemoryStream stream, DateTime timestamp) + public StreamMessage(SocketConnection connection, Stream stream, DateTime timestamp) { Connection = connection; Stream = stream; diff --git a/CryptoExchange.Net/Objects/Sockets/UpdateSubscription.cs b/CryptoExchange.Net/Objects/Sockets/UpdateSubscription.cs index ea23216..e4fdede 100644 --- a/CryptoExchange.Net/Objects/Sockets/UpdateSubscription.cs +++ b/CryptoExchange.Net/Objects/Sockets/UpdateSubscription.cs @@ -11,7 +11,7 @@ namespace CryptoExchange.Net.Objects.Sockets public class UpdateSubscription { private readonly SocketConnection _connection; - private readonly SocketSubscriptionListener _subscription; + private readonly MessageListener _listener; /// /// Event when the connection is lost. The socket will automatically reconnect when possible. @@ -65,8 +65,8 @@ namespace CryptoExchange.Net.Objects.Sockets /// public event Action Exception { - add => _subscription.Exception += value; - remove => _subscription.Exception -= value; + add => _listener.Exception += value; + remove => _listener.Exception -= value; } /// @@ -77,17 +77,17 @@ namespace CryptoExchange.Net.Objects.Sockets /// /// The id of the subscription /// - public int Id => _subscription.Id; + public int Id => _listener.Id; /// /// ctor /// /// The socket connection the subscription is on /// The subscription - public UpdateSubscription(SocketConnection connection, SocketSubscriptionListener subscription) + public UpdateSubscription(SocketConnection connection, MessageListener subscription) { _connection = connection; - _subscription = subscription; + _listener = subscription; } /// @@ -96,7 +96,7 @@ namespace CryptoExchange.Net.Objects.Sockets /// public Task CloseAsync() { - return _connection.CloseAsync(_subscription); + return _connection.CloseAsync(_listener); } /// @@ -114,7 +114,7 @@ namespace CryptoExchange.Net.Objects.Sockets /// internal async Task UnsubscribeAsync() { - await _connection.UnsubscribeAsync(_subscription).ConfigureAwait(false); + await _connection.UnsubscribeAsync(_listener).ConfigureAwait(false); } /// @@ -123,7 +123,7 @@ namespace CryptoExchange.Net.Objects.Sockets /// internal async Task> ResubscribeAsync() { - return await _connection.ResubscribeAsync(_subscription).ConfigureAwait(false); + return await _connection.ResubscribeAsync(_listener).ConfigureAwait(false); } } } diff --git a/CryptoExchange.Net/Objects/Sockets/WebSocketParameters.cs b/CryptoExchange.Net/Objects/Sockets/WebSocketParameters.cs index e65481f..c4dde81 100644 --- a/CryptoExchange.Net/Objects/Sockets/WebSocketParameters.cs +++ b/CryptoExchange.Net/Objects/Sockets/WebSocketParameters.cs @@ -2,6 +2,7 @@ using CryptoExchange.Net.Objects; using System; using System.Collections.Generic; +using System.IO; using System.Text; using System.Threading.Tasks; @@ -63,9 +64,9 @@ namespace CryptoExchange.Net.Objects.Sockets public string? Origin { get; set; } /// - /// Delegate used for processing byte data received from socket connections before it is processed by handlers + /// Delegate used for manipulating data received from socket connections before it is processed by listeners /// - public Func? DataInterpreterBytes { get; set; } + public Func? Interceptor { get; set; } /// /// Delegate used for processing string data received from socket connections before it is processed by handlers diff --git a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs index 0c73aaf..edbdbce 100644 --- a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs +++ b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs @@ -101,7 +101,7 @@ namespace CryptoExchange.Net.Sockets public event Action? OnClose; /// - public event Func? OnStreamMessage; + public event Func? OnStreamMessage; /// public event Action? OnRequestSent; @@ -521,7 +521,7 @@ namespace CryptoExchange.Net.Sockets { // Received a complete message and it's not multi part _logger.Log(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in single message"); - await ProcessByteData(new MemoryStream(buffer.Array, buffer.Offset, receiveResult.Count), receiveResult.MessageType).ConfigureAwait(false); + await ProcessData(new MemoryStream(buffer.Array, buffer.Offset, receiveResult.Count), receiveResult.MessageType).ConfigureAwait(false); } else { @@ -555,7 +555,7 @@ namespace CryptoExchange.Net.Sockets { // Reassemble complete message from memory stream _logger.Log(LogLevel.Trace, $"Socket {Id} reassembled message of {memoryStream!.Length} bytes"); - await ProcessByteData(memoryStream, receiveResult.MessageType).ConfigureAwait(false); + await ProcessData(memoryStream, receiveResult.MessageType).ConfigureAwait(false); memoryStream.Dispose(); } else @@ -580,10 +580,12 @@ namespace CryptoExchange.Net.Sockets } } - private async Task ProcessByteData(MemoryStream memoryStream, WebSocketMessageType messageType) + private async Task ProcessData(Stream stream, WebSocketMessageType messageType) { + if (Parameters.Interceptor != null) + stream = Parameters.Interceptor.Invoke(stream); if (OnStreamMessage != null) - await OnStreamMessage.Invoke(memoryStream).ConfigureAwait(false); + await OnStreamMessage.Invoke(stream).ConfigureAwait(false); } /// diff --git a/CryptoExchange.Net/Sockets/QueryActor.cs b/CryptoExchange.Net/Sockets/Query.cs similarity index 72% rename from CryptoExchange.Net/Sockets/QueryActor.cs rename to CryptoExchange.Net/Sockets/Query.cs index bae03a4..f08edf2 100644 --- a/CryptoExchange.Net/Sockets/QueryActor.cs +++ b/CryptoExchange.Net/Sockets/Query.cs @@ -6,18 +6,23 @@ namespace CryptoExchange.Net.Sockets /// /// Query /// - public abstract class QueryActor + public abstract class Query { /// /// The query request /// - public object Query { get; set; } + public object Request { get; set; } /// /// If this is a private request /// public bool Authenticated { get; } + /// + /// Weight of the query + /// + public int Weight { get; } + /// /// Check if the message is the response to the query /// @@ -34,12 +39,14 @@ namespace CryptoExchange.Net.Sockets /// /// ctor /// - /// + /// /// - public QueryActor(object query, bool authenticated) + /// + public Query(object request, bool authenticated, int weight = 1) { Authenticated = authenticated; - Query = query; + Request = request; + Weight = weight; } } } diff --git a/CryptoExchange.Net/Sockets/SocketConnection.cs b/CryptoExchange.Net/Sockets/SocketConnection.cs index 74df18d..90bda3c 100644 --- a/CryptoExchange.Net/Sockets/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/SocketConnection.cs @@ -5,7 +5,6 @@ using System.Diagnostics; using System.Linq; using System.Threading.Tasks; using Newtonsoft.Json; -using Newtonsoft.Json.Linq; using Microsoft.Extensions.Logging; using CryptoExchange.Net.Objects; using System.Net.WebSockets; @@ -50,23 +49,23 @@ namespace CryptoExchange.Net.Sockets public event Action? UnhandledMessage; /// - /// The amount of subscriptions on this connection + /// The amount of listeners on this connection /// - public int SubscriptionCount + public int UserListenerCount { - get { lock (_subscriptionLock) - return _subscriptions.Count(h => h.UserSubscription); } + get { lock (_listenerLock) + return _listeners.Count(h => h.UserListener); } } /// - /// Get a copy of the current subscriptions + /// Get a copy of the current message listeners /// - public SocketSubscriptionListener[] Subscriptions + public MessageListener[] MessageListeners { get { - lock (_subscriptionLock) - return _subscriptions.Where(h => h.UserSubscription).ToArray(); + lock (_listenerLock) + return _listeners.Where(h => h.UserListener).ToArray(); } } @@ -151,10 +150,10 @@ namespace CryptoExchange.Net.Sockets } private bool _pausedActivity; - private readonly List _subscriptions; - private readonly List _messageListeners; + private readonly List _listeners; + private readonly List _messageListeners; // ? - private readonly object _subscriptionLock = new(); + private readonly object _listenerLock = new(); private readonly ILogger _logger; private SocketStatus _status; @@ -179,7 +178,7 @@ namespace CryptoExchange.Net.Sockets Properties = new Dictionary(); _messageListeners = new List(); - _subscriptions = new List(); + _listeners = new List(); _socket = socket; _socket.OnStreamMessage += HandleStreamMessage; @@ -208,10 +207,10 @@ namespace CryptoExchange.Net.Sockets { Status = SocketStatus.Closed; Authenticated = false; - lock(_subscriptionLock) + lock(_listenerLock) { - foreach (var sub in _subscriptions) - sub.Confirmed = false; + foreach (var listener in _listeners) + listener.Confirmed = false; } Task.Run(() => ConnectionClosed?.Invoke()); } @@ -224,10 +223,10 @@ namespace CryptoExchange.Net.Sockets Status = SocketStatus.Reconnecting; DisconnectTime = DateTime.UtcNow; Authenticated = false; - lock (_subscriptionLock) + lock (_listenerLock) { - foreach (var sub in _subscriptions) - sub.Confirmed = false; + foreach (var listener in _listeners) + listener.Confirmed = false; } _ = Task.Run(() => ConnectionLost?.Invoke()); @@ -310,14 +309,19 @@ namespace CryptoExchange.Net.Sockets /// /// /// - protected virtual async Task HandleStreamMessage(MemoryStream stream) + protected virtual async Task HandleStreamMessage(Stream stream) { var timestamp = DateTime.UtcNow; var streamMessage = new StreamMessage(this, stream, timestamp); var handledResponse = false; - SocketSubscriptionListener? currentSubscription = null; + MessageListener? currentSubscription = null; TimeSpan userCodeDuration = TimeSpan.Zero; - foreach (var listener in _messageListeners.OrderByDescending(x => x.Priority).ToList()) // LOCK + + List listeners; + lock (_listenerLock) + listeners = _messageListeners.OrderByDescending(x => x.Priority).ToList(); + + foreach (var listener in listeners) { if (listener.MessageMatches(streamMessage)) { @@ -329,10 +333,10 @@ namespace CryptoExchange.Net.Sockets if (pendingRequest.Completed) { // Answer to a timed out request, unsub if it is a subscription request - if (pendingRequest.Subscription != null) + if (pendingRequest.MessageListener != null) { _logger.Log(LogLevel.Warning, $"Socket {SocketId} Received subscription info after request timed out; unsubscribing. Consider increasing the RequestTimeout"); - _ = UnsubscribeAsync(pendingRequest.Subscription).ConfigureAwait(false); + _ = UnsubscribeAsync(pendingRequest.MessageListener).ConfigureAwait(false); } } else @@ -347,7 +351,7 @@ namespace CryptoExchange.Net.Sockets handledResponse = true; break; } - else if (listener is SocketSubscriptionListener subscription) + else if (listener is MessageListener subscription) { currentSubscription = subscription; handledResponse = true; @@ -398,12 +402,12 @@ namespace CryptoExchange.Net.Sockets if (ApiClient.socketConnections.ContainsKey(SocketId)) ApiClient.socketConnections.TryRemove(SocketId, out _); - lock (_subscriptionLock) + lock (_listenerLock) { - foreach (var subscription in _subscriptions) + foreach (var listener in _listeners) { - if (subscription.CancellationTokenRegistration.HasValue) - subscription.CancellationTokenRegistration.Value.Dispose(); + if (listener.CancellationTokenRegistration.HasValue) + listener.CancellationTokenRegistration.Value.Dispose(); } } @@ -412,32 +416,32 @@ namespace CryptoExchange.Net.Sockets } /// - /// Close a subscription on this connection. If all subscriptions on this connection are closed the connection gets closed as well + /// Close a listener on this connection. If all listener on this connection are closed the connection gets closed as well /// - /// Subscription to close + /// Listener to close /// - public async Task CloseAsync(SocketSubscriptionListener subscription) + public async Task CloseAsync(MessageListener listener) { - lock (_subscriptionLock) + lock (_listenerLock) { - if (!_subscriptions.Contains(subscription)) + if (!_listeners.Contains(listener)) return; - subscription.Closed = true; + listener.Closed = true; } if (Status == SocketStatus.Closing || Status == SocketStatus.Closed || Status == SocketStatus.Disposed) return; - _logger.Log(LogLevel.Debug, $"Socket {SocketId} closing subscription {subscription.Id}"); - if (subscription.CancellationTokenRegistration.HasValue) - subscription.CancellationTokenRegistration.Value.Dispose(); + _logger.Log(LogLevel.Debug, $"Socket {SocketId} closing listener {listener.Id}"); + if (listener.CancellationTokenRegistration.HasValue) + listener.CancellationTokenRegistration.Value.Dispose(); - if (subscription.Confirmed && _socket.IsOpen) - await UnsubscribeAsync(subscription).ConfigureAwait(false); + if (listener.Confirmed && _socket.IsOpen) + await UnsubscribeAsync(listener).ConfigureAwait(false); bool shouldCloseConnection; - lock (_subscriptionLock) + lock (_listenerLock) { if (Status == SocketStatus.Closing) { @@ -445,21 +449,21 @@ namespace CryptoExchange.Net.Sockets return; } - shouldCloseConnection = _subscriptions.All(r => !r.UserSubscription || r.Closed); + shouldCloseConnection = _listeners.All(r => !r.UserListener || r.Closed); if (shouldCloseConnection) Status = SocketStatus.Closing; } if (shouldCloseConnection) { - _logger.Log(LogLevel.Debug, $"Socket {SocketId} closing as there are no more subscriptions"); + _logger.Log(LogLevel.Debug, $"Socket {SocketId} closing as there are no more listeners"); await CloseAsync().ConfigureAwait(false); } - lock (_subscriptionLock) + lock (_listenerLock) { - _messageListeners.Remove(subscription); - _subscriptions.Remove(subscription); + _messageListeners.Remove(listener); + _listeners.Remove(listener); } } @@ -473,44 +477,44 @@ namespace CryptoExchange.Net.Sockets } /// - /// Add a subscription to this connection + /// Add a listener to this connection /// - /// - public bool AddSubscription(SocketSubscriptionListener subscription) + /// + public bool AddListener(MessageListener listener) { - lock (_subscriptionLock) + lock (_listenerLock) { if (Status != SocketStatus.None && Status != SocketStatus.Connected) return false; - _subscriptions.Add(subscription); - _messageListeners.Add(subscription); + _listeners.Add(listener); + _messageListeners.Add(listener); - if (subscription.UserSubscription) - _logger.Log(LogLevel.Debug, $"Socket {SocketId} adding new subscription with id {subscription.Id}, total subscriptions on connection: {_subscriptions.Count(s => s.UserSubscription)}"); + if (listener.UserListener) + _logger.Log(LogLevel.Debug, $"Socket {SocketId} adding new listener with id {listener.Id}, total listeners on connection: {_listeners.Count(s => s.UserListener)}"); return true; } } /// - /// Get a subscription on this connection by id + /// Get a listener on this connection by id /// /// - public SocketSubscriptionListener? GetSubscription(int id) + public MessageListener? GetListener(int id) { - lock (_subscriptionLock) - return _subscriptions.SingleOrDefault(s => s.Id == id); + lock (_listenerLock) + return _listeners.SingleOrDefault(s => s.Id == id); } /// - /// Get a subscription on this connection by its subscribe request + /// Get a listener on this connection by its subscribe request /// /// Filter for a request /// - public SocketSubscriptionListener? GetSubscriptionByRequest(Func predicate) + public MessageListener? GetListenerByRequest(Func predicate) { - lock(_subscriptionLock) - return _subscriptions.SingleOrDefault(s => predicate(s.Subscription)); + lock(_listenerLock) + return _listeners.SingleOrDefault(s => predicate(s.Subscription)); } /// @@ -519,13 +523,13 @@ namespace CryptoExchange.Net.Sockets /// The data type expected in response /// The object to send /// The timeout for response - /// Subscription if this is a subscribe request + /// Listener if this is a subscribe request /// The response handler /// The weight of the message /// - public virtual async Task SendAndWaitAsync(T obj, TimeSpan timeout, SocketSubscriptionListener? subscription, int weight, Func handler) + public virtual async Task SendAndWaitAsync(T obj, TimeSpan timeout, MessageListener? listener, int weight, Func handler) { - var pending = new PendingRequest(ExchangeHelpers.NextId(), handler, timeout, subscription); + var pending = new PendingRequest(ExchangeHelpers.NextId(), handler, timeout, listener); lock (_messageListeners) { _messageListeners.Add(pending); @@ -598,8 +602,8 @@ namespace CryptoExchange.Net.Sockets return new CallResult(new WebError("Socket not connected")); bool anySubscriptions = false; - lock (_subscriptionLock) - anySubscriptions = _subscriptions.Any(s => s.UserSubscription); + lock (_listenerLock) + anySubscriptions = _listeners.Any(s => s.UserListener); if (!anySubscriptions) { @@ -610,8 +614,8 @@ namespace CryptoExchange.Net.Sockets } bool anyAuthenticated = false; - lock (_subscriptionLock) - anyAuthenticated = _subscriptions.Any(s => s.Authenticated); + lock (_listenerLock) + anyAuthenticated = _listeners.Any(s => s.Authenticated); if (anyAuthenticated) { @@ -628,21 +632,21 @@ namespace CryptoExchange.Net.Sockets } // Get a list of all subscriptions on the socket - List subscriptionList = new List(); - lock (_subscriptionLock) + List listenerList = new List(); + lock (_listenerLock) { - foreach (var subscription in _subscriptions) + foreach (var listener in _listeners) { - if (subscription.Subscription != null) - subscriptionList.Add(subscription); + if (listener.Subscription != null) + listenerList.Add(listener); else - subscription.Confirmed = true; + listener.Confirmed = true; } } - foreach(var subscription in subscriptionList.Where(s => s.Subscription != null)) + foreach(var listener in listenerList.Where(s => s.Subscription != null)) { - var result = await ApiClient.RevitalizeRequestAsync(subscription.Subscription!).ConfigureAwait(false); + var result = await ApiClient.RevitalizeRequestAsync(listener.Subscription!).ConfigureAwait(false); if (!result) { _logger.Log(LogLevel.Warning, $"Socket {SocketId} Failed request revitalization: " + result.Error); @@ -651,22 +655,22 @@ namespace CryptoExchange.Net.Sockets } // Foreach subscription which is subscribed by a subscription request we will need to resend that request to resubscribe - for (var i = 0; i < subscriptionList.Count; i += ApiClient.ClientOptions.MaxConcurrentResubscriptionsPerSocket) + for (var i = 0; i < listenerList.Count; i += ApiClient.ClientOptions.MaxConcurrentResubscriptionsPerSocket) { if (!_socket.IsOpen) return new CallResult(new WebError("Socket not connected")); var taskList = new List>>(); - foreach (var subscription in subscriptionList.Skip(i).Take(ApiClient.ClientOptions.MaxConcurrentResubscriptionsPerSocket)) - taskList.Add(ApiClient.SubscribeAndWaitAsync(this, subscription.Subscription!, subscription)); + foreach (var listener in listenerList.Skip(i).Take(ApiClient.ClientOptions.MaxConcurrentResubscriptionsPerSocket)) + taskList.Add(ApiClient.SubscribeAndWaitAsync(this, listener.Subscription!, listener)); await Task.WhenAll(taskList).ConfigureAwait(false); if (taskList.Any(t => !t.Result.Success)) return taskList.First(t => !t.Result.Success).Result; } - foreach (var subscription in subscriptionList) - subscription.Confirmed = true; + foreach (var listener in listenerList) + listener.Confirmed = true; if (!_socket.IsOpen) return new CallResult(new WebError("Socket not connected")); @@ -675,26 +679,26 @@ namespace CryptoExchange.Net.Sockets return new CallResult(true); } - internal async Task UnsubscribeAsync(SocketSubscriptionListener socketSubscription) + internal async Task UnsubscribeAsync(MessageListener listener) { - var unsubscribeRequest = socketSubscription.Subscription?.GetUnsubscribeRequest(); + var unsubscribeRequest = listener.Subscription?.GetUnsubRequest(); if (unsubscribeRequest != null) { - await SendAndWaitAsync(unsubscribeRequest, TimeSpan.FromSeconds(10), socketSubscription, 0, x => + await SendAndWaitAsync(unsubscribeRequest, TimeSpan.FromSeconds(10), listener, 0, x => { - var (matches, result) = socketSubscription.Subscription!.MessageMatchesUnsubscribeRequest(x); + var (matches, result) = listener.Subscription!.MessageMatchesUnsubRequest(x); // TODO check result? return matches; }).ConfigureAwait(false); } } - internal async Task> ResubscribeAsync(SocketSubscriptionListener socketSubscription) + internal async Task> ResubscribeAsync(MessageListener listener) { if (!_socket.IsOpen) return new CallResult(new UnknownError("Socket is not connected")); - return await ApiClient.SubscribeAndWaitAsync(this, socketSubscription.Subscription!, socketSubscription).ConfigureAwait(false); + return await ApiClient.SubscribeAndWaitAsync(this, listener.Subscription!, listener).ConfigureAwait(false); } /// diff --git a/CryptoExchange.Net/Sockets/SubscriptionActor.cs b/CryptoExchange.Net/Sockets/Subscription.cs similarity index 81% rename from CryptoExchange.Net/Sockets/SubscriptionActor.cs rename to CryptoExchange.Net/Sockets/Subscription.cs index 12967ac..324b5fb 100644 --- a/CryptoExchange.Net/Sockets/SubscriptionActor.cs +++ b/CryptoExchange.Net/Sockets/Subscription.cs @@ -12,7 +12,7 @@ namespace CryptoExchange.Net.Sockets /// /// Subscription base /// - public abstract class SubscriptionActor + public abstract class Subscription { private bool _outputOriginalData; @@ -32,7 +32,7 @@ namespace CryptoExchange.Net.Sockets /// /// /// - public SubscriptionActor(ILogger logger, ISocketApiClient apiClient, bool authenticated) + public Subscription(ILogger logger, ISocketApiClient apiClient, bool authenticated) { _logger = logger; _outputOriginalData = apiClient.ApiOptions.OutputOriginalData ?? apiClient.ClientOptions.OutputOriginalData; @@ -43,32 +43,32 @@ namespace CryptoExchange.Net.Sockets /// Get the subscribe object to send when subscribing /// /// - public abstract object? GetSubscribeRequest(); + public abstract object? GetSubRequest(); /// /// Check if the message is the response to the subscribe request /// /// /// - public abstract (bool, CallResult?) MessageMatchesSubscribeRequest(StreamMessage message); + public abstract (bool, CallResult?) MessageMatchesSubRequest(StreamMessage message); /// /// Get the unsubscribe object to send when unsubscribing /// /// - public abstract object? GetUnsubscribeRequest(); + public abstract object? GetUnsubRequest(); /// /// Check if the message is the response to the unsubscribe request /// /// /// - public abstract (bool, CallResult?) MessageMatchesUnsubscribeRequest(StreamMessage message); + public abstract (bool, CallResult?) MessageMatchesUnsubRequest(StreamMessage message); /// /// Check if the message is an update for this subscription /// /// /// - public abstract bool MessageMatchesSubscription(StreamMessage message); + public abstract bool MessageMatchesEvent(StreamMessage message); /// /// Handle the update message /// @@ -85,7 +85,7 @@ namespace CryptoExchange.Net.Sockets /// /// /// - protected DataEvent CreateDataEvent(T obj, StreamMessage message, string? topic = null, SocketUpdateType? type = null) + protected virtual DataEvent CreateDataEvent(T obj, StreamMessage message, string? topic = null, SocketUpdateType? type = null) { string? originalData = null; if (_outputOriginalData) @@ -101,7 +101,7 @@ namespace CryptoExchange.Net.Sockets /// /// /// - protected Task> DeserializeAsync(StreamMessage message, JsonSerializerSettings settings) + protected virtual Task> DeserializeAsync(StreamMessage message, JsonSerializerSettings settings) { var serializer = JsonSerializer.Create(settings); using var sr = new StreamReader(message.Stream, Encoding.UTF8, false, (int)message.Stream.Length, true); diff --git a/CryptoExchange.Net/Sockets/SystemSubscription.cs b/CryptoExchange.Net/Sockets/SystemSubscription.cs index ddcc142..9473e29 100644 --- a/CryptoExchange.Net/Sockets/SystemSubscription.cs +++ b/CryptoExchange.Net/Sockets/SystemSubscription.cs @@ -9,25 +9,26 @@ namespace CryptoExchange.Net.Sockets /// /// A system subscription /// - public abstract class SystemSubscription : SubscriptionActor + public abstract class SystemSubscription : Subscription { /// /// ctor /// /// + /// /// public SystemSubscription(ILogger logger, ISocketApiClient socketApiClient, bool authenticated = false) : base(logger, socketApiClient, authenticated) { } /// - public override object? GetSubscribeRequest() => null; + public override object? GetSubRequest() => null; /// - public override (bool, CallResult?) MessageMatchesSubscribeRequest(StreamMessage message) => throw new NotImplementedException(); + public override (bool, CallResult?) MessageMatchesSubRequest(StreamMessage message) => throw new NotImplementedException(); /// - public override object? GetUnsubscribeRequest() => null; + public override object? GetUnsubRequest() => null; /// - public override (bool, CallResult?) MessageMatchesUnsubscribeRequest(StreamMessage message) => throw new NotImplementedException(); + public override (bool, CallResult?) MessageMatchesUnsubRequest(StreamMessage message) => throw new NotImplementedException(); } }