1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-12-14 18:00:26 +00:00
This commit is contained in:
JKorf 2025-11-16 21:41:03 +01:00 committed by Jkorf
parent 68f772a13a
commit 9c43f58e6c
18 changed files with 683 additions and 471 deletions

View File

@ -1,234 +1,234 @@
using CryptoExchange.Net.Objects; //using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Sockets; //using CryptoExchange.Net.Objects.Sockets;
using CryptoExchange.Net.Sockets; //using CryptoExchange.Net.Sockets;
using CryptoExchange.Net.Testing.Implementations; //using CryptoExchange.Net.Testing.Implementations;
using CryptoExchange.Net.UnitTests.TestImplementations; //using CryptoExchange.Net.UnitTests.TestImplementations;
using CryptoExchange.Net.UnitTests.TestImplementations.Sockets; //using CryptoExchange.Net.UnitTests.TestImplementations.Sockets;
using Microsoft.Extensions.Logging; //using Microsoft.Extensions.Logging;
using Moq; //using Moq;
using NUnit.Framework; //using NUnit.Framework;
using NUnit.Framework.Legacy; //using NUnit.Framework.Legacy;
using System; //using System;
using System.Collections.Generic; //using System.Collections.Generic;
using System.Net.Sockets; //using System.Net.Sockets;
using System.Text.Json; //using System.Text.Json;
using System.Threading; //using System.Threading;
using System.Threading.Tasks; //using System.Threading.Tasks;
namespace CryptoExchange.Net.UnitTests //namespace CryptoExchange.Net.UnitTests
{ //{
[TestFixture] // [TestFixture]
public class SocketClientTests // public class SocketClientTests
{ // {
[TestCase] // [TestCase]
public void SettingOptions_Should_ResultInOptionsSet() // public void SettingOptions_Should_ResultInOptionsSet()
{ // {
//arrange // //arrange
//act // //act
var client = new TestSocketClient(options => // var client = new TestSocketClient(options =>
{ // {
options.SubOptions.ApiCredentials = new Authentication.ApiCredentials("1", "2"); // options.SubOptions.ApiCredentials = new Authentication.ApiCredentials("1", "2");
options.SubOptions.MaxSocketConnections = 1; // options.SubOptions.MaxSocketConnections = 1;
}); // });
//assert // //assert
ClassicAssert.NotNull(client.SubClient.ApiOptions.ApiCredentials); // ClassicAssert.NotNull(client.SubClient.ApiOptions.ApiCredentials);
Assert.That(1 == client.SubClient.ApiOptions.MaxSocketConnections); // Assert.That(1 == client.SubClient.ApiOptions.MaxSocketConnections);
} // }
[TestCase(true)] // [TestCase(true)]
[TestCase(false)] // [TestCase(false)]
public void ConnectSocket_Should_ReturnConnectionResult(bool canConnect) // public void ConnectSocket_Should_ReturnConnectionResult(bool canConnect)
{ // {
//arrange // //arrange
var client = new TestSocketClient(); // var client = new TestSocketClient();
var socket = client.CreateSocket(); // var socket = client.CreateSocket();
socket.CanConnect = canConnect; // socket.CanConnect = canConnect;
//act // //act
var connectResult = client.SubClient.ConnectSocketSub( // var connectResult = client.SubClient.ConnectSocketSub(
new SocketConnection(new TraceLogger(), new TestWebsocketFactory(socket), new WebSocketParameters(new Uri("https://localhost/"), ReconnectPolicy.Disabled), client.SubClient, "")); // new SocketConnection(new TraceLogger(), new TestWebsocketFactory(socket), new WebSocketParameters(new Uri("https://localhost/"), ReconnectPolicy.Disabled), client.SubClient, ""));
//assert // //assert
Assert.That(connectResult.Success == canConnect); // Assert.That(connectResult.Success == canConnect);
} // }
[TestCase] // [TestCase]
public void SocketMessages_Should_BeProcessedInDataHandlers() // public void SocketMessages_Should_BeProcessedInDataHandlers()
{ // {
// arrange // // arrange
var client = new TestSocketClient(options => { // var client = new TestSocketClient(options => {
options.ReconnectInterval = TimeSpan.Zero; // options.ReconnectInterval = TimeSpan.Zero;
}); // });
var socket = client.CreateSocket(); // var socket = client.CreateSocket();
socket.CanConnect = true; // socket.CanConnect = true;
var sub = new SocketConnection(new TraceLogger(), new TestWebsocketFactory(socket), new WebSocketParameters(new Uri("https://localhost/"), ReconnectPolicy.Disabled), client.SubClient, ""); // var sub = new SocketConnection(new TraceLogger(), new TestWebsocketFactory(socket), new WebSocketParameters(new Uri("https://localhost/"), ReconnectPolicy.Disabled), client.SubClient, "");
var rstEvent = new ManualResetEvent(false); // var rstEvent = new ManualResetEvent(false);
Dictionary<string, string> result = null; // Dictionary<string, string> result = null;
client.SubClient.ConnectSocketSub(sub); // client.SubClient.ConnectSocketSub(sub);
var subObj = new TestSubscription<Dictionary<string, string>>(Mock.Of<ILogger>(), (messageEvent) => // var subObj = new TestSubscription<Dictionary<string, string>>(Mock.Of<ILogger>(), (messageEvent) =>
{ // {
result = messageEvent.Data; // result = messageEvent.Data;
rstEvent.Set(); // rstEvent.Set();
}); // });
sub.AddSubscription(subObj); // sub.AddSubscription(subObj);
// act // // act
socket.InvokeMessage("{\"property\": \"123\", \"action\": \"update\", \"topic\": \"topic\"}"); // socket.InvokeMessage("{\"property\": \"123\", \"action\": \"update\", \"topic\": \"topic\"}");
rstEvent.WaitOne(1000); // rstEvent.WaitOne(1000);
// assert // // assert
Assert.That(result["property"] == "123"); // Assert.That(result["property"] == "123");
} // }
[TestCase(false)] // [TestCase(false)]
[TestCase(true)] // [TestCase(true)]
public void SocketMessages_Should_ContainOriginalDataIfEnabled(bool enabled) // public void SocketMessages_Should_ContainOriginalDataIfEnabled(bool enabled)
{ // {
// arrange // // arrange
var client = new TestSocketClient(options => // var client = new TestSocketClient(options =>
{ // {
options.ReconnectInterval = TimeSpan.Zero; // options.ReconnectInterval = TimeSpan.Zero;
options.SubOptions.OutputOriginalData = enabled; // options.SubOptions.OutputOriginalData = enabled;
}); // });
var socket = client.CreateSocket(); // var socket = client.CreateSocket();
socket.CanConnect = true; // socket.CanConnect = true;
var sub = new SocketConnection(new TraceLogger(), new TestWebsocketFactory(socket), new WebSocketParameters(new Uri("https://localhost/"), ReconnectPolicy.Disabled), client.SubClient, ""); // var sub = new SocketConnection(new TraceLogger(), new TestWebsocketFactory(socket), new WebSocketParameters(new Uri("https://localhost/"), ReconnectPolicy.Disabled), client.SubClient, "");
var rstEvent = new ManualResetEvent(false); // var rstEvent = new ManualResetEvent(false);
string original = null; // string original = null;
client.SubClient.ConnectSocketSub(sub); // client.SubClient.ConnectSocketSub(sub);
var subObj = new TestSubscription<Dictionary<string, string>>(Mock.Of<ILogger>(), (messageEvent) => // var subObj = new TestSubscription<Dictionary<string, string>>(Mock.Of<ILogger>(), (messageEvent) =>
{ // {
original = messageEvent.OriginalData; // original = messageEvent.OriginalData;
rstEvent.Set(); // rstEvent.Set();
}); // });
sub.AddSubscription(subObj); // sub.AddSubscription(subObj);
var msgToSend = JsonSerializer.Serialize(new { topic = "topic", action = "update", property = "123" }); // var msgToSend = JsonSerializer.Serialize(new { topic = "topic", action = "update", property = "123" });
// act // // act
socket.InvokeMessage(msgToSend); // socket.InvokeMessage(msgToSend);
rstEvent.WaitOne(1000); // rstEvent.WaitOne(1000);
// assert // // assert
Assert.That(original == (enabled ? msgToSend : null)); // Assert.That(original == (enabled ? msgToSend : null));
} // }
[TestCase()] // [TestCase()]
public void UnsubscribingStream_Should_CloseTheSocket() // public void UnsubscribingStream_Should_CloseTheSocket()
{ // {
// arrange // // arrange
var client = new TestSocketClient(options => // var client = new TestSocketClient(options =>
{ // {
options.ReconnectInterval = TimeSpan.Zero; // options.ReconnectInterval = TimeSpan.Zero;
}); // });
var socket = client.CreateSocket(); // var socket = client.CreateSocket();
socket.CanConnect = true; // socket.CanConnect = true;
var sub = new SocketConnection(new TraceLogger(), new TestWebsocketFactory(socket), new WebSocketParameters(new Uri("https://localhost/"), ReconnectPolicy.Disabled), client.SubClient, ""); // var sub = new SocketConnection(new TraceLogger(), new TestWebsocketFactory(socket), new WebSocketParameters(new Uri("https://localhost/"), ReconnectPolicy.Disabled), client.SubClient, "");
client.SubClient.ConnectSocketSub(sub); // client.SubClient.ConnectSocketSub(sub);
var subscription = new TestSubscription<Dictionary<string, string>>(Mock.Of<ILogger>(), (messageEvent) => { }); // var subscription = new TestSubscription<Dictionary<string, string>>(Mock.Of<ILogger>(), (messageEvent) => { });
var ups = new UpdateSubscription(sub, subscription); // var ups = new UpdateSubscription(sub, subscription);
sub.AddSubscription(subscription); // sub.AddSubscription(subscription);
// act // // act
client.UnsubscribeAsync(ups).Wait(); // client.UnsubscribeAsync(ups).Wait();
// assert // // assert
Assert.That(socket.Connected == false); // Assert.That(socket.Connected == false);
} // }
[TestCase()] // [TestCase()]
public void UnsubscribingAll_Should_CloseAllSockets() // public void UnsubscribingAll_Should_CloseAllSockets()
{ // {
// arrange // // arrange
var client = new TestSocketClient(options => { options.ReconnectInterval = TimeSpan.Zero; }); // var client = new TestSocketClient(options => { options.ReconnectInterval = TimeSpan.Zero; });
var socket1 = client.CreateSocket(); // var socket1 = client.CreateSocket();
var socket2 = client.CreateSocket(); // var socket2 = client.CreateSocket();
socket1.CanConnect = true; // socket1.CanConnect = true;
socket2.CanConnect = true; // socket2.CanConnect = true;
var sub1 = new SocketConnection(new TraceLogger(), new TestWebsocketFactory(socket1), new WebSocketParameters(new Uri("https://localhost/"), ReconnectPolicy.Disabled), client.SubClient, ""); // var sub1 = new SocketConnection(new TraceLogger(), new TestWebsocketFactory(socket1), new WebSocketParameters(new Uri("https://localhost/"), ReconnectPolicy.Disabled), client.SubClient, "");
var sub2 = new SocketConnection(new TraceLogger(), new TestWebsocketFactory(socket2), new WebSocketParameters(new Uri("https://localhost/"), ReconnectPolicy.Disabled), client.SubClient, ""); // var sub2 = new SocketConnection(new TraceLogger(), new TestWebsocketFactory(socket2), new WebSocketParameters(new Uri("https://localhost/"), ReconnectPolicy.Disabled), client.SubClient, "");
client.SubClient.ConnectSocketSub(sub1); // client.SubClient.ConnectSocketSub(sub1);
client.SubClient.ConnectSocketSub(sub2); // client.SubClient.ConnectSocketSub(sub2);
var subscription1 = new TestSubscription<Dictionary<string, string>>(Mock.Of<ILogger>(), (messageEvent) => { }); // var subscription1 = new TestSubscription<Dictionary<string, string>>(Mock.Of<ILogger>(), (messageEvent) => { });
var subscription2 = new TestSubscription<Dictionary<string, string>>(Mock.Of<ILogger>(), (messageEvent) => { }); // var subscription2 = new TestSubscription<Dictionary<string, string>>(Mock.Of<ILogger>(), (messageEvent) => { });
sub1.AddSubscription(subscription1); // sub1.AddSubscription(subscription1);
sub2.AddSubscription(subscription2); // sub2.AddSubscription(subscription2);
var ups1 = new UpdateSubscription(sub1, subscription1); // var ups1 = new UpdateSubscription(sub1, subscription1);
var ups2 = new UpdateSubscription(sub2, subscription2); // var ups2 = new UpdateSubscription(sub2, subscription2);
// act // // act
client.UnsubscribeAllAsync().Wait(); // client.UnsubscribeAllAsync().Wait();
// assert // // assert
Assert.That(socket1.Connected == false); // Assert.That(socket1.Connected == false);
Assert.That(socket2.Connected == false); // Assert.That(socket2.Connected == false);
} // }
[TestCase()] // [TestCase()]
public void FailingToConnectSocket_Should_ReturnError() // public void FailingToConnectSocket_Should_ReturnError()
{ // {
// arrange // // arrange
var client = new TestSocketClient(options => { options.ReconnectInterval = TimeSpan.Zero; }); // var client = new TestSocketClient(options => { options.ReconnectInterval = TimeSpan.Zero; });
var socket = client.CreateSocket(); // var socket = client.CreateSocket();
socket.CanConnect = false; // socket.CanConnect = false;
var sub1 = new SocketConnection(new TraceLogger(), new TestWebsocketFactory(socket), new WebSocketParameters(new Uri("https://localhost/"), ReconnectPolicy.Disabled), client.SubClient, ""); // var sub1 = new SocketConnection(new TraceLogger(), new TestWebsocketFactory(socket), new WebSocketParameters(new Uri("https://localhost/"), ReconnectPolicy.Disabled), client.SubClient, "");
// act // // act
var connectResult = client.SubClient.ConnectSocketSub(sub1); // var connectResult = client.SubClient.ConnectSocketSub(sub1);
// assert // // assert
ClassicAssert.IsFalse(connectResult.Success); // ClassicAssert.IsFalse(connectResult.Success);
} // }
[TestCase()] // [TestCase()]
public async Task ErrorResponse_ShouldNot_ConfirmSubscription() // public async Task ErrorResponse_ShouldNot_ConfirmSubscription()
{ // {
// arrange // // arrange
var channel = "trade_btcusd"; // var channel = "trade_btcusd";
var client = new TestSocketClient(opt => // var client = new TestSocketClient(opt =>
{ // {
opt.OutputOriginalData = true; // opt.OutputOriginalData = true;
opt.SocketSubscriptionsCombineTarget = 1; // opt.SocketSubscriptionsCombineTarget = 1;
}); // });
var socket = client.CreateSocket(); // var socket = client.CreateSocket();
socket.CanConnect = true; // socket.CanConnect = true;
client.SubClient.ConnectSocketSub(new SocketConnection(new TraceLogger(), new TestWebsocketFactory(socket), new WebSocketParameters(new Uri("https://localhost/"), ReconnectPolicy.Disabled), client.SubClient, "")); // client.SubClient.ConnectSocketSub(new SocketConnection(new TraceLogger(), new TestWebsocketFactory(socket), new WebSocketParameters(new Uri("https://localhost/"), ReconnectPolicy.Disabled), client.SubClient, ""));
// act // // act
var sub = client.SubClient.SubscribeToSomethingAsync(channel, onUpdate => {}, ct: default); // var sub = client.SubClient.SubscribeToSomethingAsync(channel, onUpdate => {}, ct: default);
socket.InvokeMessage(JsonSerializer.Serialize(new { channel, action = "subscribe", status = "error" })); // socket.InvokeMessage(JsonSerializer.Serialize(new { channel, action = "subscribe", status = "error" }));
await sub; // await sub;
// assert // // assert
ClassicAssert.IsTrue(client.SubClient.TestSubscription.Status != SubscriptionStatus.Subscribed); // ClassicAssert.IsTrue(client.SubClient.TestSubscription.Status != SubscriptionStatus.Subscribed);
} // }
[TestCase()] // [TestCase()]
public async Task SuccessResponse_Should_ConfirmSubscription() // public async Task SuccessResponse_Should_ConfirmSubscription()
{ // {
// arrange // // arrange
var channel = "trade_btcusd"; // var channel = "trade_btcusd";
var client = new TestSocketClient(opt => // var client = new TestSocketClient(opt =>
{ // {
opt.OutputOriginalData = true; // opt.OutputOriginalData = true;
opt.SocketSubscriptionsCombineTarget = 1; // opt.SocketSubscriptionsCombineTarget = 1;
}); // });
var socket = client.CreateSocket(); // var socket = client.CreateSocket();
socket.CanConnect = true; // socket.CanConnect = true;
client.SubClient.ConnectSocketSub(new SocketConnection(new TraceLogger(), new TestWebsocketFactory(socket), new WebSocketParameters(new Uri("https://localhost/"), ReconnectPolicy.Disabled), client.SubClient, "")); // client.SubClient.ConnectSocketSub(new SocketConnection(new TraceLogger(), new TestWebsocketFactory(socket), new WebSocketParameters(new Uri("https://localhost/"), ReconnectPolicy.Disabled), client.SubClient, ""));
// act // // act
var sub = client.SubClient.SubscribeToSomethingAsync(channel, onUpdate => {}, ct: default); // var sub = client.SubClient.SubscribeToSomethingAsync(channel, onUpdate => {}, ct: default);
socket.InvokeMessage(JsonSerializer.Serialize(new { channel, action = "subscribe", status = "confirmed" })); // socket.InvokeMessage(JsonSerializer.Serialize(new { channel, action = "subscribe", status = "confirmed" }));
await sub; // await sub;
// assert // // assert
Assert.That(client.SubClient.TestSubscription.Status == SubscriptionStatus.Subscribed); // Assert.That(client.SubClient.TestSubscription.Status == SubscriptionStatus.Subscribed);
} // }
} // }
} //}

View File

@ -1,143 +1,148 @@
using System; //using System;
using System.Collections.Generic; //using System.Collections.Generic;
using System.Threading; //using System.Threading;
using System.Threading.Tasks; //using System.Threading.Tasks;
using CryptoExchange.Net.Authentication; //using CryptoExchange.Net.Authentication;
using CryptoExchange.Net.Clients; //using CryptoExchange.Net.Clients;
using CryptoExchange.Net.Converters.MessageParsing; //using CryptoExchange.Net.Converters.MessageParsing;
using CryptoExchange.Net.Interfaces; //using CryptoExchange.Net.Interfaces;
//using CryptoExchange.Net.Objects;
//using CryptoExchange.Net.Objects.Options;
//using CryptoExchange.Net.Objects.Sockets;
//using CryptoExchange.Net.Sockets;
//using CryptoExchange.Net.UnitTests.TestImplementations.Sockets;
//using Microsoft.Extensions.Logging;
//using Moq;
//using CryptoExchange.Net.Testing.Implementations;
//using CryptoExchange.Net.SharedApis;
//using Microsoft.Extensions.Options;
//using CryptoExchange.Net.Converters.SystemTextJson;
//using System.Net.WebSockets;
//using System.Text.Json;
//using CryptoExchange.Net.Converters.MessageParsing.DynamicConverters;
//namespace CryptoExchange.Net.UnitTests.TestImplementations
//{
// internal class TestSocketClient: BaseSocketClient
// {
// public TestSubSocketClient SubClient { get; }
// /// <summary>
// /// Create a new instance of TestSocketClient
// /// </summary>
// /// <param name="optionsFunc">Configure the options to use for this client</param>
// public TestSocketClient(Action<TestSocketOptions> optionsDelegate = null)
// : this(Options.Create(ApplyOptionsDelegate(optionsDelegate)), null)
// {
// }
// public TestSocketClient(IOptions<TestSocketOptions> options, ILoggerFactory loggerFactory = null) : base(loggerFactory, "Test")
// {
// Initialize(options.Value);
// SubClient = AddApiClient(new TestSubSocketClient(options.Value, options.Value.SubOptions));
// SubClient.SocketFactory = new Mock<IWebsocketFactory>().Object;
// Mock.Get(SubClient.SocketFactory).Setup(f => f.CreateWebsocket(It.IsAny<ILogger>(), It.IsAny<WebSocketParameters>())).Returns(new TestSocket("https://test.com"));
// }
// public TestSocket CreateSocket()
// {
// Mock.Get(SubClient.SocketFactory).Setup(f => f.CreateWebsocket(It.IsAny<ILogger>(), It.IsAny<WebSocketParameters>())).Returns(new TestSocket("https://test.com"));
// return (TestSocket)SubClient.CreateSocketInternal("https://localhost:123/");
// }
// }
using CryptoExchange.Net.Objects; using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Options;
using CryptoExchange.Net.Objects.Sockets;
using CryptoExchange.Net.Sockets;
using CryptoExchange.Net.UnitTests.TestImplementations.Sockets;
using Microsoft.Extensions.Logging;
using Moq;
using CryptoExchange.Net.Testing.Implementations;
using CryptoExchange.Net.SharedApis;
using Microsoft.Extensions.Options;
using CryptoExchange.Net.Converters.SystemTextJson;
using System.Net.WebSockets;
using System.Text.Json;
namespace CryptoExchange.Net.UnitTests.TestImplementations public class TestEnvironment : TradeEnvironment
{ {
internal class TestSocketClient: BaseSocketClient public string TestAddress { get; }
public TestEnvironment(string name, string url) : base(name)
{ {
public TestSubSocketClient SubClient { get; } TestAddress = url;
/// <summary>
/// Create a new instance of KucoinSocketClient
/// </summary>
/// <param name="optionsFunc">Configure the options to use for this client</param>
public TestSocketClient(Action<TestSocketOptions> optionsDelegate = null)
: this(Options.Create(ApplyOptionsDelegate(optionsDelegate)), null)
{
}
public TestSocketClient(IOptions<TestSocketOptions> options, ILoggerFactory loggerFactory = null) : base(loggerFactory, "Test")
{
Initialize(options.Value);
SubClient = AddApiClient(new TestSubSocketClient(options.Value, options.Value.SubOptions));
SubClient.SocketFactory = new Mock<IWebsocketFactory>().Object;
Mock.Get(SubClient.SocketFactory).Setup(f => f.CreateWebsocket(It.IsAny<ILogger>(), It.IsAny<WebSocketParameters>())).Returns(new TestSocket("https://test.com"));
}
public TestSocket CreateSocket()
{
Mock.Get(SubClient.SocketFactory).Setup(f => f.CreateWebsocket(It.IsAny<ILogger>(), It.IsAny<WebSocketParameters>())).Returns(new TestSocket("https://test.com"));
return (TestSocket)SubClient.CreateSocketInternal("https://localhost:123/");
}
}
public class TestEnvironment : TradeEnvironment
{
public string TestAddress { get; }
public TestEnvironment(string name, string url) : base(name)
{
TestAddress = url;
}
}
public class TestSocketOptions: SocketExchangeOptions<TestEnvironment>
{
public static TestSocketOptions Default = new TestSocketOptions
{
Environment = new TestEnvironment("Live", "https://test.test")
};
/// <summary>
/// ctor
/// </summary>
public TestSocketOptions()
{
Default?.Set(this);
}
public SocketApiOptions SubOptions { get; set; } = new SocketApiOptions();
internal TestSocketOptions Set(TestSocketOptions targetOptions)
{
targetOptions = base.Set<TestSocketOptions>(targetOptions);
targetOptions.SubOptions = SubOptions.Set(targetOptions.SubOptions);
return targetOptions;
}
}
public class TestSubSocketClient : SocketApiClient
{
private MessagePath _channelPath = MessagePath.Get().Property("channel");
private MessagePath _actionPath = MessagePath.Get().Property("action");
private MessagePath _topicPath = MessagePath.Get().Property("topic");
public Subscription TestSubscription { get; private set; } = null;
public override JsonSerializerOptions JsonSerializerOptions => new JsonSerializerOptions();
public TestSubSocketClient(TestSocketOptions options, SocketApiOptions apiOptions) : base(new TraceLogger(), options.Environment.TestAddress, options, apiOptions)
{
}
protected internal override IByteMessageAccessor CreateAccessor(WebSocketMessageType type) => new SystemTextJsonByteMessageAccessor(new System.Text.Json.JsonSerializerOptions());
protected internal override IMessageSerializer CreateSerializer() => new SystemTextJsonMessageSerializer(new System.Text.Json.JsonSerializerOptions());
/// <inheritdoc />
public override string FormatSymbol(string baseAsset, string quoteAsset, TradingMode futuresType, DateTime? deliverDate = null) => $"{baseAsset.ToUpperInvariant()}{quoteAsset.ToUpperInvariant()}";
internal IWebsocket CreateSocketInternal(string address)
{
return CreateSocket(address);
}
protected override AuthenticationProvider CreateAuthenticationProvider(ApiCredentials credentials)
=> new TestAuthProvider(credentials);
public CallResult ConnectSocketSub(SocketConnection sub)
{
return ConnectSocketAsync(sub, default).Result;
}
public override string GetListenerIdentifier(IMessageAccessor message)
{
if (!message.IsValid)
{
return "topic";
}
var id = message.GetValue<string>(_channelPath);
id ??= message.GetValue<string>(_topicPath);
return message.GetValue<string>(_actionPath) + "-" + id;
}
public Task<CallResult<UpdateSubscription>> SubscribeToSomethingAsync(string channel, Action<DataEvent<string>> onUpdate, CancellationToken ct)
{
TestSubscription = new TestSubscriptionWithResponseCheck<string>(channel, onUpdate);
return SubscribeAsync(TestSubscription, ct);
}
} }
} }
// public class TestSocketOptions: SocketExchangeOptions<TestEnvironment>
// {
// public static TestSocketOptions Default = new TestSocketOptions
// {
// Environment = new TestEnvironment("Live", "https://test.test")
// };
// /// <summary>
// /// ctor
// /// </summary>
// public TestSocketOptions()
// {
// Default?.Set(this);
// }
// public SocketApiOptions SubOptions { get; set; } = new SocketApiOptions();
// internal TestSocketOptions Set(TestSocketOptions targetOptions)
// {
// targetOptions = base.Set<TestSocketOptions>(targetOptions);
// targetOptions.SubOptions = SubOptions.Set(targetOptions.SubOptions);
// return targetOptions;
// }
// }
// public class TestSubSocketClient : SocketApiClient
// {
// private MessagePath _channelPath = MessagePath.Get().Property("channel");
// private MessagePath _actionPath = MessagePath.Get().Property("action");
// private MessagePath _topicPath = MessagePath.Get().Property("topic");
// public Subscription TestSubscription { get; private set; } = null;
// public override JsonSerializerOptions JsonSerializerOptions => new JsonSerializerOptions();
// public TestSubSocketClient(TestSocketOptions options, SocketApiOptions apiOptions) : base(new TraceLogger(), options.Environment.TestAddress, options, apiOptions)
// {
// }
// protected internal override IByteMessageAccessor CreateAccessor(WebSocketMessageType type) => new SystemTextJsonByteMessageAccessor(new System.Text.Json.JsonSerializerOptions());
// protected internal override IMessageSerializer CreateSerializer() => new SystemTextJsonMessageSerializer(new System.Text.Json.JsonSerializerOptions());
// /// <inheritdoc />
// public override string FormatSymbol(string baseAsset, string quoteAsset, TradingMode futuresType, DateTime? deliverDate = null) => $"{baseAsset.ToUpperInvariant()}{quoteAsset.ToUpperInvariant()}";
// internal IWebsocket CreateSocketInternal(string address)
// {
// return SocketFactory.CreateWebsocket(_logger, );
// }
// protected override AuthenticationProvider CreateAuthenticationProvider(ApiCredentials credentials)
// => new TestAuthProvider(credentials);
// public CallResult ConnectSocketSub(SocketConnection sub)
// {
// return ConnectSocketAsync(sub, default).Result;
// }
// public override string GetListenerIdentifier(IMessageAccessor message)
// {
// if (!message.IsValid)
// {
// return "topic";
// }
// var id = message.GetValue<string>(_channelPath);
// id ??= message.GetValue<string>(_topicPath);
// return message.GetValue<string>(_actionPath) + "-" + id;
// }
// public Task<CallResult<UpdateSubscription>> SubscribeToSomethingAsync(string channel, Action<DataEvent<string>> onUpdate, CancellationToken ct)
// {
// TestSubscription = new TestSubscriptionWithResponseCheck<string>(channel, onUpdate);
// return SubscribeAsync(TestSubscription, ct);
// }
// public override IMessageConverter CreateMessageConverter() => throw new NotImplementedException();
// }
//}

View File

@ -1,3 +1,4 @@
using CryptoExchange.Net.Converters.MessageParsing.DynamicConverters;
using CryptoExchange.Net.Interfaces; using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Logging.Extensions; using CryptoExchange.Net.Logging.Extensions;
using CryptoExchange.Net.Objects; using CryptoExchange.Net.Objects;
@ -813,19 +814,20 @@ namespace CryptoExchange.Net.Clients
Proxy = ClientOptions.Proxy, Proxy = ClientOptions.Proxy,
Timeout = ApiOptions.SocketNoDataTimeout ?? ClientOptions.SocketNoDataTimeout, Timeout = ApiOptions.SocketNoDataTimeout ?? ClientOptions.SocketNoDataTimeout,
ReceiveBufferSize = ClientOptions.ReceiveBufferSize, ReceiveBufferSize = ClientOptions.ReceiveBufferSize,
UseNewMessageDeserialization = ClientOptions.EnabledNewDeserialization
}; };
/// <summary> ///// <summary>
/// Create a socket for an address ///// Create a socket for an address
/// </summary> ///// </summary>
/// <param name="address">The address the socket should connect to</param> ///// <param name="address">The address the socket should connect to</param>
/// <returns></returns> ///// <returns></returns>
protected internal virtual IWebsocket CreateSocket(string address) //protected internal virtual IWebsocket CreateSocket(string address)
{ //{
var socket = SocketFactory.CreateWebsocket(_logger, GetWebSocketParameters(address)); // var socket = SocketFactory.CreateWebsocket(_logger, GetWebSocketParameters(address));
_logger.SocketCreatedForAddress(socket.Id, address); // _logger.SocketCreatedForAddress(socket.Id, address);
return socket; // return socket;
} //}
/// <summary> /// <summary>
/// Unsubscribe an update subscription /// Unsubscribe an update subscription
@ -1059,5 +1061,7 @@ namespace CryptoExchange.Net.Clients
/// <param name="data"></param> /// <param name="data"></param>
/// <returns></returns> /// <returns></returns>
public virtual ReadOnlyMemory<byte> PreprocessStreamMessage(SocketConnection connection, WebSocketMessageType type, ReadOnlyMemory<byte> data) => data; public virtual ReadOnlyMemory<byte> PreprocessStreamMessage(SocketConnection connection, WebSocketMessageType type, ReadOnlyMemory<byte> data) => data;
public abstract IMessageConverter CreateMessageConverter();
} }
} }

View File

@ -0,0 +1,55 @@
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Net.WebSockets;
using System.Text;
using System.Text.Json;
namespace CryptoExchange.Net.Converters.MessageParsing.DynamicConverters
{
public ref struct MessageType
{
public Type Type { get; set; }
public string? Identifier { get; set; }
}
public interface IMessageConverter
{
MessageType GetMessageType(ReadOnlySpan<byte> data, WebSocketMessageType? webSocketMessageType);
object Deserialize(ReadOnlySpan<byte> data, Type type);
}
public abstract class DynamicConverter : IMessageConverter
{
public abstract JsonSerializerOptions Options { get; }
public abstract MessageType GetMessageType(ReadOnlySpan<byte> data, WebSocketMessageType? webSocketMessageType);
public virtual object Deserialize(ReadOnlySpan<byte> data, Type type)
{
return JsonSerializer.Deserialize(data, type, Options);
}
}
public abstract class StaticConverter : IMessageConverter
{
public abstract JsonSerializerOptions Options { get; }
public abstract MessageType GetMessageType(ReadOnlySpan<byte> data, WebSocketMessageType? webSocketMessageType);
public object? Deserialize(ReadOnlySpan<byte> data, Type type)
{
return JsonSerializer.Deserialize(data, type, Options);
}
}
public abstract class StaticConverter<T> : StaticConverter
{
public override MessageType GetMessageType(ReadOnlySpan<byte> data,, WebSocketMessageType? webSocketMessageType) =>
new MessageType { Type = typeof(T), Identifier = GetMessageListenId(data, webSocketMessageType) };
public abstract string GetMessageListenId(ReadOnlySpan<byte> data, WebSocketMessageType? webSocketMessageType);
}
}

View File

@ -21,58 +21,15 @@ namespace CryptoExchange.Net.Converters.SystemTextJson
/// <inheritdoc /> /// <inheritdoc />
public override JsonConverter CreateConverter(Type typeToConvert, JsonSerializerOptions options) public override JsonConverter CreateConverter(Type typeToConvert, JsonSerializerOptions options)
{ {
return typeToConvert == typeof(bool) ? new BoolConverterInner<bool>() : new BoolConverterInner<bool?>(); return typeToConvert == typeof(bool) ? new BoolConverterInner() : new BoolConverterInnerNullable();
} }
private class BoolConverterInner<T> : JsonConverter<T> private class BoolConverterInnerNullable : JsonConverter<bool?>
{ {
public override T Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) public override bool? Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
=> (T)((object?)ReadBool(ref reader, typeToConvert, options) ?? default(T))!; => ReadBool(ref reader, typeToConvert, options);
public bool? ReadBool(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) public override void Write(Utf8JsonWriter writer, bool? value, JsonSerializerOptions options)
{
if (reader.TokenType == JsonTokenType.True)
return true;
if (reader.TokenType == JsonTokenType.False)
return false;
var value = reader.TokenType switch
{
JsonTokenType.String => reader.GetString(),
JsonTokenType.Number => reader.GetInt16().ToString(),
_ => null
};
value = value?.ToLowerInvariant().Trim();
if (string.IsNullOrEmpty(value))
{
if (typeToConvert == typeof(bool))
LibraryHelpers.StaticLogger?.LogWarning("Received null bool value, but property type is not a nullable bool. Resolver: {Resolver}", options.TypeInfoResolver?.GetType()?.Name);
return default;
}
switch (value)
{
case "true":
case "yes":
case "y":
case "1":
case "on":
return true;
case "false":
case "no":
case "n":
case "0":
case "off":
case "-1":
return false;
}
throw new SerializationException($"Can't convert bool value {value}");
}
public override void Write(Utf8JsonWriter writer, T value, JsonSerializerOptions options)
{ {
if (value is bool boolVal) if (value is bool boolVal)
writer.WriteBooleanValue(boolVal); writer.WriteBooleanValue(boolVal);
@ -81,5 +38,59 @@ namespace CryptoExchange.Net.Converters.SystemTextJson
} }
} }
private class BoolConverterInner : JsonConverter<bool>
{
public override bool Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
=> ReadBool(ref reader, typeToConvert, options) ?? false;
public override void Write(Utf8JsonWriter writer, bool value, JsonSerializerOptions options)
{
writer.WriteBooleanValue(value);
}
}
private static bool? ReadBool(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
{
if (reader.TokenType == JsonTokenType.True)
return true;
if (reader.TokenType == JsonTokenType.False)
return false;
var value = reader.TokenType switch
{
JsonTokenType.String => reader.GetString(),
JsonTokenType.Number => reader.GetInt16().ToString(),
_ => null
};
value = value?.ToLowerInvariant().Trim();
if (string.IsNullOrEmpty(value))
{
if (typeToConvert == typeof(bool))
LibraryHelpers.StaticLogger?.LogWarning("Received null or empty bool value, but property type is not a nullable bool. Resolver: {Resolver}", options.TypeInfoResolver?.GetType()?.Name);
return default;
}
switch (value)
{
case "true":
case "yes":
case "y":
case "1":
case "on":
return true;
case "false":
case "no":
case "n":
case "0":
case "off":
case "-1":
return false;
}
throw new SerializationException($"Can't convert bool value {value}");
}
} }
} }

View File

@ -27,64 +27,77 @@ namespace CryptoExchange.Net.Converters.SystemTextJson
/// <inheritdoc /> /// <inheritdoc />
public override JsonConverter CreateConverter(Type typeToConvert, JsonSerializerOptions options) public override JsonConverter CreateConverter(Type typeToConvert, JsonSerializerOptions options)
{ {
return typeToConvert == typeof(DateTime) ? new DateTimeConverterInner<DateTime>() : new DateTimeConverterInner<DateTime?>(); return typeToConvert == typeof(DateTime) ? new DateTimeConverterInner() : new NullableDateTimeConverterInner();
} }
private class DateTimeConverterInner<T> : JsonConverter<T> private class NullableDateTimeConverterInner : JsonConverter<DateTime?>
{ {
public override T Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) public override DateTime? Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
=> (T)((object?)ReadDateTime(ref reader, typeToConvert, options) ?? default(T))!; => ReadDateTime(ref reader, typeToConvert, options);
private DateTime? ReadDateTime(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) public override void Write(Utf8JsonWriter writer, DateTime? value, JsonSerializerOptions options)
{
if (reader.TokenType == JsonTokenType.Null)
{
if (typeToConvert == typeof(DateTime))
LibraryHelpers.StaticLogger?.LogWarning("DateTime value of null, but property is not nullable. Resolver: {Resolver}", options.TypeInfoResolver?.GetType()?.Name);
return default;
}
if (reader.TokenType is JsonTokenType.Number)
{
var decValue = reader.GetDecimal();
if (decValue == 0 || decValue < 0)
return default;
return ParseFromDecimal(decValue);
}
else if (reader.TokenType is JsonTokenType.String)
{
var stringValue = reader.GetString();
if (string.IsNullOrWhiteSpace(stringValue)
|| stringValue == "-1"
|| stringValue == "0001-01-01T00:00:00Z"
|| decimal.TryParse(stringValue, out var decVal) && decVal == 0)
{
return default;
}
return ParseFromString(stringValue!, options.TypeInfoResolver?.GetType()?.Name);
}
else
{
return reader.GetDateTime();
}
}
public override void Write(Utf8JsonWriter writer, T value, JsonSerializerOptions options)
{ {
if (value == null) if (value == null)
{ {
writer.WriteNullValue(); writer.WriteNullValue();
return;
} }
if (value.Value == default)
writer.WriteStringValue(default(DateTime));
else else
writer.WriteNumberValue((long)Math.Round((value.Value - new DateTime(1970, 1, 1)).TotalMilliseconds));
}
}
private class DateTimeConverterInner : JsonConverter<DateTime>
{
public override DateTime Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
=> ReadDateTime(ref reader, typeToConvert, options) ?? default;
public override void Write(Utf8JsonWriter writer, DateTime value, JsonSerializerOptions options)
{
var dtValue = (DateTime)(object)value;
if (dtValue == default)
writer.WriteStringValue(default(DateTime));
else
writer.WriteNumberValue((long)Math.Round((dtValue - new DateTime(1970, 1, 1)).TotalMilliseconds));
}
}
private static DateTime? ReadDateTime(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
{
if (reader.TokenType == JsonTokenType.Null)
{
if (typeToConvert == typeof(DateTime))
LibraryHelpers.StaticLogger?.LogWarning("DateTime value of null, but property is not nullable. Resolver: {Resolver}", options.TypeInfoResolver?.GetType()?.Name);
return default;
}
if (reader.TokenType is JsonTokenType.Number)
{
var decValue = reader.GetDecimal();
if (decValue == 0 || decValue < 0)
return default;
return ParseFromDecimal(decValue);
}
else if (reader.TokenType is JsonTokenType.String)
{
var stringValue = reader.GetString();
if (string.IsNullOrWhiteSpace(stringValue)
|| stringValue == "-1"
|| stringValue == "0001-01-01T00:00:00Z"
|| decimal.TryParse(stringValue, out var decVal) && decVal == 0)
{ {
var dtValue = (DateTime)(object)value; return default;
if (dtValue == default)
writer.WriteStringValue(default(DateTime));
else
writer.WriteNumberValue((long)Math.Round((dtValue - new DateTime(1970, 1, 1)).TotalMilliseconds));
} }
return ParseFromString(stringValue!, options.TypeInfoResolver?.GetType()?.Name);
}
else
{
return reader.GetDateTime();
} }
} }

View File

@ -20,6 +20,8 @@ namespace CryptoExchange.Net.Interfaces
/// The matcher for this listener /// The matcher for this listener
/// </summary> /// </summary>
public MessageMatcher MessageMatcher { get; } public MessageMatcher MessageMatcher { get; }
public HashSet<Type> DeserializationTypes { get; set; }
/// <summary> /// <summary>
/// Handle a message /// Handle a message
/// </summary> /// </summary>

View File

@ -1,4 +1,5 @@
using CryptoExchange.Net.Objects.Sockets; using CryptoExchange.Net.Objects.Sockets;
using CryptoExchange.Net.Sockets;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using System.IO.Pipelines; using System.IO.Pipelines;
@ -15,7 +16,7 @@ namespace CryptoExchange.Net.Interfaces
/// <param name="logger">The logger</param> /// <param name="logger">The logger</param>
/// <param name="parameters">The parameters to use for the connection</param> /// <param name="parameters">The parameters to use for the connection</param>
/// <returns></returns> /// <returns></returns>
IWebsocket CreateWebsocket(ILogger logger, WebSocketParameters parameters); IWebsocket CreateWebsocket(ILogger logger, SocketConnection connection, WebSocketParameters parameters);
/// <summary> /// <summary>
/// Create high performance websocket /// Create high performance websocket

View File

@ -61,6 +61,8 @@ namespace CryptoExchange.Net.Objects.Options
/// </remarks> /// </remarks>
public int? ReceiveBufferSize { get; set; } public int? ReceiveBufferSize { get; set; }
public bool EnabledNewDeserialization { get; set; }
/// <summary> /// <summary>
/// Create a copy of this options /// Create a copy of this options
/// </summary> /// </summary>
@ -82,6 +84,7 @@ namespace CryptoExchange.Net.Objects.Options
item.RateLimitingBehaviour = RateLimitingBehaviour; item.RateLimitingBehaviour = RateLimitingBehaviour;
item.RateLimiterEnabled = RateLimiterEnabled; item.RateLimiterEnabled = RateLimiterEnabled;
item.ReceiveBufferSize = ReceiveBufferSize; item.ReceiveBufferSize = ReceiveBufferSize;
item.EnabledNewDeserialization = EnabledNewDeserialization;
return item; return item;
} }
} }

View File

@ -75,6 +75,8 @@ namespace CryptoExchange.Net.Objects.Sockets
/// </summary> /// </summary>
public int? ReceiveBufferSize { get; set; } = null; public int? ReceiveBufferSize { get; set; } = null;
public bool UseNewMessageDeserialization { get; set; }
/// <summary> /// <summary>
/// ctor /// ctor
/// </summary> /// </summary>

View File

@ -9,6 +9,7 @@ using System;
using System.Buffers; using System.Buffers;
using System.Collections.Concurrent; using System.Collections.Concurrent;
using System.Collections.Generic; using System.Collections.Generic;
using System.Data.Common;
using System.IO; using System.IO;
using System.Linq; using System.Linq;
using System.Net; using System.Net;
@ -145,22 +146,28 @@ namespace CryptoExchange.Net.Sockets
/// <inheritdoc /> /// <inheritdoc />
public Func<Task<Uri?>>? GetReconnectionUrl { get; set; } public Func<Task<Uri?>>? GetReconnectionUrl { get; set; }
private SocketConnection _connection;
/// <summary> /// <summary>
/// ctor /// ctor
/// </summary> /// </summary>
/// <param name="logger">The log object to use</param> /// <param name="logger">The log object to use</param>
/// <param name="websocketParameters">The parameters for this socket</param> /// <param name="websocketParameters">The parameters for this socket</param>
public CryptoExchangeWebSocketClient(ILogger logger, WebSocketParameters websocketParameters) public CryptoExchangeWebSocketClient(ILogger logger, SocketConnection connection, WebSocketParameters websocketParameters)
{ {
Id = NextStreamId(); Id = NextStreamId();
_logger = logger; _logger = logger;
_connection = connection;
Parameters = websocketParameters; Parameters = websocketParameters;
_receivedMessages = new List<ReceiveItem>(); _receivedMessages = new List<ReceiveItem>();
_sendEvent = new AsyncResetEvent(); _sendEvent = new AsyncResetEvent();
_sendBuffer = new ConcurrentQueue<SendItem>(); _sendBuffer = new ConcurrentQueue<SendItem>();
_ctsSource = new CancellationTokenSource(); _ctsSource = new CancellationTokenSource();
_receiveBufferSize = websocketParameters.ReceiveBufferSize ?? _defaultReceiveBufferSize; if (websocketParameters.UseNewMessageDeserialization)
_receiveBufferSize = 1024;
else
_receiveBufferSize = websocketParameters.ReceiveBufferSize ?? _defaultReceiveBufferSize;
_closeSem = new SemaphoreSlim(1, 1); _closeSem = new SemaphoreSlim(1, 1);
_socket = CreateSocket(); _socket = CreateSocket();
@ -682,7 +689,10 @@ namespace CryptoExchange.Net.Sockets
{ {
// Received a complete message and it's not multi part // Received a complete message and it's not multi part
_logger.SocketReceivedSingleMessage(Id, receiveResult.Count); _logger.SocketReceivedSingleMessage(Id, receiveResult.Count);
await ProcessData(receiveResult.MessageType, new ReadOnlyMemory<byte>(buffer.Array!, buffer.Offset, receiveResult.Count)).ConfigureAwait(false); if (!Parameters.UseNewMessageDeserialization)
await ProcessData(receiveResult.MessageType, new ReadOnlyMemory<byte>(buffer.Array!, buffer.Offset, receiveResult.Count)).ConfigureAwait(false);
else
ProcessDataNew(receiveResult.MessageType, new ReadOnlySpan<byte>(buffer.Array!, buffer.Offset, receiveResult.Count));
} }
else else
{ {
@ -717,7 +727,11 @@ namespace CryptoExchange.Net.Sockets
{ {
_logger.SocketReassembledMessage(Id, multipartStream!.Length); _logger.SocketReassembledMessage(Id, multipartStream!.Length);
// Get the underlying buffer of the memory stream holding the written data and delimit it (GetBuffer return the full array, not only the written part) // Get the underlying buffer of the memory stream holding the written data and delimit it (GetBuffer return the full array, not only the written part)
await ProcessData(receiveResult.MessageType, new ReadOnlyMemory<byte>(multipartStream.GetBuffer(), 0, (int)multipartStream.Length)).ConfigureAwait(false);
if (!Parameters.UseNewMessageDeserialization)
await ProcessData(receiveResult.MessageType, new ReadOnlyMemory<byte>(multipartStream.GetBuffer(), 0, (int)multipartStream.Length)).ConfigureAwait(false);
else
ProcessDataNew(receiveResult.MessageType, new ReadOnlySpan<byte>(multipartStream.GetBuffer(), 0, (int)multipartStream.Length));
} }
else else
{ {
@ -743,6 +757,19 @@ namespace CryptoExchange.Net.Sockets
} }
} }
/// <summary>
/// Process a stream message
/// </summary>
/// <param name="type"></param>
/// <param name="data"></param>
/// <returns></returns>
protected void ProcessDataNew(WebSocketMessageType type, ReadOnlySpan<byte> data)
{
LastActionTime = DateTime.UtcNow;
_connection.HandleStreamMessage2(type, data);
//await (OnStreamMessage?.Invoke(type, data) ?? Task.CompletedTask).ConfigureAwait(false);
}
/// <summary> /// <summary>
/// Process a stream message /// Process a stream message
/// </summary> /// </summary>

View File

@ -484,24 +484,33 @@ namespace CryptoExchange.Net.Sockets
#pragma warning restore IL3050 // Calling members annotated with 'RequiresDynamicCodeAttribute' may break functionality when AOT compiling. #pragma warning restore IL3050 // Calling members annotated with 'RequiresDynamicCodeAttribute' may break functionality when AOT compiling.
#pragma warning restore IL2026 // Members annotated with 'RequiresUnreferencedCodeAttribute' require dynamic access otherwise can break functionality when trimming application code #pragma warning restore IL2026 // Members annotated with 'RequiresUnreferencedCodeAttribute' require dynamic access otherwise can break functionality when trimming application code
{ {
var tasks = _typedSubscriptions.Select(sub => if (_typedSubscriptions.Count == 1)
{ {
try // If there is only one listener we can prevent the overhead of the await which will call a `ToList`
{ await DelegateToSubscription(_typedSubscriptions[0], update!).ConfigureAwait(false);
return sub.HandleAsync(update!); continue;
} }
catch (Exception ex)
{ var tasks = _typedSubscriptions.Select(sub => DelegateToSubscription(sub, update!));
sub.InvokeExceptionHandler(ex);
_logger.UserMessageProcessingFailed(SocketId, ex.Message, ex);
return new ValueTask();
}
});
await LibraryHelpers.WhenAll(tasks).ConfigureAwait(false); await LibraryHelpers.WhenAll(tasks).ConfigureAwait(false);
} }
} }
catch (OperationCanceledException) { } catch (OperationCanceledException) { }
} }
private ValueTask DelegateToSubscription(HighPerfSubscription<T> sub, T update)
{
try
{
return sub.HandleAsync(update!);
}
catch (Exception ex)
{
sub.InvokeExceptionHandler(ex);
_logger.UserMessageProcessingFailed(SocketId, ex.Message, ex);
return new ValueTask();
}
}
} }
} }

View File

@ -90,7 +90,7 @@ namespace CryptoExchange.Net.Sockets
/// <summary> /// <summary>
/// Get any handler links matching with the listen id /// Get any handler links matching with the listen id
/// </summary> /// </summary>
public List<MessageHandlerLink> GetHandlerLinks(string listenId) => HandlerLinks.Where(x => x.Check(listenId)).ToList(); public IEnumerable<MessageHandlerLink> GetHandlerLinks(string listenId) => HandlerLinks.Where(x => x.Check(listenId));
/// <inheritdoc /> /// <inheritdoc />
public override string ToString() => string.Join(",", HandlerLinks.Select(x => x.ToString())); public override string ToString() => string.Join(",", HandlerLinks.Select(x => x.ToString()));
@ -113,6 +113,7 @@ namespace CryptoExchange.Net.Sockets
/// Deserialization type /// Deserialization type
/// </summary> /// </summary>
public abstract Type GetDeserializationType(IMessageAccessor accessor); public abstract Type GetDeserializationType(IMessageAccessor accessor);
public abstract Type DeserializationType { get; }
/// <summary> /// <summary>
/// ctor /// ctor
@ -150,6 +151,8 @@ namespace CryptoExchange.Net.Sockets
{ {
private Func<SocketConnection, DataEvent<TServer>, CallResult> _handler; private Func<SocketConnection, DataEvent<TServer>, CallResult> _handler;
public override Type DeserializationType => typeof(TServer);
/// <inheritdoc /> /// <inheritdoc />
public override Type GetDeserializationType(IMessageAccessor accessor) => typeof(TServer); public override Type GetDeserializationType(IMessageAccessor accessor) => typeof(TServer);

View File

@ -65,10 +65,21 @@ namespace CryptoExchange.Net.Sockets
/// </summary> /// </summary>
public AsyncResetEvent? ContinueAwaiter { get; set; } public AsyncResetEvent? ContinueAwaiter { get; set; }
public HashSet<Type> DeserializationTypes { get; set; }
private MessageMatcher _matcher;
/// <summary> /// <summary>
/// Matcher for this query /// Matcher for this subscription
/// </summary> /// </summary>
public MessageMatcher MessageMatcher { get; set; } = null!; public MessageMatcher MessageMatcher
{
get => _matcher;
set
{
_matcher = value;
DeserializationTypes = new HashSet<Type>(MessageMatcher.HandlerLinks.Select(x => x.DeserializationType));
}
}
/// <summary> /// <summary>
/// The query request object /// The query request object

View File

@ -1,4 +1,5 @@
using CryptoExchange.Net.Clients; using CryptoExchange.Net.Clients;
using CryptoExchange.Net.Converters.MessageParsing.DynamicConverters;
using CryptoExchange.Net.Interfaces; using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Logging.Extensions; using CryptoExchange.Net.Logging.Extensions;
using CryptoExchange.Net.Objects; using CryptoExchange.Net.Objects;
@ -11,6 +12,7 @@ using System.Linq;
using System.Net; using System.Net;
using System.Net.Sockets; using System.Net.Sockets;
using System.Net.WebSockets; using System.Net.WebSockets;
using System.Text.Json;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
@ -266,6 +268,8 @@ namespace CryptoExchange.Net.Sockets
private IByteMessageAccessor? _stringMessageAccessor; private IByteMessageAccessor? _stringMessageAccessor;
private IByteMessageAccessor? _byteMessageAccessor; private IByteMessageAccessor? _byteMessageAccessor;
private IMessageConverter? _messageConverter;
/// <summary> /// <summary>
/// The task that is sending periodic data on the websocket. Can be used for sending Ping messages every x seconds or similar. Not necessary. /// The task that is sending periodic data on the websocket. Can be used for sending Ping messages every x seconds or similar. Not necessary.
/// </summary> /// </summary>
@ -296,7 +300,7 @@ namespace CryptoExchange.Net.Sockets
Tag = tag; Tag = tag;
Properties = new Dictionary<string, object>(); Properties = new Dictionary<string, object>();
_socket = socketFactory.CreateWebsocket(logger, parameters); _socket = socketFactory.CreateWebsocket(logger, this, parameters);
_logger.SocketCreatedForAddress(_socket.Id, parameters.Uri.ToString()); _logger.SocketCreatedForAddress(_socket.Id, parameters.Uri.ToString());
_socket.OnStreamMessage += HandleStreamMessage; _socket.OnStreamMessage += HandleStreamMessage;
@ -500,6 +504,52 @@ namespace CryptoExchange.Net.Sockets
return Task.CompletedTask; return Task.CompletedTask;
} }
/// <summary>
/// Handle a message
/// </summary>
protected internal virtual void HandleStreamMessage2(WebSocketMessageType type, ReadOnlySpan<byte> data)
{
//var sw = Stopwatch.StartNew();
var receiveTime = DateTime.UtcNow;
//// 1. Decrypt/Preprocess if necessary
//data = ApiClient.PreprocessStreamMessage(this, type, data);
_messageConverter ??= ApiClient.CreateMessageConverter();
var messageType = _messageConverter.GetMessageType(data, type);
if (messageType.Type == null)
{
// Failed to determine message type
return;
}
var result = _messageConverter.Deserialize(data, messageType.Type);
if (result == null)
{
// Deserialize error
return;
}
var targetType = messageType.Type;
List<IMessageProcessor> listeners;
lock (_listenersLock)
listeners = _listeners.Where(x => x.DeserializationTypes.Contains(targetType)).ToList();
if (listeners.Count == 0)
{
// No subscriptions found for type
return;
}
var dataEvent = new DataEvent<object>(result, null, null, null /*originalData*/, receiveTime, null);
foreach (var subscription in listeners)
{
var links = subscription.MessageMatcher.GetHandlerLinks(messageType.Identifier);
foreach(var link in links)
subscription.Handle(this, dataEvent, link);
}
}
/// <summary> /// <summary>
/// Handle a message /// Handle a message
/// </summary> /// </summary>

View File

@ -35,6 +35,8 @@ namespace CryptoExchange.Net.Sockets
/// </summary> /// </summary>
public bool UserSubscription { get; set; } public bool UserSubscription { get; set; }
public HashSet<Type> DeserializationTypes { get; set; }
private SubscriptionStatus _status; private SubscriptionStatus _status;
/// <summary> /// <summary>
/// Current subscription status /// Current subscription status
@ -72,10 +74,20 @@ namespace CryptoExchange.Net.Sockets
/// </summary> /// </summary>
public bool Authenticated { get; } public bool Authenticated { get; }
private MessageMatcher _matcher;
/// <summary> /// <summary>
/// Matcher for this subscription /// Matcher for this subscription
/// </summary> /// </summary>
public MessageMatcher MessageMatcher { get; set; } = null!; public MessageMatcher MessageMatcher
{
get => _matcher;
set
{
_matcher = value;
DeserializationTypes = new HashSet<Type>(MessageMatcher.HandlerLinks.Select(x => x.DeserializationType));
}
}
/// <summary> /// <summary>
/// Cancellation token registration /// Cancellation token registration
@ -109,7 +121,10 @@ namespace CryptoExchange.Net.Sockets
/// <summary> /// <summary>
/// ctor /// ctor
/// </summary> /// </summary>
public Subscription(ILogger logger, bool authenticated, bool userSubscription = true) public Subscription(
ILogger logger,
bool authenticated,
bool userSubscription = true)
{ {
_logger = logger; _logger = logger;
Authenticated = authenticated; Authenticated = authenticated;

View File

@ -13,9 +13,9 @@ namespace CryptoExchange.Net.Sockets
public class WebsocketFactory : IWebsocketFactory public class WebsocketFactory : IWebsocketFactory
{ {
/// <inheritdoc /> /// <inheritdoc />
public IWebsocket CreateWebsocket(ILogger logger, WebSocketParameters parameters) public IWebsocket CreateWebsocket(ILogger logger, SocketConnection connection, WebSocketParameters parameters)
{ {
return new CryptoExchangeWebSocketClient(logger, parameters); return new CryptoExchangeWebSocketClient(logger, connection, parameters);
} }
/// <inheritdoc /> /// <inheritdoc />
public IHighPerfWebsocket CreateHighPerfWebsocket(ILogger logger, WebSocketParameters parameters, PipeWriter pipeWriter) public IHighPerfWebsocket CreateHighPerfWebsocket(ILogger logger, WebSocketParameters parameters, PipeWriter pipeWriter)

View File

@ -1,5 +1,6 @@
using CryptoExchange.Net.Interfaces; using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Objects.Sockets; using CryptoExchange.Net.Objects.Sockets;
using CryptoExchange.Net.Sockets;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using System; using System;
using System.IO.Pipelines; using System.IO.Pipelines;
@ -16,6 +17,6 @@ namespace CryptoExchange.Net.Testing.Implementations
} }
public IHighPerfWebsocket CreateHighPerfWebsocket(ILogger logger, WebSocketParameters parameters, PipeWriter pipeWriter) => throw new NotImplementedException(); public IHighPerfWebsocket CreateHighPerfWebsocket(ILogger logger, WebSocketParameters parameters, PipeWriter pipeWriter) => throw new NotImplementedException();
public IWebsocket CreateWebsocket(ILogger logger, WebSocketParameters parameters) => _socket; public IWebsocket CreateWebsocket(ILogger logger, SocketConnection connection, WebSocketParameters parameters) => _socket;
} }
} }