From e3207033c3ca4ba9467338cb7bc9192647f775f4 Mon Sep 17 00:00:00 2001 From: JKorf Date: Sat, 20 Jan 2024 19:16:39 +0100 Subject: [PATCH] wip --- CryptoExchange.Net/Clients/SocketApiClient.cs | 40 +-- .../Converters/JTokenAccessor.cs | 262 +++++++-------- .../Interfaces/IMessageAccessor.cs | 21 -- .../Interfaces/IMessageProcessor.cs | 26 +- .../Interfaces/IStreamMessageListener.cs | 13 - CryptoExchange.Net/Objects/ConcurrentList.cs | 81 ----- .../Objects/Sockets/MatchingStrategy.cs | 21 -- .../Objects/Sockets/ParsedMessage.cs | 100 +++--- .../Sockets/StreamMessageParseCallback.cs | 36 +-- .../Objects/Testing/TestWebsocket.cs | 39 --- .../Objects/Testing/TestWebsocketFactory.cs | 24 -- .../Sockets/CryptoExchangeWebSocketClient.cs | 26 +- .../MessageParsing/Interfaces/IMessageData.cs | 39 +++ .../MessageParsing/JsonNetMessageData.cs | 132 ++++++++ .../Sockets/MessageParsing/MessageNode.cs | 38 +++ .../Sockets/MessageParsing/MessagePath.cs | 50 +++ .../MessageParsing/MessagePathExtension.cs | 32 ++ .../Sockets/MessageParsing/NodeType.cs | 21 ++ CryptoExchange.Net/Sockets/Query.cs | 83 +++-- .../Sockets/SocketConnection.cs | 305 ++++++++++-------- .../Sockets/SocketListenerManager.cs | 170 ---------- CryptoExchange.Net/Sockets/SocketMessage.cs | 45 +++ CryptoExchange.Net/Sockets/Subscription.cs | 84 ++--- .../Sockets/SystemSubscription.cs | 29 +- 24 files changed, 914 insertions(+), 803 deletions(-) delete mode 100644 CryptoExchange.Net/Interfaces/IMessageAccessor.cs delete mode 100644 CryptoExchange.Net/Interfaces/IStreamMessageListener.cs delete mode 100644 CryptoExchange.Net/Objects/ConcurrentList.cs delete mode 100644 CryptoExchange.Net/Objects/Sockets/MatchingStrategy.cs delete mode 100644 CryptoExchange.Net/Objects/Testing/TestWebsocket.cs delete mode 100644 CryptoExchange.Net/Objects/Testing/TestWebsocketFactory.cs create mode 100644 CryptoExchange.Net/Sockets/MessageParsing/Interfaces/IMessageData.cs create mode 100644 CryptoExchange.Net/Sockets/MessageParsing/JsonNetMessageData.cs create mode 100644 CryptoExchange.Net/Sockets/MessageParsing/MessageNode.cs create mode 100644 CryptoExchange.Net/Sockets/MessageParsing/MessagePath.cs create mode 100644 CryptoExchange.Net/Sockets/MessageParsing/MessagePathExtension.cs create mode 100644 CryptoExchange.Net/Sockets/MessageParsing/NodeType.cs delete mode 100644 CryptoExchange.Net/Sockets/SocketListenerManager.cs create mode 100644 CryptoExchange.Net/Sockets/SocketMessage.cs diff --git a/CryptoExchange.Net/Clients/SocketApiClient.cs b/CryptoExchange.Net/Clients/SocketApiClient.cs index 43af3bf..57f3180 100644 --- a/CryptoExchange.Net/Clients/SocketApiClient.cs +++ b/CryptoExchange.Net/Clients/SocketApiClient.cs @@ -10,6 +10,7 @@ using System.Collections.Concurrent; using System.Collections.Generic; using System.IO; using System.Linq; +using System.Net.WebSockets; using System.Text; using System.Threading; using System.Threading.Tasks; @@ -103,15 +104,12 @@ namespace CryptoExchange.Net } } - /// public new SocketExchangeOptions ClientOptions => (SocketExchangeOptions)base.ClientOptions; /// public new SocketApiOptions ApiOptions => (SocketApiOptions)base.ApiOptions; - /// - public abstract MessageInterpreterPipeline Pipeline { get; } #endregion /// @@ -135,19 +133,9 @@ namespace CryptoExchange.Net RateLimiters = rateLimiters; } - /// - /// Set a delegate which can manipulate the message stream before it is processed by listeners - /// - /// Interceptor - protected void SetInterceptor(Func interceptor) - { - this.interceptor = interceptor; - } - /// /// Connect to an url and listen for data on the BaseAddress /// - /// The type of the expected data /// The subscription /// Cancellation token for closing this subscription /// @@ -159,7 +147,6 @@ namespace CryptoExchange.Net /// /// Connect to an url and listen for data /// - /// The type of the expected data /// The URL to connect to /// The subscription /// Cancellation token for closing this subscription @@ -247,7 +234,7 @@ namespace CryptoExchange.Net return new CallResult(subResult.Error!); } - subscription.HandleSubQueryResponse(subQuery.Response); + subscription.HandleSubQueryResponse(subQuery.Response!); } subscription.Confirmed = true; @@ -382,7 +369,7 @@ namespace CryptoExchange.Net /// Should return the request which can be used to authenticate a socket connection /// /// - protected internal virtual BaseQuery GetAuthenticationRequest() => throw new NotImplementedException(); + protected internal virtual Query GetAuthenticationRequest() => throw new NotImplementedException(); /// /// Adds a system subscription. Used for example to reply to ping requests @@ -474,7 +461,7 @@ namespace CryptoExchange.Net /// Process an unhandled message /// /// The message that wasn't processed - protected virtual void HandleUnhandledMessage(BaseParsedMessage message) + protected virtual void HandleUnhandledMessage(SocketMessage message) { } @@ -538,7 +525,7 @@ namespace CryptoExchange.Net /// How often /// Method returning the query to send /// The callback for processing the response - protected virtual void QueryPeriodic(string identifier, TimeSpan interval, Func queryDelegate, Action? callback) + protected virtual void QueryPeriodic(string identifier, TimeSpan interval, Func queryDelegate, Action? callback) { if (queryDelegate == null) throw new ArgumentNullException(nameof(queryDelegate)); @@ -686,7 +673,7 @@ namespace CryptoExchange.Net sb.AppendLine($" Id: {subscription.Id}"); sb.AppendLine($" Confirmed: {subscription.Confirmed}"); sb.AppendLine($" Invocations: {subscription.TotalInvocations}"); - sb.AppendLine($" Identifiers: [{string.Join(", ", subscription.StreamIdentifiers)}]"); + sb.AppendLine($" Identifiers: [{string.Join(", ", subscription.ListenerIdentifiers)}]"); } } return sb.ToString(); @@ -708,5 +695,20 @@ namespace CryptoExchange.Net semaphoreSlim?.Dispose(); base.Dispose(); } + + /// + /// Get the listener identifier for the message + /// + /// + /// + public abstract string GetListenerIdentifier(SocketMessage message); + + /// + /// Preprocess a stream message + /// + /// + /// + /// + public virtual Stream PreprocessStreamMessage(WebSocketMessageType type, Stream stream) => stream; } } diff --git a/CryptoExchange.Net/Converters/JTokenAccessor.cs b/CryptoExchange.Net/Converters/JTokenAccessor.cs index 182bc5b..c21f330 100644 --- a/CryptoExchange.Net/Converters/JTokenAccessor.cs +++ b/CryptoExchange.Net/Converters/JTokenAccessor.cs @@ -1,158 +1,158 @@ -using CryptoExchange.Net.Interfaces; -using CryptoExchange.Net.Objects.Sockets; -using Newtonsoft.Json; -using Newtonsoft.Json.Linq; -using System; -using System.Collections.Generic; -using System.IO; -using System.Linq; -using System.Runtime.InteropServices.ComTypes; -using System.Text; +//using CryptoExchange.Net.Interfaces; +//using CryptoExchange.Net.Objects.Sockets; +//using Newtonsoft.Json; +//using Newtonsoft.Json.Linq; +//using System; +//using System.Collections.Generic; +//using System.IO; +//using System.Linq; +//using System.Runtime.InteropServices.ComTypes; +//using System.Text; -namespace CryptoExchange.Net.Converters -{ - internal class JTokenAccessor : IMessageAccessor - { - private readonly JToken _token; - private readonly Stream _stream; - private readonly StreamReader _reader; - private Dictionary _cache = new Dictionary(); - private static JsonSerializer _serializer = JsonSerializer.Create(SerializerOptions.WithConverters); +//namespace CryptoExchange.Net.Converters +//{ +// internal class JTokenAccessor : IMessageAccessor +// { +// private readonly JToken _token; +// private readonly Stream _stream; +// private readonly StreamReader _reader; +// private Dictionary _cache = new Dictionary(); +// private static JsonSerializer _serializer = JsonSerializer.Create(SerializerOptions.WithConverters); - public JTokenAccessor(Stream stream) - { - _stream = stream; - _reader = new StreamReader(stream, Encoding.UTF8, false, (int)stream.Length, true); - using var jsonTextReader = new JsonTextReader(_reader); - JToken token; - try - { - _token = JToken.Load(jsonTextReader); - } - catch (Exception ex) - { - // Not a json message - throw; - } - } +// public JTokenAccessor(Stream stream) +// { +// _stream = stream; +// _reader = new StreamReader(stream, Encoding.UTF8, false, (int)stream.Length, true); +// using var jsonTextReader = new JsonTextReader(_reader); +// JToken token; +// try +// { +// _token = JToken.Load(jsonTextReader); +// } +// catch (Exception ex) +// { +// // Not a json message +// throw; +// } +// } - public BaseParsedMessage Instantiate(Type type) - { - var resultMessageType = typeof(ParsedMessage<>).MakeGenericType(type); - var instance = (BaseParsedMessage)Activator.CreateInstance(resultMessageType, type == null ? null : _token.ToObject(type, _serializer)); - return instance; - } +// public BaseParsedMessage Instantiate(Type type) +// { +// var resultMessageType = typeof(ParsedMessage<>).MakeGenericType(type); +// var instance = (BaseParsedMessage)Activator.CreateInstance(resultMessageType, type == null ? null : _token.ToObject(type, _serializer)); +// return instance; +// } - public string GetOriginalString() - { - _stream.Position = 0; - return _reader.ReadToEnd(); - } +// public string GetOriginalString() +// { +// _stream.Position = 0; +// return _reader.ReadToEnd(); +// } - public int? GetArrayIntValue(string? key, int index) - { - var accessToken = key == null ? _token : GetToken(key); - if (accessToken == null || accessToken is not JArray arr) - return null; - return arr[index].Value(); - } +// public int? GetArrayIntValue(string? key, int index) +// { +// var accessToken = key == null ? _token : GetToken(key); +// if (accessToken == null || accessToken is not JArray arr) +// return null; +// return arr[index].Value(); +// } - public string? GetArrayStringValue(string? key, int index) - { - var accessToken = key == null ? _token : GetToken(key); - if (accessToken == null || accessToken is not JArray arr) - return null; +// public string? GetArrayStringValue(string? key, int index) +// { +// var accessToken = key == null ? _token : GetToken(key); +// if (accessToken == null || accessToken is not JArray arr) +// return null; - if (arr.Count <= index) - return null; +// if (arr.Count <= index) +// return null; - if (arr[index].Type != JTokenType.String) - return null; +// if (arr[index].Type != JTokenType.String) +// return null; - return arr[index].Value(); - } +// return arr[index].Value(); +// } - public int? GetCount(string key) - { - var accessToken = GetToken(key); - return accessToken.Count(); - } +// public int? GetCount(string key) +// { +// var accessToken = GetToken(key); +// return accessToken.Count(); +// } - public int? GetIntValue(string key) - { - var accessToken = GetToken(key); - return accessToken?.Value(); - } +// public int? GetIntValue(string key) +// { +// var accessToken = GetToken(key); +// return accessToken?.Value(); +// } - public string? GetStringValue(string key) - { - var accessToken = GetToken(key); - if (accessToken?.Type == JTokenType.Object) - return ((JObject)accessToken).Properties().First().Name; +// public string? GetStringValue(string key) +// { +// var accessToken = GetToken(key); +// if (accessToken?.Type == JTokenType.Object) +// return ((JObject)accessToken).Properties().First().Name; - return accessToken?.ToString(); - } +// return accessToken?.ToString(); +// } - public bool IsObject(string? key) => _token.Type == JTokenType.Object; - public bool IsArray(IEnumerable indexes) - { - var item = _token; - foreach(var index in indexes) - { - if (item.Type != JTokenType.Array) - return false; +// public bool IsObject(string? key) => _token.Type == JTokenType.Object; +// public bool IsArray(IEnumerable indexes) +// { +// var item = _token; +// foreach(var index in indexes) +// { +// if (item.Type != JTokenType.Array) +// return false; - var arr = ((JArray)item); - if (arr.Count <= index) - return false; +// var arr = ((JArray)item); +// if (arr.Count <= index) +// return false; - item = arr[index]; - } +// item = arr[index]; +// } - return item.Type == JTokenType.Array; - } +// return item.Type == JTokenType.Array; +// } - public bool IsEmptyArray(IEnumerable indexes) - { - var item = _token; - foreach (var index in indexes) - { - if (item.Type != JTokenType.Array) - return false; +// public bool IsEmptyArray(IEnumerable indexes) +// { +// var item = _token; +// foreach (var index in indexes) +// { +// if (item.Type != JTokenType.Array) +// return false; - var arr = ((JArray)item); - if (arr.Count <= index) - return false; +// var arr = ((JArray)item); +// if (arr.Count <= index) +// return false; - item = arr[index]; - } +// item = arr[index]; +// } - return item.Type == JTokenType.Array && !item.HasValues; - } +// return item.Type == JTokenType.Array && !item.HasValues; +// } - private JToken? GetToken(string key) - { - if (key == null) - return _token; +// private JToken? GetToken(string key) +// { +// if (key == null) +// return _token; - if (_cache.TryGetValue(key, out var token)) - return token; +// if (_cache.TryGetValue(key, out var token)) +// return token; - var splitTokens = key.Split(new char[] { ':' }); - var accessToken = _token; - foreach (var splitToken in splitTokens) - { - if (accessToken.Type == JTokenType.Array) - return null; +// var splitTokens = key.Split(new char[] { ':' }); +// var accessToken = _token; +// foreach (var splitToken in splitTokens) +// { +// if (accessToken.Type == JTokenType.Array) +// return null; - accessToken = accessToken[splitToken]; +// accessToken = accessToken[splitToken]; - if (accessToken == null) - break; - } +// if (accessToken == null) +// break; +// } - _cache.Add(key, accessToken); - return accessToken; - } - } -} +// _cache.Add(key, accessToken); +// return accessToken; +// } +// } +//} diff --git a/CryptoExchange.Net/Interfaces/IMessageAccessor.cs b/CryptoExchange.Net/Interfaces/IMessageAccessor.cs deleted file mode 100644 index f9c67ae..0000000 --- a/CryptoExchange.Net/Interfaces/IMessageAccessor.cs +++ /dev/null @@ -1,21 +0,0 @@ -using CryptoExchange.Net.Objects.Sockets; -using System; -using System.Collections.Generic; -using System.Text; - -namespace CryptoExchange.Net.Interfaces -{ - public interface IMessageAccessor - { - bool IsObject(string? key); - bool IsArray(IEnumerable indexes); - bool IsEmptyArray(IEnumerable indexes); - string? GetStringValue(string key); - int? GetIntValue(string key); - public int? GetCount(string key); - public int? GetArrayIntValue(string? key, int index); - public string? GetArrayStringValue(string? key, int index); - - public BaseParsedMessage Instantiate(Type type); - } -} diff --git a/CryptoExchange.Net/Interfaces/IMessageProcessor.cs b/CryptoExchange.Net/Interfaces/IMessageProcessor.cs index 62cf75b..299d16e 100644 --- a/CryptoExchange.Net/Interfaces/IMessageProcessor.cs +++ b/CryptoExchange.Net/Interfaces/IMessageProcessor.cs @@ -8,11 +8,31 @@ using System.Threading.Tasks; namespace CryptoExchange.Net.Interfaces { + /// + /// Message processor + /// public interface IMessageProcessor { + /// + /// Id of the processor + /// public int Id { get; } - public List StreamIdentifiers { get; } - Task HandleMessageAsync(SocketConnection connection, DataEvent message); - Dictionary TypeMapping { get; } + /// + /// The identifiers for this processor + /// + public HashSet ListenerIdentifiers { get; } + /// + /// Handle a message + /// + /// + /// + /// + Task HandleAsync(SocketConnection connection, DataEvent message); + /// + /// Get the type the message should be deserialized to + /// + /// + /// + Type? GetMessageType(SocketMessage message); } } diff --git a/CryptoExchange.Net/Interfaces/IStreamMessageListener.cs b/CryptoExchange.Net/Interfaces/IStreamMessageListener.cs deleted file mode 100644 index 9d5bce2..0000000 --- a/CryptoExchange.Net/Interfaces/IStreamMessageListener.cs +++ /dev/null @@ -1,13 +0,0 @@ -//using CryptoExchange.Net.Converters; -//using CryptoExchange.Net.Objects.Sockets; -//using System.Threading.Tasks; - -//namespace CryptoExchange.Net.Interfaces -//{ -// internal interface IStreamMessageListener -// { -// int Priority { get; } -// bool MessageMatches(ParsedMessage message); -// Task ProcessAsync(ParsedMessage message); -// } -//} diff --git a/CryptoExchange.Net/Objects/ConcurrentList.cs b/CryptoExchange.Net/Objects/ConcurrentList.cs deleted file mode 100644 index 40a6ce1..0000000 --- a/CryptoExchange.Net/Objects/ConcurrentList.cs +++ /dev/null @@ -1,81 +0,0 @@ -using System; -using System.Collections; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using static System.Collections.Specialized.BitVector32; - -namespace CryptoExchange.Net.Objects -{ - public class ConcurrentList : IEnumerable - { - private readonly object _lock = new object(); - private readonly List _collection = new List(); - - public void Add(T item) - { - lock (_lock) - _collection.Add(item); - } - - - public void Remove(T item) - { - lock (_lock) - _collection.Remove(item); - } - - public T? SingleOrDefault(Func action) - { - lock (_lock) - return _collection.SingleOrDefault(action); - } - - public bool All(Func action) - { - lock (_lock) - return _collection.All(action); - } - - public bool Any(Func action) - { - lock (_lock) - return _collection.Any(action); - } - - public int Count(Func action) - { - lock (_lock) - return _collection.Count(action); - } - - public bool Contains(T item) - { - lock (_lock) - return _collection.Contains(item); - } - - public T[] ToArray(Func predicate) - { - lock (_lock) - return _collection.Where(predicate).ToArray(); - } - - public List ToList() - { - lock (_lock) - return _collection.ToList(); - } - - public IEnumerator GetEnumerator() - { - lock (_lock) - { - foreach (var item in _collection) - yield return item; - } - } - - IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); - } -} diff --git a/CryptoExchange.Net/Objects/Sockets/MatchingStrategy.cs b/CryptoExchange.Net/Objects/Sockets/MatchingStrategy.cs deleted file mode 100644 index 00c4e45..0000000 --- a/CryptoExchange.Net/Objects/Sockets/MatchingStrategy.cs +++ /dev/null @@ -1,21 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; - -namespace CryptoExchange.Net.Objects.Sockets -{ - public interface IMatchingStrategy - { - - } - - internal class IdMatchingStrategy : IMatchingStrategy - { - - } - - internal class FieldsMatchingStrategy : IMatchingStrategy - { - - } -} diff --git a/CryptoExchange.Net/Objects/Sockets/ParsedMessage.cs b/CryptoExchange.Net/Objects/Sockets/ParsedMessage.cs index ffd072d..97f802a 100644 --- a/CryptoExchange.Net/Objects/Sockets/ParsedMessage.cs +++ b/CryptoExchange.Net/Objects/Sockets/ParsedMessage.cs @@ -1,54 +1,54 @@ -namespace CryptoExchange.Net.Objects.Sockets -{ - /// - /// Parsed message object - /// - public abstract class BaseParsedMessage - { - /// - /// Stream identifier string - /// - public string StreamIdentifier { get; set; } = null!; - /// - /// Type identifier string - /// - public string TypeIdentifier { get; set; } = null!; - /// - /// Original data if the option is enabled - /// - public string? OriginalData { get; set; } - /// - /// If parsed - /// - public bool Parsed { get; set; } +//namespace CryptoExchange.Net.Objects.Sockets +//{ +// /// +// /// Parsed message object +// /// +// public abstract class BaseParsedMessage +// { +// /// +// /// Stream identifier string +// /// +// public string StreamIdentifier { get; set; } = null!; +// /// +// /// Type identifier string +// /// +// public string TypeIdentifier { get; set; } = null!; +// /// +// /// Original data if the option is enabled +// /// +// public string? OriginalData { get; set; } +// /// +// /// If parsed +// /// +// public bool Parsed { get; set; } - /// - /// Get the data object - /// - /// - public abstract object Data { get; } - } +// /// +// /// Get the data object +// /// +// /// +// public abstract object Data { get; } +// } - /// - /// Parsed message object - /// - /// Data type - public class ParsedMessage : BaseParsedMessage - { - /// - /// Parsed data object - /// - public override object? Data { get; } +// /// +// /// Parsed message object +// /// +// /// Data type +// public class ParsedMessage : BaseParsedMessage +// { +// /// +// /// Parsed data object +// /// +// public override object? Data { get; } - public T? TypedData => (T)Data; +// public T? TypedData => (T)Data; - /// - /// ctor - /// - /// - public ParsedMessage(T? data) - { - Data = data; - } - } -} +// /// +// /// ctor +// /// +// /// +// public ParsedMessage(T? data) +// { +// Data = data; +// } +// } +//} diff --git a/CryptoExchange.Net/Objects/Sockets/StreamMessageParseCallback.cs b/CryptoExchange.Net/Objects/Sockets/StreamMessageParseCallback.cs index 3e0357b..f0f1c02 100644 --- a/CryptoExchange.Net/Objects/Sockets/StreamMessageParseCallback.cs +++ b/CryptoExchange.Net/Objects/Sockets/StreamMessageParseCallback.cs @@ -1,19 +1,19 @@ -using CryptoExchange.Net.Converters; -using CryptoExchange.Net.Interfaces; -using CryptoExchange.Net.Sockets; -using Newtonsoft.Json.Linq; -using System; -using System.Collections.Generic; -using System.IO; -using System.Net.WebSockets; -using System.Text; +//using CryptoExchange.Net.Converters; +//using CryptoExchange.Net.Interfaces; +//using CryptoExchange.Net.Sockets; +//using Newtonsoft.Json.Linq; +//using System; +//using System.Collections.Generic; +//using System.IO; +//using System.Net.WebSockets; +//using System.Text; -namespace CryptoExchange.Net.Objects.Sockets -{ - public class MessageInterpreterPipeline - { - public Func? PreProcessCallback { get; set; } - public Func GetStreamIdentifier { get; set; } - public Func GetTypeIdentifier { get; set; } = (accessor, streamId) => streamId; - } -} +//namespace CryptoExchange.Net.Objects.Sockets +//{ +// public class MessageInterpreterPipeline +// { +// public Func? PreProcessCallback { get; set; } +// public Func GetStreamIdentifier { get; set; } +// public Func GetTypeIdentifier { get; set; } = (accessor, streamId) => streamId; +// } +//} diff --git a/CryptoExchange.Net/Objects/Testing/TestWebsocket.cs b/CryptoExchange.Net/Objects/Testing/TestWebsocket.cs deleted file mode 100644 index 1067434..0000000 --- a/CryptoExchange.Net/Objects/Testing/TestWebsocket.cs +++ /dev/null @@ -1,39 +0,0 @@ -using CryptoExchange.Net.Interfaces; -using CryptoExchange.Net.Objects.Sockets; -using CryptoExchange.Net.Sockets; -using Microsoft.Extensions.Logging; -using Newtonsoft.Json; -using System; -using System.Collections.Generic; -using System.IO; -using System.Text; -using System.Threading.Tasks; - -namespace CryptoExchange.Net.Objects.Testing -{ - public class TestWebsocket : CryptoExchangeWebSocketClient - { - public TestWebsocket(ILogger logger, WebSocketParameters websocketParameters) : base(logger, websocketParameters) - { - } - - public override bool IsClosed => false; - public override bool IsOpen => true; - - public override Task ConnectAsync() => Task.FromResult(true); - - public override Task CloseAsync() => Task.CompletedTask; - - public override Task ReconnectAsync() => Task.CompletedTask; - - public override void Send(int id, string data, int weight) { } - - public void Receive(string data) - { - var bytes = Encoding.UTF8.GetBytes(data); - var stream = new MemoryStream(bytes); - stream.Position = 0; - _ = ProcessData(System.Net.WebSockets.WebSocketMessageType.Text, stream); - } - } -} diff --git a/CryptoExchange.Net/Objects/Testing/TestWebsocketFactory.cs b/CryptoExchange.Net/Objects/Testing/TestWebsocketFactory.cs deleted file mode 100644 index 62767f1..0000000 --- a/CryptoExchange.Net/Objects/Testing/TestWebsocketFactory.cs +++ /dev/null @@ -1,24 +0,0 @@ -using CryptoExchange.Net.Interfaces; -using CryptoExchange.Net.Objects.Sockets; -using Microsoft.Extensions.Logging; -using System; -using System.Collections.Generic; -using System.Text; - -namespace CryptoExchange.Net.Objects.Testing -{ - public class TestWebsocketFactory : IWebsocketFactory - { - private readonly Func _websocketFactory; - - public TestWebsocketFactory(Func websocketFactory) - { - _websocketFactory = websocketFactory; - } - - public IWebsocket CreateWebsocket(ILogger logger, WebSocketParameters parameters) - { - return _websocketFactory(logger, parameters); - } - } -} diff --git a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs index 4e39cf7..325ebe1 100644 --- a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs +++ b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs @@ -278,7 +278,7 @@ namespace CryptoExchange.Net.Sockets return; var bytes = Parameters.Encoding.GetBytes(data); - _logger.Log(LogLevel.Trace, $"Socket {Id} - msg {id} - Adding {bytes.Length} bytes to send buffer"); + _logger.Log(LogLevel.Trace, $"Socket {Id} msg {id} - Adding {bytes.Length} bytes to send buffer"); _sendBuffer.Enqueue(new SendItem { Id = id, Weight = weight, Bytes = bytes }); _sendEvent.Set(); } @@ -415,7 +415,7 @@ namespace CryptoExchange.Net.Sockets if (limitResult.Success) { if (limitResult.Data > 0) - _logger.Log(LogLevel.Debug, $"Socket {Id} - msg {data.Id} - send delayed {limitResult.Data}ms because of rate limit"); + _logger.Log(LogLevel.Debug, $"Socket {Id} msg {data.Id} - send delayed {limitResult.Data}ms because of rate limit"); } } } @@ -424,7 +424,7 @@ namespace CryptoExchange.Net.Sockets { await _socket.SendAsync(new ArraySegment(data.Bytes, 0, data.Bytes.Length), WebSocketMessageType.Text, true, _ctsSource.Token).ConfigureAwait(false); await (OnRequestSent?.Invoke(data.Id) ?? Task.CompletedTask).ConfigureAwait(false); - _logger.Log(LogLevel.Trace, $"Socket {Id} - msg {data.Id} - sent {data.Bytes.Length} bytes"); + _logger.Log(LogLevel.Trace, $"Socket {Id} msg {data.Id} - sent {data.Bytes.Length} bytes"); } catch (OperationCanceledException) { @@ -447,13 +447,13 @@ namespace CryptoExchange.Net.Sockets // Because this is running in a separate task and not awaited until the socket gets closed // any exception here will crash the send processing, but do so silently unless the socket get's stopped. // Make sure we at least let the owner know there was an error - _logger.Log(LogLevel.Warning, $"Socket {Id} Send loop stopped with exception"); + _logger.Log(LogLevel.Warning, $"Socket {Id} send loop stopped with exception"); await (OnError?.Invoke(e) ?? Task.CompletedTask).ConfigureAwait(false); throw; } finally { - _logger.Log(LogLevel.Debug, $"Socket {Id} Send loop finished"); + _logger.Log(LogLevel.Debug, $"Socket {Id} send loop finished"); } } @@ -570,22 +570,30 @@ namespace CryptoExchange.Net.Sockets // Because this is running in a separate task and not awaited until the socket gets closed // any exception here will crash the receive processing, but do so silently unless the socket gets stopped. // Make sure we at least let the owner know there was an error - _logger.Log(LogLevel.Warning, $"Socket {Id} Receive loop stopped with exception"); + _logger.Log(LogLevel.Warning, $"Socket {Id} receive loop stopped with exception"); await (OnError?.Invoke(e) ?? Task.CompletedTask).ConfigureAwait(false); throw; } finally { - _logger.Log(LogLevel.Debug, $"Socket {Id} Receive loop finished"); + _logger.Log(LogLevel.Debug, $"Socket {Id} receive loop finished"); } } + /// + /// Proccess a stream message + /// + /// + /// + /// protected async Task ProcessData(WebSocketMessageType type, Stream stream) { LastActionTime = DateTime.UtcNow; stream.Position = 0; + if (Parameters.Interceptor != null) stream = Parameters.Interceptor.Invoke(stream); + if (OnStreamMessage != null) await OnStreamMessage.Invoke(type, stream).ConfigureAwait(false); } @@ -596,7 +604,7 @@ namespace CryptoExchange.Net.Sockets /// protected async Task CheckTimeoutAsync() { - _logger.Log(LogLevel.Debug, $"Socket {Id} Starting task checking for no data received for {Parameters.Timeout}"); + _logger.Log(LogLevel.Debug, $"Socket {Id} starting task checking for no data received for {Parameters.Timeout}"); LastActionTime = DateTime.UtcNow; try { @@ -607,7 +615,7 @@ namespace CryptoExchange.Net.Sockets if (DateTime.UtcNow - LastActionTime > Parameters.Timeout) { - _logger.Log(LogLevel.Warning, $"Socket {Id} No data received for {Parameters.Timeout}, reconnecting socket"); + _logger.Log(LogLevel.Warning, $"Socket {Id} no data received for {Parameters.Timeout}, reconnecting socket"); _ = ReconnectAsync().ConfigureAwait(false); return; } diff --git a/CryptoExchange.Net/Sockets/MessageParsing/Interfaces/IMessageData.cs b/CryptoExchange.Net/Sockets/MessageParsing/Interfaces/IMessageData.cs new file mode 100644 index 0000000..b4e37df --- /dev/null +++ b/CryptoExchange.Net/Sockets/MessageParsing/Interfaces/IMessageData.cs @@ -0,0 +1,39 @@ +using System; + +namespace CryptoExchange.Net.Sockets.MessageParsing.Interfaces +{ + /// + /// Message accessor + /// + public interface IMessageAccessor + { + /// + /// Is this a json message + /// + bool IsJson { get; } + /// + /// Get the type of node + /// + /// + NodeType? GetNodeType(); + /// + /// Get the type of node + /// + /// Access path + /// + NodeType? GetNodeType(MessagePath path); + /// + /// Get the value of a path + /// + /// + /// + /// + T? GetValue(MessagePath path); + /// + /// Deserialize the message into this type + /// + /// + /// + object Deserialize(Type type); + } +} diff --git a/CryptoExchange.Net/Sockets/MessageParsing/JsonNetMessageData.cs b/CryptoExchange.Net/Sockets/MessageParsing/JsonNetMessageData.cs new file mode 100644 index 0000000..e2af385 --- /dev/null +++ b/CryptoExchange.Net/Sockets/MessageParsing/JsonNetMessageData.cs @@ -0,0 +1,132 @@ +using CryptoExchange.Net.Converters; +using CryptoExchange.Net.Sockets.MessageParsing.Interfaces; +using Newtonsoft.Json; +using Newtonsoft.Json.Linq; +using System; +using System.Collections.Generic; +using System.IO; +using System.Text; + +namespace CryptoExchange.Net.Sockets.MessageParsing +{ + /// + /// Json.Net message accessor + /// + public class JsonNetMessageData : IMessageAccessor + { + private readonly JToken? _token; + private readonly Stream _stream; + private static JsonSerializer _serializer = JsonSerializer.Create(SerializerOptions.WithConverters); + + /// + public bool IsJson { get; private set; } + + /// + /// ctor + /// + /// + public JsonNetMessageData(Stream stream) + { + _stream = stream; + using var reader = new StreamReader(stream, Encoding.UTF8, false, (int)stream.Length, true); + using var jsonTextReader = new JsonTextReader(reader); + + try + { + _token = JToken.Load(jsonTextReader); + IsJson = true; + } + catch (Exception) + { + // Not a json message + IsJson = false; + } + } + + /// + public object Deserialize(Type type) + { + if (!IsJson) + { + var sr = new StreamReader(_stream); + return sr.ReadToEnd(); + } + + return _token!.ToObject(type, _serializer)!; + } + + /// + public NodeType? GetNodeType() + { + if (_token == null) + return null; + + if (_token.Type == JTokenType.Object) + return NodeType.Object; + + if (_token.Type == JTokenType.Array) + return NodeType.Array; + + return NodeType.Value; + } + + /// + public NodeType? GetNodeType(MessagePath path) + { + var node = GetPathNode(path); + if (node == null) + return null; + + if (node.Type == JTokenType.Object) + return NodeType.Object; + + if (node.Type == JTokenType.Array) + return NodeType.Array; + + return NodeType.Value; + } + + /// + public T? GetValue(MessagePath path) + { + var value = GetPathNode(path); + if (value == null) + return default; + + if (value.Type == JTokenType.Object || value.Type == JTokenType.Array) + return default; + + return value!.Value(); + } + + private JToken? GetPathNode(MessagePath path) + { + var currentToken = _token; + foreach (var node in path) + { + if (node.Type) + { + // Int value + var val = (int)node.Value; + if (currentToken!.Type != JTokenType.Array || ((JArray)currentToken).Count <= val) + return null; + + currentToken = currentToken[val]; + } + else + { + // String value + if (currentToken!.Type != JTokenType.Object) + return null; + + currentToken = currentToken[(string)node.Value]; + } + + if (currentToken == null) + return null; + } + + return currentToken; + } + } +} diff --git a/CryptoExchange.Net/Sockets/MessageParsing/MessageNode.cs b/CryptoExchange.Net/Sockets/MessageParsing/MessageNode.cs new file mode 100644 index 0000000..36e3c9a --- /dev/null +++ b/CryptoExchange.Net/Sockets/MessageParsing/MessageNode.cs @@ -0,0 +1,38 @@ +namespace CryptoExchange.Net.Sockets.MessageParsing +{ + /// + /// Node accessor + /// + public struct NodeAccessor + { + /// + /// Value + /// + public object Value { get; } + /// + /// Type (true = int, false = string) + /// + public bool Type { get; } + + private NodeAccessor(object value, bool type) + { + Value = value; + Type = type; + } + + /// + /// Create an int node accessor + /// + /// + /// + public static NodeAccessor Int(int value) { return new NodeAccessor(value, true); } + + /// + /// Create a string node accessor + /// + /// + /// + public static NodeAccessor String(string value) { return new NodeAccessor(value, false); } + } + +} diff --git a/CryptoExchange.Net/Sockets/MessageParsing/MessagePath.cs b/CryptoExchange.Net/Sockets/MessageParsing/MessagePath.cs new file mode 100644 index 0000000..51db132 --- /dev/null +++ b/CryptoExchange.Net/Sockets/MessageParsing/MessagePath.cs @@ -0,0 +1,50 @@ +using System.Collections; +using System.Collections.Generic; + +namespace CryptoExchange.Net.Sockets.MessageParsing +{ + /// + /// Message access definition + /// + public struct MessagePath : IEnumerable + { + private List _path; + + internal void Add(NodeAccessor node) + { + _path.Add(node); + } + + /// + /// ctor + /// + public MessagePath() + { + _path = new List(); + } + + /// + /// Create a new message path + /// + /// + public static MessagePath Get() + { + return new MessagePath(); + } + + /// + /// IEnumerable implementation + /// + /// + public IEnumerator GetEnumerator() + { + for (var i = 0; i < _path.Count; i++) + yield return _path[i]; + } + + IEnumerator IEnumerable.GetEnumerator() + { + return GetEnumerator(); + } + } +} diff --git a/CryptoExchange.Net/Sockets/MessageParsing/MessagePathExtension.cs b/CryptoExchange.Net/Sockets/MessageParsing/MessagePathExtension.cs new file mode 100644 index 0000000..3a93748 --- /dev/null +++ b/CryptoExchange.Net/Sockets/MessageParsing/MessagePathExtension.cs @@ -0,0 +1,32 @@ +namespace CryptoExchange.Net.Sockets.MessageParsing +{ + /// + /// Message path extension methods + /// + public static class MessagePathExtension + { + /// + /// Add a string node accessor + /// + /// + /// + /// + public static MessagePath Property(this MessagePath path, string propName) + { + path.Add(NodeAccessor.String(propName)); + return path; + } + + /// + /// Add a int node accessor + /// + /// + /// + /// + public static MessagePath Index(this MessagePath path, int index) + { + path.Add(NodeAccessor.Int(index)); + return path; + } + } +} diff --git a/CryptoExchange.Net/Sockets/MessageParsing/NodeType.cs b/CryptoExchange.Net/Sockets/MessageParsing/NodeType.cs new file mode 100644 index 0000000..309db88 --- /dev/null +++ b/CryptoExchange.Net/Sockets/MessageParsing/NodeType.cs @@ -0,0 +1,21 @@ +namespace CryptoExchange.Net.Sockets.MessageParsing +{ + /// + /// Message node type + /// + public enum NodeType + { + /// + /// Array node + /// + Array, + /// + /// Object node + /// + Object, + /// + /// Value node + /// + Value + } +} diff --git a/CryptoExchange.Net/Sockets/Query.cs b/CryptoExchange.Net/Sockets/Query.cs index daa5b54..3a5cbdf 100644 --- a/CryptoExchange.Net/Sockets/Query.cs +++ b/CryptoExchange.Net/Sockets/Query.cs @@ -11,26 +11,42 @@ namespace CryptoExchange.Net.Sockets /// /// Query /// - public abstract class BaseQuery : IMessageProcessor + public abstract class Query : IMessageProcessor { /// /// Unique identifier /// public int Id { get; } = ExchangeHelpers.NextId(); + /// + /// Has this query been completed + /// public bool Completed { get; set; } - public DateTime RequestTimestamp { get; set; } - public CallResult? Result { get; set; } - public BaseParsedMessage Response { get; set; } - public Action OnFinished { get; set; } - - protected AsyncResetEvent _event; - protected CancellationTokenSource? _cts; /// - /// Strings to identify this subscription with + /// Timestamp of when the request was send /// - public abstract List StreamIdentifiers { get; set; } + public DateTime RequestTimestamp { get; set; } + + /// + /// Result + /// + public CallResult? Result { get; set; } + + /// + /// Response + /// + public object? Response { get; set; } + + /// + /// Action to execute when query is finished + /// + public Action? OnFinished { get; set; } + + /// + /// Strings to match this query to a received message + /// + public abstract HashSet ListenerIdentifiers { get; set; } /// /// The query request object @@ -47,7 +63,22 @@ namespace CryptoExchange.Net.Sockets /// public int Weight { get; } - public abstract Dictionary TypeMapping { get; set; } + /// + /// Get the type the message should be deserialized to + /// + /// + /// + public abstract Type GetMessageType(SocketMessage message); + + /// + /// Wait event for response + /// + protected AsyncResetEvent _event; + + /// + /// Cancellation token + /// + protected CancellationTokenSource? _cts; /// /// ctor @@ -55,7 +86,7 @@ namespace CryptoExchange.Net.Sockets /// /// /// - public BaseQuery(object request, bool authenticated, int weight = 1) + public Query(object request, bool authenticated, int weight = 1) { _event = new AsyncResetEvent(false, false); @@ -97,8 +128,9 @@ namespace CryptoExchange.Net.Sockets /// Handle a response message /// /// + /// /// - public abstract Task HandleMessageAsync(SocketConnection connection, DataEvent message); + public abstract Task HandleAsync(SocketConnection connection, DataEvent message); } @@ -106,20 +138,10 @@ namespace CryptoExchange.Net.Sockets /// Query /// /// Response object type - public abstract class Query : BaseQuery + public abstract class Query : Query { - private Dictionary _typeMapping = new Dictionary - { - { "", typeof(TResponse) } - }; - public override Dictionary TypeMapping - { - get => _typeMapping; - set - { - _typeMapping = value; - } - } + /// + public override Type GetMessageType(SocketMessage message) => typeof(TResponse); /// /// The typed call result @@ -137,13 +159,11 @@ namespace CryptoExchange.Net.Sockets } /// - public override async Task HandleMessageAsync(SocketConnection connection, DataEvent message) + public override async Task HandleAsync(SocketConnection connection, DataEvent message) { Completed = true; Response = message.Data; - Result = await HandleMessageAsync(connection, message.As((ParsedMessage)message.Data)).ConfigureAwait(false); - // Set() gives calling/waiting request the signal to continue and allows the message processing thread to continue with next message. - // However, the processing of the message isn't fully finished yet? + Result = await HandleMessageAsync(connection, message.As((TResponse)message.Data)).ConfigureAwait(false); OnFinished?.Invoke(); _event.Set(); return Result; @@ -152,9 +172,10 @@ namespace CryptoExchange.Net.Sockets /// /// Handle the query response /// + /// /// /// - public virtual Task> HandleMessageAsync(SocketConnection connection, DataEvent> message) => Task.FromResult(new CallResult(message.Data.TypedData!)); + public virtual Task> HandleMessageAsync(SocketConnection connection, DataEvent message) => Task.FromResult(new CallResult(message.Data)); /// public override void Timeout() diff --git a/CryptoExchange.Net/Sockets/SocketConnection.cs b/CryptoExchange.Net/Sockets/SocketConnection.cs index 6242382..25ffa24 100644 --- a/CryptoExchange.Net/Sockets/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/SocketConnection.cs @@ -13,6 +13,7 @@ using System.Text; using System.Diagnostics.CodeAnalysis; using CryptoExchange.Net.Converters; using System.Diagnostics; +using CryptoExchange.Net.Sockets.MessageParsing; namespace CryptoExchange.Net.Sockets { @@ -49,17 +50,24 @@ namespace CryptoExchange.Net.Sockets /// /// Unhandled message event /// - public event Action? UnhandledMessage; + public event Action? UnhandledMessage; /// /// Unparsed message event /// - public event Action? UnparsedMessage; + public event Action? UnparsedMessage; // TODO not linked up /// /// The amount of subscriptions on this connection /// - public int UserSubscriptionCount => _listenerManager.GetSubscriptions().Count(h => h.UserSubscription); + public int UserSubscriptionCount + { + get + { + lock(_listenersLock) + return _listeners.OfType().Count(h => h.UserSubscription); + } + } /// /// Get a copy of the current message subscriptions @@ -68,7 +76,8 @@ namespace CryptoExchange.Net.Sockets { get { - return _listenerManager.GetSubscriptions().Where(h => h.UserSubscription).ToArray(); + lock(_listenersLock) + return _listeners.OfType().Where(h => h.UserSubscription).ToArray(); } } @@ -128,7 +137,7 @@ namespace CryptoExchange.Net.Sockets if (_pausedActivity != value) { _pausedActivity = value; - _logger.Log(LogLevel.Information, $"Socket {SocketId} Paused activity: " + value); + _logger.Log(LogLevel.Information, $"Socket {SocketId} paused activity: " + value); if(_pausedActivity) _ = Task.Run(() => ActivityPaused?.Invoke()); else _ = Task.Run(() => ActivityUnpaused?.Invoke()); } @@ -153,7 +162,8 @@ namespace CryptoExchange.Net.Sockets } private bool _pausedActivity; - private readonly SocketListenerManager _listenerManager; + private readonly object _listenersLock; + private readonly List _listeners; private readonly ILogger _logger; private SocketStatus _status; @@ -186,57 +196,67 @@ namespace CryptoExchange.Net.Sockets _socket.OnError += HandleErrorAsync; _socket.GetReconnectionUrl = GetReconnectionUrlAsync; - _listenerManager = new SocketListenerManager(_logger, SocketId); + _listenersLock = new object(); + _listeners = new List(); } /// /// Handler for a socket opening /// - protected virtual async Task HandleOpenAsync() + protected virtual Task HandleOpenAsync() { Status = SocketStatus.Connected; PausedActivity = false; + return Task.CompletedTask; } /// /// Handler for a socket closing without reconnect /// - protected virtual async Task HandleCloseAsync() + protected virtual Task HandleCloseAsync() { Status = SocketStatus.Closed; Authenticated = false; - foreach (var subscription in _listenerManager.GetSubscriptions()) - subscription.Confirmed = false; - - foreach (var query in _listenerManager.GetQueries()) + lock (_listenersLock) { - query.Fail("Connection interupted"); - _listenerManager.Remove(query); + foreach (var subscription in _listeners.OfType()) + subscription.Confirmed = false; + + foreach (var query in _listeners.OfType()) + { + query.Fail("Connection interupted"); + _listeners.Remove(query); + } } - Task.Run(() => ConnectionClosed?.Invoke()); + _ = Task.Run(() => ConnectionClosed?.Invoke()); + return Task.CompletedTask; } /// /// Handler for a socket losing conenction and starting reconnect /// - protected virtual async Task HandleReconnectingAsync() + protected virtual Task HandleReconnectingAsync() { Status = SocketStatus.Reconnecting; DisconnectTime = DateTime.UtcNow; Authenticated = false; - foreach (var subscription in _listenerManager.GetSubscriptions()) - subscription.Confirmed = false; - - foreach (var query in _listenerManager.GetQueries()) + lock (_listenersLock) { - query.Fail("Connection interupted"); - _listenerManager.Remove(query); + foreach (var subscription in _listeners.OfType()) + subscription.Confirmed = false; + + foreach (var query in _listeners.OfType()) + { + query.Fail("Connection interupted"); + _listeners.Remove(query); + } } _ = Task.Run(() => ConnectionLost?.Invoke()); + return Task.CompletedTask; } /// @@ -251,22 +271,26 @@ namespace CryptoExchange.Net.Sockets /// /// Handler for a socket which has reconnected /// - protected virtual async Task HandleReconnectedAsync() + protected virtual Task HandleReconnectedAsync() { Status = SocketStatus.Resubscribing; - foreach (var query in _listenerManager.GetQueries()) + lock (_listenersLock) { - query.Fail("Connection interupted"); - _listenerManager.Remove(query); + foreach (var query in _listeners.OfType()) + { + query.Fail("Connection interupted"); + _listeners.Remove(query); + } } + // Can't wait for this as it would cause a deadlock _ = Task.Run(async () => { var reconnectSuccessful = await ProcessReconnectAsync().ConfigureAwait(false); if (!reconnectSuccessful) { - _logger.Log(LogLevel.Warning, $"Socket {SocketId} Failed reconnect processing: {reconnectSuccessful.Error}, reconnecting again"); + _logger.Log(LogLevel.Warning, $"Socket {SocketId} failed reconnect processing: {reconnectSuccessful.Error}, reconnecting again"); _ = _socket.ReconnectAsync().ConfigureAwait(false); } else @@ -279,34 +303,44 @@ namespace CryptoExchange.Net.Sockets }); } }); + + return Task.CompletedTask; } /// /// Handler for an error on a websocket /// /// The exception - protected virtual async Task HandleErrorAsync(Exception e) + protected virtual Task HandleErrorAsync(Exception e) { if (e is WebSocketException wse) _logger.Log(LogLevel.Warning, $"Socket {SocketId} error: Websocket error code {wse.WebSocketErrorCode}, details: " + e.ToLogString()); else _logger.Log(LogLevel.Warning, $"Socket {SocketId} error: " + e.ToLogString()); + + return Task.CompletedTask; } /// /// Handler for whenever a request is sent over the websocket /// /// Id of the request sent - protected virtual async Task HandleRequestSentAsync(int requestId) + protected virtual Task HandleRequestSentAsync(int requestId) { - var query = _listenerManager.GetById(requestId); + Query query; + lock (_listenersLock) + { + query = _listeners.OfType().FirstOrDefault(x => x.Id == requestId); + } + if (query == null) { - _logger.Log(LogLevel.Debug, $"Socket {SocketId} - msg {requestId} - message sent, but not pending"); - return; + _logger.Log(LogLevel.Debug, $"Socket {SocketId} msg {requestId} - message sent, but not pending"); + return Task.CompletedTask; } query.IsSend(ApiClient.ClientOptions.RequestTimeout); + return Task.CompletedTask; } /// @@ -317,87 +351,74 @@ namespace CryptoExchange.Net.Sockets /// protected virtual async Task HandleStreamMessage(WebSocketMessageType type, Stream stream) { - var result = ReadJson(type, stream); - if (result == null) - { - // Not able to parse at all - var buffer = new byte[stream.Length]; - stream.Position = 0; - stream.Read(buffer, 0, buffer.Length); - _logger.LogDebug($"Socket {SocketId} Failed to parse data: {Encoding.UTF8.GetString(buffer)}"); + // 1. Decrypt/Preprocess if necessary + stream = ApiClient.PreprocessStreamMessage(type, stream); - UnparsedMessage?.Invoke(buffer); + // 2. Read data into accessor + var messageData = new JsonNetMessageData(stream); // TODO if we let the implementation create this we can switch per implementation + var message = new SocketMessage(DateTime.UtcNow, messageData); + if (ApiClient.ApiOptions.OutputOriginalData ?? ApiClient.ClientOptions.OutputOriginalData) + { + stream.Position = 0; + using var textReader = new StreamReader(stream, Encoding.UTF8, false, 1024, true); + message.RawData = textReader.ReadToEnd(); + + _logger.LogTrace("Socket {SocketId} received {Data}", SocketId, message.RawData); + } + + // 3. Determine the subscription interested in the messsage + var listenId = ApiClient.GetListenerIdentifier(message); + + List processors; + lock(_listenersLock) + processors = _listeners.Where(s => s.ListenerIdentifiers.Contains(listenId)).ToList(); + + if (!processors.Any()) + { + _logger.LogWarning("Socket {SocketId} received message not matched to any processor", SocketId); + UnhandledMessage?.Invoke(message); return; } - if (result.OriginalData != null) - _logger.LogDebug($"Socket {SocketId} Data received: {result.OriginalData}"); - - if (!result.Parsed) + _logger.LogTrace("Socket {SocketId} {Count} processors matched to message with listener identifier {ListenerId}", SocketId, processors.Count, listenId); + foreach (var processor in processors) { - // Not able to determine the message type for the message - _logger.LogWarning("Message not matched to type"); - return; - } + // 4. Determine the type to deserialize to + var messageType = processor.GetMessageType(message); + if (messageType == null) + { + _logger.LogWarning("Socket {SocketId} received message not recognized by handler {Id}", SocketId, processor.Id); + continue; + } - if (!await _listenerManager.InvokeListenersAsync(this, result.StreamIdentifier, result).ConfigureAwait(false)) - { - // Not able to find a listener for this message - stream.Position = 0; - var unhandledBuffer = new byte[stream.Length]; - stream.Read(unhandledBuffer, 0, unhandledBuffer.Length); - _logger.Log(LogLevel.Warning, $"Socket {SocketId} Message unidentified. Id: {result.StreamIdentifier.ToLowerInvariant()}, listening ids: [{string.Join(", ", _listenerManager.GetListenIds())}], Message: {Encoding.UTF8.GetString(unhandledBuffer)} "); - UnhandledMessage?.Invoke(result); - return; + // 5. Deserialize the message + object deserialized; + try + { + deserialized = message.Deserialize(messageType); + } + catch (Exception ex) + { + _logger.LogWarning("Socket {SocketId} failed to deserialize message to type {Type}: {Exception}", SocketId, messageType.Name, ex.ToLogString()); + continue; + } + + // 6. Hand of the message to the subscription + try + { + await processor.HandleAsync(this, new DataEvent(deserialized, null, message.RawData, message.ReceiveTime, null)).ConfigureAwait(false); + } + catch (Exception ex) + { + _logger.LogWarning("Socket {SocketId} user message processing failed: {Exception}", SocketId, ex.ToLogString()); + if (processor is Subscription subscription) + subscription.InvokeExceptionHandler(ex); + } } stream.Dispose(); } - /// - /// Read a message from stream - /// - /// - /// - /// - protected virtual BaseParsedMessage? ReadJson(WebSocketMessageType websocketMessageType, Stream stream) - { - // Start reading the data - // Once we reach the properties that identify the message we save those in a dict - // Once all id properties have been read callback to see what the deserialization type should be - // Deserialize to the correct type - - if (ApiClient.Pipeline.PreProcessCallback != null) - stream = ApiClient.Pipeline.PreProcessCallback(websocketMessageType, stream); - - var accessor = new JTokenAccessor(stream); - if (accessor == null) - return null; - - var streamIdentity = ApiClient.Pipeline.GetStreamIdentifier(accessor); - if (streamIdentity == null) - return null; - - var typeIdentity = ApiClient.Pipeline.GetTypeIdentifier(accessor, streamIdentity); - var typeResult = _listenerManager.IdToType(streamIdentity, typeIdentity); - if (typeResult == null) - return null; - - var idInstance = accessor.Instantiate(typeResult); - if (ApiClient.ApiOptions.OutputOriginalData ?? ApiClient.ClientOptions.OutputOriginalData) - { - var buffer2 = new byte[stream.Length]; - stream.Position = 0; - stream.Read(buffer2, 0, buffer2.Length); - idInstance.OriginalData = Encoding.UTF8.GetString(buffer2); - } - - idInstance.StreamIdentifier = streamIdentity; - idInstance.TypeIdentifier = typeIdentity; - idInstance.Parsed = true; - return idInstance; - } - /// /// Connect the websocket /// @@ -428,10 +449,13 @@ namespace CryptoExchange.Net.Sockets if (ApiClient.socketConnections.ContainsKey(SocketId)) ApiClient.socketConnections.TryRemove(SocketId, out _); - foreach (var subscription in _listenerManager.GetSubscriptions()) + lock (_listenersLock) { - if (subscription.CancellationTokenRegistration.HasValue) - subscription.CancellationTokenRegistration.Value.Dispose(); + foreach (var subscription in _listeners.OfType()) + { + if (subscription.CancellationTokenRegistration.HasValue) + subscription.CancellationTokenRegistration.Value.Dispose(); + } } await _socket.CloseAsync().ConfigureAwait(false); @@ -446,8 +470,11 @@ namespace CryptoExchange.Net.Sockets /// public async Task CloseAsync(Subscription subscription, bool unsubEvenIfNotConfirmed = false) { - if (!_listenerManager.Contains(subscription)) - return; + lock (_listenersLock) + { + if (!_listeners.Contains(subscription)) + return; + } subscription.Closed = true; @@ -467,9 +494,13 @@ namespace CryptoExchange.Net.Sockets return; } - var shouldCloseConnection = _listenerManager.GetSubscriptions().All(r => !r.UserSubscription || r.Closed); - if (shouldCloseConnection) - Status = SocketStatus.Closing; + bool shouldCloseConnection; + lock (_listenersLock) + { + shouldCloseConnection = _listeners.OfType().All(r => !r.UserSubscription || r.Closed); + if (shouldCloseConnection) + Status = SocketStatus.Closing; + } if (shouldCloseConnection) { @@ -477,7 +508,8 @@ namespace CryptoExchange.Net.Sockets await CloseAsync().ConfigureAwait(false); } - _listenerManager.Remove(subscription); + lock (_listenersLock) + _listeners.Remove(subscription); } /// @@ -504,7 +536,8 @@ namespace CryptoExchange.Net.Sockets if (Status != SocketStatus.None && Status != SocketStatus.Connected) return false; - _listenerManager.Add(subscription); + lock (_listenersLock) + _listeners.Add(subscription); if (subscription.UserSubscription) _logger.Log(LogLevel.Debug, $"Socket {SocketId} adding new subscription with id {subscription.Id}, total subscriptions on connection: {UserSubscriptionCount}"); @@ -515,21 +548,29 @@ namespace CryptoExchange.Net.Sockets /// Get a subscription on this connection by id /// /// - public Subscription? GetSubscription(int id) => _listenerManager.GetSubscriptions().SingleOrDefault(s => s.Id == id); + public Subscription? GetSubscription(int id) + { + lock (_listenersLock) + return _listeners.OfType().SingleOrDefault(s => s.Id == id); + } /// /// Get a subscription on this connection by its subscribe request /// /// Filter for a request /// - public Subscription? GetSubscriptionByRequest(Func predicate) => _listenerManager.GetSubscriptions().SingleOrDefault(s => predicate(s)); - + public Subscription? GetSubscriptionByRequest(Func predicate) + { + lock (_listenersLock) + return _listeners.OfType().SingleOrDefault(s => predicate(s)); + } /// /// Send a query request and wait for an answer /// /// Query to send + /// Action to run when query finishes /// - public virtual async Task SendAndWaitQueryAsync(BaseQuery query, Action? onFinished = null) + public virtual async Task SendAndWaitQueryAsync(Query query, Action? onFinished = null) { await SendAndWaitIntAsync(query, onFinished).ConfigureAwait(false); return query.Result ?? new CallResult(new ServerError("Timeout")); @@ -540,6 +581,7 @@ namespace CryptoExchange.Net.Sockets /// /// Query response type /// Query to send + /// Action to run when query finishes /// public virtual async Task> SendAndWaitQueryAsync(Query query, Action? onFinished = null) { @@ -547,9 +589,11 @@ namespace CryptoExchange.Net.Sockets return query.TypedResult ?? new CallResult(new ServerError("Timeout")); } - private async Task SendAndWaitIntAsync(BaseQuery query, Action onFinished) + private async Task SendAndWaitIntAsync(Query query, Action? onFinished) { - _listenerManager.Add(query); + lock(_listenersLock) + _listeners.Add(query); + var sendOk = Send(query.Id, query.Request, query.Weight); if (!sendOk) { @@ -600,7 +644,7 @@ namespace CryptoExchange.Net.Sockets /// The id of the request public virtual bool Send(int requestId, string data, int weight) { - _logger.Log(LogLevel.Trace, $"Socket {SocketId} - msg {requestId} - sending messsage: {data}"); + _logger.Log(LogLevel.Trace, $"Socket {SocketId} msg {requestId} - sending messsage: {data}"); try { _socket.Send(requestId, data, weight); @@ -617,16 +661,20 @@ namespace CryptoExchange.Net.Sockets if (!_socket.IsOpen) return new CallResult(new WebError("Socket not connected")); - var anySubscriptions = _listenerManager.GetSubscriptions().Any(s => s.UserSubscription); + bool anySubscriptions; + lock (_listenersLock) + anySubscriptions = _listeners.OfType().Any(s => s.UserSubscription); if (!anySubscriptions) { // No need to resubscribe anything - _logger.Log(LogLevel.Debug, $"Socket {SocketId} Nothing to resubscribe, closing connection"); + _logger.Log(LogLevel.Debug, $"Socket {SocketId} nothing to resubscribe, closing connection"); _ = _socket.CloseAsync(); return new CallResult(true); } - var anyAuthenticated = _listenerManager.GetSubscriptions().Any(s => s.Authenticated); + bool anyAuthenticated; + lock (_listenersLock) + anyAuthenticated = _listeners.OfType().Any(s => s.Authenticated); if (anyAuthenticated) { // If we reconnected a authenticated connection we need to re-authenticate @@ -642,7 +690,9 @@ namespace CryptoExchange.Net.Sockets } // Get a list of all subscriptions on the socket - var subList = _listenerManager.GetSubscriptions(); + List subList; + lock (_listenersLock) + subList = _listeners.OfType().ToList(); foreach(var subscription in subList) { @@ -650,7 +700,7 @@ namespace CryptoExchange.Net.Sockets var result = await ApiClient.RevitalizeRequestAsync(subscription).ConfigureAwait(false); if (!result) { - _logger.Log(LogLevel.Warning, $"Socket {SocketId} Failed request revitalization: " + result.Error); + _logger.Log(LogLevel.Warning, $"Socket {SocketId} failed request revitalization: " + result.Error); return result.As(false); } } @@ -670,8 +720,7 @@ namespace CryptoExchange.Net.Sockets taskList.Add(SendAndWaitQueryAsync(subQuery, () => { - subscription.HandleSubQueryResponse(subQuery.Response); - _listenerManager.Reset(subscription); + subscription.HandleSubQueryResponse(subQuery.Response!); })); } @@ -710,7 +759,7 @@ namespace CryptoExchange.Net.Sockets return new CallResult(null); var result = await SendAndWaitQueryAsync(subQuery).ConfigureAwait(false); - subscription.HandleSubQueryResponse(subQuery.Response); + subscription.HandleSubQueryResponse(subQuery.Response!); return result; } diff --git a/CryptoExchange.Net/Sockets/SocketListenerManager.cs b/CryptoExchange.Net/Sockets/SocketListenerManager.cs deleted file mode 100644 index d1ffb2b..0000000 --- a/CryptoExchange.Net/Sockets/SocketListenerManager.cs +++ /dev/null @@ -1,170 +0,0 @@ -using CryptoExchange.Net.Interfaces; -using CryptoExchange.Net.Objects.Sockets; -using Microsoft.Extensions.Logging; -using System; -using System.Collections.Generic; -using System.Diagnostics; -using System.Linq; -using System.Threading.Tasks; - -namespace CryptoExchange.Net.Sockets -{ - public class SocketListenerManager - { - private ILogger _logger; - private int _socketId; - private object _lock = new object(); - //private Dictionary _idMap; - //private Dictionary> _typeMap; - private Dictionary> _listeners; - - public SocketListenerManager(ILogger logger, int socketId) - { - //_idMap = new Dictionary(); - _listeners = new Dictionary>(); - //_typeMap = new Dictionary>(); - _logger = logger; - _socketId = socketId; - } - - public Type? IdToType(string streamIdentifier, string typeIdentifier) - { - lock (_lock) - { - _listeners.TryGetValue(streamIdentifier, out var listeners); - if (listeners == null) - return null; - - var result = listeners.SelectMany(l => l.TypeMapping).FirstOrDefault(x => x.Key == (typeIdentifier ?? "")); - return result.Value; - } - } - - public List GetListenIds() - { - lock(_lock) - return _listeners.Keys.ToList(); - } - - public void Add(IMessageProcessor processor) - { - lock (_lock) - { - if (processor.StreamIdentifiers?.Any() == true) - { - foreach (var identifier in processor.StreamIdentifiers) - { - if (!_listeners.TryGetValue(identifier, out var list)) - { - list = new List(); - _listeners.Add(identifier, list); - } - - list.Add(processor); - } - } - - } - } - - public void Reset(IMessageProcessor processor) - { - lock (_lock) - { - //Debug.WriteLine("4 Resetting"); - Remove(processor); - Add(processor); - } - } - - public async Task InvokeListenersAsync(SocketConnection connection, string id, BaseParsedMessage data) - { - List listeners; - lock (_lock) - { - if (!_listeners.TryGetValue(id, out var idListeners)) - return false; - - listeners = idListeners.Where(i => data.TypeIdentifier == null || i.TypeMapping.ContainsKey(data.TypeIdentifier)).ToList(); - } - - foreach (var listener in listeners) - { - _logger.Log(LogLevel.Trace, $"Socket {_socketId} Message mapped to processor {listener.Id} with identifier {data.StreamIdentifier}"); - if (listener is BaseQuery query) - { - Remove(listener); - if (query?.Completed == true) - { - // Answer to a timed out request - _logger.Log(LogLevel.Warning, $"Socket {_socketId} Received after request timeout. Consider increasing the RequestTimeout"); - } - } - - // Matched based on identifier - var userSw = Stopwatch.StartNew(); - var dataEvent = new DataEvent(data, null, data.OriginalData, DateTime.UtcNow, null); - try - { - await listener.HandleMessageAsync(connection, dataEvent).ConfigureAwait(false); - } - catch (Exception ex) - { - // TODO - } - - userSw.Stop(); - if (userSw.ElapsedMilliseconds > 500) - { - _logger.Log(LogLevel.Debug, $"Socket {_socketId} {(listener is Subscription ? "subscription " : "query " + listener!.Id)} message processing slow ({(int)userSw.ElapsedMilliseconds}ms), consider offloading data handling to another thread. " + - "Data from this socket may arrive late or not at all if message processing is continuously slow."); - } - } - - return true; - } - - public T? GetById(int id) where T : BaseQuery - { - lock (_lock) - { - var val = _listeners.Values.SelectMany(x => x).FirstOrDefault(x => x.Id == id); - return (T)val; - } - } - - public List GetSubscriptions() - { - lock (_lock) - return _listeners.Values.SelectMany(v => v.OfType()).Distinct().ToList(); - } - - public List GetQueries() - { - lock (_lock) - return _listeners.Values.SelectMany(v => v.OfType()).ToList(); - } - - public bool Contains(IMessageProcessor processor) - { - lock (_lock) - return _listeners.Any(l => l.Value.Contains(processor)); - } - - public void Remove(IMessageProcessor processor) - { - lock (_lock) - { - if (processor.StreamIdentifiers?.Any() != true) - return; - - foreach(var kv in _listeners) - { - if (kv.Value.Contains(processor)) - kv.Value.Remove(processor); - } - } - } - - } -} diff --git a/CryptoExchange.Net/Sockets/SocketMessage.cs b/CryptoExchange.Net/Sockets/SocketMessage.cs new file mode 100644 index 0000000..75fc7a6 --- /dev/null +++ b/CryptoExchange.Net/Sockets/SocketMessage.cs @@ -0,0 +1,45 @@ +using CryptoExchange.Net.Sockets.MessageParsing.Interfaces; +using System; + +namespace CryptoExchange.Net.Sockets +{ + /// + /// Message received from the websocket + /// + public class SocketMessage + { + /// + /// Message receive time + /// + public DateTime ReceiveTime { get; set; } + /// + /// The message data + /// + public IMessageAccessor Message { get; set; } + /// + /// Raw string data + /// + public string? RawData { get; set; } + + /// + /// ctor + /// + /// + /// + public SocketMessage(DateTime receiveTime, IMessageAccessor message) + { + ReceiveTime = receiveTime; + Message = message; + } + + /// + /// Deserialize the message to a type + /// + /// + /// + public object Deserialize(Type type) + { + return Message.Deserialize(type); + } + } +} diff --git a/CryptoExchange.Net/Sockets/Subscription.cs b/CryptoExchange.Net/Sockets/Subscription.cs index e2185c5..c1b98bb 100644 --- a/CryptoExchange.Net/Sockets/Subscription.cs +++ b/CryptoExchange.Net/Sockets/Subscription.cs @@ -55,9 +55,9 @@ namespace CryptoExchange.Net.Sockets public bool Authenticated { get; } /// - /// Strings to identify this subscription with + /// Strings to match this subscription to a received message /// - public abstract List StreamIdentifiers { get; set; } + public abstract HashSet ListenerIdentifiers { get; set; } /// /// Cancellation token registration @@ -69,7 +69,12 @@ namespace CryptoExchange.Net.Sockets /// public event Action? Exception; - public abstract Dictionary TypeMapping { get; } + /// + /// Get the deserialization type for this message + /// + /// + /// + public abstract Type? GetMessageType(SocketMessage message); /// /// ctor @@ -86,21 +91,36 @@ namespace CryptoExchange.Net.Sockets } /// - /// Get the subscribe object to send when subscribing + /// Get the subscribe query to send when subscribing /// /// - public abstract BaseQuery? GetSubQuery(SocketConnection connection); - - public virtual void HandleSubQueryResponse(BaseParsedMessage message) { } - public virtual void HandleUnsubQueryResponse(BaseParsedMessage message) { } + public abstract Query? GetSubQuery(SocketConnection connection); /// - /// Get the unsubscribe object to send when unsubscribing + /// Handle a subscription query response + /// + /// + public virtual void HandleSubQueryResponse(object message) { } + + /// + /// Handle an unsubscription query response + /// + /// + public virtual void HandleUnsubQueryResponse(object message) { } + + /// + /// Get the unsubscribe query to send when unsubscribing /// /// - public abstract BaseQuery? GetUnsubQuery(); + public abstract Query? GetUnsubQuery(); - public async Task HandleMessageAsync(SocketConnection connection, DataEvent message) + /// + /// Handle an update message + /// + /// + /// + /// + public async Task HandleAsync(SocketConnection connection, DataEvent message) { ConnectionInvocations++; TotalInvocations++; @@ -110,9 +130,10 @@ namespace CryptoExchange.Net.Sockets /// /// Handle the update message /// + /// /// /// - public abstract Task DoHandleMessageAsync(SocketConnection connection, DataEvent message); + public abstract Task DoHandleMessageAsync(SocketConnection connection, DataEvent message); /// /// Invoke the exception event @@ -124,24 +145,9 @@ namespace CryptoExchange.Net.Sockets } } - /// - public abstract class Subscription : Subscription - { - /// - /// ctor - /// - /// - /// - protected Subscription(ILogger logger, bool authenticated) : base(logger, authenticated) - { - } - } - /// public abstract class Subscription : Subscription { - //public override Func ExpectedTypeDelegate => (x) => typeof(TEvent); - /// /// ctor /// @@ -152,18 +158,24 @@ namespace CryptoExchange.Net.Sockets } /// - //public override Task DoHandleMessageAsync(SocketConnection connection, DataEvent message) - // => HandleEventAsync(connection, message.As((ParsedMessage)message.Data)); + public override void HandleSubQueryResponse(object message) + => HandleSubQueryResponse((TSubResponse)message); - public override void HandleSubQueryResponse(BaseParsedMessage message) - => HandleSubQueryResponse((ParsedMessage)message); + /// + /// Handle a subscription query response + /// + /// + public virtual void HandleSubQueryResponse(TSubResponse message) { } - public virtual void HandleSubQueryResponse(ParsedMessage message) { } + /// + public override void HandleUnsubQueryResponse(object message) + => HandleUnsubQueryResponse((TUnsubResponse)message); - public override void HandleUnsubQueryResponse(BaseParsedMessage message) - => HandleUnsubQueryResponse((ParsedMessage)message); - - public virtual void HandleUnsubQueryResponse(ParsedMessage message) { } + /// + /// Handle an unsubscription query response + /// + /// + public virtual void HandleUnsubQueryResponse(TUnsubResponse message) { } } } diff --git a/CryptoExchange.Net/Sockets/SystemSubscription.cs b/CryptoExchange.Net/Sockets/SystemSubscription.cs index c409fed..f4e680b 100644 --- a/CryptoExchange.Net/Sockets/SystemSubscription.cs +++ b/CryptoExchange.Net/Sockets/SystemSubscription.cs @@ -22,26 +22,37 @@ namespace CryptoExchange.Net.Sockets } /// - public override BaseQuery? GetSubQuery(SocketConnection connection) => null; + public override Query? GetSubQuery(SocketConnection connection) => null; /// - public override BaseQuery? GetUnsubQuery() => null; + public override Query? GetUnsubQuery() => null; } + /// public abstract class SystemSubscription : SystemSubscription { - public override Dictionary TypeMapping => new Dictionary - { - { "", typeof(T) } - }; + /// + public override Type GetMessageType(SocketMessage message) => typeof(T); - public override Task DoHandleMessageAsync(SocketConnection connection, DataEvent message) - => HandleMessageAsync(connection, message.As((ParsedMessage)message.Data)); + /// + public override Task DoHandleMessageAsync(SocketConnection connection, DataEvent message) + => HandleMessageAsync(connection, message.As((T)message.Data)); + /// + /// ctor + /// + /// + /// protected SystemSubscription(ILogger logger, bool authenticated) : base(logger, authenticated) { } - public abstract Task HandleMessageAsync(SocketConnection connection, DataEvent> message); + /// + /// Handle an update message + /// + /// + /// + /// + public abstract Task HandleMessageAsync(SocketConnection connection, DataEvent message); } }