diff --git a/CryptoExchange.Net/Clients/SocketApiClient.cs b/CryptoExchange.Net/Clients/SocketApiClient.cs
index 3c21cbc..9921c33 100644
--- a/CryptoExchange.Net/Clients/SocketApiClient.cs
+++ b/CryptoExchange.Net/Clients/SocketApiClient.cs
@@ -151,9 +151,9 @@ namespace CryptoExchange.Net
/// The subscription
/// Cancellation token for closing this subscription
///
- protected virtual Task> SubscribeAsync(Subscription subscription, CancellationToken ct)
+ protected virtual Task> SubscribeAsync(Subscription subscription, CancellationToken ct)
{
- return SubscribeAsync(BaseAddress, subscription, ct);
+ return SubscribeAsync(BaseAddress, subscription, ct);
}
///
@@ -164,7 +164,7 @@ namespace CryptoExchange.Net
/// The subscription
/// Cancellation token for closing this subscription
///
- protected virtual async Task> SubscribeAsync(string url, Subscription subscription, CancellationToken ct)
+ protected virtual async Task> SubscribeAsync(string url, Subscription subscription, CancellationToken ct)
{
if (_disposing)
return new CallResult(new InvalidOperationError("Client disposed, can't subscribe"));
@@ -195,7 +195,7 @@ namespace CryptoExchange.Net
socketConnection = socketResult.Data;
// Add a subscription on the socket connection
- messageListener = AddSubscription(subscription, true, socketConnection);
+ messageListener = AddSubscription(subscription, true, socketConnection);
if (messageListener == null)
{
_logger.Log(LogLevel.Trace, $"Socket {socketConnection.SocketId} failed to add subscription, retrying on different connection");
@@ -441,7 +441,7 @@ namespace CryptoExchange.Net
/// Should return the request which can be used to authenticate a socket connection
///
///
- protected internal abstract Query GetAuthenticationRequest();
+ protected internal virtual Query GetAuthenticationRequest() => throw new NotImplementedException();
///
/// Add a subscription to a connection
@@ -451,7 +451,7 @@ namespace CryptoExchange.Net
/// Whether or not this is a user subscription (counts towards the max amount of handlers on a socket)
/// The socket connection the handler is on
///
- protected virtual MessageListener? AddSubscription(Subscription subscription, bool userSubscription, SocketConnection connection)
+ protected virtual MessageListener? AddSubscription(Subscription subscription, bool userSubscription, SocketConnection connection)
{
var messageListener = new MessageListener(ExchangeHelpers.NextId(), subscription, userSubscription);
if (!connection.AddListener(messageListener))
diff --git a/CryptoExchange.Net/Objects/Sockets/SocketSubscription.cs b/CryptoExchange.Net/Objects/Sockets/SocketSubscription.cs
index 0037ef2..441aca4 100644
--- a/CryptoExchange.Net/Objects/Sockets/SocketSubscription.cs
+++ b/CryptoExchange.Net/Objects/Sockets/SocketSubscription.cs
@@ -88,7 +88,7 @@ namespace CryptoExchange.Net.Objects.Sockets
public Task ProcessAsync(ParsedMessage message)
{
// TODO
- var dataEvent = new DataEvent(message, null, null, DateTime.UtcNow, null);
+ var dataEvent = new DataEvent(message, null, message.OriginalData, DateTime.UtcNow, null);
return Subscription.HandleEventAsync(dataEvent);
}
}
diff --git a/CryptoExchange.Net/Objects/Testing/TestWebsocket.cs b/CryptoExchange.Net/Objects/Testing/TestWebsocket.cs
new file mode 100644
index 0000000..48664f6
--- /dev/null
+++ b/CryptoExchange.Net/Objects/Testing/TestWebsocket.cs
@@ -0,0 +1,39 @@
+using CryptoExchange.Net.Interfaces;
+using CryptoExchange.Net.Objects.Sockets;
+using CryptoExchange.Net.Sockets;
+using Microsoft.Extensions.Logging;
+using Newtonsoft.Json;
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace CryptoExchange.Net.Objects.Testing
+{
+ public class TestWebsocket : CryptoExchangeWebSocketClient
+ {
+ public TestWebsocket(ILogger logger, WebSocketParameters websocketParameters) : base(logger, websocketParameters)
+ {
+ }
+
+ public override bool IsClosed => false;
+ public override bool IsOpen => true;
+
+ public override Task ConnectAsync() => Task.FromResult(true);
+
+ public override Task CloseAsync() => Task.CompletedTask;
+
+ public override Task ReconnectAsync() => Task.CompletedTask;
+
+ public override void Send(int id, string data, int weight) { }
+
+ public void Receive(string data)
+ {
+ var bytes = Encoding.UTF8.GetBytes(data);
+ var stream = new MemoryStream(bytes);
+ stream.Position = 0;
+ _ = ProcessData(stream);
+ }
+ }
+}
diff --git a/CryptoExchange.Net/Objects/Testing/TestWebsocketFactory.cs b/CryptoExchange.Net/Objects/Testing/TestWebsocketFactory.cs
new file mode 100644
index 0000000..62767f1
--- /dev/null
+++ b/CryptoExchange.Net/Objects/Testing/TestWebsocketFactory.cs
@@ -0,0 +1,24 @@
+using CryptoExchange.Net.Interfaces;
+using CryptoExchange.Net.Objects.Sockets;
+using Microsoft.Extensions.Logging;
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace CryptoExchange.Net.Objects.Testing
+{
+ public class TestWebsocketFactory : IWebsocketFactory
+ {
+ private readonly Func _websocketFactory;
+
+ public TestWebsocketFactory(Func websocketFactory)
+ {
+ _websocketFactory = websocketFactory;
+ }
+
+ public IWebsocket CreateWebsocket(ILogger logger, WebSocketParameters parameters)
+ {
+ return _websocketFactory(logger, parameters);
+ }
+ }
+}
diff --git a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs
index 52892e6..1409056 100644
--- a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs
+++ b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs
@@ -75,10 +75,10 @@ namespace CryptoExchange.Net.Sockets
public Uri Uri => Parameters.Uri;
///
- public bool IsClosed => _socket.State == WebSocketState.Closed;
+ public virtual bool IsClosed => _socket.State == WebSocketState.Closed;
///
- public bool IsOpen => _socket.State == WebSocketState.Open && !_ctsSource.IsCancellationRequested;
+ public virtual bool IsOpen => _socket.State == WebSocketState.Open && !_ctsSource.IsCancellationRequested;
///
public double IncomingKbps
@@ -521,7 +521,7 @@ namespace CryptoExchange.Net.Sockets
{
// Received a complete message and it's not multi part
_logger.Log(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in single message");
- await ProcessData(new MemoryStream(buffer.Array, buffer.Offset, receiveResult.Count), receiveResult.MessageType).ConfigureAwait(false);
+ await ProcessData(new MemoryStream(buffer.Array, buffer.Offset, receiveResult.Count)).ConfigureAwait(false);
}
else
{
@@ -555,7 +555,7 @@ namespace CryptoExchange.Net.Sockets
{
// Reassemble complete message from memory stream
_logger.Log(LogLevel.Trace, $"Socket {Id} reassembled message of {memoryStream!.Length} bytes");
- await ProcessData(memoryStream, receiveResult.MessageType).ConfigureAwait(false);
+ await ProcessData(memoryStream).ConfigureAwait(false);
memoryStream.Dispose();
}
else
@@ -580,7 +580,7 @@ namespace CryptoExchange.Net.Sockets
}
}
- private async Task ProcessData(Stream stream, WebSocketMessageType messageType)
+ protected async Task ProcessData(Stream stream)
{
stream.Position = 0;
if (Parameters.Interceptor != null)
diff --git a/CryptoExchange.Net/Sockets/WebsocketFactory.cs b/CryptoExchange.Net/Sockets/WebsocketFactory.cs
index 286d37d..6113de4 100644
--- a/CryptoExchange.Net/Sockets/WebsocketFactory.cs
+++ b/CryptoExchange.Net/Sockets/WebsocketFactory.cs
@@ -1,6 +1,7 @@
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Objects.Sockets;
using Microsoft.Extensions.Logging;
+using System;
namespace CryptoExchange.Net.Sockets
{