1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-06-09 00:46:19 +00:00
This commit is contained in:
JKorf 2023-10-29 17:43:18 +01:00
parent bf854c92af
commit bee2e86c2f
10 changed files with 236 additions and 223 deletions

View File

@ -109,6 +109,9 @@ namespace CryptoExchange.Net
/// <inheritdoc /> /// <inheritdoc />
public new SocketApiOptions ApiOptions => (SocketApiOptions)base.ApiOptions; public new SocketApiOptions ApiOptions => (SocketApiOptions)base.ApiOptions;
/// <inheritdoc />
public abstract SocketConverter StreamConverter { get; }
#endregion #endregion
/// <summary> /// <summary>
@ -457,8 +460,6 @@ namespace CryptoExchange.Net
return messageListener; return messageListener;
} }
protected internal abstract SocketConverter GetConverter();
/// <summary> /// <summary>
/// Adds a system subscription. Used for example to reply to ping requests /// Adds a system subscription. Used for example to reply to ping requests
/// </summary> /// </summary>
@ -552,7 +553,7 @@ namespace CryptoExchange.Net
/// Process an unhandled message /// Process an unhandled message
/// </summary> /// </summary>
/// <param name="message">The message that wasn't processed</param> /// <param name="message">The message that wasn't processed</param>
protected virtual void HandleUnhandledMessage(StreamMessage message) protected virtual void HandleUnhandledMessage(ParsedMessage message)
{ {
} }

View File

@ -3,35 +3,31 @@ using Newtonsoft.Json;
using Newtonsoft.Json.Linq; using Newtonsoft.Json.Linq;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.IO;
using System.Text; using System.Text;
namespace CryptoExchange.Net.Converters namespace CryptoExchange.Net.Converters
{ {
public class SocketConverter : JsonConverter public abstract class SocketConverter
{ {
private readonly List<string> _idFields; public abstract string[] IdFields { get; }
private readonly Func<Dictionary<string, string>, Type> _typeIdentifier;
public SocketConverter(List<string> idFields, Func<Dictionary<string, string>, Type> typeIdentifier) public abstract Type? GetDeserializationType(Dictionary<string, string> idValues, List<MessageListener> listeners);
{ public abstract List<MessageListener> MatchToListener(ParsedMessage message, List<MessageListener> listeners);
_idFields = idFields;
_typeIdentifier = typeIdentifier;
}
/// <inheritdoc /> /// <inheritdoc />
public override bool CanConvert(Type objectType) => true; public object? ReadJson(Stream stream, List<MessageListener> listeners)
/// <inheritdoc />
public override object? ReadJson(JsonReader reader, Type objectType, object? existingValue, JsonSerializer serializer)
{ {
// Start reading the data // Start reading the data
// Once we reach the property that identifies the message we save those in a string array // Once we reach the properties that identify the message we save those in a dict
// Once all id properties have been read callback to see what the deserialization type should be // Once all id properties have been read callback to see what the deserialization type should be
// Deserialize to the correct type // Deserialize to the correct type
using var sr = new StreamReader(stream, Encoding.UTF8, false, (int)stream.Length, true);
using var jsonTextReader = new JsonTextReader(sr);
var token = JToken.Load(reader); var token = JToken.Load(jsonTextReader);
var dict = new Dictionary<string, string>(); var dict = new Dictionary<string, string>();
foreach(var idField in _idFields) foreach(var idField in IdFields)
{ {
var splitTokens = idField.Split(new char[] { ':' }); var splitTokens = idField.Split(new char[] { ':' });
var accessToken = token; var accessToken = token;
@ -42,7 +38,7 @@ namespace CryptoExchange.Net.Converters
dict[idField] = accessToken?.ToString(); dict[idField] = accessToken?.ToString();
} }
var resultType = _typeIdentifier(dict); var resultType = GetDeserializationType(dict, listeners);
string idString = ""; string idString = "";
foreach(var item in dict) foreach(var item in dict)
idString += item.Value; idString += item.Value;
@ -53,20 +49,12 @@ namespace CryptoExchange.Net.Converters
Data = resultType == null ? null : token.ToObject(resultType) Data = resultType == null ? null : token.ToObject(resultType)
}; };
} }
/// <inheritdoc />
public override bool CanWrite { get { return false; } }
/// <inheritdoc />
public override void WriteJson(JsonWriter writer, object? value, JsonSerializer serializer)
{
}
} }
public class ParsedMessage public class ParsedMessage
{ {
public string Identifier { get; set; } public string Identifier { get; set; } = null!;
public object Data { get; set; } public object? Data { get; set; }
} }
} }

View File

@ -1,4 +1,5 @@
using CryptoExchange.Net.Objects.Sockets; using CryptoExchange.Net.Converters;
using CryptoExchange.Net.Objects.Sockets;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace CryptoExchange.Net.Interfaces namespace CryptoExchange.Net.Interfaces
@ -6,7 +7,7 @@ namespace CryptoExchange.Net.Interfaces
internal interface IStreamMessageListener internal interface IStreamMessageListener
{ {
int Priority { get; } int Priority { get; }
bool MessageMatches(StreamMessage message); bool MessageMatches(ParsedMessage message);
Task ProcessAsync(StreamMessage message); Task ProcessAsync(ParsedMessage message);
} }
} }

View File

@ -1,4 +1,5 @@
using CryptoExchange.Net.Interfaces; using CryptoExchange.Net.Converters;
using CryptoExchange.Net.Interfaces;
using System; using System;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
@ -8,7 +9,7 @@ namespace CryptoExchange.Net.Objects.Sockets
internal class PendingRequest : IStreamMessageListener internal class PendingRequest : IStreamMessageListener
{ {
public int Id { get; set; } public int Id { get; set; }
public Func<StreamMessage, bool> MessageMatchesHandler { get; } public Func<ParsedMessage, bool> MessageMatchesHandler { get; }
public bool Completed { get; private set; } public bool Completed { get; private set; }
public AsyncResetEvent Event { get; } public AsyncResetEvent Event { get; }
public DateTime RequestTimestamp { get; set; } public DateTime RequestTimestamp { get; set; }
@ -19,7 +20,7 @@ namespace CryptoExchange.Net.Objects.Sockets
public int Priority => 100; public int Priority => 100;
public PendingRequest(int id, Func<StreamMessage, bool> messageMatchesHandler, TimeSpan timeout, MessageListener? subscription) public PendingRequest(int id, Func<ParsedMessage, bool> messageMatchesHandler, TimeSpan timeout, MessageListener? subscription)
{ {
Id = id; Id = id;
MessageMatchesHandler = messageMatchesHandler; MessageMatchesHandler = messageMatchesHandler;
@ -42,12 +43,12 @@ namespace CryptoExchange.Net.Objects.Sockets
Event.Set(); Event.Set();
} }
public bool MessageMatches(StreamMessage message) public bool MessageMatches(ParsedMessage message)
{ {
return MessageMatchesHandler(message); return MessageMatchesHandler(message);
} }
public Task ProcessAsync(StreamMessage message) public Task ProcessAsync(ParsedMessage message)
{ {
Completed = true; Completed = true;
Event.Set(); Event.Set();

View File

@ -1,4 +1,5 @@
using CryptoExchange.Net.Interfaces; using CryptoExchange.Net.Converters;
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Sockets; using CryptoExchange.Net.Sockets;
using System; using System;
using System.Threading; using System.Threading;
@ -84,13 +85,18 @@ namespace CryptoExchange.Net.Objects.Sockets
/// </summary> /// </summary>
/// <param name="message"></param> /// <param name="message"></param>
/// <returns></returns> /// <returns></returns>
public bool MessageMatches(StreamMessage message) => Subscription.MessageMatchesEvent(message); public bool MessageMatches(ParsedMessage message) => Subscription.MessageMatchesEvent(message);
/// <summary> /// <summary>
/// Process the message /// Process the message
/// </summary> /// </summary>
/// <param name="message"></param> /// <param name="message"></param>
/// <returns></returns> /// <returns></returns>
public Task ProcessAsync(StreamMessage message) => Subscription.HandleEventAsync(message); public Task ProcessAsync(ParsedMessage message)
{
// TODO
var dataEvent = new DataEvent<ParsedMessage>(message, null, null, DateTime.UtcNow, null);
return Subscription.HandleEventAsync(dataEvent);
}
} }
} }

View File

@ -1,70 +1,70 @@
using System; //using System;
using System.Collections.Generic; //using System.Collections.Generic;
using System.Data.Common; //using System.Data.Common;
using System.IO; //using System.IO;
using System.Text; //using System.Text;
using System.Threading.Tasks; //using System.Threading.Tasks;
using CryptoExchange.Net.Sockets; //using CryptoExchange.Net.Sockets;
namespace CryptoExchange.Net.Objects.Sockets //namespace CryptoExchange.Net.Objects.Sockets
{ //{
/// <summary> // /// <summary>
/// A message received from a stream // /// A message received from a stream
/// </summary> // /// </summary>
public class StreamMessage : IDisposable // public class StreamMessage : IDisposable
{ // {
/// <summary> // /// <summary>
/// The connection it was received on // /// The connection it was received on
/// </summary> // /// </summary>
public SocketConnection Connection { get; } // public SocketConnection Connection { get; }
/// <summary> // /// <summary>
/// The data stream // /// The data stream
/// </summary> // /// </summary>
public Stream Stream { get; } // public Stream Stream { get; }
/// <summary> // /// <summary>
/// Receive timestamp // /// Receive timestamp
/// </summary> // /// </summary>
public DateTime Timestamp { get; set; } // public DateTime Timestamp { get; set; }
private Dictionary<Type, object> _casted; // private Dictionary<Type, object> _casted;
/// <summary> // /// <summary>
/// Get the data from the memory in specified type using the converter. If this type has been resolved before it will use that instead // /// Get the data from the memory in specified type using the converter. If this type has been resolved before it will use that instead
/// </summary> // /// </summary>
/// <typeparam name="T"></typeparam> // /// <typeparam name="T"></typeparam>
/// <param name="converter"></param> // /// <param name="converter"></param>
/// <returns></returns> // /// <returns></returns>
public T Get<T>(Func<Stream, T> converter) // public T Get<T>(Func<Stream, T> converter)
{ // {
if (_casted.TryGetValue(typeof(T), out var casted)) // if (_casted.TryGetValue(typeof(T), out var casted))
return (T)casted; // return (T)casted;
var result = converter(Stream); // var result = converter(Stream);
_casted.Add(typeof(T), result!); // _casted.Add(typeof(T), result!);
Stream.Position = 0; // Stream.Position = 0;
return result; // return result;
} // }
/// <summary> // /// <summary>
/// Dispose // /// Dispose
/// </summary> // /// </summary>
public void Dispose() // public void Dispose()
{ // {
Stream.Dispose(); // Stream.Dispose();
} // }
/// <summary> // /// <summary>
/// ctor // /// ctor
/// </summary> // /// </summary>
/// <param name="connection"></param> // /// <param name="connection"></param>
/// <param name="stream"></param> // /// <param name="stream"></param>
/// <param name="timestamp"></param> // /// <param name="timestamp"></param>
public StreamMessage(SocketConnection connection, Stream stream, DateTime timestamp) // public StreamMessage(SocketConnection connection, Stream stream, DateTime timestamp)
{ // {
Connection = connection; // Connection = connection;
Stream = stream; // Stream = stream;
Timestamp = timestamp; // Timestamp = timestamp;
_casted = new Dictionary<Type, object>(); // _casted = new Dictionary<Type, object>();
} // }
} // }
} //}

View File

@ -1,4 +1,5 @@
using CryptoExchange.Net.Objects; using CryptoExchange.Net.Converters;
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Sockets; using CryptoExchange.Net.Objects.Sockets;
namespace CryptoExchange.Net.Sockets namespace CryptoExchange.Net.Sockets
@ -28,13 +29,13 @@ namespace CryptoExchange.Net.Sockets
/// </summary> /// </summary>
/// <param name="message"></param> /// <param name="message"></param>
/// <returns></returns> /// <returns></returns>
public abstract bool MessageMatchesQuery(StreamMessage message); public abstract bool MessageMatchesQuery(ParsedMessage message);
/// <summary> /// <summary>
/// Handle the query response /// Handle the query response
/// </summary> /// </summary>
/// <param name="message"></param> /// <param name="message"></param>
/// <returns></returns> /// <returns></returns>
public abstract CallResult HandleResponse(StreamMessage message); public abstract CallResult HandleResponse(ParsedMessage message);
/// <summary> /// <summary>
/// ctor /// ctor

View File

@ -49,7 +49,7 @@ namespace CryptoExchange.Net.Sockets
/// <summary> /// <summary>
/// Unhandled message event /// Unhandled message event
/// </summary> /// </summary>
public event Action<StreamMessage>? UnhandledMessage; public event Action<ParsedMessage>? UnhandledMessage;
/// <summary> /// <summary>
/// The amount of listeners on this connection /// The amount of listeners on this connection
@ -57,7 +57,7 @@ namespace CryptoExchange.Net.Sockets
public int UserListenerCount public int UserListenerCount
{ {
get { lock (_listenerLock) get { lock (_listenerLock)
return _listeners.Count(h => h.UserListener); } return _messageIdentifierListeners.Count(h => h.UserListener); }
} }
/// <summary> /// <summary>
@ -153,10 +153,11 @@ namespace CryptoExchange.Net.Sockets
} }
private bool _pausedActivity; private bool _pausedActivity;
private readonly List<MessageListener> _listeners; //private readonly List<MessageListener> _listeners;
private readonly List<IStreamMessageListener> _messageListeners; // ? //private readonly List<IStreamMessageListener> _messageListeners; // ?
private readonly Dictionary<string, IStreamMessageListener> _messageIdentifierListeners; private readonly List<PendingRequest> _pendingRequests;
private readonly Dictionary<string, MessageListener> _messageIdentifierListeners;
private readonly object _listenerLock = new(); private readonly object _listenerLock = new();
private readonly ILogger _logger; private readonly ILogger _logger;
@ -167,8 +168,6 @@ namespace CryptoExchange.Net.Sockets
/// The underlying websocket /// The underlying websocket
/// </summary> /// </summary>
private readonly IWebsocket _socket; private readonly IWebsocket _socket;
private readonly JsonSerializerSettings _options;
private readonly JsonSerializer _serializer;
/// <summary> /// <summary>
/// New socket connection /// New socket connection
@ -184,9 +183,8 @@ namespace CryptoExchange.Net.Sockets
Tag = tag; Tag = tag;
Properties = new Dictionary<string, object>(); Properties = new Dictionary<string, object>();
_messageListeners = new List<IStreamMessageListener>(); _pendingRequests = new List<PendingRequest>();
_messageIdentifierListeners = new Dictionary<string, IStreamMessageListener>(); _messageIdentifierListeners = new Dictionary<string, IStreamMessageListener>();
_listeners = new List<MessageListener>();
_socket = socket; _socket = socket;
_socket.OnStreamMessage += HandleStreamMessage; _socket.OnStreamMessage += HandleStreamMessage;
@ -197,11 +195,6 @@ namespace CryptoExchange.Net.Sockets
_socket.OnReconnected += HandleReconnected; _socket.OnReconnected += HandleReconnected;
_socket.OnError += HandleError; _socket.OnError += HandleError;
_socket.GetReconnectionUrl = GetReconnectionUrlAsync; _socket.GetReconnectionUrl = GetReconnectionUrlAsync;
var converter = ApiClient.GetConverter();
_options = SerializerOptions.Default;
_options.Converters.Add(converter);
_serializer = JsonSerializer.Create(_options);
} }
/// <summary> /// <summary>
@ -222,7 +215,7 @@ namespace CryptoExchange.Net.Sockets
Authenticated = false; Authenticated = false;
lock(_listenerLock) lock(_listenerLock)
{ {
foreach (var listener in _listeners) foreach (var listener in _messageIdentifierListeners.Values)
listener.Confirmed = false; listener.Confirmed = false;
} }
Task.Run(() => ConnectionClosed?.Invoke()); Task.Run(() => ConnectionClosed?.Invoke());
@ -325,81 +318,101 @@ namespace CryptoExchange.Net.Sockets
protected virtual async Task HandleStreamMessage(Stream stream) protected virtual async Task HandleStreamMessage(Stream stream)
{ {
var timestamp = DateTime.UtcNow; var timestamp = DateTime.UtcNow;
var streamMessage = new StreamMessage(this, stream, timestamp); //var streamMessage = new StreamMessage(this, stream, timestamp);
var handledResponse = false;
MessageListener? currentSubscription = null;
TimeSpan userCodeDuration = TimeSpan.Zero; TimeSpan userCodeDuration = TimeSpan.Zero;
List<IStreamMessageListener> listeners; List<IStreamMessageListener> listeners;
lock (_listenerLock) lock (_listenerLock)
listeners = _messageListeners.OrderByDescending(x => x.Priority).ToList(); listeners = _messageListeners.OrderByDescending(x => x.Priority).ToList();
var converter = ApiClient.GetConverter(); var result = (ParsedMessage)ApiClient.StreamConverter.ReadJson(stream, listeners.OfType<MessageListener>().ToList()); // TODO
using var sr = new StreamReader(stream, Encoding.UTF8, false, (int)stream.Length, true);
using var jsonTextReader = new JsonTextReader(sr);
var result = (ParsedMessage)converter.ReadJson(jsonTextReader, typeof(object), null, _serializer);
stream.Position = 0; stream.Position = 0;
if (result == null)
{
_logger.LogWarning("Message not matched to type");
return;
}
if (_messageIdentifierListeners.TryGetValue(result.Identifier.ToLowerInvariant(), out var idListener)) if (_messageIdentifierListeners.TryGetValue(result.Identifier.ToLowerInvariant(), out var idListener))
{ {
var userSw = Stopwatch.StartNew(); // Matched based on identifier
await idListener.ProcessAsync(streamMessage).ConfigureAwait(false); await idListener.ProcessAsync(result).ConfigureAwait(false);
userSw.Stop(); return;
userCodeDuration = userSw.Elapsed;
handledResponse = true;
}
else
{
foreach (var listener in listeners)
{
if (listener.MessageMatches(streamMessage))
{
if (listener is PendingRequest pendingRequest)
{
lock (_messageListeners)
_messageListeners.Remove(pendingRequest);
if (pendingRequest.Completed)
{
// Answer to a timed out request, unsub if it is a subscription request
if (pendingRequest.MessageListener != null)
{
_logger.Log(LogLevel.Warning, $"Socket {SocketId} Received subscription info after request timed out; unsubscribing. Consider increasing the RequestTimeout");
_ = UnsubscribeAsync(pendingRequest.MessageListener).ConfigureAwait(false);
}
}
else
{
_logger.Log(LogLevel.Trace, $"Socket {SocketId} - msg {pendingRequest.Id} - received data matched to pending request");
await pendingRequest.ProcessAsync(streamMessage).ConfigureAwait(false);
} }
if (!ApiClient.ContinueOnQueryResponse) foreach (var pendingRequest in _messageListeners.OfType<PendingRequest>())
{
if (pendingRequest.MessageMatchesHandler(result))
{
await pendingRequest.ProcessAsync(result).ConfigureAwait(false);
break;
}
}
_logger.LogWarning("Message not matched"); // TODO
return; return;
handledResponse = true; //if (_messageIdentifierListeners.TryGetValue(result.Identifier.ToLowerInvariant(), out var idListener))
break; //{
} // var userSw = Stopwatch.StartNew();
else if (listener is MessageListener subscription) // await idListener.ProcessAsync(streamMessage).ConfigureAwait(false);
{ // userSw.Stop();
currentSubscription = subscription; // userCodeDuration = userSw.Elapsed;
handledResponse = true; // handledResponse = true;
var userSw = Stopwatch.StartNew(); //}
await subscription.ProcessAsync(streamMessage).ConfigureAwait(false); //else
userSw.Stop(); //{
userCodeDuration = userSw.Elapsed; // foreach (var listener in listeners)
break; // {
} // if (listener.MessageMatches(streamMessage))
} // {
} // if (listener is PendingRequest pendingRequest)
} // {
// lock (_messageListeners)
// _messageListeners.Remove(pendingRequest);
if (!handledResponse) // if (pendingRequest.Completed)
{ // {
if (!ApiClient.UnhandledMessageExpected) // // Answer to a timed out request, unsub if it is a subscription request
_logger.Log(LogLevel.Warning, $"Socket {SocketId} Message not handled: " + streamMessage.Get(ParsingUtils.GetString)); // if (pendingRequest.MessageListener != null)
UnhandledMessage?.Invoke(streamMessage); // {
} // _logger.Log(LogLevel.Warning, $"Socket {SocketId} Received subscription info after request timed out; unsubscribing. Consider increasing the RequestTimeout");
// _ = UnsubscribeAsync(pendingRequest.MessageListener).ConfigureAwait(false);
// }
// }
// else
// {
// _logger.Log(LogLevel.Trace, $"Socket {SocketId} - msg {pendingRequest.Id} - received data matched to pending request");
// await pendingRequest.ProcessAsync(streamMessage).ConfigureAwait(false);
// }
// if (!ApiClient.ContinueOnQueryResponse)
// return;
// handledResponse = true;
// break;
// }
// else if (listener is MessageListener subscription)
// {
// currentSubscription = subscription;
// handledResponse = true;
// var userSw = Stopwatch.StartNew();
// await subscription.ProcessAsync(streamMessage).ConfigureAwait(false);
// userSw.Stop();
// userCodeDuration = userSw.Elapsed;
// break;
// }
// }
// }
//}
//if (!handledResponse)
//{
// if (!ApiClient.UnhandledMessageExpected)
// _logger.Log(LogLevel.Warning, $"Socket {SocketId} Message not handled: " + streamMessage.Get(ParsingUtils.GetString));
// UnhandledMessage?.Invoke(streamMessage);
//}
} }
/// <summary> /// <summary>
@ -434,7 +447,7 @@ namespace CryptoExchange.Net.Sockets
lock (_listenerLock) lock (_listenerLock)
{ {
foreach (var listener in _listeners) foreach (var listener in _messageIdentifierListeners.Values)
{ {
if (listener.CancellationTokenRegistration.HasValue) if (listener.CancellationTokenRegistration.HasValue)
listener.CancellationTokenRegistration.Value.Dispose(); listener.CancellationTokenRegistration.Value.Dispose();
@ -562,7 +575,7 @@ namespace CryptoExchange.Net.Sockets
/// <param name="handler">The response handler</param> /// <param name="handler">The response handler</param>
/// <param name="weight">The weight of the message</param> /// <param name="weight">The weight of the message</param>
/// <returns></returns> /// <returns></returns>
public virtual async Task SendAndWaitAsync<T>(T obj, TimeSpan timeout, MessageListener? listener, int weight, Func<StreamMessage, bool> handler) public virtual async Task SendAndWaitAsync<T>(T obj, TimeSpan timeout, MessageListener? listener, int weight, Func<ParsedMessage, bool> handler)
{ {
var pending = new PendingRequest(ExchangeHelpers.NextId(), handler, timeout, listener); var pending = new PendingRequest(ExchangeHelpers.NextId(), handler, timeout, listener);
lock (_messageListeners) lock (_messageListeners)

View File

@ -1,4 +1,5 @@
using CryptoExchange.Net.Interfaces; using CryptoExchange.Net.Converters;
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Objects; using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Sockets; using CryptoExchange.Net.Objects.Sockets;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
@ -52,7 +53,7 @@ namespace CryptoExchange.Net.Sockets
/// </summary> /// </summary>
/// <param name="message"></param> /// <param name="message"></param>
/// <returns></returns> /// <returns></returns>
public abstract (bool, CallResult?) MessageMatchesSubRequest(StreamMessage message); public abstract (bool, CallResult?) MessageMatchesSubRequest(ParsedMessage message);
/// <summary> /// <summary>
/// Get the unsubscribe object to send when unsubscribing /// Get the unsubscribe object to send when unsubscribing
@ -64,54 +65,54 @@ namespace CryptoExchange.Net.Sockets
/// </summary> /// </summary>
/// <param name="message"></param> /// <param name="message"></param>
/// <returns></returns> /// <returns></returns>
public abstract (bool, CallResult?) MessageMatchesUnsubRequest(StreamMessage message); public abstract (bool, CallResult?) MessageMatchesUnsubRequest(ParsedMessage message);
/// <summary> /// <summary>
/// Check if the message is an update for this subscription /// Check if the message is an update for this subscription
/// </summary> /// </summary>
/// <param name="message"></param> /// <param name="message"></param>
/// <returns></returns> /// <returns></returns>
public abstract bool MessageMatchesEvent(StreamMessage message); public abstract bool MessageMatchesEvent(ParsedMessage message);
/// <summary> /// <summary>
/// Handle the update message /// Handle the update message
/// </summary> /// </summary>
/// <param name="message"></param> /// <param name="message"></param>
/// <returns></returns> /// <returns></returns>
public abstract Task HandleEventAsync(StreamMessage message); public abstract Task HandleEventAsync(DataEvent<ParsedMessage> message);
/// <summary> ///// <summary>
/// Create a data event ///// Create a data event
/// </summary> ///// </summary>
/// <typeparam name="T"></typeparam> ///// <typeparam name="T"></typeparam>
/// <param name="obj"></param> ///// <param name="obj"></param>
/// <param name="message"></param> ///// <param name="message"></param>
/// <param name="topic"></param> ///// <param name="topic"></param>
/// <param name="type"></param> ///// <param name="type"></param>
/// <returns></returns> ///// <returns></returns>
protected virtual DataEvent<T> CreateDataEvent<T>(T obj, StreamMessage message, string? topic = null, SocketUpdateType? type = null) //protected virtual DataEvent<T> CreateDataEvent<T>(T obj, ParsedMessage message, string? topic = null, SocketUpdateType? type = null)
{ //{
string? originalData = null; // string? originalData = null;
if (_outputOriginalData) // if (_outputOriginalData)
originalData = message.Get(ParsingUtils.GetString); // originalData = message.Get(ParsingUtils.GetString);
return new DataEvent<T>(obj, topic, originalData, message.Timestamp, type); // return new DataEvent<T>(obj, topic, originalData, message.Timestamp, type);
} //}
/// <summary> ///// <summary>
/// Deserialize the message to an object using Json.Net ///// Deserialize the message to an object using Json.Net
/// </summary> ///// </summary>
/// <typeparam name="T"></typeparam> ///// <typeparam name="T"></typeparam>
/// <param name="message"></param> ///// <param name="message"></param>
/// <param name="settings"></param> ///// <param name="settings"></param>
/// <returns></returns> ///// <returns></returns>
protected virtual Task<CallResult<T>> DeserializeAsync<T>(StreamMessage message, JsonSerializerSettings settings) //protected virtual Task<CallResult<T>> DeserializeAsync<T>(StreamMessage message, JsonSerializerSettings settings)
{ //{
var serializer = JsonSerializer.Create(settings); // var serializer = JsonSerializer.Create(settings);
using var sr = new StreamReader(message.Stream, Encoding.UTF8, false, (int)message.Stream.Length, true); // using var sr = new StreamReader(message.Stream, Encoding.UTF8, false, (int)message.Stream.Length, true);
using var jsonTextReader = new JsonTextReader(sr); // using var jsonTextReader = new JsonTextReader(sr);
var result = serializer.Deserialize<T>(jsonTextReader); // var result = serializer.Deserialize<T>(jsonTextReader);
message.Stream.Position = 0; // message.Stream.Position = 0;
return Task.FromResult(new CallResult<T>(result!)); // return Task.FromResult(new CallResult<T>(result!));
} //}
} }
} }

View File

@ -1,4 +1,5 @@
using CryptoExchange.Net.Interfaces; using CryptoExchange.Net.Converters;
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Objects; using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Sockets; using CryptoExchange.Net.Objects.Sockets;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
@ -24,11 +25,11 @@ namespace CryptoExchange.Net.Sockets
/// <inheritdoc /> /// <inheritdoc />
public override object? GetSubRequest() => null; public override object? GetSubRequest() => null;
/// <inheritdoc /> /// <inheritdoc />
public override (bool, CallResult?) MessageMatchesSubRequest(StreamMessage message) => throw new NotImplementedException(); public override (bool, CallResult?) MessageMatchesSubRequest(ParsedMessage message) => throw new NotImplementedException();
/// <inheritdoc /> /// <inheritdoc />
public override object? GetUnsubRequest() => null; public override object? GetUnsubRequest() => null;
/// <inheritdoc /> /// <inheritdoc />
public override (bool, CallResult?) MessageMatchesUnsubRequest(StreamMessage message) => throw new NotImplementedException(); public override (bool, CallResult?) MessageMatchesUnsubRequest(ParsedMessage message) => throw new NotImplementedException();
} }
} }