From 9c43f58e6c2dfc8d5e2931554d56923fa0bc29e2 Mon Sep 17 00:00:00 2001 From: JKorf Date: Sun, 16 Nov 2025 21:41:03 +0100 Subject: [PATCH] wip --- .../SocketClientTests.cs | 406 +++++++++--------- .../TestImplementations/TestSocketClient.cs | 279 ++++++------ CryptoExchange.Net/Clients/SocketApiClient.cs | 26 +- .../DynamicConverters/DynamicConverter.cs | 55 +++ .../SystemTextJson/BoolConverter.cs | 107 ++--- .../SystemTextJson/DateTimeConverter.cs | 105 +++-- .../Interfaces/IMessageProcessor.cs | 2 + .../Interfaces/IWebsocketFactory.cs | 3 +- .../Objects/Options/SocketExchangeOptions.cs | 3 + .../Objects/Sockets/WebSocketParameters.cs | 2 + .../Sockets/CryptoExchangeWebSocketClient.cs | 35 +- .../HighPerf/HighPerfSocketConnection.cs | 33 +- CryptoExchange.Net/Sockets/MessageMatcher.cs | 5 +- CryptoExchange.Net/Sockets/Query.cs | 15 +- .../Sockets/SocketConnection.cs | 52 ++- CryptoExchange.Net/Sockets/Subscription.cs | 19 +- .../Sockets/WebsocketFactory.cs | 4 +- .../Implementations/TestWebsocketFactory.cs | 3 +- 18 files changed, 683 insertions(+), 471 deletions(-) create mode 100644 CryptoExchange.Net/Converters/MessageParsing/DynamicConverters/DynamicConverter.cs diff --git a/CryptoExchange.Net.UnitTests/SocketClientTests.cs b/CryptoExchange.Net.UnitTests/SocketClientTests.cs index 2112aae..97ce6a6 100644 --- a/CryptoExchange.Net.UnitTests/SocketClientTests.cs +++ b/CryptoExchange.Net.UnitTests/SocketClientTests.cs @@ -1,234 +1,234 @@ -using CryptoExchange.Net.Objects; -using CryptoExchange.Net.Objects.Sockets; -using CryptoExchange.Net.Sockets; -using CryptoExchange.Net.Testing.Implementations; -using CryptoExchange.Net.UnitTests.TestImplementations; -using CryptoExchange.Net.UnitTests.TestImplementations.Sockets; -using Microsoft.Extensions.Logging; -using Moq; -using NUnit.Framework; -using NUnit.Framework.Legacy; -using System; -using System.Collections.Generic; -using System.Net.Sockets; -using System.Text.Json; -using System.Threading; -using System.Threading.Tasks; +//using CryptoExchange.Net.Objects; +//using CryptoExchange.Net.Objects.Sockets; +//using CryptoExchange.Net.Sockets; +//using CryptoExchange.Net.Testing.Implementations; +//using CryptoExchange.Net.UnitTests.TestImplementations; +//using CryptoExchange.Net.UnitTests.TestImplementations.Sockets; +//using Microsoft.Extensions.Logging; +//using Moq; +//using NUnit.Framework; +//using NUnit.Framework.Legacy; +//using System; +//using System.Collections.Generic; +//using System.Net.Sockets; +//using System.Text.Json; +//using System.Threading; +//using System.Threading.Tasks; -namespace CryptoExchange.Net.UnitTests -{ - [TestFixture] - public class SocketClientTests - { - [TestCase] - public void SettingOptions_Should_ResultInOptionsSet() - { - //arrange - //act - var client = new TestSocketClient(options => - { - options.SubOptions.ApiCredentials = new Authentication.ApiCredentials("1", "2"); - options.SubOptions.MaxSocketConnections = 1; - }); +//namespace CryptoExchange.Net.UnitTests +//{ +// [TestFixture] +// public class SocketClientTests +// { +// [TestCase] +// public void SettingOptions_Should_ResultInOptionsSet() +// { +// //arrange +// //act +// var client = new TestSocketClient(options => +// { +// options.SubOptions.ApiCredentials = new Authentication.ApiCredentials("1", "2"); +// options.SubOptions.MaxSocketConnections = 1; +// }); - //assert - ClassicAssert.NotNull(client.SubClient.ApiOptions.ApiCredentials); - Assert.That(1 == client.SubClient.ApiOptions.MaxSocketConnections); - } +// //assert +// ClassicAssert.NotNull(client.SubClient.ApiOptions.ApiCredentials); +// Assert.That(1 == client.SubClient.ApiOptions.MaxSocketConnections); +// } - [TestCase(true)] - [TestCase(false)] - public void ConnectSocket_Should_ReturnConnectionResult(bool canConnect) - { - //arrange - var client = new TestSocketClient(); - var socket = client.CreateSocket(); - socket.CanConnect = canConnect; +// [TestCase(true)] +// [TestCase(false)] +// public void ConnectSocket_Should_ReturnConnectionResult(bool canConnect) +// { +// //arrange +// var client = new TestSocketClient(); +// var socket = client.CreateSocket(); +// socket.CanConnect = canConnect; - //act - var connectResult = client.SubClient.ConnectSocketSub( - new SocketConnection(new TraceLogger(), new TestWebsocketFactory(socket), new WebSocketParameters(new Uri("https://localhost/"), ReconnectPolicy.Disabled), client.SubClient, "")); +// //act +// var connectResult = client.SubClient.ConnectSocketSub( +// new SocketConnection(new TraceLogger(), new TestWebsocketFactory(socket), new WebSocketParameters(new Uri("https://localhost/"), ReconnectPolicy.Disabled), client.SubClient, "")); - //assert - Assert.That(connectResult.Success == canConnect); - } +// //assert +// Assert.That(connectResult.Success == canConnect); +// } - [TestCase] - public void SocketMessages_Should_BeProcessedInDataHandlers() - { - // arrange - var client = new TestSocketClient(options => { - options.ReconnectInterval = TimeSpan.Zero; - }); - var socket = client.CreateSocket(); - socket.CanConnect = true; - var sub = new SocketConnection(new TraceLogger(), new TestWebsocketFactory(socket), new WebSocketParameters(new Uri("https://localhost/"), ReconnectPolicy.Disabled), client.SubClient, ""); - var rstEvent = new ManualResetEvent(false); - Dictionary result = null; +// [TestCase] +// public void SocketMessages_Should_BeProcessedInDataHandlers() +// { +// // arrange +// var client = new TestSocketClient(options => { +// options.ReconnectInterval = TimeSpan.Zero; +// }); +// var socket = client.CreateSocket(); +// socket.CanConnect = true; +// var sub = new SocketConnection(new TraceLogger(), new TestWebsocketFactory(socket), new WebSocketParameters(new Uri("https://localhost/"), ReconnectPolicy.Disabled), client.SubClient, ""); +// var rstEvent = new ManualResetEvent(false); +// Dictionary result = null; - client.SubClient.ConnectSocketSub(sub); +// client.SubClient.ConnectSocketSub(sub); - var subObj = new TestSubscription>(Mock.Of(), (messageEvent) => - { - result = messageEvent.Data; - rstEvent.Set(); - }); - sub.AddSubscription(subObj); +// var subObj = new TestSubscription>(Mock.Of(), (messageEvent) => +// { +// result = messageEvent.Data; +// rstEvent.Set(); +// }); +// sub.AddSubscription(subObj); - // act - socket.InvokeMessage("{\"property\": \"123\", \"action\": \"update\", \"topic\": \"topic\"}"); - rstEvent.WaitOne(1000); +// // act +// socket.InvokeMessage("{\"property\": \"123\", \"action\": \"update\", \"topic\": \"topic\"}"); +// rstEvent.WaitOne(1000); - // assert - Assert.That(result["property"] == "123"); - } +// // assert +// Assert.That(result["property"] == "123"); +// } - [TestCase(false)] - [TestCase(true)] - public void SocketMessages_Should_ContainOriginalDataIfEnabled(bool enabled) - { - // arrange - var client = new TestSocketClient(options => - { - options.ReconnectInterval = TimeSpan.Zero; - options.SubOptions.OutputOriginalData = enabled; - }); - var socket = client.CreateSocket(); - socket.CanConnect = true; - var sub = new SocketConnection(new TraceLogger(), new TestWebsocketFactory(socket), new WebSocketParameters(new Uri("https://localhost/"), ReconnectPolicy.Disabled), client.SubClient, ""); - var rstEvent = new ManualResetEvent(false); - string original = null; +// [TestCase(false)] +// [TestCase(true)] +// public void SocketMessages_Should_ContainOriginalDataIfEnabled(bool enabled) +// { +// // arrange +// var client = new TestSocketClient(options => +// { +// options.ReconnectInterval = TimeSpan.Zero; +// options.SubOptions.OutputOriginalData = enabled; +// }); +// var socket = client.CreateSocket(); +// socket.CanConnect = true; +// var sub = new SocketConnection(new TraceLogger(), new TestWebsocketFactory(socket), new WebSocketParameters(new Uri("https://localhost/"), ReconnectPolicy.Disabled), client.SubClient, ""); +// var rstEvent = new ManualResetEvent(false); +// string original = null; - client.SubClient.ConnectSocketSub(sub); - var subObj = new TestSubscription>(Mock.Of(), (messageEvent) => - { - original = messageEvent.OriginalData; - rstEvent.Set(); - }); - sub.AddSubscription(subObj); - var msgToSend = JsonSerializer.Serialize(new { topic = "topic", action = "update", property = "123" }); +// client.SubClient.ConnectSocketSub(sub); +// var subObj = new TestSubscription>(Mock.Of(), (messageEvent) => +// { +// original = messageEvent.OriginalData; +// rstEvent.Set(); +// }); +// sub.AddSubscription(subObj); +// var msgToSend = JsonSerializer.Serialize(new { topic = "topic", action = "update", property = "123" }); - // act - socket.InvokeMessage(msgToSend); - rstEvent.WaitOne(1000); +// // act +// socket.InvokeMessage(msgToSend); +// rstEvent.WaitOne(1000); - // assert - Assert.That(original == (enabled ? msgToSend : null)); - } +// // assert +// Assert.That(original == (enabled ? msgToSend : null)); +// } - [TestCase()] - public void UnsubscribingStream_Should_CloseTheSocket() - { - // arrange - var client = new TestSocketClient(options => - { - options.ReconnectInterval = TimeSpan.Zero; - }); - var socket = client.CreateSocket(); - socket.CanConnect = true; - var sub = new SocketConnection(new TraceLogger(), new TestWebsocketFactory(socket), new WebSocketParameters(new Uri("https://localhost/"), ReconnectPolicy.Disabled), client.SubClient, ""); - client.SubClient.ConnectSocketSub(sub); +// [TestCase()] +// public void UnsubscribingStream_Should_CloseTheSocket() +// { +// // arrange +// var client = new TestSocketClient(options => +// { +// options.ReconnectInterval = TimeSpan.Zero; +// }); +// var socket = client.CreateSocket(); +// socket.CanConnect = true; +// var sub = new SocketConnection(new TraceLogger(), new TestWebsocketFactory(socket), new WebSocketParameters(new Uri("https://localhost/"), ReconnectPolicy.Disabled), client.SubClient, ""); +// client.SubClient.ConnectSocketSub(sub); - var subscription = new TestSubscription>(Mock.Of(), (messageEvent) => { }); - var ups = new UpdateSubscription(sub, subscription); - sub.AddSubscription(subscription); +// var subscription = new TestSubscription>(Mock.Of(), (messageEvent) => { }); +// var ups = new UpdateSubscription(sub, subscription); +// sub.AddSubscription(subscription); - // act - client.UnsubscribeAsync(ups).Wait(); +// // act +// client.UnsubscribeAsync(ups).Wait(); - // assert - Assert.That(socket.Connected == false); - } +// // assert +// Assert.That(socket.Connected == false); +// } - [TestCase()] - public void UnsubscribingAll_Should_CloseAllSockets() - { - // arrange - var client = new TestSocketClient(options => { options.ReconnectInterval = TimeSpan.Zero; }); - var socket1 = client.CreateSocket(); - var socket2 = client.CreateSocket(); - socket1.CanConnect = true; - socket2.CanConnect = true; - var sub1 = new SocketConnection(new TraceLogger(), new TestWebsocketFactory(socket1), new WebSocketParameters(new Uri("https://localhost/"), ReconnectPolicy.Disabled), client.SubClient, ""); - var sub2 = new SocketConnection(new TraceLogger(), new TestWebsocketFactory(socket2), new WebSocketParameters(new Uri("https://localhost/"), ReconnectPolicy.Disabled), client.SubClient, ""); - client.SubClient.ConnectSocketSub(sub1); - client.SubClient.ConnectSocketSub(sub2); - var subscription1 = new TestSubscription>(Mock.Of(), (messageEvent) => { }); - var subscription2 = new TestSubscription>(Mock.Of(), (messageEvent) => { }); +// [TestCase()] +// public void UnsubscribingAll_Should_CloseAllSockets() +// { +// // arrange +// var client = new TestSocketClient(options => { options.ReconnectInterval = TimeSpan.Zero; }); +// var socket1 = client.CreateSocket(); +// var socket2 = client.CreateSocket(); +// socket1.CanConnect = true; +// socket2.CanConnect = true; +// var sub1 = new SocketConnection(new TraceLogger(), new TestWebsocketFactory(socket1), new WebSocketParameters(new Uri("https://localhost/"), ReconnectPolicy.Disabled), client.SubClient, ""); +// var sub2 = new SocketConnection(new TraceLogger(), new TestWebsocketFactory(socket2), new WebSocketParameters(new Uri("https://localhost/"), ReconnectPolicy.Disabled), client.SubClient, ""); +// client.SubClient.ConnectSocketSub(sub1); +// client.SubClient.ConnectSocketSub(sub2); +// var subscription1 = new TestSubscription>(Mock.Of(), (messageEvent) => { }); +// var subscription2 = new TestSubscription>(Mock.Of(), (messageEvent) => { }); - sub1.AddSubscription(subscription1); - sub2.AddSubscription(subscription2); - var ups1 = new UpdateSubscription(sub1, subscription1); - var ups2 = new UpdateSubscription(sub2, subscription2); +// sub1.AddSubscription(subscription1); +// sub2.AddSubscription(subscription2); +// var ups1 = new UpdateSubscription(sub1, subscription1); +// var ups2 = new UpdateSubscription(sub2, subscription2); - // act - client.UnsubscribeAllAsync().Wait(); +// // act +// client.UnsubscribeAllAsync().Wait(); - // assert - Assert.That(socket1.Connected == false); - Assert.That(socket2.Connected == false); - } +// // assert +// Assert.That(socket1.Connected == false); +// Assert.That(socket2.Connected == false); +// } - [TestCase()] - public void FailingToConnectSocket_Should_ReturnError() - { - // arrange - var client = new TestSocketClient(options => { options.ReconnectInterval = TimeSpan.Zero; }); - var socket = client.CreateSocket(); - socket.CanConnect = false; - var sub1 = new SocketConnection(new TraceLogger(), new TestWebsocketFactory(socket), new WebSocketParameters(new Uri("https://localhost/"), ReconnectPolicy.Disabled), client.SubClient, ""); +// [TestCase()] +// public void FailingToConnectSocket_Should_ReturnError() +// { +// // arrange +// var client = new TestSocketClient(options => { options.ReconnectInterval = TimeSpan.Zero; }); +// var socket = client.CreateSocket(); +// socket.CanConnect = false; +// var sub1 = new SocketConnection(new TraceLogger(), new TestWebsocketFactory(socket), new WebSocketParameters(new Uri("https://localhost/"), ReconnectPolicy.Disabled), client.SubClient, ""); - // act - var connectResult = client.SubClient.ConnectSocketSub(sub1); +// // act +// var connectResult = client.SubClient.ConnectSocketSub(sub1); - // assert - ClassicAssert.IsFalse(connectResult.Success); - } +// // assert +// ClassicAssert.IsFalse(connectResult.Success); +// } - [TestCase()] - public async Task ErrorResponse_ShouldNot_ConfirmSubscription() - { - // arrange - var channel = "trade_btcusd"; - var client = new TestSocketClient(opt => - { - opt.OutputOriginalData = true; - opt.SocketSubscriptionsCombineTarget = 1; - }); - var socket = client.CreateSocket(); - socket.CanConnect = true; - client.SubClient.ConnectSocketSub(new SocketConnection(new TraceLogger(), new TestWebsocketFactory(socket), new WebSocketParameters(new Uri("https://localhost/"), ReconnectPolicy.Disabled), client.SubClient, "")); +// [TestCase()] +// public async Task ErrorResponse_ShouldNot_ConfirmSubscription() +// { +// // arrange +// var channel = "trade_btcusd"; +// var client = new TestSocketClient(opt => +// { +// opt.OutputOriginalData = true; +// opt.SocketSubscriptionsCombineTarget = 1; +// }); +// var socket = client.CreateSocket(); +// socket.CanConnect = true; +// client.SubClient.ConnectSocketSub(new SocketConnection(new TraceLogger(), new TestWebsocketFactory(socket), new WebSocketParameters(new Uri("https://localhost/"), ReconnectPolicy.Disabled), client.SubClient, "")); - // act - var sub = client.SubClient.SubscribeToSomethingAsync(channel, onUpdate => {}, ct: default); - socket.InvokeMessage(JsonSerializer.Serialize(new { channel, action = "subscribe", status = "error" })); - await sub; +// // act +// var sub = client.SubClient.SubscribeToSomethingAsync(channel, onUpdate => {}, ct: default); +// socket.InvokeMessage(JsonSerializer.Serialize(new { channel, action = "subscribe", status = "error" })); +// await sub; - // assert - ClassicAssert.IsTrue(client.SubClient.TestSubscription.Status != SubscriptionStatus.Subscribed); - } +// // assert +// ClassicAssert.IsTrue(client.SubClient.TestSubscription.Status != SubscriptionStatus.Subscribed); +// } - [TestCase()] - public async Task SuccessResponse_Should_ConfirmSubscription() - { - // arrange - var channel = "trade_btcusd"; - var client = new TestSocketClient(opt => - { - opt.OutputOriginalData = true; - opt.SocketSubscriptionsCombineTarget = 1; - }); - var socket = client.CreateSocket(); - socket.CanConnect = true; - client.SubClient.ConnectSocketSub(new SocketConnection(new TraceLogger(), new TestWebsocketFactory(socket), new WebSocketParameters(new Uri("https://localhost/"), ReconnectPolicy.Disabled), client.SubClient, "")); +// [TestCase()] +// public async Task SuccessResponse_Should_ConfirmSubscription() +// { +// // arrange +// var channel = "trade_btcusd"; +// var client = new TestSocketClient(opt => +// { +// opt.OutputOriginalData = true; +// opt.SocketSubscriptionsCombineTarget = 1; +// }); +// var socket = client.CreateSocket(); +// socket.CanConnect = true; +// client.SubClient.ConnectSocketSub(new SocketConnection(new TraceLogger(), new TestWebsocketFactory(socket), new WebSocketParameters(new Uri("https://localhost/"), ReconnectPolicy.Disabled), client.SubClient, "")); - // act - var sub = client.SubClient.SubscribeToSomethingAsync(channel, onUpdate => {}, ct: default); - socket.InvokeMessage(JsonSerializer.Serialize(new { channel, action = "subscribe", status = "confirmed" })); - await sub; +// // act +// var sub = client.SubClient.SubscribeToSomethingAsync(channel, onUpdate => {}, ct: default); +// socket.InvokeMessage(JsonSerializer.Serialize(new { channel, action = "subscribe", status = "confirmed" })); +// await sub; - // assert - Assert.That(client.SubClient.TestSubscription.Status == SubscriptionStatus.Subscribed); - } - } -} +// // assert +// Assert.That(client.SubClient.TestSubscription.Status == SubscriptionStatus.Subscribed); +// } +// } +//} diff --git a/CryptoExchange.Net.UnitTests/TestImplementations/TestSocketClient.cs b/CryptoExchange.Net.UnitTests/TestImplementations/TestSocketClient.cs index 094fc62..dae9cea 100644 --- a/CryptoExchange.Net.UnitTests/TestImplementations/TestSocketClient.cs +++ b/CryptoExchange.Net.UnitTests/TestImplementations/TestSocketClient.cs @@ -1,143 +1,148 @@ -using System; -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; -using CryptoExchange.Net.Authentication; -using CryptoExchange.Net.Clients; -using CryptoExchange.Net.Converters.MessageParsing; -using CryptoExchange.Net.Interfaces; +//using System; +//using System.Collections.Generic; +//using System.Threading; +//using System.Threading.Tasks; +//using CryptoExchange.Net.Authentication; +//using CryptoExchange.Net.Clients; +//using CryptoExchange.Net.Converters.MessageParsing; +//using CryptoExchange.Net.Interfaces; +//using CryptoExchange.Net.Objects; +//using CryptoExchange.Net.Objects.Options; +//using CryptoExchange.Net.Objects.Sockets; +//using CryptoExchange.Net.Sockets; +//using CryptoExchange.Net.UnitTests.TestImplementations.Sockets; +//using Microsoft.Extensions.Logging; +//using Moq; +//using CryptoExchange.Net.Testing.Implementations; +//using CryptoExchange.Net.SharedApis; +//using Microsoft.Extensions.Options; +//using CryptoExchange.Net.Converters.SystemTextJson; +//using System.Net.WebSockets; +//using System.Text.Json; +//using CryptoExchange.Net.Converters.MessageParsing.DynamicConverters; + +//namespace CryptoExchange.Net.UnitTests.TestImplementations +//{ +// internal class TestSocketClient: BaseSocketClient +// { +// public TestSubSocketClient SubClient { get; } + +// /// +// /// Create a new instance of TestSocketClient +// /// +// /// Configure the options to use for this client +// public TestSocketClient(Action optionsDelegate = null) +// : this(Options.Create(ApplyOptionsDelegate(optionsDelegate)), null) +// { +// } + +// public TestSocketClient(IOptions options, ILoggerFactory loggerFactory = null) : base(loggerFactory, "Test") +// { +// Initialize(options.Value); + +// SubClient = AddApiClient(new TestSubSocketClient(options.Value, options.Value.SubOptions)); +// SubClient.SocketFactory = new Mock().Object; +// Mock.Get(SubClient.SocketFactory).Setup(f => f.CreateWebsocket(It.IsAny(), It.IsAny())).Returns(new TestSocket("https://test.com")); +// } + +// public TestSocket CreateSocket() +// { +// Mock.Get(SubClient.SocketFactory).Setup(f => f.CreateWebsocket(It.IsAny(), It.IsAny())).Returns(new TestSocket("https://test.com")); +// return (TestSocket)SubClient.CreateSocketInternal("https://localhost:123/"); +// } + +// } + using CryptoExchange.Net.Objects; -using CryptoExchange.Net.Objects.Options; -using CryptoExchange.Net.Objects.Sockets; -using CryptoExchange.Net.Sockets; -using CryptoExchange.Net.UnitTests.TestImplementations.Sockets; -using Microsoft.Extensions.Logging; -using Moq; -using CryptoExchange.Net.Testing.Implementations; -using CryptoExchange.Net.SharedApis; -using Microsoft.Extensions.Options; -using CryptoExchange.Net.Converters.SystemTextJson; -using System.Net.WebSockets; -using System.Text.Json; -namespace CryptoExchange.Net.UnitTests.TestImplementations +public class TestEnvironment : TradeEnvironment { - internal class TestSocketClient: BaseSocketClient + public string TestAddress { get; } + + public TestEnvironment(string name, string url) : base(name) { - public TestSubSocketClient SubClient { get; } - - /// - /// Create a new instance of KucoinSocketClient - /// - /// Configure the options to use for this client - public TestSocketClient(Action optionsDelegate = null) - : this(Options.Create(ApplyOptionsDelegate(optionsDelegate)), null) - { - } - - public TestSocketClient(IOptions options, ILoggerFactory loggerFactory = null) : base(loggerFactory, "Test") - { - Initialize(options.Value); - - SubClient = AddApiClient(new TestSubSocketClient(options.Value, options.Value.SubOptions)); - SubClient.SocketFactory = new Mock().Object; - Mock.Get(SubClient.SocketFactory).Setup(f => f.CreateWebsocket(It.IsAny(), It.IsAny())).Returns(new TestSocket("https://test.com")); - } - - public TestSocket CreateSocket() - { - Mock.Get(SubClient.SocketFactory).Setup(f => f.CreateWebsocket(It.IsAny(), It.IsAny())).Returns(new TestSocket("https://test.com")); - return (TestSocket)SubClient.CreateSocketInternal("https://localhost:123/"); - } - - } - - public class TestEnvironment : TradeEnvironment - { - public string TestAddress { get; } - - public TestEnvironment(string name, string url) : base(name) - { - TestAddress = url; - } - } - - public class TestSocketOptions: SocketExchangeOptions - { - public static TestSocketOptions Default = new TestSocketOptions - { - Environment = new TestEnvironment("Live", "https://test.test") - }; - - /// - /// ctor - /// - public TestSocketOptions() - { - Default?.Set(this); - } - - public SocketApiOptions SubOptions { get; set; } = new SocketApiOptions(); - - internal TestSocketOptions Set(TestSocketOptions targetOptions) - { - targetOptions = base.Set(targetOptions); - targetOptions.SubOptions = SubOptions.Set(targetOptions.SubOptions); - return targetOptions; - } - } - - public class TestSubSocketClient : SocketApiClient - { - private MessagePath _channelPath = MessagePath.Get().Property("channel"); - private MessagePath _actionPath = MessagePath.Get().Property("action"); - private MessagePath _topicPath = MessagePath.Get().Property("topic"); - - public Subscription TestSubscription { get; private set; } = null; - - public override JsonSerializerOptions JsonSerializerOptions => new JsonSerializerOptions(); - - public TestSubSocketClient(TestSocketOptions options, SocketApiOptions apiOptions) : base(new TraceLogger(), options.Environment.TestAddress, options, apiOptions) - { - - } - - protected internal override IByteMessageAccessor CreateAccessor(WebSocketMessageType type) => new SystemTextJsonByteMessageAccessor(new System.Text.Json.JsonSerializerOptions()); - protected internal override IMessageSerializer CreateSerializer() => new SystemTextJsonMessageSerializer(new System.Text.Json.JsonSerializerOptions()); - - /// - public override string FormatSymbol(string baseAsset, string quoteAsset, TradingMode futuresType, DateTime? deliverDate = null) => $"{baseAsset.ToUpperInvariant()}{quoteAsset.ToUpperInvariant()}"; - - internal IWebsocket CreateSocketInternal(string address) - { - return CreateSocket(address); - } - - protected override AuthenticationProvider CreateAuthenticationProvider(ApiCredentials credentials) - => new TestAuthProvider(credentials); - - public CallResult ConnectSocketSub(SocketConnection sub) - { - return ConnectSocketAsync(sub, default).Result; - } - - public override string GetListenerIdentifier(IMessageAccessor message) - { - if (!message.IsValid) - { - return "topic"; - } - - var id = message.GetValue(_channelPath); - id ??= message.GetValue(_topicPath); - - return message.GetValue(_actionPath) + "-" + id; - } - - public Task> SubscribeToSomethingAsync(string channel, Action> onUpdate, CancellationToken ct) - { - TestSubscription = new TestSubscriptionWithResponseCheck(channel, onUpdate); - return SubscribeAsync(TestSubscription, ct); - } + TestAddress = url; } } + +// public class TestSocketOptions: SocketExchangeOptions +// { +// public static TestSocketOptions Default = new TestSocketOptions +// { +// Environment = new TestEnvironment("Live", "https://test.test") +// }; + +// /// +// /// ctor +// /// +// public TestSocketOptions() +// { +// Default?.Set(this); +// } + +// public SocketApiOptions SubOptions { get; set; } = new SocketApiOptions(); + +// internal TestSocketOptions Set(TestSocketOptions targetOptions) +// { +// targetOptions = base.Set(targetOptions); +// targetOptions.SubOptions = SubOptions.Set(targetOptions.SubOptions); +// return targetOptions; +// } +// } + +// public class TestSubSocketClient : SocketApiClient +// { +// private MessagePath _channelPath = MessagePath.Get().Property("channel"); +// private MessagePath _actionPath = MessagePath.Get().Property("action"); +// private MessagePath _topicPath = MessagePath.Get().Property("topic"); + +// public Subscription TestSubscription { get; private set; } = null; + +// public override JsonSerializerOptions JsonSerializerOptions => new JsonSerializerOptions(); + +// public TestSubSocketClient(TestSocketOptions options, SocketApiOptions apiOptions) : base(new TraceLogger(), options.Environment.TestAddress, options, apiOptions) +// { + +// } + +// protected internal override IByteMessageAccessor CreateAccessor(WebSocketMessageType type) => new SystemTextJsonByteMessageAccessor(new System.Text.Json.JsonSerializerOptions()); +// protected internal override IMessageSerializer CreateSerializer() => new SystemTextJsonMessageSerializer(new System.Text.Json.JsonSerializerOptions()); + +// /// +// public override string FormatSymbol(string baseAsset, string quoteAsset, TradingMode futuresType, DateTime? deliverDate = null) => $"{baseAsset.ToUpperInvariant()}{quoteAsset.ToUpperInvariant()}"; + +// internal IWebsocket CreateSocketInternal(string address) +// { +// return SocketFactory.CreateWebsocket(_logger, ); +// } + +// protected override AuthenticationProvider CreateAuthenticationProvider(ApiCredentials credentials) +// => new TestAuthProvider(credentials); + +// public CallResult ConnectSocketSub(SocketConnection sub) +// { +// return ConnectSocketAsync(sub, default).Result; +// } + +// public override string GetListenerIdentifier(IMessageAccessor message) +// { +// if (!message.IsValid) +// { +// return "topic"; +// } + +// var id = message.GetValue(_channelPath); +// id ??= message.GetValue(_topicPath); + +// return message.GetValue(_actionPath) + "-" + id; +// } + +// public Task> SubscribeToSomethingAsync(string channel, Action> onUpdate, CancellationToken ct) +// { +// TestSubscription = new TestSubscriptionWithResponseCheck(channel, onUpdate); +// return SubscribeAsync(TestSubscription, ct); +// } + +// public override IMessageConverter CreateMessageConverter() => throw new NotImplementedException(); +// } +//} diff --git a/CryptoExchange.Net/Clients/SocketApiClient.cs b/CryptoExchange.Net/Clients/SocketApiClient.cs index 3fdd668..b57f9c0 100644 --- a/CryptoExchange.Net/Clients/SocketApiClient.cs +++ b/CryptoExchange.Net/Clients/SocketApiClient.cs @@ -1,3 +1,4 @@ +using CryptoExchange.Net.Converters.MessageParsing.DynamicConverters; using CryptoExchange.Net.Interfaces; using CryptoExchange.Net.Logging.Extensions; using CryptoExchange.Net.Objects; @@ -813,19 +814,20 @@ namespace CryptoExchange.Net.Clients Proxy = ClientOptions.Proxy, Timeout = ApiOptions.SocketNoDataTimeout ?? ClientOptions.SocketNoDataTimeout, ReceiveBufferSize = ClientOptions.ReceiveBufferSize, + UseNewMessageDeserialization = ClientOptions.EnabledNewDeserialization }; - /// - /// Create a socket for an address - /// - /// The address the socket should connect to - /// - protected internal virtual IWebsocket CreateSocket(string address) - { - var socket = SocketFactory.CreateWebsocket(_logger, GetWebSocketParameters(address)); - _logger.SocketCreatedForAddress(socket.Id, address); - return socket; - } + ///// + ///// Create a socket for an address + ///// + ///// The address the socket should connect to + ///// + //protected internal virtual IWebsocket CreateSocket(string address) + //{ + // var socket = SocketFactory.CreateWebsocket(_logger, GetWebSocketParameters(address)); + // _logger.SocketCreatedForAddress(socket.Id, address); + // return socket; + //} /// /// Unsubscribe an update subscription @@ -1059,5 +1061,7 @@ namespace CryptoExchange.Net.Clients /// /// public virtual ReadOnlyMemory PreprocessStreamMessage(SocketConnection connection, WebSocketMessageType type, ReadOnlyMemory data) => data; + + public abstract IMessageConverter CreateMessageConverter(); } } diff --git a/CryptoExchange.Net/Converters/MessageParsing/DynamicConverters/DynamicConverter.cs b/CryptoExchange.Net/Converters/MessageParsing/DynamicConverters/DynamicConverter.cs new file mode 100644 index 0000000..6b2df3e --- /dev/null +++ b/CryptoExchange.Net/Converters/MessageParsing/DynamicConverters/DynamicConverter.cs @@ -0,0 +1,55 @@ +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Net.WebSockets; +using System.Text; +using System.Text.Json; + +namespace CryptoExchange.Net.Converters.MessageParsing.DynamicConverters +{ + + public ref struct MessageType + { + public Type Type { get; set; } + public string? Identifier { get; set; } + } + + public interface IMessageConverter + { + MessageType GetMessageType(ReadOnlySpan data, WebSocketMessageType? webSocketMessageType); + + object Deserialize(ReadOnlySpan data, Type type); + } + + public abstract class DynamicConverter : IMessageConverter + { + public abstract JsonSerializerOptions Options { get; } + + public abstract MessageType GetMessageType(ReadOnlySpan data, WebSocketMessageType? webSocketMessageType); + + public virtual object Deserialize(ReadOnlySpan data, Type type) + { + return JsonSerializer.Deserialize(data, type, Options); + } + } + + public abstract class StaticConverter : IMessageConverter + { + public abstract JsonSerializerOptions Options { get; } + public abstract MessageType GetMessageType(ReadOnlySpan data, WebSocketMessageType? webSocketMessageType); + + public object? Deserialize(ReadOnlySpan data, Type type) + { + return JsonSerializer.Deserialize(data, type, Options); + } + + } + + public abstract class StaticConverter : StaticConverter + { + public override MessageType GetMessageType(ReadOnlySpan data,, WebSocketMessageType? webSocketMessageType) => + new MessageType { Type = typeof(T), Identifier = GetMessageListenId(data, webSocketMessageType) }; + + public abstract string GetMessageListenId(ReadOnlySpan data, WebSocketMessageType? webSocketMessageType); + } +} diff --git a/CryptoExchange.Net/Converters/SystemTextJson/BoolConverter.cs b/CryptoExchange.Net/Converters/SystemTextJson/BoolConverter.cs index c996198..75ce2b6 100644 --- a/CryptoExchange.Net/Converters/SystemTextJson/BoolConverter.cs +++ b/CryptoExchange.Net/Converters/SystemTextJson/BoolConverter.cs @@ -21,58 +21,15 @@ namespace CryptoExchange.Net.Converters.SystemTextJson /// public override JsonConverter CreateConverter(Type typeToConvert, JsonSerializerOptions options) { - return typeToConvert == typeof(bool) ? new BoolConverterInner() : new BoolConverterInner(); + return typeToConvert == typeof(bool) ? new BoolConverterInner() : new BoolConverterInnerNullable(); } - private class BoolConverterInner : JsonConverter + private class BoolConverterInnerNullable : JsonConverter { - public override T Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) - => (T)((object?)ReadBool(ref reader, typeToConvert, options) ?? default(T))!; + public override bool? Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + => ReadBool(ref reader, typeToConvert, options); - public bool? ReadBool(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) - { - if (reader.TokenType == JsonTokenType.True) - return true; - - if (reader.TokenType == JsonTokenType.False) - return false; - - var value = reader.TokenType switch - { - JsonTokenType.String => reader.GetString(), - JsonTokenType.Number => reader.GetInt16().ToString(), - _ => null - }; - - value = value?.ToLowerInvariant().Trim(); - if (string.IsNullOrEmpty(value)) - { - if (typeToConvert == typeof(bool)) - LibraryHelpers.StaticLogger?.LogWarning("Received null bool value, but property type is not a nullable bool. Resolver: {Resolver}", options.TypeInfoResolver?.GetType()?.Name); - return default; - } - - switch (value) - { - case "true": - case "yes": - case "y": - case "1": - case "on": - return true; - case "false": - case "no": - case "n": - case "0": - case "off": - case "-1": - return false; - } - - throw new SerializationException($"Can't convert bool value {value}"); - } - - public override void Write(Utf8JsonWriter writer, T value, JsonSerializerOptions options) + public override void Write(Utf8JsonWriter writer, bool? value, JsonSerializerOptions options) { if (value is bool boolVal) writer.WriteBooleanValue(boolVal); @@ -81,5 +38,59 @@ namespace CryptoExchange.Net.Converters.SystemTextJson } } + private class BoolConverterInner : JsonConverter + { + public override bool Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + => ReadBool(ref reader, typeToConvert, options) ?? false; + + public override void Write(Utf8JsonWriter writer, bool value, JsonSerializerOptions options) + { + writer.WriteBooleanValue(value); + } + } + + private static bool? ReadBool(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + if (reader.TokenType == JsonTokenType.True) + return true; + + if (reader.TokenType == JsonTokenType.False) + return false; + + var value = reader.TokenType switch + { + JsonTokenType.String => reader.GetString(), + JsonTokenType.Number => reader.GetInt16().ToString(), + _ => null + }; + + value = value?.ToLowerInvariant().Trim(); + if (string.IsNullOrEmpty(value)) + { + if (typeToConvert == typeof(bool)) + LibraryHelpers.StaticLogger?.LogWarning("Received null or empty bool value, but property type is not a nullable bool. Resolver: {Resolver}", options.TypeInfoResolver?.GetType()?.Name); + return default; + } + + switch (value) + { + case "true": + case "yes": + case "y": + case "1": + case "on": + return true; + case "false": + case "no": + case "n": + case "0": + case "off": + case "-1": + return false; + } + + throw new SerializationException($"Can't convert bool value {value}"); + } + } } diff --git a/CryptoExchange.Net/Converters/SystemTextJson/DateTimeConverter.cs b/CryptoExchange.Net/Converters/SystemTextJson/DateTimeConverter.cs index 2bd3a7b..b1e7b13 100644 --- a/CryptoExchange.Net/Converters/SystemTextJson/DateTimeConverter.cs +++ b/CryptoExchange.Net/Converters/SystemTextJson/DateTimeConverter.cs @@ -27,64 +27,77 @@ namespace CryptoExchange.Net.Converters.SystemTextJson /// public override JsonConverter CreateConverter(Type typeToConvert, JsonSerializerOptions options) { - return typeToConvert == typeof(DateTime) ? new DateTimeConverterInner() : new DateTimeConverterInner(); + return typeToConvert == typeof(DateTime) ? new DateTimeConverterInner() : new NullableDateTimeConverterInner(); } - private class DateTimeConverterInner : JsonConverter + private class NullableDateTimeConverterInner : JsonConverter { - public override T Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) - => (T)((object?)ReadDateTime(ref reader, typeToConvert, options) ?? default(T))!; + public override DateTime? Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + => ReadDateTime(ref reader, typeToConvert, options); - private DateTime? ReadDateTime(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) - { - if (reader.TokenType == JsonTokenType.Null) - { - if (typeToConvert == typeof(DateTime)) - LibraryHelpers.StaticLogger?.LogWarning("DateTime value of null, but property is not nullable. Resolver: {Resolver}", options.TypeInfoResolver?.GetType()?.Name); - return default; - } - - if (reader.TokenType is JsonTokenType.Number) - { - var decValue = reader.GetDecimal(); - if (decValue == 0 || decValue < 0) - return default; - - return ParseFromDecimal(decValue); - } - else if (reader.TokenType is JsonTokenType.String) - { - var stringValue = reader.GetString(); - if (string.IsNullOrWhiteSpace(stringValue) - || stringValue == "-1" - || stringValue == "0001-01-01T00:00:00Z" - || decimal.TryParse(stringValue, out var decVal) && decVal == 0) - { - return default; - } - - return ParseFromString(stringValue!, options.TypeInfoResolver?.GetType()?.Name); - } - else - { - return reader.GetDateTime(); - } - } - - public override void Write(Utf8JsonWriter writer, T value, JsonSerializerOptions options) + public override void Write(Utf8JsonWriter writer, DateTime? value, JsonSerializerOptions options) { if (value == null) { writer.WriteNullValue(); + return; } + + if (value.Value == default) + writer.WriteStringValue(default(DateTime)); else + writer.WriteNumberValue((long)Math.Round((value.Value - new DateTime(1970, 1, 1)).TotalMilliseconds)); + } + } + + private class DateTimeConverterInner : JsonConverter + { + public override DateTime Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + => ReadDateTime(ref reader, typeToConvert, options) ?? default; + + public override void Write(Utf8JsonWriter writer, DateTime value, JsonSerializerOptions options) + { + var dtValue = (DateTime)(object)value; + if (dtValue == default) + writer.WriteStringValue(default(DateTime)); + else + writer.WriteNumberValue((long)Math.Round((dtValue - new DateTime(1970, 1, 1)).TotalMilliseconds)); + } + } + + private static DateTime? ReadDateTime(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + if (reader.TokenType == JsonTokenType.Null) + { + if (typeToConvert == typeof(DateTime)) + LibraryHelpers.StaticLogger?.LogWarning("DateTime value of null, but property is not nullable. Resolver: {Resolver}", options.TypeInfoResolver?.GetType()?.Name); + return default; + } + + if (reader.TokenType is JsonTokenType.Number) + { + var decValue = reader.GetDecimal(); + if (decValue == 0 || decValue < 0) + return default; + + return ParseFromDecimal(decValue); + } + else if (reader.TokenType is JsonTokenType.String) + { + var stringValue = reader.GetString(); + if (string.IsNullOrWhiteSpace(stringValue) + || stringValue == "-1" + || stringValue == "0001-01-01T00:00:00Z" + || decimal.TryParse(stringValue, out var decVal) && decVal == 0) { - var dtValue = (DateTime)(object)value; - if (dtValue == default) - writer.WriteStringValue(default(DateTime)); - else - writer.WriteNumberValue((long)Math.Round((dtValue - new DateTime(1970, 1, 1)).TotalMilliseconds)); + return default; } + + return ParseFromString(stringValue!, options.TypeInfoResolver?.GetType()?.Name); + } + else + { + return reader.GetDateTime(); } } diff --git a/CryptoExchange.Net/Interfaces/IMessageProcessor.cs b/CryptoExchange.Net/Interfaces/IMessageProcessor.cs index 749a04f..a9632ef 100644 --- a/CryptoExchange.Net/Interfaces/IMessageProcessor.cs +++ b/CryptoExchange.Net/Interfaces/IMessageProcessor.cs @@ -20,6 +20,8 @@ namespace CryptoExchange.Net.Interfaces /// The matcher for this listener /// public MessageMatcher MessageMatcher { get; } + + public HashSet DeserializationTypes { get; set; } /// /// Handle a message /// diff --git a/CryptoExchange.Net/Interfaces/IWebsocketFactory.cs b/CryptoExchange.Net/Interfaces/IWebsocketFactory.cs index 5caa97e..8051a34 100644 --- a/CryptoExchange.Net/Interfaces/IWebsocketFactory.cs +++ b/CryptoExchange.Net/Interfaces/IWebsocketFactory.cs @@ -1,4 +1,5 @@ using CryptoExchange.Net.Objects.Sockets; +using CryptoExchange.Net.Sockets; using Microsoft.Extensions.Logging; using System.IO.Pipelines; @@ -15,7 +16,7 @@ namespace CryptoExchange.Net.Interfaces /// The logger /// The parameters to use for the connection /// - IWebsocket CreateWebsocket(ILogger logger, WebSocketParameters parameters); + IWebsocket CreateWebsocket(ILogger logger, SocketConnection connection, WebSocketParameters parameters); /// /// Create high performance websocket diff --git a/CryptoExchange.Net/Objects/Options/SocketExchangeOptions.cs b/CryptoExchange.Net/Objects/Options/SocketExchangeOptions.cs index 40e8ea0..8a0cdc3 100644 --- a/CryptoExchange.Net/Objects/Options/SocketExchangeOptions.cs +++ b/CryptoExchange.Net/Objects/Options/SocketExchangeOptions.cs @@ -61,6 +61,8 @@ namespace CryptoExchange.Net.Objects.Options /// public int? ReceiveBufferSize { get; set; } + public bool EnabledNewDeserialization { get; set; } + /// /// Create a copy of this options /// @@ -82,6 +84,7 @@ namespace CryptoExchange.Net.Objects.Options item.RateLimitingBehaviour = RateLimitingBehaviour; item.RateLimiterEnabled = RateLimiterEnabled; item.ReceiveBufferSize = ReceiveBufferSize; + item.EnabledNewDeserialization = EnabledNewDeserialization; return item; } } diff --git a/CryptoExchange.Net/Objects/Sockets/WebSocketParameters.cs b/CryptoExchange.Net/Objects/Sockets/WebSocketParameters.cs index a72da68..0642092 100644 --- a/CryptoExchange.Net/Objects/Sockets/WebSocketParameters.cs +++ b/CryptoExchange.Net/Objects/Sockets/WebSocketParameters.cs @@ -75,6 +75,8 @@ namespace CryptoExchange.Net.Objects.Sockets /// public int? ReceiveBufferSize { get; set; } = null; + public bool UseNewMessageDeserialization { get; set; } + /// /// ctor /// diff --git a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs index d29e697..3df7d91 100644 --- a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs +++ b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs @@ -9,6 +9,7 @@ using System; using System.Buffers; using System.Collections.Concurrent; using System.Collections.Generic; +using System.Data.Common; using System.IO; using System.Linq; using System.Net; @@ -145,22 +146,28 @@ namespace CryptoExchange.Net.Sockets /// public Func>? GetReconnectionUrl { get; set; } + private SocketConnection _connection; + /// /// ctor /// /// The log object to use /// The parameters for this socket - public CryptoExchangeWebSocketClient(ILogger logger, WebSocketParameters websocketParameters) + public CryptoExchangeWebSocketClient(ILogger logger, SocketConnection connection, WebSocketParameters websocketParameters) { Id = NextStreamId(); _logger = logger; + _connection = connection; Parameters = websocketParameters; _receivedMessages = new List(); _sendEvent = new AsyncResetEvent(); _sendBuffer = new ConcurrentQueue(); _ctsSource = new CancellationTokenSource(); - _receiveBufferSize = websocketParameters.ReceiveBufferSize ?? _defaultReceiveBufferSize; + if (websocketParameters.UseNewMessageDeserialization) + _receiveBufferSize = 1024; + else + _receiveBufferSize = websocketParameters.ReceiveBufferSize ?? _defaultReceiveBufferSize; _closeSem = new SemaphoreSlim(1, 1); _socket = CreateSocket(); @@ -682,7 +689,10 @@ namespace CryptoExchange.Net.Sockets { // Received a complete message and it's not multi part _logger.SocketReceivedSingleMessage(Id, receiveResult.Count); - await ProcessData(receiveResult.MessageType, new ReadOnlyMemory(buffer.Array!, buffer.Offset, receiveResult.Count)).ConfigureAwait(false); + if (!Parameters.UseNewMessageDeserialization) + await ProcessData(receiveResult.MessageType, new ReadOnlyMemory(buffer.Array!, buffer.Offset, receiveResult.Count)).ConfigureAwait(false); + else + ProcessDataNew(receiveResult.MessageType, new ReadOnlySpan(buffer.Array!, buffer.Offset, receiveResult.Count)); } else { @@ -717,7 +727,11 @@ namespace CryptoExchange.Net.Sockets { _logger.SocketReassembledMessage(Id, multipartStream!.Length); // Get the underlying buffer of the memory stream holding the written data and delimit it (GetBuffer return the full array, not only the written part) - await ProcessData(receiveResult.MessageType, new ReadOnlyMemory(multipartStream.GetBuffer(), 0, (int)multipartStream.Length)).ConfigureAwait(false); + + if (!Parameters.UseNewMessageDeserialization) + await ProcessData(receiveResult.MessageType, new ReadOnlyMemory(multipartStream.GetBuffer(), 0, (int)multipartStream.Length)).ConfigureAwait(false); + else + ProcessDataNew(receiveResult.MessageType, new ReadOnlySpan(multipartStream.GetBuffer(), 0, (int)multipartStream.Length)); } else { @@ -743,6 +757,19 @@ namespace CryptoExchange.Net.Sockets } } + /// + /// Process a stream message + /// + /// + /// + /// + protected void ProcessDataNew(WebSocketMessageType type, ReadOnlySpan data) + { + LastActionTime = DateTime.UtcNow; + _connection.HandleStreamMessage2(type, data); + //await (OnStreamMessage?.Invoke(type, data) ?? Task.CompletedTask).ConfigureAwait(false); + } + /// /// Process a stream message /// diff --git a/CryptoExchange.Net/Sockets/HighPerf/HighPerfSocketConnection.cs b/CryptoExchange.Net/Sockets/HighPerf/HighPerfSocketConnection.cs index 4f424a6..1313fba 100644 --- a/CryptoExchange.Net/Sockets/HighPerf/HighPerfSocketConnection.cs +++ b/CryptoExchange.Net/Sockets/HighPerf/HighPerfSocketConnection.cs @@ -484,24 +484,33 @@ namespace CryptoExchange.Net.Sockets #pragma warning restore IL3050 // Calling members annotated with 'RequiresDynamicCodeAttribute' may break functionality when AOT compiling. #pragma warning restore IL2026 // Members annotated with 'RequiresUnreferencedCodeAttribute' require dynamic access otherwise can break functionality when trimming application code { - var tasks = _typedSubscriptions.Select(sub => + if (_typedSubscriptions.Count == 1) { - try - { - return sub.HandleAsync(update!); - } - catch (Exception ex) - { - sub.InvokeExceptionHandler(ex); - _logger.UserMessageProcessingFailed(SocketId, ex.Message, ex); - return new ValueTask(); - } - }); + // If there is only one listener we can prevent the overhead of the await which will call a `ToList` + await DelegateToSubscription(_typedSubscriptions[0], update!).ConfigureAwait(false); + continue; + } + + var tasks = _typedSubscriptions.Select(sub => DelegateToSubscription(sub, update!)); await LibraryHelpers.WhenAll(tasks).ConfigureAwait(false); } } catch (OperationCanceledException) { } } + + private ValueTask DelegateToSubscription(HighPerfSubscription sub, T update) + { + try + { + return sub.HandleAsync(update!); + } + catch (Exception ex) + { + sub.InvokeExceptionHandler(ex); + _logger.UserMessageProcessingFailed(SocketId, ex.Message, ex); + return new ValueTask(); + } + } } } diff --git a/CryptoExchange.Net/Sockets/MessageMatcher.cs b/CryptoExchange.Net/Sockets/MessageMatcher.cs index 2ca7107..2f1f8dc 100644 --- a/CryptoExchange.Net/Sockets/MessageMatcher.cs +++ b/CryptoExchange.Net/Sockets/MessageMatcher.cs @@ -90,7 +90,7 @@ namespace CryptoExchange.Net.Sockets /// /// Get any handler links matching with the listen id /// - public List GetHandlerLinks(string listenId) => HandlerLinks.Where(x => x.Check(listenId)).ToList(); + public IEnumerable GetHandlerLinks(string listenId) => HandlerLinks.Where(x => x.Check(listenId)); /// public override string ToString() => string.Join(",", HandlerLinks.Select(x => x.ToString())); @@ -113,6 +113,7 @@ namespace CryptoExchange.Net.Sockets /// Deserialization type /// public abstract Type GetDeserializationType(IMessageAccessor accessor); + public abstract Type DeserializationType { get; } /// /// ctor @@ -150,6 +151,8 @@ namespace CryptoExchange.Net.Sockets { private Func, CallResult> _handler; + public override Type DeserializationType => typeof(TServer); + /// public override Type GetDeserializationType(IMessageAccessor accessor) => typeof(TServer); diff --git a/CryptoExchange.Net/Sockets/Query.cs b/CryptoExchange.Net/Sockets/Query.cs index 9d38368..0cf1cff 100644 --- a/CryptoExchange.Net/Sockets/Query.cs +++ b/CryptoExchange.Net/Sockets/Query.cs @@ -65,10 +65,21 @@ namespace CryptoExchange.Net.Sockets /// public AsyncResetEvent? ContinueAwaiter { get; set; } + public HashSet DeserializationTypes { get; set; } + + private MessageMatcher _matcher; /// - /// Matcher for this query + /// Matcher for this subscription /// - public MessageMatcher MessageMatcher { get; set; } = null!; + public MessageMatcher MessageMatcher + { + get => _matcher; + set + { + _matcher = value; + DeserializationTypes = new HashSet(MessageMatcher.HandlerLinks.Select(x => x.DeserializationType)); + } + } /// /// The query request object diff --git a/CryptoExchange.Net/Sockets/SocketConnection.cs b/CryptoExchange.Net/Sockets/SocketConnection.cs index 00ee53f..d80522a 100644 --- a/CryptoExchange.Net/Sockets/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/SocketConnection.cs @@ -1,4 +1,5 @@ using CryptoExchange.Net.Clients; +using CryptoExchange.Net.Converters.MessageParsing.DynamicConverters; using CryptoExchange.Net.Interfaces; using CryptoExchange.Net.Logging.Extensions; using CryptoExchange.Net.Objects; @@ -11,6 +12,7 @@ using System.Linq; using System.Net; using System.Net.Sockets; using System.Net.WebSockets; +using System.Text.Json; using System.Threading; using System.Threading.Tasks; @@ -266,6 +268,8 @@ namespace CryptoExchange.Net.Sockets private IByteMessageAccessor? _stringMessageAccessor; private IByteMessageAccessor? _byteMessageAccessor; + private IMessageConverter? _messageConverter; + /// /// The task that is sending periodic data on the websocket. Can be used for sending Ping messages every x seconds or similar. Not necessary. /// @@ -296,7 +300,7 @@ namespace CryptoExchange.Net.Sockets Tag = tag; Properties = new Dictionary(); - _socket = socketFactory.CreateWebsocket(logger, parameters); + _socket = socketFactory.CreateWebsocket(logger, this, parameters); _logger.SocketCreatedForAddress(_socket.Id, parameters.Uri.ToString()); _socket.OnStreamMessage += HandleStreamMessage; @@ -500,6 +504,52 @@ namespace CryptoExchange.Net.Sockets return Task.CompletedTask; } + /// + /// Handle a message + /// + protected internal virtual void HandleStreamMessage2(WebSocketMessageType type, ReadOnlySpan data) + { + //var sw = Stopwatch.StartNew(); + var receiveTime = DateTime.UtcNow; + + //// 1. Decrypt/Preprocess if necessary + //data = ApiClient.PreprocessStreamMessage(this, type, data); + + _messageConverter ??= ApiClient.CreateMessageConverter(); + + var messageType = _messageConverter.GetMessageType(data, type); + if (messageType.Type == null) + { + // Failed to determine message type + return; + } + + var result = _messageConverter.Deserialize(data, messageType.Type); + if (result == null) + { + // Deserialize error + return; + } + + var targetType = messageType.Type; + List listeners; + lock (_listenersLock) + listeners = _listeners.Where(x => x.DeserializationTypes.Contains(targetType)).ToList(); + if (listeners.Count == 0) + { + // No subscriptions found for type + return; + } + + var dataEvent = new DataEvent(result, null, null, null /*originalData*/, receiveTime, null); + foreach (var subscription in listeners) + { + var links = subscription.MessageMatcher.GetHandlerLinks(messageType.Identifier); + foreach(var link in links) + subscription.Handle(this, dataEvent, link); + } + } + /// /// Handle a message /// diff --git a/CryptoExchange.Net/Sockets/Subscription.cs b/CryptoExchange.Net/Sockets/Subscription.cs index 90318f1..e4d8643 100644 --- a/CryptoExchange.Net/Sockets/Subscription.cs +++ b/CryptoExchange.Net/Sockets/Subscription.cs @@ -35,6 +35,8 @@ namespace CryptoExchange.Net.Sockets /// public bool UserSubscription { get; set; } + public HashSet DeserializationTypes { get; set; } + private SubscriptionStatus _status; /// /// Current subscription status @@ -72,10 +74,20 @@ namespace CryptoExchange.Net.Sockets /// public bool Authenticated { get; } + + private MessageMatcher _matcher; /// /// Matcher for this subscription /// - public MessageMatcher MessageMatcher { get; set; } = null!; + public MessageMatcher MessageMatcher + { + get => _matcher; + set + { + _matcher = value; + DeserializationTypes = new HashSet(MessageMatcher.HandlerLinks.Select(x => x.DeserializationType)); + } + } /// /// Cancellation token registration @@ -109,7 +121,10 @@ namespace CryptoExchange.Net.Sockets /// /// ctor /// - public Subscription(ILogger logger, bool authenticated, bool userSubscription = true) + public Subscription( + ILogger logger, + bool authenticated, + bool userSubscription = true) { _logger = logger; Authenticated = authenticated; diff --git a/CryptoExchange.Net/Sockets/WebsocketFactory.cs b/CryptoExchange.Net/Sockets/WebsocketFactory.cs index 0871ae7..81602f3 100644 --- a/CryptoExchange.Net/Sockets/WebsocketFactory.cs +++ b/CryptoExchange.Net/Sockets/WebsocketFactory.cs @@ -13,9 +13,9 @@ namespace CryptoExchange.Net.Sockets public class WebsocketFactory : IWebsocketFactory { /// - public IWebsocket CreateWebsocket(ILogger logger, WebSocketParameters parameters) + public IWebsocket CreateWebsocket(ILogger logger, SocketConnection connection, WebSocketParameters parameters) { - return new CryptoExchangeWebSocketClient(logger, parameters); + return new CryptoExchangeWebSocketClient(logger, connection, parameters); } /// public IHighPerfWebsocket CreateHighPerfWebsocket(ILogger logger, WebSocketParameters parameters, PipeWriter pipeWriter) diff --git a/CryptoExchange.Net/Testing/Implementations/TestWebsocketFactory.cs b/CryptoExchange.Net/Testing/Implementations/TestWebsocketFactory.cs index 7f5faf0..18d2abd 100644 --- a/CryptoExchange.Net/Testing/Implementations/TestWebsocketFactory.cs +++ b/CryptoExchange.Net/Testing/Implementations/TestWebsocketFactory.cs @@ -1,5 +1,6 @@ using CryptoExchange.Net.Interfaces; using CryptoExchange.Net.Objects.Sockets; +using CryptoExchange.Net.Sockets; using Microsoft.Extensions.Logging; using System; using System.IO.Pipelines; @@ -16,6 +17,6 @@ namespace CryptoExchange.Net.Testing.Implementations } public IHighPerfWebsocket CreateHighPerfWebsocket(ILogger logger, WebSocketParameters parameters, PipeWriter pipeWriter) => throw new NotImplementedException(); - public IWebsocket CreateWebsocket(ILogger logger, WebSocketParameters parameters) => _socket; + public IWebsocket CreateWebsocket(ILogger logger, SocketConnection connection, WebSocketParameters parameters) => _socket; } }