From 35f7dbf9fb2fe9ca72b9d8c57cc7bb2b4e39e32c Mon Sep 17 00:00:00 2001 From: JKorf Date: Wed, 1 Nov 2023 21:55:18 +0100 Subject: [PATCH] wip --- CryptoExchange.Net/Clients/SocketApiClient.cs | 12 +++--- .../Objects/Sockets/SocketSubscription.cs | 2 +- .../Objects/Testing/TestWebsocket.cs | 39 +++++++++++++++++++ .../Objects/Testing/TestWebsocketFactory.cs | 24 ++++++++++++ .../Sockets/CryptoExchangeWebSocketClient.cs | 10 ++--- .../Sockets/WebsocketFactory.cs | 1 + 6 files changed, 76 insertions(+), 12 deletions(-) create mode 100644 CryptoExchange.Net/Objects/Testing/TestWebsocket.cs create mode 100644 CryptoExchange.Net/Objects/Testing/TestWebsocketFactory.cs diff --git a/CryptoExchange.Net/Clients/SocketApiClient.cs b/CryptoExchange.Net/Clients/SocketApiClient.cs index 3c21cbc..9921c33 100644 --- a/CryptoExchange.Net/Clients/SocketApiClient.cs +++ b/CryptoExchange.Net/Clients/SocketApiClient.cs @@ -151,9 +151,9 @@ namespace CryptoExchange.Net /// The subscription /// Cancellation token for closing this subscription /// - protected virtual Task> SubscribeAsync(Subscription subscription, CancellationToken ct) + protected virtual Task> SubscribeAsync(Subscription subscription, CancellationToken ct) { - return SubscribeAsync(BaseAddress, subscription, ct); + return SubscribeAsync(BaseAddress, subscription, ct); } /// @@ -164,7 +164,7 @@ namespace CryptoExchange.Net /// The subscription /// Cancellation token for closing this subscription /// - protected virtual async Task> SubscribeAsync(string url, Subscription subscription, CancellationToken ct) + protected virtual async Task> SubscribeAsync(string url, Subscription subscription, CancellationToken ct) { if (_disposing) return new CallResult(new InvalidOperationError("Client disposed, can't subscribe")); @@ -195,7 +195,7 @@ namespace CryptoExchange.Net socketConnection = socketResult.Data; // Add a subscription on the socket connection - messageListener = AddSubscription(subscription, true, socketConnection); + messageListener = AddSubscription(subscription, true, socketConnection); if (messageListener == null) { _logger.Log(LogLevel.Trace, $"Socket {socketConnection.SocketId} failed to add subscription, retrying on different connection"); @@ -441,7 +441,7 @@ namespace CryptoExchange.Net /// Should return the request which can be used to authenticate a socket connection /// /// - protected internal abstract Query GetAuthenticationRequest(); + protected internal virtual Query GetAuthenticationRequest() => throw new NotImplementedException(); /// /// Add a subscription to a connection @@ -451,7 +451,7 @@ namespace CryptoExchange.Net /// Whether or not this is a user subscription (counts towards the max amount of handlers on a socket) /// The socket connection the handler is on /// - protected virtual MessageListener? AddSubscription(Subscription subscription, bool userSubscription, SocketConnection connection) + protected virtual MessageListener? AddSubscription(Subscription subscription, bool userSubscription, SocketConnection connection) { var messageListener = new MessageListener(ExchangeHelpers.NextId(), subscription, userSubscription); if (!connection.AddListener(messageListener)) diff --git a/CryptoExchange.Net/Objects/Sockets/SocketSubscription.cs b/CryptoExchange.Net/Objects/Sockets/SocketSubscription.cs index 0037ef2..441aca4 100644 --- a/CryptoExchange.Net/Objects/Sockets/SocketSubscription.cs +++ b/CryptoExchange.Net/Objects/Sockets/SocketSubscription.cs @@ -88,7 +88,7 @@ namespace CryptoExchange.Net.Objects.Sockets public Task ProcessAsync(ParsedMessage message) { // TODO - var dataEvent = new DataEvent(message, null, null, DateTime.UtcNow, null); + var dataEvent = new DataEvent(message, null, message.OriginalData, DateTime.UtcNow, null); return Subscription.HandleEventAsync(dataEvent); } } diff --git a/CryptoExchange.Net/Objects/Testing/TestWebsocket.cs b/CryptoExchange.Net/Objects/Testing/TestWebsocket.cs new file mode 100644 index 0000000..48664f6 --- /dev/null +++ b/CryptoExchange.Net/Objects/Testing/TestWebsocket.cs @@ -0,0 +1,39 @@ +using CryptoExchange.Net.Interfaces; +using CryptoExchange.Net.Objects.Sockets; +using CryptoExchange.Net.Sockets; +using Microsoft.Extensions.Logging; +using Newtonsoft.Json; +using System; +using System.Collections.Generic; +using System.IO; +using System.Text; +using System.Threading.Tasks; + +namespace CryptoExchange.Net.Objects.Testing +{ + public class TestWebsocket : CryptoExchangeWebSocketClient + { + public TestWebsocket(ILogger logger, WebSocketParameters websocketParameters) : base(logger, websocketParameters) + { + } + + public override bool IsClosed => false; + public override bool IsOpen => true; + + public override Task ConnectAsync() => Task.FromResult(true); + + public override Task CloseAsync() => Task.CompletedTask; + + public override Task ReconnectAsync() => Task.CompletedTask; + + public override void Send(int id, string data, int weight) { } + + public void Receive(string data) + { + var bytes = Encoding.UTF8.GetBytes(data); + var stream = new MemoryStream(bytes); + stream.Position = 0; + _ = ProcessData(stream); + } + } +} diff --git a/CryptoExchange.Net/Objects/Testing/TestWebsocketFactory.cs b/CryptoExchange.Net/Objects/Testing/TestWebsocketFactory.cs new file mode 100644 index 0000000..62767f1 --- /dev/null +++ b/CryptoExchange.Net/Objects/Testing/TestWebsocketFactory.cs @@ -0,0 +1,24 @@ +using CryptoExchange.Net.Interfaces; +using CryptoExchange.Net.Objects.Sockets; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Text; + +namespace CryptoExchange.Net.Objects.Testing +{ + public class TestWebsocketFactory : IWebsocketFactory + { + private readonly Func _websocketFactory; + + public TestWebsocketFactory(Func websocketFactory) + { + _websocketFactory = websocketFactory; + } + + public IWebsocket CreateWebsocket(ILogger logger, WebSocketParameters parameters) + { + return _websocketFactory(logger, parameters); + } + } +} diff --git a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs index 52892e6..1409056 100644 --- a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs +++ b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs @@ -75,10 +75,10 @@ namespace CryptoExchange.Net.Sockets public Uri Uri => Parameters.Uri; /// - public bool IsClosed => _socket.State == WebSocketState.Closed; + public virtual bool IsClosed => _socket.State == WebSocketState.Closed; /// - public bool IsOpen => _socket.State == WebSocketState.Open && !_ctsSource.IsCancellationRequested; + public virtual bool IsOpen => _socket.State == WebSocketState.Open && !_ctsSource.IsCancellationRequested; /// public double IncomingKbps @@ -521,7 +521,7 @@ namespace CryptoExchange.Net.Sockets { // Received a complete message and it's not multi part _logger.Log(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in single message"); - await ProcessData(new MemoryStream(buffer.Array, buffer.Offset, receiveResult.Count), receiveResult.MessageType).ConfigureAwait(false); + await ProcessData(new MemoryStream(buffer.Array, buffer.Offset, receiveResult.Count)).ConfigureAwait(false); } else { @@ -555,7 +555,7 @@ namespace CryptoExchange.Net.Sockets { // Reassemble complete message from memory stream _logger.Log(LogLevel.Trace, $"Socket {Id} reassembled message of {memoryStream!.Length} bytes"); - await ProcessData(memoryStream, receiveResult.MessageType).ConfigureAwait(false); + await ProcessData(memoryStream).ConfigureAwait(false); memoryStream.Dispose(); } else @@ -580,7 +580,7 @@ namespace CryptoExchange.Net.Sockets } } - private async Task ProcessData(Stream stream, WebSocketMessageType messageType) + protected async Task ProcessData(Stream stream) { stream.Position = 0; if (Parameters.Interceptor != null) diff --git a/CryptoExchange.Net/Sockets/WebsocketFactory.cs b/CryptoExchange.Net/Sockets/WebsocketFactory.cs index 286d37d..6113de4 100644 --- a/CryptoExchange.Net/Sockets/WebsocketFactory.cs +++ b/CryptoExchange.Net/Sockets/WebsocketFactory.cs @@ -1,6 +1,7 @@ using CryptoExchange.Net.Interfaces; using CryptoExchange.Net.Objects.Sockets; using Microsoft.Extensions.Logging; +using System; namespace CryptoExchange.Net.Sockets {