1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-06-10 09:26:22 +00:00
This commit is contained in:
JKorf 2023-10-28 10:08:04 +02:00
parent 141d5bd956
commit bf854c92af
4 changed files with 147 additions and 34 deletions

View File

@ -1,3 +1,4 @@
using CryptoExchange.Net.Converters;
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Options;
@ -450,12 +451,14 @@ namespace CryptoExchange.Net
protected virtual MessageListener? AddSubscription<T>(Subscription subscription, bool userSubscription, SocketConnection connection)
{
var messageListener = new MessageListener(ExchangeHelpers.NextId(), subscription, userSubscription);
if (!connection.AddListener(messageListener))
if (!connection.AddListener(messageListener, subscription.Identifiers))
return null;
return messageListener;
}
protected internal abstract SocketConverter GetConverter();
/// <summary>
/// Adds a system subscription. Used for example to reply to ping requests
/// </summary>
@ -465,7 +468,7 @@ namespace CryptoExchange.Net
systemSubscriptions.Add(systemSubscription);
var subscription = new MessageListener(ExchangeHelpers.NextId(), systemSubscription, false);
foreach (var connection in socketConnections.Values)
connection.AddListener(subscription);
connection.AddListener(subscription, null);
}
/// <summary>
@ -539,7 +542,7 @@ namespace CryptoExchange.Net
foreach (var systemSubscription in systemSubscriptions)
{
var handler = new MessageListener(ExchangeHelpers.NextId(), systemSubscription, false);
socketConnection.AddListener(handler);
socketConnection.AddListener(handler, null);
}
return new CallResult<SocketConnection>(socketConnection);

View File

@ -0,0 +1,72 @@
using CryptoExchange.Net.Objects.Sockets;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic;
using System.Text;
namespace CryptoExchange.Net.Converters
{
public class SocketConverter : JsonConverter
{
private readonly List<string> _idFields;
private readonly Func<Dictionary<string, string>, Type> _typeIdentifier;
public SocketConverter(List<string> idFields, Func<Dictionary<string, string>, Type> typeIdentifier)
{
_idFields = idFields;
_typeIdentifier = typeIdentifier;
}
/// <inheritdoc />
public override bool CanConvert(Type objectType) => true;
/// <inheritdoc />
public override object? ReadJson(JsonReader reader, Type objectType, object? existingValue, JsonSerializer serializer)
{
// Start reading the data
// Once we reach the property that identifies the message we save those in a string array
// Once all id properties have been read callback to see what the deserialization type should be
// Deserialize to the correct type
var token = JToken.Load(reader);
var dict = new Dictionary<string, string>();
foreach(var idField in _idFields)
{
var splitTokens = idField.Split(new char[] { ':' });
var accessToken = token;
foreach (var splitToken in splitTokens)
{
accessToken = accessToken[splitToken];
}
dict[idField] = accessToken?.ToString();
}
var resultType = _typeIdentifier(dict);
string idString = "";
foreach(var item in dict)
idString += item.Value;
return new ParsedMessage
{
Identifier = idString,
Data = resultType == null ? null : token.ToObject(resultType)
};
}
/// <inheritdoc />
public override bool CanWrite { get { return false; } }
/// <inheritdoc />
public override void WriteJson(JsonWriter writer, object? value, JsonSerializer serializer)
{
}
}
public class ParsedMessage
{
public string Identifier { get; set; }
public object Data { get; set; }
}
}

View File

@ -10,6 +10,9 @@ using CryptoExchange.Net.Objects;
using System.Net.WebSockets;
using System.IO;
using CryptoExchange.Net.Objects.Sockets;
using CryptoExchange.Net.Converters;
using System.Text;
using System.Runtime;
namespace CryptoExchange.Net.Sockets
{
@ -153,6 +156,8 @@ namespace CryptoExchange.Net.Sockets
private readonly List<MessageListener> _listeners;
private readonly List<IStreamMessageListener> _messageListeners; // ?
private readonly Dictionary<string, IStreamMessageListener> _messageIdentifierListeners;
private readonly object _listenerLock = new();
private readonly ILogger _logger;
@ -162,6 +167,8 @@ namespace CryptoExchange.Net.Sockets
/// The underlying websocket
/// </summary>
private readonly IWebsocket _socket;
private readonly JsonSerializerSettings _options;
private readonly JsonSerializer _serializer;
/// <summary>
/// New socket connection
@ -178,6 +185,7 @@ namespace CryptoExchange.Net.Sockets
Properties = new Dictionary<string, object>();
_messageListeners = new List<IStreamMessageListener>();
_messageIdentifierListeners = new Dictionary<string, IStreamMessageListener>();
_listeners = new List<MessageListener>();
_socket = socket;
@ -189,6 +197,11 @@ namespace CryptoExchange.Net.Sockets
_socket.OnReconnected += HandleReconnected;
_socket.OnError += HandleError;
_socket.GetReconnectionUrl = GetReconnectionUrlAsync;
var converter = ApiClient.GetConverter();
_options = SerializerOptions.Default;
_options.Converters.Add(converter);
_serializer = JsonSerializer.Create(_options);
}
/// <summary>
@ -321,45 +334,62 @@ namespace CryptoExchange.Net.Sockets
lock (_listenerLock)
listeners = _messageListeners.OrderByDescending(x => x.Priority).ToList();
foreach (var listener in listeners)
var converter = ApiClient.GetConverter();
using var sr = new StreamReader(stream, Encoding.UTF8, false, (int)stream.Length, true);
using var jsonTextReader = new JsonTextReader(sr);
var result = (ParsedMessage)converter.ReadJson(jsonTextReader, typeof(object), null, _serializer);
stream.Position = 0;
if (_messageIdentifierListeners.TryGetValue(result.Identifier.ToLowerInvariant(), out var idListener))
{
if (listener.MessageMatches(streamMessage))
var userSw = Stopwatch.StartNew();
await idListener.ProcessAsync(streamMessage).ConfigureAwait(false);
userSw.Stop();
userCodeDuration = userSw.Elapsed;
handledResponse = true;
}
else
{
foreach (var listener in listeners)
{
if (listener is PendingRequest pendingRequest)
if (listener.MessageMatches(streamMessage))
{
lock (_messageListeners)
_messageListeners.Remove(pendingRequest);
if (pendingRequest.Completed)
if (listener is PendingRequest pendingRequest)
{
// Answer to a timed out request, unsub if it is a subscription request
if (pendingRequest.MessageListener != null)
lock (_messageListeners)
_messageListeners.Remove(pendingRequest);
if (pendingRequest.Completed)
{
_logger.Log(LogLevel.Warning, $"Socket {SocketId} Received subscription info after request timed out; unsubscribing. Consider increasing the RequestTimeout");
_ = UnsubscribeAsync(pendingRequest.MessageListener).ConfigureAwait(false);
// Answer to a timed out request, unsub if it is a subscription request
if (pendingRequest.MessageListener != null)
{
_logger.Log(LogLevel.Warning, $"Socket {SocketId} Received subscription info after request timed out; unsubscribing. Consider increasing the RequestTimeout");
_ = UnsubscribeAsync(pendingRequest.MessageListener).ConfigureAwait(false);
}
}
else
{
_logger.Log(LogLevel.Trace, $"Socket {SocketId} - msg {pendingRequest.Id} - received data matched to pending request");
await pendingRequest.ProcessAsync(streamMessage).ConfigureAwait(false);
}
if (!ApiClient.ContinueOnQueryResponse)
return;
handledResponse = true;
break;
}
else
else if (listener is MessageListener subscription)
{
_logger.Log(LogLevel.Trace, $"Socket {SocketId} - msg {pendingRequest.Id} - received data matched to pending request");
await pendingRequest.ProcessAsync(streamMessage).ConfigureAwait(false);
currentSubscription = subscription;
handledResponse = true;
var userSw = Stopwatch.StartNew();
await subscription.ProcessAsync(streamMessage).ConfigureAwait(false);
userSw.Stop();
userCodeDuration = userSw.Elapsed;
break;
}
if (!ApiClient.ContinueOnQueryResponse)
return;
handledResponse = true;
break;
}
else if (listener is MessageListener subscription)
{
currentSubscription = subscription;
handledResponse = true;
var userSw = Stopwatch.StartNew();
await subscription.ProcessAsync(streamMessage).ConfigureAwait(false);
userSw.Stop();
userCodeDuration = userSw.Elapsed;
break;
}
}
}
@ -480,7 +510,7 @@ namespace CryptoExchange.Net.Sockets
/// Add a listener to this connection
/// </summary>
/// <param name="listener"></param>
public bool AddListener(MessageListener listener)
public bool AddListener(MessageListener listener, List<string>? listenerIdentifiers)
{
lock (_listenerLock)
{
@ -489,6 +519,11 @@ namespace CryptoExchange.Net.Sockets
_listeners.Add(listener);
_messageListeners.Add(listener);
if (listenerIdentifiers != null)
{
foreach (var id in listenerIdentifiers)
_messageIdentifierListeners.Add(id.ToLowerInvariant(), listener);
}
if (listener.UserListener)
_logger.Log(LogLevel.Debug, $"Socket {SocketId} adding new listener with id {listener.Id}, total listeners on connection: {_listeners.Count(s => s.UserListener)}");

View File

@ -3,6 +3,7 @@ using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Sockets;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using System.Collections.Generic;
using System.IO;
using System.Text;
using System.Threading.Tasks;
@ -26,6 +27,8 @@ namespace CryptoExchange.Net.Sockets
/// </summary>
public bool Authenticated { get; }
public abstract List<string> Identifiers { get; }
/// <summary>
/// ctor
/// </summary>