diff --git a/CryptoExchange.Net.UnitTests/SocketClientTests.cs b/CryptoExchange.Net.UnitTests/SocketClientTests.cs index b35dd36..0092caa 100644 --- a/CryptoExchange.Net.UnitTests/SocketClientTests.cs +++ b/CryptoExchange.Net.UnitTests/SocketClientTests.cs @@ -1,11 +1,17 @@ using System; +using System.Collections.Generic; using System.Threading; +using System.Threading.Tasks; using CryptoExchange.Net.Objects; +using CryptoExchange.Net.Objects.Sockets; using CryptoExchange.Net.Sockets; using CryptoExchange.Net.UnitTests.TestImplementations; +using CryptoExchange.Net.UnitTests.TestImplementations.Sockets; using Microsoft.Extensions.Logging; +using Moq; using Newtonsoft.Json.Linq; using NUnit.Framework; +using NUnit.Framework.Constraints; namespace CryptoExchange.Net.UnitTests { @@ -46,7 +52,7 @@ namespace CryptoExchange.Net.UnitTests } [TestCase] - public void SocketMessages_Should_BeProcessedInDataHandlers() + public async Task SocketMessages_Should_BeProcessedInDataHandlers() { // arrange var client = new TestSocketClient(options => { @@ -58,28 +64,31 @@ namespace CryptoExchange.Net.UnitTests socket.DisconnectTime = DateTime.UtcNow; var sub = new SocketConnection(new TraceLogger(), client.SubClient, socket, null); var rstEvent = new ManualResetEvent(false); - JToken result = null; - sub.AddSubscription(SocketSubscription.CreateForIdentifier(10, "TestHandler", true, false, (messageEvent) => - { - result = messageEvent.JsonData; - rstEvent.Set(); - })); + Dictionary result = null; + client.SubClient.ConnectSocketSub(sub); + sub.AddSubscription(new TestSubscription>(Mock.Of(), (messageEvent) => + { + result = messageEvent.Data; + rstEvent.Set(); + })); + // act - socket.InvokeMessage("{\"property\": 123}"); + await socket.InvokeMessage("{\"property\": \"123\", \"topic\": \"topic\"}"); rstEvent.WaitOne(1000); // assert - Assert.IsTrue((int)result["property"] == 123); + Assert.IsTrue(result["property"] == "123"); } [TestCase(false)] [TestCase(true)] - public void SocketMessages_Should_ContainOriginalDataIfEnabled(bool enabled) + public async Task SocketMessages_Should_ContainOriginalDataIfEnabled(bool enabled) { // arrange - var client = new TestSocketClient(options => { + var client = new TestSocketClient(options => + { options.ReconnectInterval = TimeSpan.Zero; options.SubOptions.OutputOriginalData = enabled; }); @@ -90,15 +99,16 @@ namespace CryptoExchange.Net.UnitTests var sub = new SocketConnection(new TraceLogger(), client.SubClient, socket, null); var rstEvent = new ManualResetEvent(false); string original = null; - sub.AddSubscription(SocketSubscription.CreateForIdentifier(10, "TestHandler", true, false, (messageEvent) => + + client.SubClient.ConnectSocketSub(sub); + sub.AddSubscription(new TestSubscription>(Mock.Of(), (messageEvent) => { original = messageEvent.OriginalData; rstEvent.Set(); })); - client.SubClient.ConnectSocketSub(sub); // act - socket.InvokeMessage("{\"property\": 123}"); + await socket.InvokeMessage("{\"property\": 123}"); rstEvent.WaitOne(1000); // assert @@ -109,16 +119,18 @@ namespace CryptoExchange.Net.UnitTests public void UnsubscribingStream_Should_CloseTheSocket() { // arrange - var client = new TestSocketClient(options => { + var client = new TestSocketClient(options => + { options.ReconnectInterval = TimeSpan.Zero; - }); + }); var socket = client.CreateSocket(); socket.CanConnect = true; var sub = new SocketConnection(new TraceLogger(), client.SubClient, socket, null); client.SubClient.ConnectSocketSub(sub); - var us = SocketSubscription.CreateForIdentifier(10, "Test", true, false, (e) => { }); - var ups = new UpdateSubscription(sub, us); - sub.AddSubscription(us); + + var subscription = new TestSubscription>(Mock.Of(), (messageEvent) => { }); + var ups = new UpdateSubscription(sub, subscription); + sub.AddSubscription(subscription); // act client.UnsubscribeAsync(ups).Wait(); @@ -140,12 +152,13 @@ namespace CryptoExchange.Net.UnitTests var sub2 = new SocketConnection(new TraceLogger(), client.SubClient, socket2, null); client.SubClient.ConnectSocketSub(sub1); client.SubClient.ConnectSocketSub(sub2); - var us1 = SocketSubscription.CreateForIdentifier(10, "Test1", true, false, (e) => { }); - var us2 = SocketSubscription.CreateForIdentifier(11, "Test2", true, false, (e) => { }); - sub1.AddSubscription(us1); - sub2.AddSubscription(us2); - var ups1 = new UpdateSubscription(sub1, us1); - var ups2 = new UpdateSubscription(sub2, us2); + var subscription1 = new TestSubscription>(Mock.Of(), (messageEvent) => { }); + var subscription2 = new TestSubscription>(Mock.Of(), (messageEvent) => { }); + + sub1.AddSubscription(subscription1); + sub2.AddSubscription(subscription2); + var ups1 = new UpdateSubscription(sub1, subscription1); + var ups2 = new UpdateSubscription(sub2, subscription2); // act client.UnsubscribeAllAsync().Wait(); diff --git a/CryptoExchange.Net.UnitTests/SymbolOrderBookTests.cs b/CryptoExchange.Net.UnitTests/SymbolOrderBookTests.cs index ad6da83..e349d72 100644 --- a/CryptoExchange.Net.UnitTests/SymbolOrderBookTests.cs +++ b/CryptoExchange.Net.UnitTests/SymbolOrderBookTests.cs @@ -14,13 +14,13 @@ namespace CryptoExchange.Net.UnitTests [TestFixture] public class SymbolOrderBookTests { - private static OrderBookOptions defaultOrderBookOptions = new OrderBookOptions(); + private static readonly OrderBookOptions _defaultOrderBookOptions = new OrderBookOptions(); private class TestableSymbolOrderBook : SymbolOrderBook { public TestableSymbolOrderBook() : base(null, "Test", "BTC/USD") { - Initialize(defaultOrderBookOptions); + Initialize(_defaultOrderBookOptions); } diff --git a/CryptoExchange.Net.UnitTests/TestImplementations/Sockets/TestQuery.cs b/CryptoExchange.Net.UnitTests/TestImplementations/Sockets/TestQuery.cs new file mode 100644 index 0000000..4eafbd8 --- /dev/null +++ b/CryptoExchange.Net.UnitTests/TestImplementations/Sockets/TestQuery.cs @@ -0,0 +1,19 @@ +using CryptoExchange.Net.Sockets; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace CryptoExchange.Net.UnitTests.TestImplementations.Sockets +{ + internal class TestQuery : Query + { + public override HashSet ListenerIdentifiers { get; set; } + + public TestQuery(string identifier, object request, bool authenticated, int weight = 1) : base(request, authenticated, weight) + { + ListenerIdentifiers = new HashSet { identifier }; + } + } +} diff --git a/CryptoExchange.Net.UnitTests/TestImplementations/Sockets/TestSubscription.cs b/CryptoExchange.Net.UnitTests/TestImplementations/Sockets/TestSubscription.cs new file mode 100644 index 0000000..84747cf --- /dev/null +++ b/CryptoExchange.Net.UnitTests/TestImplementations/Sockets/TestSubscription.cs @@ -0,0 +1,36 @@ +using CryptoExchange.Net.Objects; +using CryptoExchange.Net.Objects.Sockets; +using CryptoExchange.Net.Sockets; +using CryptoExchange.Net.Sockets.MessageParsing.Interfaces; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace CryptoExchange.Net.UnitTests.TestImplementations.Sockets +{ + internal class TestSubscription : Subscription + { + private readonly Action> _handler; + + public override HashSet ListenerIdentifiers { get; set; } = new HashSet { "topic" }; + + public TestSubscription(ILogger logger, Action> handler) : base(logger, false) + { + _handler = handler; + } + + public override Task DoHandleMessageAsync(SocketConnection connection, DataEvent message) + { + var data = (T)message.Data; + _handler.Invoke(message.As(data)); + return Task.FromResult(new CallResult(null)); + } + + public override Type GetMessageType(IMessageAccessor message) => typeof(T); + public override Query GetSubQuery(SocketConnection connection) => new TestQuery("sub", new object(), false, 1); + public override Query GetUnsubQuery() => new TestQuery("unsub", new object(), false, 1); + } +} diff --git a/CryptoExchange.Net.UnitTests/TestImplementations/TestSocket.cs b/CryptoExchange.Net.UnitTests/TestImplementations/TestSocket.cs index 7f2afb1..82f1812 100644 --- a/CryptoExchange.Net.UnitTests/TestImplementations/TestSocket.cs +++ b/CryptoExchange.Net.UnitTests/TestImplementations/TestSocket.cs @@ -1,4 +1,6 @@ using System; +using System.IO; +using System.Net.WebSockets; using System.Security.Authentication; using System.Text; using System.Threading.Tasks; @@ -12,16 +14,15 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations public bool CanConnect { get; set; } public bool Connected { get; set; } - public event Action OnClose; - + public event Func OnClose; #pragma warning disable 0067 - public event Action OnReconnected; - public event Action OnReconnecting; + public event Func OnReconnected; + public event Func OnReconnecting; #pragma warning restore 0067 - public event Action OnRequestSent; - public event Action OnMessage; - public event Action OnError; - public event Action OnOpen; + public event Func OnRequestSent; + public event Func OnStreamMessage; + public event Func OnError; + public event Func OnOpen; public Func> GetReconnectionUrl { get; set; } public int Id { get; } @@ -110,9 +111,10 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations OnOpen?.Invoke(); } - public void InvokeMessage(string data) + public async Task InvokeMessage(string data) { - OnMessage?.Invoke(data); + var stream = new MemoryStream(Encoding.UTF8.GetBytes(data)); + await OnStreamMessage?.Invoke(WebSocketMessageType.Text, stream); } public void SetProxy(ApiProxy proxy) diff --git a/CryptoExchange.Net.UnitTests/TestImplementations/TestSocketClient.cs b/CryptoExchange.Net.UnitTests/TestImplementations/TestSocketClient.cs index 796213e..eb9a358 100644 --- a/CryptoExchange.Net.UnitTests/TestImplementations/TestSocketClient.cs +++ b/CryptoExchange.Net.UnitTests/TestImplementations/TestSocketClient.cs @@ -7,6 +7,7 @@ using CryptoExchange.Net.Objects; using CryptoExchange.Net.Objects.Options; using CryptoExchange.Net.Objects.Sockets; using CryptoExchange.Net.Sockets; +using CryptoExchange.Net.Sockets.MessageParsing.Interfaces; using Microsoft.Extensions.Logging; using Moq; using Newtonsoft.Json.Linq; @@ -89,35 +90,6 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations return ConnectSocketAsync(sub).Result; } - protected internal override bool HandleQueryResponse(SocketConnection s, object request, JToken data, out CallResult callResult) - { - throw new NotImplementedException(); - } - - protected internal override bool HandleSubscriptionResponse(SocketConnection s, SocketSubscription subscription, object request, JToken message, - out CallResult callResult) - { - throw new NotImplementedException(); - } - - protected internal override bool MessageMatchesHandler(SocketConnection s, JToken message, object request) - { - throw new NotImplementedException(); - } - - protected internal override bool MessageMatchesHandler(SocketConnection s, JToken message, string identifier) - { - return true; - } - - protected internal override Task> AuthenticateSocketAsync(SocketConnection s) - { - throw new NotImplementedException(); - } - - protected internal override Task UnsubscribeAsync(SocketConnection connection, SocketSubscription s) - { - throw new NotImplementedException(); - } + public override string GetListenerIdentifier(IMessageAccessor messageAccessor) => "topic"; } } diff --git a/CryptoExchange.Net/Clients/RestApiClient.cs b/CryptoExchange.Net/Clients/RestApiClient.cs index 266a013..f8088a5 100644 --- a/CryptoExchange.Net/Clients/RestApiClient.cs +++ b/CryptoExchange.Net/Clients/RestApiClient.cs @@ -116,9 +116,9 @@ namespace CryptoExchange.Net var result = await GetResponseAsync(request.Data, deserializer, cancellationToken, true).ConfigureAwait(false); if (!result) - _logger.Log(LogLevel.Warning, $"[{result.RequestId}] Error received in {result.ResponseTime!.Value.TotalMilliseconds}ms: {result.Error}"); + _logger.Log(LogLevel.Warning, $"[Req {result.RequestId}] Error received in {result.ResponseTime!.Value.TotalMilliseconds}ms: {result.Error}"); else - _logger.Log(LogLevel.Debug, $"[{result.RequestId}] Response received in {result.ResponseTime!.Value.TotalMilliseconds}ms{(OutputOriginalData ? (": " + result.OriginalData) : "")}"); + _logger.Log(LogLevel.Debug, $"[Req {result.RequestId}] Response received in {result.ResponseTime!.Value.TotalMilliseconds}ms{(OutputOriginalData ? (": " + result.OriginalData) : "")}"); if (await ShouldRetryRequestAsync(result, currentTry).ConfigureAwait(false)) continue; @@ -170,9 +170,9 @@ namespace CryptoExchange.Net var result = await GetResponseAsync(request.Data, deserializer, cancellationToken, false).ConfigureAwait(false); if (!result) - _logger.Log(LogLevel.Warning, $"[{result.RequestId}] Error received in {result.ResponseTime!.Value.TotalMilliseconds}ms: {result.Error}"); + _logger.Log(LogLevel.Warning, $"[Req {result.RequestId}] Error received in {result.ResponseTime!.Value.TotalMilliseconds}ms: {result.Error}"); else - _logger.Log(LogLevel.Debug, $"[{result.RequestId}] Response received in {result.ResponseTime!.Value.TotalMilliseconds}ms{(OutputOriginalData ? (": " + result.OriginalData) : "")}"); + _logger.Log(LogLevel.Debug, $"[Req {result.RequestId}] Response received in {result.ResponseTime!.Value.TotalMilliseconds}ms{(OutputOriginalData ? (": " + result.OriginalData) : "")}"); if (await ShouldRetryRequestAsync(result, currentTry).ConfigureAwait(false)) continue; @@ -224,7 +224,7 @@ namespace CryptoExchange.Net var syncTimeResult = await syncTask.ConfigureAwait(false); if (!syncTimeResult) { - _logger.Log(LogLevel.Debug, $"[{requestId}] Failed to sync time, aborting request: " + syncTimeResult.Error); + _logger.Log(LogLevel.Debug, $"[Req {requestId}] Failed to sync time, aborting request: " + syncTimeResult.Error); return syncTimeResult.As(default); } } @@ -242,11 +242,11 @@ namespace CryptoExchange.Net if (signed && AuthenticationProvider == null) { - _logger.Log(LogLevel.Warning, $"[{requestId}] Request {uri.AbsolutePath} failed because no ApiCredentials were provided"); + _logger.Log(LogLevel.Warning, $"[Req {requestId}] Request {uri.AbsolutePath} failed because no ApiCredentials were provided"); return new CallResult(new NoApiCredentialsError()); } - _logger.Log(LogLevel.Information, $"[{requestId}] Creating request for " + uri); + _logger.Log(LogLevel.Information, $"[Req {requestId}] Creating request for " + uri); var paramsPosition = parameterPosition ?? ParameterPositions[method]; var request = ConstructRequest(uri, method, parameters?.OrderBy(p => p.Key).ToDictionary(p => p.Key, p => p.Value), signed, paramsPosition, arraySerialization ?? this.arraySerialization, requestBodyFormat ?? this.requestBodyFormat, requestId, additionalHeaders); @@ -259,7 +259,7 @@ namespace CryptoExchange.Net paramString += " with headers " + string.Join(", ", headers.Select(h => h.Key + $"=[{string.Join(",", h.Value)}]")); TotalRequestsMade++; - _logger.Log(LogLevel.Trace, $"[{requestId}] Sending {method}{(signed ? " signed" : "")} request to {request.Uri}{paramString ?? " "}"); + _logger.Log(LogLevel.Trace, $"[Req {requestId}] Sending {method}{(signed ? " signed" : "")} request to {request.Uri}{paramString ?? " "}"); return new CallResult(request); } diff --git a/CryptoExchange.Net/Clients/SocketApiClient.cs b/CryptoExchange.Net/Clients/SocketApiClient.cs index 8cf3c1d..122a1f9 100644 --- a/CryptoExchange.Net/Clients/SocketApiClient.cs +++ b/CryptoExchange.Net/Clients/SocketApiClient.cs @@ -3,12 +3,15 @@ using CryptoExchange.Net.Objects; using CryptoExchange.Net.Objects.Options; using CryptoExchange.Net.Objects.Sockets; using CryptoExchange.Net.Sockets; +using CryptoExchange.Net.Sockets.MessageParsing.Interfaces; using Microsoft.Extensions.Logging; using System; using System.Collections.Concurrent; using System.Collections.Generic; +using System.Diagnostics; using System.IO; using System.Linq; +using System.Net.Sockets; using System.Net.WebSockets; using System.Text; using System.Threading; @@ -55,17 +58,16 @@ namespace CryptoExchange.Net /// protected AsyncResetEvent? periodicEvent; - /// - /// If true; data which is a response to a query will also be distributed to subscriptions - /// If false; data which is a response to a query won't get forwarded to subscriptions as well - /// - protected internal bool ContinueOnQueryResponse { get; protected set; } - /// /// If a message is received on the socket which is not handled by a handler this boolean determines whether this logs an error message /// protected internal bool UnhandledMessageExpected { get; set; } + /// + /// If true a subscription will accept message before the confirmation of a subscription has been received + /// + protected bool HandleMessageBeforeConfirmation { get; set; } + /// /// The rate limiters /// @@ -181,7 +183,7 @@ namespace CryptoExchange.Net var success = socketConnection.CanAddSubscription(); if (!success) { - _logger.Log(LogLevel.Trace, $"Socket {socketConnection.SocketId} failed to add subscription, retrying on different connection"); + _logger.Log(LogLevel.Trace, $"[Sckt {socketConnection.SocketId}] failed to add subscription, retrying on different connection"); continue; } @@ -209,18 +211,23 @@ namespace CryptoExchange.Net if (socketConnection.PausedActivity) { - _logger.Log(LogLevel.Warning, $"Socket {socketConnection.SocketId} has been paused, can't subscribe at this moment"); + _logger.Log(LogLevel.Warning, $"[Sckt {socketConnection.SocketId}] has been paused, can't subscribe at this moment"); return new CallResult(new ServerError("Socket is paused")); } + var waitEvent = new AsyncResetEvent(false); var subQuery = subscription.GetSubQuery(socketConnection); if (subQuery != null) { + if (HandleMessageBeforeConfirmation) + socketConnection.AddSubscription(subscription); + // Send the request and wait for answer - var subResult = await socketConnection.SendAndWaitQueryAsync(subQuery).ConfigureAwait(false); + var subResult = await socketConnection.SendAndWaitQueryAsync(subQuery, waitEvent).ConfigureAwait(false); if (!subResult) { - _logger.Log(LogLevel.Warning, $"Socket {socketConnection.SocketId} failed to subscribe: {subResult.Error}"); + waitEvent?.Set(); + _logger.Log(LogLevel.Warning, $"[Sckt {socketConnection.SocketId}] failed to subscribe: {subResult.Error}"); // If this was a timeout we still need to send an unsubscribe to prevent messages coming in later var unsubscribe = subResult.Error is CancellationRequestedError; await socketConnection.CloseAsync(subscription, unsubscribe).ConfigureAwait(false); @@ -236,13 +243,16 @@ namespace CryptoExchange.Net { subscription.CancellationTokenRegistration = ct.Register(async () => { - _logger.Log(LogLevel.Information, $"Socket {socketConnection.SocketId} Cancellation token set, closing subscription {subscription.Id}"); + _logger.Log(LogLevel.Information, $"[Sckt {socketConnection.SocketId}] Cancellation token set, closing subscription {subscription.Id}"); await socketConnection.CloseAsync(subscription).ConfigureAwait(false); }, false); } - socketConnection.AddSubscription(subscription); - _logger.Log(LogLevel.Information, $"Socket {socketConnection.SocketId} subscription {subscription.Id} completed successfully"); + if (!HandleMessageBeforeConfirmation) + socketConnection.AddSubscription(subscription); + + waitEvent?.Set(); + _logger.Log(LogLevel.Information, $"[Sckt {socketConnection.SocketId}] subscription {subscription.Id} completed successfully"); return new CallResult(new UpdateSubscription(socketConnection, subscription)); } @@ -299,7 +309,7 @@ namespace CryptoExchange.Net if (socketConnection.PausedActivity) { - _logger.Log(LogLevel.Warning, $"Socket {socketConnection.SocketId} has been paused, can't send query at this moment"); + _logger.Log(LogLevel.Warning, $"[Sckt {socketConnection.SocketId}] has been paused, can't send query at this moment"); return new CallResult(new ServerError("Socket is paused")); } @@ -340,21 +350,24 @@ namespace CryptoExchange.Net if (AuthenticationProvider == null) return new CallResult(new NoApiCredentialsError()); - _logger.Log(LogLevel.Debug, $"Socket {socket.SocketId} Attempting to authenticate"); + _logger.Log(LogLevel.Debug, $"[Sckt {socket.SocketId}] Attempting to authenticate"); var authRequest = GetAuthenticationRequest(); - var result = await socket.SendAndWaitQueryAsync(authRequest).ConfigureAwait(false); - - if (!result) + if (authRequest != null) { - _logger.Log(LogLevel.Warning, $"Socket {socket.SocketId} authentication failed"); - if (socket.Connected) - await socket.CloseAsync().ConfigureAwait(false); + var result = await socket.SendAndWaitQueryAsync(authRequest).ConfigureAwait(false); - result.Error!.Message = "Authentication failed: " + result.Error.Message; - return new CallResult(result.Error)!; + if (!result) + { + _logger.Log(LogLevel.Warning, $"[Sckt {socket.SocketId}] authentication failed"); + if (socket.Connected) + await socket.CloseAsync().ConfigureAwait(false); + + result.Error!.Message = "Authentication failed: " + result.Error.Message; + return new CallResult(result.Error)!; + } } - _logger.Log(LogLevel.Debug, $"Socket {socket.SocketId} authenticated"); + _logger.Log(LogLevel.Debug, $"[Sckt {socket.SocketId}] authenticated"); socket.Authenticated = true; return new CallResult(true); } @@ -363,7 +376,7 @@ namespace CryptoExchange.Net /// Should return the request which can be used to authenticate a socket connection /// /// - protected internal virtual Query GetAuthenticationRequest() => throw new NotImplementedException(); + protected internal virtual Query? GetAuthenticationRequest() => throw new NotImplementedException(); /// /// Adds a system subscription. Used for example to reply to ping requests @@ -443,7 +456,6 @@ namespace CryptoExchange.Net var socket = CreateSocket(connectionAddress.Data!); var socketConnection = new SocketConnection(_logger, this, socket, address); socketConnection.UnhandledMessage += HandleUnhandledMessage; - socketConnection.UnparsedMessage += HandleUnparsedMessage; foreach (var systemSubscription in systemSubscriptions) socketConnection.AddSubscription(systemSubscription); @@ -455,15 +467,7 @@ namespace CryptoExchange.Net /// Process an unhandled message /// /// The message that wasn't processed - protected virtual void HandleUnhandledMessage(SocketMessage message) - { - } - - /// - /// Process an unparsed message - /// - /// The message that wasn't parsed - protected virtual void HandleUnparsedMessage(byte[] message) + protected virtual void HandleUnhandledMessage(IMessageAccessor message) { } @@ -507,7 +511,7 @@ namespace CryptoExchange.Net protected virtual IWebsocket CreateSocket(string address) { var socket = SocketFactory.CreateWebsocket(_logger, GetWebSocketParameters(address)); - _logger.Log(LogLevel.Debug, $"Socket {socket.Id} new socket created for " + address); + _logger.Log(LogLevel.Debug, $"[Sckt {socket.Id}] created for " + address); return socket; } @@ -547,7 +551,7 @@ namespace CryptoExchange.Net if (query == null) continue; - _logger.Log(LogLevel.Trace, $"Socket {socketConnection.SocketId} sending periodic {identifier}"); + _logger.Log(LogLevel.Trace, $"[Sckt {socketConnection.SocketId}] sending periodic {identifier}"); try { @@ -556,7 +560,7 @@ namespace CryptoExchange.Net } catch (Exception ex) { - _logger.Log(LogLevel.Warning, $"Socket {socketConnection.SocketId} Periodic send {identifier} failed: " + ex.ToLogString()); + _logger.Log(LogLevel.Warning, $"[Sckt {socketConnection.SocketId}] Periodic send {identifier} failed: " + ex.ToLogString()); } } } @@ -585,7 +589,7 @@ namespace CryptoExchange.Net if (subscription == null || connection == null) return false; - _logger.Log(LogLevel.Information, $"Socket {connection.SocketId} Unsubscribing subscription " + subscriptionId); + _logger.Log(LogLevel.Information, $"[Sckt {connection.SocketId}] unsubscribing subscription " + subscriptionId); await connection.CloseAsync(subscription).ConfigureAwait(false); return true; } @@ -600,7 +604,7 @@ namespace CryptoExchange.Net if (subscription == null) throw new ArgumentNullException(nameof(subscription)); - _logger.Log(LogLevel.Information, $"Socket {subscription.SocketId} Unsubscribing subscription " + subscription.Id); + _logger.Log(LogLevel.Information, $"[Sckt {subscription.SocketId}] Unsubscribing subscription " + subscription.Id); await subscription.CloseAsync().ConfigureAwait(false); } @@ -692,9 +696,9 @@ namespace CryptoExchange.Net /// /// Get the listener identifier for the message /// - /// + /// /// - public abstract string GetListenerIdentifier(SocketMessage message); + public abstract string? GetListenerIdentifier(IMessageAccessor messageAccessor); /// /// Preprocess a stream message diff --git a/CryptoExchange.Net/Converters/EnumConverter.cs b/CryptoExchange.Net/Converters/EnumConverter.cs index eb9c664..5c65abc 100644 --- a/CryptoExchange.Net/Converters/EnumConverter.cs +++ b/CryptoExchange.Net/Converters/EnumConverter.cs @@ -47,7 +47,7 @@ namespace CryptoExchange.Net.Converters mapping = AddMapping(enumType); var stringValue = reader.Value?.ToString(); - if (stringValue == null) + if (stringValue == null || stringValue == "") { // Received null value var emptyResult = GetDefaultValue(objectType, enumType); diff --git a/CryptoExchange.Net/ExchangeHelpers.cs b/CryptoExchange.Net/ExchangeHelpers.cs index 988b874..20c759f 100644 --- a/CryptoExchange.Net/ExchangeHelpers.cs +++ b/CryptoExchange.Net/ExchangeHelpers.cs @@ -144,6 +144,16 @@ namespace CryptoExchange.Net } } + /// + /// Return the last unique id that was generated + /// + /// + public static int LastId() + { + lock (_idLock) + return _lastId; + } + /// /// Generate a random string of specified length /// diff --git a/CryptoExchange.Net/Interfaces/IMessageProcessor.cs b/CryptoExchange.Net/Interfaces/IMessageProcessor.cs index 39e13e2..6f6115a 100644 --- a/CryptoExchange.Net/Interfaces/IMessageProcessor.cs +++ b/CryptoExchange.Net/Interfaces/IMessageProcessor.cs @@ -1,6 +1,7 @@ using CryptoExchange.Net.Objects; using CryptoExchange.Net.Objects.Sockets; using CryptoExchange.Net.Sockets; +using CryptoExchange.Net.Sockets.MessageParsing.Interfaces; using System; using System.Collections.Generic; using System.Threading.Tasks; @@ -30,8 +31,15 @@ namespace CryptoExchange.Net.Interfaces /// /// Get the type the message should be deserialized to /// - /// + /// /// - Type? GetMessageType(SocketMessage message); + Type? GetMessageType(IMessageAccessor messageAccessor); + /// + /// Deserialize a message int oobject of type + /// + /// + /// + /// + object Deserialize(IMessageAccessor accessor, Type type); } } diff --git a/CryptoExchange.Net/Interfaces/ISocketApiClient.cs b/CryptoExchange.Net/Interfaces/ISocketApiClient.cs index f567d67..4d45e10 100644 --- a/CryptoExchange.Net/Interfaces/ISocketApiClient.cs +++ b/CryptoExchange.Net/Interfaces/ISocketApiClient.cs @@ -24,7 +24,7 @@ namespace CryptoExchange.Net.Interfaces /// /// The factory for creating sockets. Used for unit testing /// - IWebsocketFactory SocketFactory { get; } + IWebsocketFactory SocketFactory { get; set; } /// /// Current client options /// diff --git a/CryptoExchange.Net/Objects/CallResult.cs b/CryptoExchange.Net/Objects/CallResult.cs index c72f470..be9b783 100644 --- a/CryptoExchange.Net/Objects/CallResult.cs +++ b/CryptoExchange.Net/Objects/CallResult.cs @@ -70,7 +70,7 @@ namespace CryptoExchange.Net.Objects /// /// #pragma warning disable 8618 - protected CallResult([AllowNull]T data, string? originalData, Error? error): base(error) + public CallResult([AllowNull]T data, string? originalData, Error? error): base(error) #pragma warning restore 8618 { OriginalData = originalData; @@ -91,6 +91,13 @@ namespace CryptoExchange.Net.Objects /// The erro rto return public CallResult(Error error) : this(default, null, error) { } + /// + /// Create a new error result + /// + /// The error to return + /// The original response data + public CallResult(Error error, string? originalData) : this(default, originalData, error) { } + /// /// Overwrite bool check so we can use if(callResult) instead of if(callResult.Success) /// diff --git a/CryptoExchange.Net/Objects/Error.cs b/CryptoExchange.Net/Objects/Error.cs index 580a014..74232dd 100644 --- a/CryptoExchange.Net/Objects/Error.cs +++ b/CryptoExchange.Net/Objects/Error.cs @@ -278,7 +278,7 @@ namespace CryptoExchange.Net.Objects /// /// /// - protected CancellationRequestedError(int? code, string message, object? data): base(code, message, data) { } + public CancellationRequestedError(int? code, string message, object? data): base(code, message, data) { } } /// diff --git a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs index 6053a8c..d2936f1 100644 --- a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs +++ b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs @@ -183,7 +183,7 @@ namespace CryptoExchange.Net.Sockets private async Task ConnectInternalAsync() { - _logger.Log(LogLevel.Debug, $"Socket {Id} connecting"); + _logger.Log(LogLevel.Debug, $"[Sckt {Id}] connecting"); try { using CancellationTokenSource tcs = new(TimeSpan.FromSeconds(10)); @@ -191,11 +191,11 @@ namespace CryptoExchange.Net.Sockets } catch (Exception e) { - _logger.Log(LogLevel.Debug, $"Socket {Id} connection failed: " + e.ToLogString()); + _logger.Log(LogLevel.Debug, $"[Sckt {Id}] connection failed: " + e.ToLogString()); return false; } - _logger.Log(LogLevel.Debug, $"Socket {Id} connected to {Uri}"); + _logger.Log(LogLevel.Debug, $"[Sckt {Id}] connected to {Uri}"); return true; } @@ -204,13 +204,13 @@ namespace CryptoExchange.Net.Sockets { while (!_stopRequested) { - _logger.Log(LogLevel.Debug, $"Socket {Id} starting processing tasks"); + _logger.Log(LogLevel.Debug, $"[Sckt {Id}] starting processing tasks"); _processState = ProcessState.Processing; var sendTask = SendLoopAsync(); var receiveTask = ReceiveLoopAsync(); var timeoutTask = Parameters.Timeout != null && Parameters.Timeout > TimeSpan.FromSeconds(0) ? CheckTimeoutAsync() : Task.CompletedTask; await Task.WhenAll(sendTask, receiveTask, timeoutTask).ConfigureAwait(false); - _logger.Log(LogLevel.Debug, $"Socket {Id} processing tasks finished"); + _logger.Log(LogLevel.Debug, $"[Sckt {Id}] processing tasks finished"); _processState = ProcessState.WaitingForClose; while (_closeTask == null) @@ -238,14 +238,14 @@ namespace CryptoExchange.Net.Sockets while (!_stopRequested) { - _logger.Log(LogLevel.Debug, $"Socket {Id} attempting to reconnect"); + _logger.Log(LogLevel.Debug, $"[Sckt {Id}] attempting to reconnect"); var task = GetReconnectionUrl?.Invoke(); if (task != null) { var reconnectUri = await task.ConfigureAwait(false); if (reconnectUri != null && Parameters.Uri != reconnectUri) { - _logger.Log(LogLevel.Debug, $"Socket {Id} reconnect URI set to {reconnectUri}"); + _logger.Log(LogLevel.Debug, $"[Sckt {Id}] reconnect URI set to {reconnectUri}"); Parameters.Uri = reconnectUri; } } @@ -278,7 +278,7 @@ namespace CryptoExchange.Net.Sockets return; var bytes = Parameters.Encoding.GetBytes(data); - _logger.Log(LogLevel.Trace, $"Socket {Id} msg {id} - Adding {bytes.Length} bytes to send buffer"); + _logger.Log(LogLevel.Trace, $"[Sckt {Id}] msg {id} - Adding {bytes.Length} bytes to send buffer"); _sendBuffer.Enqueue(new SendItem { Id = id, Weight = weight, Bytes = bytes }); _sendEvent.Set(); } @@ -289,7 +289,7 @@ namespace CryptoExchange.Net.Sockets if (_processState != ProcessState.Processing && IsOpen) return; - _logger.Log(LogLevel.Debug, $"Socket {Id} reconnect requested"); + _logger.Log(LogLevel.Debug, $"[Sckt {Id}] reconnect requested"); _closeTask = CloseInternalAsync(); await _closeTask.ConfigureAwait(false); } @@ -304,18 +304,18 @@ namespace CryptoExchange.Net.Sockets { if (_closeTask?.IsCompleted == false) { - _logger.Log(LogLevel.Debug, $"Socket {Id} CloseAsync() waiting for existing close task"); + _logger.Log(LogLevel.Debug, $"[Sckt {Id}] CloseAsync() waiting for existing close task"); await _closeTask.ConfigureAwait(false); return; } if (!IsOpen) { - _logger.Log(LogLevel.Debug, $"Socket {Id} CloseAsync() socket not open"); + _logger.Log(LogLevel.Debug, $"[Sckt {Id}] CloseAsync() socket not open"); return; } - _logger.Log(LogLevel.Debug, $"Socket {Id} closing"); + _logger.Log(LogLevel.Debug, $"[Sckt {Id}] closing"); _closeTask = CloseInternalAsync(); } finally @@ -327,7 +327,7 @@ namespace CryptoExchange.Net.Sockets if(_processTask != null) await _processTask.ConfigureAwait(false); await (OnClose?.Invoke() ?? Task.CompletedTask).ConfigureAwait(false); - _logger.Log(LogLevel.Debug, $"Socket {Id} closed"); + _logger.Log(LogLevel.Debug, $"[Sckt {Id}] closed"); } /// @@ -379,11 +379,11 @@ namespace CryptoExchange.Net.Sockets if (_disposed) return; - _logger.Log(LogLevel.Debug, $"Socket {Id} disposing"); + _logger.Log(LogLevel.Debug, $"[Sckt {Id}] disposing"); _disposed = true; _socket.Dispose(); _ctsSource.Dispose(); - _logger.Log(LogLevel.Trace, $"Socket {Id} disposed"); + _logger.Log(LogLevel.Trace, $"[Sckt {Id}] disposed"); } /// @@ -415,7 +415,7 @@ namespace CryptoExchange.Net.Sockets if (limitResult.Success) { if (limitResult.Data > 0) - _logger.Log(LogLevel.Debug, $"Socket {Id} msg {data.Id} - send delayed {limitResult.Data}ms because of rate limit"); + _logger.Log(LogLevel.Debug, $"[Sckt {Id}] msg {data.Id} - send delayed {limitResult.Data}ms because of rate limit"); } } } @@ -424,7 +424,7 @@ namespace CryptoExchange.Net.Sockets { await _socket.SendAsync(new ArraySegment(data.Bytes, 0, data.Bytes.Length), WebSocketMessageType.Text, true, _ctsSource.Token).ConfigureAwait(false); await (OnRequestSent?.Invoke(data.Id) ?? Task.CompletedTask).ConfigureAwait(false); - _logger.Log(LogLevel.Trace, $"Socket {Id} msg {data.Id} - sent {data.Bytes.Length} bytes"); + _logger.Log(LogLevel.Trace, $"[Sckt {Id}] msg {data.Id} - sent {data.Bytes.Length} bytes"); } catch (OperationCanceledException) { @@ -447,13 +447,13 @@ namespace CryptoExchange.Net.Sockets // Because this is running in a separate task and not awaited until the socket gets closed // any exception here will crash the send processing, but do so silently unless the socket get's stopped. // Make sure we at least let the owner know there was an error - _logger.Log(LogLevel.Warning, $"Socket {Id} send loop stopped with exception"); + _logger.Log(LogLevel.Warning, $"[Sckt {Id}] send loop stopped with exception"); await (OnError?.Invoke(e) ?? Task.CompletedTask).ConfigureAwait(false); throw; } finally { - _logger.Log(LogLevel.Debug, $"Socket {Id} send loop finished"); + _logger.Log(LogLevel.Debug, $"[Sckt {Id}] send loop finished"); } } @@ -501,7 +501,7 @@ namespace CryptoExchange.Net.Sockets if (receiveResult.MessageType == WebSocketMessageType.Close) { // Connection closed unexpectedly - _logger.Log(LogLevel.Debug, $"Socket {Id} received `Close` message"); + _logger.Log(LogLevel.Debug, $"[Sckt {Id}] received `Close` message"); if (_closeTask?.IsCompleted != false) _closeTask = CloseInternalAsync(); break; @@ -512,7 +512,7 @@ namespace CryptoExchange.Net.Sockets // We received data, but it is not complete, write it to a memory stream for reassembling multiPartMessage = true; memoryStream ??= new MemoryStream(); - _logger.Log(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in partial message"); + _logger.Log(LogLevel.Trace, $"[Sckt {Id}] received {receiveResult.Count} bytes in partial message"); await memoryStream.WriteAsync(buffer.Array, buffer.Offset, receiveResult.Count).ConfigureAwait(false); } else @@ -520,13 +520,13 @@ namespace CryptoExchange.Net.Sockets if (!multiPartMessage) { // Received a complete message and it's not multi part - _logger.Log(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in single message"); + _logger.Log(LogLevel.Trace, $"[Sckt {Id}] received {receiveResult.Count} bytes in single message"); await ProcessData(receiveResult.MessageType, new MemoryStream(buffer.Array, buffer.Offset, receiveResult.Count)).ConfigureAwait(false); } else { // Received the end of a multipart message, write to memory stream for reassembling - _logger.Log(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in partial message"); + _logger.Log(LogLevel.Trace, $"[Sckt {Id}] received {receiveResult.Count} bytes in partial message"); await memoryStream!.WriteAsync(buffer.Array, buffer.Offset, receiveResult.Count).ConfigureAwait(false); } break; @@ -554,13 +554,13 @@ namespace CryptoExchange.Net.Sockets if (receiveResult?.EndOfMessage == true) { // Reassemble complete message from memory stream - _logger.Log(LogLevel.Trace, $"Socket {Id} reassembled message of {memoryStream!.Length} bytes"); + _logger.Log(LogLevel.Trace, $"[Sckt {Id}] reassembled message of {memoryStream!.Length} bytes"); await ProcessData(receiveResult.MessageType, memoryStream).ConfigureAwait(false); memoryStream.Dispose(); } else { - _logger.Log(LogLevel.Trace, $"Socket {Id} discarding incomplete message of {memoryStream!.Length} bytes"); + _logger.Log(LogLevel.Trace, $"[Sckt {Id}] discarding incomplete message of {memoryStream!.Length} bytes"); } } } @@ -570,13 +570,13 @@ namespace CryptoExchange.Net.Sockets // Because this is running in a separate task and not awaited until the socket gets closed // any exception here will crash the receive processing, but do so silently unless the socket gets stopped. // Make sure we at least let the owner know there was an error - _logger.Log(LogLevel.Warning, $"Socket {Id} receive loop stopped with exception"); + _logger.Log(LogLevel.Warning, $"[Sckt {Id}] receive loop stopped with exception"); await (OnError?.Invoke(e) ?? Task.CompletedTask).ConfigureAwait(false); throw; } finally { - _logger.Log(LogLevel.Debug, $"Socket {Id} receive loop finished"); + _logger.Log(LogLevel.Debug, $"[Sckt {Id}] receive loop finished"); } } @@ -601,7 +601,7 @@ namespace CryptoExchange.Net.Sockets /// protected async Task CheckTimeoutAsync() { - _logger.Log(LogLevel.Debug, $"Socket {Id} starting task checking for no data received for {Parameters.Timeout}"); + _logger.Log(LogLevel.Debug, $"[Sckt {Id}] starting task checking for no data received for {Parameters.Timeout}"); LastActionTime = DateTime.UtcNow; try { @@ -612,7 +612,7 @@ namespace CryptoExchange.Net.Sockets if (DateTime.UtcNow - LastActionTime > Parameters.Timeout) { - _logger.Log(LogLevel.Warning, $"Socket {Id} no data received for {Parameters.Timeout}, reconnecting socket"); + _logger.Log(LogLevel.Warning, $"[Sckt {Id}] no data received for {Parameters.Timeout}, reconnecting socket"); _ = ReconnectAsync().ConfigureAwait(false); return; } diff --git a/CryptoExchange.Net/Sockets/MessageParsing/Interfaces/IMessageData.cs b/CryptoExchange.Net/Sockets/MessageParsing/Interfaces/IMessageData.cs index b4e37df..9c76270 100644 --- a/CryptoExchange.Net/Sockets/MessageParsing/Interfaces/IMessageData.cs +++ b/CryptoExchange.Net/Sockets/MessageParsing/Interfaces/IMessageData.cs @@ -1,4 +1,6 @@ using System; +using System.Collections.Generic; +using System.IO; namespace CryptoExchange.Net.Sockets.MessageParsing.Interfaces { @@ -12,6 +14,15 @@ namespace CryptoExchange.Net.Sockets.MessageParsing.Interfaces /// bool IsJson { get; } /// + /// The underlying data object + /// + object? Underlying { get; } + /// + /// Load a stream message + /// + /// + void Load(Stream stream); + /// /// Get the type of node /// /// @@ -30,10 +41,18 @@ namespace CryptoExchange.Net.Sockets.MessageParsing.Interfaces /// T? GetValue(MessagePath path); /// + /// Get the values of an array + /// + /// + /// + /// + List? GetValues(MessagePath path); + /// /// Deserialize the message into this type /// /// + /// /// - object Deserialize(Type type); + object Deserialize(Type type, MessagePath? path = null); } } diff --git a/CryptoExchange.Net/Sockets/MessageParsing/Interfaces/IMessageSerializer.cs b/CryptoExchange.Net/Sockets/MessageParsing/Interfaces/IMessageSerializer.cs new file mode 100644 index 0000000..8df3e5e --- /dev/null +++ b/CryptoExchange.Net/Sockets/MessageParsing/Interfaces/IMessageSerializer.cs @@ -0,0 +1,15 @@ +namespace CryptoExchange.Net.Sockets.MessageParsing.Interfaces +{ + /// + /// Serializer interface + /// + public interface IMessageSerializer + { + /// + /// Serialize an object to a string + /// + /// + /// + string Serialize(object message); + } +} diff --git a/CryptoExchange.Net/Sockets/MessageParsing/JsonNetMessageData.cs b/CryptoExchange.Net/Sockets/MessageParsing/JsonNetMessageData.cs index c9da8e9..e3606d7 100644 --- a/CryptoExchange.Net/Sockets/MessageParsing/JsonNetMessageData.cs +++ b/CryptoExchange.Net/Sockets/MessageParsing/JsonNetMessageData.cs @@ -3,7 +3,9 @@ using CryptoExchange.Net.Sockets.MessageParsing.Interfaces; using Newtonsoft.Json; using Newtonsoft.Json.Linq; using System; +using System.Collections.Generic; using System.IO; +using System.Linq; using System.Text; namespace CryptoExchange.Net.Sockets.MessageParsing @@ -11,20 +13,20 @@ namespace CryptoExchange.Net.Sockets.MessageParsing /// /// Json.Net message accessor /// - public class JsonNetMessageData : IMessageAccessor + public class JsonNetMessageAccessor : IMessageAccessor { - private readonly JToken? _token; - private readonly Stream _stream; + private JToken? _token; + private Stream? _stream; private static JsonSerializer _serializer = JsonSerializer.Create(SerializerOptions.WithConverters); /// public bool IsJson { get; private set; } - /// - /// ctor - /// - /// - public JsonNetMessageData(Stream stream) + /// + public object? Underlying => _token; + + /// + public void Load(Stream stream) { _stream = stream; using var reader = new StreamReader(stream, Encoding.UTF8, false, (int)stream.Length, true); @@ -43,7 +45,7 @@ namespace CryptoExchange.Net.Sockets.MessageParsing } /// - public object Deserialize(Type type) + public object Deserialize(Type type, MessagePath? path = null) { if (!IsJson) { @@ -51,7 +53,11 @@ namespace CryptoExchange.Net.Sockets.MessageParsing return sr.ReadToEnd(); } - return _token!.ToObject(type, _serializer)!; + var source = _token; + if (path != null) + source = GetPathNode(path.Value); + + return source!.ToObject(type, _serializer)!; } /// @@ -98,27 +104,48 @@ namespace CryptoExchange.Net.Sockets.MessageParsing return value!.Value(); } + /// + public List? GetValues(MessagePath path) + { + var value = GetPathNode(path); + if (value == null) + return default; + + if (value.Type == JTokenType.Object) + return default; + + return value!.Values().ToList(); + } + private JToken? GetPathNode(MessagePath path) { var currentToken = _token; foreach (var node in path) { - if (node.Type) + if (node.Type == 0) { // Int value - var val = (int)node.Value; + var val = (int)node.Value!; if (currentToken!.Type != JTokenType.Array || ((JArray)currentToken).Count <= val) return null; currentToken = currentToken[val]; } - else + else if (node.Type == 1) { // String value if (currentToken!.Type != JTokenType.Object) return null; - currentToken = currentToken[(string)node.Value]; + currentToken = currentToken[(string)node.Value!]; + } + else + { + // Property name + if (currentToken!.Type != JTokenType.Object) + return null; + + currentToken = (currentToken.First as JProperty)?.Name; } if (currentToken == null) diff --git a/CryptoExchange.Net/Sockets/MessageParsing/JsonNetSerializer.cs b/CryptoExchange.Net/Sockets/MessageParsing/JsonNetSerializer.cs new file mode 100644 index 0000000..f501cd8 --- /dev/null +++ b/CryptoExchange.Net/Sockets/MessageParsing/JsonNetSerializer.cs @@ -0,0 +1,12 @@ +using CryptoExchange.Net.Sockets.MessageParsing.Interfaces; +using Newtonsoft.Json; + +namespace CryptoExchange.Net.Sockets.MessageParsing +{ + /// + public class JsonNetSerializer : IMessageSerializer + { + /// + public string Serialize(object message) => JsonConvert.SerializeObject(message, Formatting.None); + } +} diff --git a/CryptoExchange.Net/Sockets/MessageParsing/MessageNode.cs b/CryptoExchange.Net/Sockets/MessageParsing/MessageNode.cs index 36e3c9a..ecbc00a 100644 --- a/CryptoExchange.Net/Sockets/MessageParsing/MessageNode.cs +++ b/CryptoExchange.Net/Sockets/MessageParsing/MessageNode.cs @@ -8,13 +8,13 @@ /// /// Value /// - public object Value { get; } + public object? Value { get; } /// - /// Type (true = int, false = string) + /// Type (0 = int, 1 = string, 2 = prop name) /// - public bool Type { get; } + public int Type { get; } - private NodeAccessor(object value, bool type) + private NodeAccessor(object? value, int type) { Value = value; Type = type; @@ -25,14 +25,20 @@ /// /// /// - public static NodeAccessor Int(int value) { return new NodeAccessor(value, true); } + public static NodeAccessor Int(int value) { return new NodeAccessor(value, 0); } /// /// Create a string node accessor /// /// /// - public static NodeAccessor String(string value) { return new NodeAccessor(value, false); } + public static NodeAccessor String(string value) { return new NodeAccessor(value, 1); } + + /// + /// Create a property name node accessor + /// + /// + public static NodeAccessor PropertyName() { return new NodeAccessor(null, 2); } } } diff --git a/CryptoExchange.Net/Sockets/MessageParsing/MessagePathExtension.cs b/CryptoExchange.Net/Sockets/MessageParsing/MessagePathExtension.cs index 3a93748..771a992 100644 --- a/CryptoExchange.Net/Sockets/MessageParsing/MessagePathExtension.cs +++ b/CryptoExchange.Net/Sockets/MessageParsing/MessagePathExtension.cs @@ -17,6 +17,17 @@ return path; } + /// + /// Add a property name node accessor + /// + /// + /// + public static MessagePath PropertyName(this MessagePath path) + { + path.Add(NodeAccessor.PropertyName()); + return path; + } + /// /// Add a int node accessor /// diff --git a/CryptoExchange.Net/Sockets/Query.cs b/CryptoExchange.Net/Sockets/Query.cs index 3a5cbdf..045b2e3 100644 --- a/CryptoExchange.Net/Sockets/Query.cs +++ b/CryptoExchange.Net/Sockets/Query.cs @@ -1,8 +1,10 @@ using CryptoExchange.Net.Interfaces; using CryptoExchange.Net.Objects; using CryptoExchange.Net.Objects.Sockets; +using CryptoExchange.Net.Sockets.MessageParsing.Interfaces; using System; using System.Collections.Generic; +using System.Diagnostics; using System.Threading; using System.Threading.Tasks; @@ -41,7 +43,7 @@ namespace CryptoExchange.Net.Sockets /// /// Action to execute when query is finished /// - public Action? OnFinished { get; set; } + public AsyncResetEvent? ContinueAwaiter { get; set; } /// /// Strings to match this query to a received message @@ -68,7 +70,7 @@ namespace CryptoExchange.Net.Sockets /// /// /// - public abstract Type GetMessageType(SocketMessage message); + public abstract Type? GetMessageType(IMessageAccessor message); /// /// Wait event for response @@ -113,6 +115,9 @@ namespace CryptoExchange.Net.Sockets /// public async Task WaitAsync(TimeSpan timeout) => await _event.WaitAsync(timeout).ConfigureAwait(false); + /// + public virtual object Deserialize(IMessageAccessor message, Type type) => message.Deserialize(type); + /// /// Mark request as timeout /// @@ -141,7 +146,7 @@ namespace CryptoExchange.Net.Sockets public abstract class Query : Query { /// - public override Type GetMessageType(SocketMessage message) => typeof(TResponse); + public override Type? GetMessageType(IMessageAccessor message) => typeof(TResponse); /// /// The typed call result @@ -164,8 +169,8 @@ namespace CryptoExchange.Net.Sockets Completed = true; Response = message.Data; Result = await HandleMessageAsync(connection, message.As((TResponse)message.Data)).ConfigureAwait(false); - OnFinished?.Invoke(); _event.Set(); + await (ContinueAwaiter?.WaitAsync() ?? Task.CompletedTask).ConfigureAwait(false); return Result; } @@ -175,7 +180,7 @@ namespace CryptoExchange.Net.Sockets /// /// /// - public virtual Task> HandleMessageAsync(SocketConnection connection, DataEvent message) => Task.FromResult(new CallResult(message.Data)); + public virtual Task> HandleMessageAsync(SocketConnection connection, DataEvent message) => Task.FromResult(new CallResult(message.Data, message.OriginalData, null)); /// public override void Timeout() @@ -184,8 +189,8 @@ namespace CryptoExchange.Net.Sockets return; Completed = true; - Result = new CallResult(new CancellationRequestedError()); - OnFinished?.Invoke(); + Result = new CallResult(new CancellationRequestedError(null, "Query timeout", null)); + ContinueAwaiter?.Set(); _event.Set(); } @@ -194,7 +199,7 @@ namespace CryptoExchange.Net.Sockets { Result = new CallResult(new ServerError(error)); Completed = true; - OnFinished?.Invoke(); + ContinueAwaiter?.Set(); _event.Set(); } } diff --git a/CryptoExchange.Net/Sockets/SocketConnection.cs b/CryptoExchange.Net/Sockets/SocketConnection.cs index 660e377..69d8229 100644 --- a/CryptoExchange.Net/Sockets/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/SocketConnection.cs @@ -3,7 +3,6 @@ using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; -using Newtonsoft.Json; using Microsoft.Extensions.Logging; using CryptoExchange.Net.Objects; using System.Net.WebSockets; @@ -12,6 +11,7 @@ using CryptoExchange.Net.Objects.Sockets; using System.Text; using System.Diagnostics; using CryptoExchange.Net.Sockets.MessageParsing; +using CryptoExchange.Net.Sockets.MessageParsing.Interfaces; namespace CryptoExchange.Net.Sockets { @@ -48,12 +48,7 @@ namespace CryptoExchange.Net.Sockets /// /// Unhandled message event /// - public event Action? UnhandledMessage; - - /// - /// Unparsed message event - /// - public event Action? UnparsedMessage; // TODO not linked up + public event Action? UnhandledMessage; /// /// The amount of subscriptions on this connection @@ -135,7 +130,7 @@ namespace CryptoExchange.Net.Sockets if (_pausedActivity != value) { _pausedActivity = value; - _logger.Log(LogLevel.Information, $"Socket {SocketId} paused activity: " + value); + _logger.Log(LogLevel.Information, $"[Sckt {SocketId}] paused activity: " + value); if(_pausedActivity) _ = Task.Run(() => ActivityPaused?.Invoke()); else _ = Task.Run(() => ActivityUnpaused?.Invoke()); } @@ -155,7 +150,7 @@ namespace CryptoExchange.Net.Sockets var oldStatus = _status; _status = value; - _logger.Log(LogLevel.Debug, $"Socket {SocketId} status changed from {oldStatus} to {_status}"); + _logger.Log(LogLevel.Debug, $"[Sckt {SocketId}] status changed from {oldStatus} to {_status}"); } } @@ -165,6 +160,9 @@ namespace CryptoExchange.Net.Sockets private readonly ILogger _logger; private SocketStatus _status; + private IMessageSerializer _serializer; + private IMessageAccessor _accessor; + /// /// The underlying websocket /// @@ -196,6 +194,9 @@ namespace CryptoExchange.Net.Sockets _listenersLock = new object(); _listeners = new List(); + + _serializer = new JsonNetSerializer(); + _accessor = new JsonNetMessageAccessor(); } /// @@ -221,7 +222,7 @@ namespace CryptoExchange.Net.Sockets foreach (var subscription in _listeners.OfType()) subscription.Confirmed = false; - foreach (var query in _listeners.OfType()) + foreach (var query in _listeners.OfType().ToList()) { query.Fail("Connection interupted"); _listeners.Remove(query); @@ -246,7 +247,7 @@ namespace CryptoExchange.Net.Sockets foreach (var subscription in _listeners.OfType()) subscription.Confirmed = false; - foreach (var query in _listeners.OfType()) + foreach (var query in _listeners.OfType().ToList()) { query.Fail("Connection interupted"); _listeners.Remove(query); @@ -275,7 +276,7 @@ namespace CryptoExchange.Net.Sockets lock (_listenersLock) { - foreach (var query in _listeners.OfType()) + foreach (var query in _listeners.OfType().ToList()) { query.Fail("Connection interupted"); _listeners.Remove(query); @@ -288,7 +289,7 @@ namespace CryptoExchange.Net.Sockets var reconnectSuccessful = await ProcessReconnectAsync().ConfigureAwait(false); if (!reconnectSuccessful) { - _logger.Log(LogLevel.Warning, $"Socket {SocketId} failed reconnect processing: {reconnectSuccessful.Error}, reconnecting again"); + _logger.Log(LogLevel.Warning, $"[Sckt {SocketId}] failed reconnect processing: {reconnectSuccessful.Error}, reconnecting again"); _ = _socket.ReconnectAsync().ConfigureAwait(false); } else @@ -312,9 +313,9 @@ namespace CryptoExchange.Net.Sockets protected virtual Task HandleErrorAsync(Exception e) { if (e is WebSocketException wse) - _logger.Log(LogLevel.Warning, $"Socket {SocketId} error: Websocket error code {wse.WebSocketErrorCode}, details: " + e.ToLogString()); + _logger.Log(LogLevel.Warning, $"[Sckt {SocketId}] error: Websocket error code {wse.WebSocketErrorCode}, details: " + e.ToLogString()); else - _logger.Log(LogLevel.Warning, $"Socket {SocketId} error: " + e.ToLogString()); + _logger.Log(LogLevel.Warning, $"[Sckt {SocketId}] error: " + e.ToLogString()); return Task.CompletedTask; } @@ -333,7 +334,7 @@ namespace CryptoExchange.Net.Sockets if (query == null) { - _logger.Log(LogLevel.Debug, $"Socket {SocketId} msg {requestId} - message sent, but not pending"); + _logger.Log(LogLevel.Debug, $"[Sckt {SocketId}] msg {requestId} - message sent, but not pending"); return Task.CompletedTask; } @@ -350,77 +351,106 @@ namespace CryptoExchange.Net.Sockets protected virtual async Task HandleStreamMessage(WebSocketMessageType type, Stream stream) { var sw = Stopwatch.StartNew(); + var receiveTime = DateTime.UtcNow; + string? originalData = null; // 1. Decrypt/Preprocess if necessary stream = ApiClient.PreprocessStreamMessage(type, stream); // 2. Read data into accessor - var messageData = new JsonNetMessageData(stream); // TODO if we let the implementation create this we can switch per implementation - var message = new SocketMessage(DateTime.UtcNow, messageData); + _accessor.Load(stream); if (ApiClient.ApiOptions.OutputOriginalData ?? ApiClient.ClientOptions.OutputOriginalData) { stream.Position = 0; using var textReader = new StreamReader(stream, Encoding.UTF8, false, 1024, true); - message.RawData = textReader.ReadToEnd(); + originalData = textReader.ReadToEnd(); - _logger.LogTrace("Socket {SocketId} received {Data}", SocketId, message.RawData); + _logger.LogTrace("[Sckt {SocketId}] received {Data}", SocketId, originalData); } // 3. Determine the subscription interested in the messsage - var listenId = ApiClient.GetListenerIdentifier(message); + var listenId = ApiClient.GetListenerIdentifier(_accessor); + if (listenId == null) + { + if (!ApiClient.UnhandledMessageExpected) + _logger.LogWarning("[Sckt {SocketId}] failed to evaluate message", SocketId); + UnhandledMessage?.Invoke(_accessor); + stream.Dispose(); + return; + } + + // 4. Get the listeners interested in this message List processors; lock(_listenersLock) processors = _listeners.Where(s => s.ListenerIdentifiers.Contains(listenId)).ToList(); if (!processors.Any()) { - _logger.LogWarning("Socket {SocketId} received message not matched to any processor", SocketId); - UnhandledMessage?.Invoke(message); + if (!ApiClient.UnhandledMessageExpected) + { + _logger.LogWarning("[Sckt {SocketId}] received message not matched to any processor. ListenId: {ListenId}", SocketId, listenId); + UnhandledMessage?.Invoke(_accessor); + } + + stream.Dispose(); return; } - _logger.LogTrace("Socket {SocketId} {Count} processors matched to message with listener identifier {ListenerId}", SocketId, processors.Count, listenId); + _logger.LogTrace("[Sckt {SocketId}] {Count} processors matched to message with listener identifier {ListenerId}", SocketId, processors.Count, listenId); var totalUserTime = 0; + Dictionary? desCache = null; + if (processors.Count > 1) + { + // Only instantiate a cache if there are multiple processors + desCache = new Dictionary(); + } + foreach (var processor in processors) { - // 4. Determine the type to deserialize to - var messageType = processor.GetMessageType(message); + // 5. Determine the type to deserialize to for this processor + var messageType = processor.GetMessageType(_accessor); if (messageType == null) { - _logger.LogWarning("Socket {SocketId} received message not recognized by handler {Id}", SocketId, processor.Id); + _logger.LogWarning("[Sckt {SocketId}] received message not recognized by handler {Id}", SocketId, processor.Id); continue; } - // 5. Deserialize the message - object deserialized; - try + // 6. Deserialize the message + object? deserialized = null; + desCache?.TryGetValue(messageType, out deserialized); + + if (deserialized == null) { - deserialized = message.Deserialize(messageType); - } - catch (Exception ex) - { - _logger.LogWarning("Socket {SocketId} failed to deserialize message to type {Type}: {Exception}", SocketId, messageType.Name, ex.ToLogString()); - continue; + try + { + deserialized = processor.Deserialize(_accessor, messageType); + desCache?.Add(messageType, deserialized); + } + catch (Exception ex) + { + _logger.LogWarning("[Sckt {SocketId}] failed to deserialize message to type {Type}: {Exception}", SocketId, messageType.Name, ex.ToLogString()); + continue; + } } - // 6. Hand of the message to the subscription + // 7. Hand of the message to the subscription try { var innerSw = Stopwatch.StartNew(); - await processor.HandleAsync(this, new DataEvent(deserialized, null, message.RawData, message.ReceiveTime, null)).ConfigureAwait(false); + await processor.HandleAsync(this, new DataEvent(deserialized, null, originalData, receiveTime, null)).ConfigureAwait(false); totalUserTime += (int)innerSw.ElapsedMilliseconds; } catch (Exception ex) { - _logger.LogWarning("Socket {SocketId} user message processing failed: {Exception}", SocketId, ex.ToLogString()); + _logger.LogWarning("[Sckt {SocketId}] user message processing failed: {Exception}", SocketId, ex.ToLogString()); if (processor is Subscription subscription) subscription.InvokeExceptionHandler(ex); } } stream.Dispose(); - _logger.LogTrace($"Socket {SocketId} message processed in {(int)sw.ElapsedMilliseconds}ms ({totalUserTime - sw.ElapsedMilliseconds}ms parsing)"); + _logger.LogTrace($"[Sckt {SocketId}] message processed in {(int)sw.ElapsedMilliseconds}ms ({sw.ElapsedMilliseconds - totalUserTime}ms parsing)"); } /// @@ -474,27 +504,36 @@ namespace CryptoExchange.Net.Sockets /// public async Task CloseAsync(Subscription subscription, bool unsubEvenIfNotConfirmed = false) { - lock (_listenersLock) - { - if (!_listeners.Contains(subscription)) - return; - } - subscription.Closed = true; if (Status == SocketStatus.Closing || Status == SocketStatus.Closed || Status == SocketStatus.Disposed) return; - _logger.Log(LogLevel.Debug, $"Socket {SocketId} closing subscription {subscription.Id}"); + _logger.Log(LogLevel.Debug, $"[Sckt {SocketId}] closing subscription {subscription.Id}"); if (subscription.CancellationTokenRegistration.HasValue) subscription.CancellationTokenRegistration.Value.Dispose(); - if ((unsubEvenIfNotConfirmed || subscription.Confirmed) && _socket.IsOpen) - await UnsubscribeAsync(subscription).ConfigureAwait(false); + bool anyDuplicateSubscription; + lock (_listenersLock) + anyDuplicateSubscription = _listeners.OfType().Any(x => x != subscription && x.ListenerIdentifiers.All(l => subscription.ListenerIdentifiers.Contains(l))); + + if (!anyDuplicateSubscription) + { + bool needUnsub; + lock (_listenersLock) + needUnsub = _listeners.Contains(subscription); + + if (needUnsub && (unsubEvenIfNotConfirmed || subscription.Confirmed) && _socket.IsOpen) + await UnsubscribeAsync(subscription).ConfigureAwait(false); + } + else + { + _logger.Log(LogLevel.Debug, $"[Sckt {SocketId}] not unsubscribing subscription as there is still a duplicate subscription running"); + } if (Status == SocketStatus.Closing) { - _logger.Log(LogLevel.Debug, $"Socket {SocketId} already closing"); + _logger.Log(LogLevel.Debug, $"[Sckt {SocketId}] already closing"); return; } @@ -508,7 +547,7 @@ namespace CryptoExchange.Net.Sockets if (shouldCloseConnection) { - _logger.Log(LogLevel.Debug, $"Socket {SocketId} closing as there are no more subscriptions"); + _logger.Log(LogLevel.Debug, $"[Sckt {SocketId}] closing as there are no more subscriptions"); await CloseAsync().ConfigureAwait(false); } @@ -544,7 +583,7 @@ namespace CryptoExchange.Net.Sockets _listeners.Add(subscription); if (subscription.UserSubscription) - _logger.Log(LogLevel.Debug, $"Socket {SocketId} adding new subscription with id {subscription.Id}, total subscriptions on connection: {UserSubscriptionCount}"); + _logger.Log(LogLevel.Debug, $"[Sckt {SocketId}] adding new subscription with id {subscription.Id}, total subscriptions on connection: {UserSubscriptionCount}"); return true; } @@ -572,11 +611,11 @@ namespace CryptoExchange.Net.Sockets /// Send a query request and wait for an answer /// /// Query to send - /// Action to run when query finishes + /// Wait event for when the socket message handler can continue /// - public virtual async Task SendAndWaitQueryAsync(Query query, Action? onFinished = null) + public virtual async Task SendAndWaitQueryAsync(Query query, AsyncResetEvent? continueEvent = null) { - await SendAndWaitIntAsync(query, onFinished).ConfigureAwait(false); + await SendAndWaitIntAsync(query, continueEvent).ConfigureAwait(false); return query.Result ?? new CallResult(new ServerError("Timeout")); } @@ -585,42 +624,52 @@ namespace CryptoExchange.Net.Sockets /// /// Query response type /// Query to send - /// Action to run when query finishes + /// Wait event for when the socket message handler can continue /// - public virtual async Task> SendAndWaitQueryAsync(Query query, Action? onFinished = null) + public virtual async Task> SendAndWaitQueryAsync(Query query, AsyncResetEvent? continueEvent = null) { - await SendAndWaitIntAsync(query, onFinished).ConfigureAwait(false); + await SendAndWaitIntAsync(query, continueEvent).ConfigureAwait(false); return query.TypedResult ?? new CallResult(new ServerError("Timeout")); } - private async Task SendAndWaitIntAsync(Query query, Action? onFinished) + private async Task SendAndWaitIntAsync(Query query, AsyncResetEvent? continueEvent) { lock(_listenersLock) _listeners.Add(query); + query.ContinueAwaiter = continueEvent; var sendOk = Send(query.Id, query.Request, query.Weight); if (!sendOk) { query.Fail("Failed to send"); + lock (_listenersLock) + _listeners.Remove(query); return; } - query.OnFinished = onFinished; - while (true) + try { - if (!_socket.IsOpen) + while (true) { - query.Fail("Socket not open"); - return; + if (!_socket.IsOpen) + { + query.Fail("Socket not open"); + return; + } + + if (query.Completed) + return; + + await query.WaitAsync(TimeSpan.FromMilliseconds(500)).ConfigureAwait(false); + + if (query.Completed) + return; } - - if (query.Completed) - return; - - await query.WaitAsync(TimeSpan.FromMilliseconds(500)).ConfigureAwait(false); - - if (query.Completed) - return; + } + finally + { + lock (_listenersLock) + _listeners.Remove(query); } } @@ -630,14 +679,13 @@ namespace CryptoExchange.Net.Sockets /// The type of the object to send /// The request id /// The object to send - /// How null values should be serialized /// The weight of the message - public virtual bool Send(int requestId, T obj, int weight, NullValueHandling nullValueHandling = NullValueHandling.Ignore) + public virtual bool Send(int requestId, T obj, int weight) { if(obj is string str) return Send(requestId, str, weight); else - return Send(requestId, JsonConvert.SerializeObject(obj, Formatting.None, new JsonSerializerSettings { NullValueHandling = nullValueHandling }), weight); + return Send(requestId, _serializer.Serialize(obj!), weight); } /// @@ -648,7 +696,7 @@ namespace CryptoExchange.Net.Sockets /// The id of the request public virtual bool Send(int requestId, string data, int weight) { - _logger.Log(LogLevel.Trace, $"Socket {SocketId} msg {requestId} - sending messsage: {data}"); + _logger.Log(LogLevel.Trace, $"[Sckt {SocketId}] msg {requestId} - sending messsage: {data}"); try { _socket.Send(requestId, data, weight); @@ -671,7 +719,7 @@ namespace CryptoExchange.Net.Sockets if (!anySubscriptions) { // No need to resubscribe anything - _logger.Log(LogLevel.Debug, $"Socket {SocketId} nothing to resubscribe, closing connection"); + _logger.Log(LogLevel.Debug, $"[Sckt {SocketId}] nothing to resubscribe, closing connection"); _ = _socket.CloseAsync(); return new CallResult(true); } @@ -685,12 +733,12 @@ namespace CryptoExchange.Net.Sockets var authResult = await ApiClient.AuthenticateSocketAsync(this).ConfigureAwait(false); if (!authResult) { - _logger.Log(LogLevel.Warning, $"Socket {SocketId} authentication failed on reconnected socket. Disconnecting and reconnecting."); + _logger.Log(LogLevel.Warning, $"[Sckt {SocketId}] authentication failed on reconnected socket. Disconnecting and reconnecting."); return authResult; } Authenticated = true; - _logger.Log(LogLevel.Debug, $"Socket {SocketId} authentication succeeded on reconnected socket."); + _logger.Log(LogLevel.Debug, $"[Sckt {SocketId}] authentication succeeded on reconnected socket."); } // Get a list of all subscriptions on the socket @@ -704,7 +752,7 @@ namespace CryptoExchange.Net.Sockets var result = await ApiClient.RevitalizeRequestAsync(subscription).ConfigureAwait(false); if (!result) { - _logger.Log(LogLevel.Warning, $"Socket {SocketId} failed request revitalization: " + result.Error); + _logger.Log(LogLevel.Warning, $"[Sckt {SocketId}] failed request revitalization: " + result.Error); return result.As(false); } } @@ -721,10 +769,13 @@ namespace CryptoExchange.Net.Sockets var subQuery = subscription.GetSubQuery(this); if (subQuery == null) continue; - - taskList.Add(SendAndWaitQueryAsync(subQuery, () => - { + + var waitEvent = new AsyncResetEvent(false); + taskList.Add(SendAndWaitQueryAsync(subQuery, waitEvent).ContinueWith((r) => + { subscription.HandleSubQueryResponse(subQuery.Response!); + waitEvent.Set(); + return r.Result; })); } @@ -739,7 +790,7 @@ namespace CryptoExchange.Net.Sockets if (!_socket.IsOpen) return new CallResult(new WebError("Socket not connected")); - _logger.Log(LogLevel.Debug, $"Socket {SocketId} all subscription successfully resubscribed on reconnected socket."); + _logger.Log(LogLevel.Debug, $"[Sckt {SocketId}] all subscription successfully resubscribed on reconnected socket."); return new CallResult(true); } @@ -750,7 +801,7 @@ namespace CryptoExchange.Net.Sockets return; await SendAndWaitQueryAsync(unsubscribeRequest).ConfigureAwait(false); - _logger.Log(LogLevel.Information, $"Socket {SocketId} subscription {subscription!.Id} unsubscribed"); + _logger.Log(LogLevel.Information, $"[Sckt {SocketId}] subscription {subscription!.Id} unsubscribed"); } internal async Task ResubscribeAsync(Subscription subscription) diff --git a/CryptoExchange.Net/Sockets/SocketMessage.cs b/CryptoExchange.Net/Sockets/SocketMessage.cs deleted file mode 100644 index 75fc7a6..0000000 --- a/CryptoExchange.Net/Sockets/SocketMessage.cs +++ /dev/null @@ -1,45 +0,0 @@ -using CryptoExchange.Net.Sockets.MessageParsing.Interfaces; -using System; - -namespace CryptoExchange.Net.Sockets -{ - /// - /// Message received from the websocket - /// - public class SocketMessage - { - /// - /// Message receive time - /// - public DateTime ReceiveTime { get; set; } - /// - /// The message data - /// - public IMessageAccessor Message { get; set; } - /// - /// Raw string data - /// - public string? RawData { get; set; } - - /// - /// ctor - /// - /// - /// - public SocketMessage(DateTime receiveTime, IMessageAccessor message) - { - ReceiveTime = receiveTime; - Message = message; - } - - /// - /// Deserialize the message to a type - /// - /// - /// - public object Deserialize(Type type) - { - return Message.Deserialize(type); - } - } -} diff --git a/CryptoExchange.Net/Sockets/Subscription.cs b/CryptoExchange.Net/Sockets/Subscription.cs index c1b98bb..353c9b7 100644 --- a/CryptoExchange.Net/Sockets/Subscription.cs +++ b/CryptoExchange.Net/Sockets/Subscription.cs @@ -1,6 +1,7 @@ using CryptoExchange.Net.Interfaces; using CryptoExchange.Net.Objects; using CryptoExchange.Net.Objects.Sockets; +using CryptoExchange.Net.Sockets.MessageParsing.Interfaces; using Microsoft.Extensions.Logging; using System; using System.Collections.Generic; @@ -74,7 +75,7 @@ namespace CryptoExchange.Net.Sockets /// /// /// - public abstract Type? GetMessageType(SocketMessage message); + public abstract Type? GetMessageType(IMessageAccessor message); /// /// ctor @@ -114,6 +115,9 @@ namespace CryptoExchange.Net.Sockets /// public abstract Query? GetUnsubQuery(); + /// + public virtual object Deserialize(IMessageAccessor message, Type type) => message.Deserialize(type); + /// /// Handle an update message /// diff --git a/CryptoExchange.Net/Sockets/SystemSubscription.cs b/CryptoExchange.Net/Sockets/SystemSubscription.cs index 625b591..3fb883b 100644 --- a/CryptoExchange.Net/Sockets/SystemSubscription.cs +++ b/CryptoExchange.Net/Sockets/SystemSubscription.cs @@ -1,5 +1,6 @@ using CryptoExchange.Net.Objects; using CryptoExchange.Net.Objects.Sockets; +using CryptoExchange.Net.Sockets.MessageParsing.Interfaces; using Microsoft.Extensions.Logging; using System; using System.Threading.Tasks; @@ -31,7 +32,7 @@ namespace CryptoExchange.Net.Sockets public abstract class SystemSubscription : SystemSubscription { /// - public override Type GetMessageType(SocketMessage message) => typeof(T); + public override Type GetMessageType(IMessageAccessor message) => typeof(T); /// public override Task DoHandleMessageAsync(SocketConnection connection, DataEvent message)