1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-06-12 02:16:23 +00:00
This commit is contained in:
JKorf 2023-10-29 20:05:35 +01:00
parent bee2e86c2f
commit 9f92d86855
7 changed files with 82 additions and 59 deletions

View File

@ -454,7 +454,7 @@ namespace CryptoExchange.Net
protected virtual MessageListener? AddSubscription<T>(Subscription subscription, bool userSubscription, SocketConnection connection) protected virtual MessageListener? AddSubscription<T>(Subscription subscription, bool userSubscription, SocketConnection connection)
{ {
var messageListener = new MessageListener(ExchangeHelpers.NextId(), subscription, userSubscription); var messageListener = new MessageListener(ExchangeHelpers.NextId(), subscription, userSubscription);
if (!connection.AddListener(messageListener, subscription.Identifiers)) if (!connection.AddListener(messageListener))
return null; return null;
return messageListener; return messageListener;
@ -469,7 +469,7 @@ namespace CryptoExchange.Net
systemSubscriptions.Add(systemSubscription); systemSubscriptions.Add(systemSubscription);
var subscription = new MessageListener(ExchangeHelpers.NextId(), systemSubscription, false); var subscription = new MessageListener(ExchangeHelpers.NextId(), systemSubscription, false);
foreach (var connection in socketConnections.Values) foreach (var connection in socketConnections.Values)
connection.AddListener(subscription, null); connection.AddListener(subscription);
} }
/// <summary> /// <summary>
@ -539,11 +539,12 @@ namespace CryptoExchange.Net
var socket = CreateSocket(connectionAddress.Data!); var socket = CreateSocket(connectionAddress.Data!);
var socketConnection = new SocketConnection(_logger, this, socket, address); var socketConnection = new SocketConnection(_logger, this, socket, address);
socketConnection.UnhandledMessage += HandleUnhandledMessage; socketConnection.UnhandledMessage += HandleUnhandledMessage;
socketConnection.UnparsedMessage += HandleUnparsedMessage;
foreach (var systemSubscription in systemSubscriptions) foreach (var systemSubscription in systemSubscriptions)
{ {
var handler = new MessageListener(ExchangeHelpers.NextId(), systemSubscription, false); var handler = new MessageListener(ExchangeHelpers.NextId(), systemSubscription, false);
socketConnection.AddListener(handler, null); socketConnection.AddListener(handler);
} }
return new CallResult<SocketConnection>(socketConnection); return new CallResult<SocketConnection>(socketConnection);
@ -557,6 +558,14 @@ namespace CryptoExchange.Net
{ {
} }
/// <summary>
/// Process an unparsed message
/// </summary>
/// <param name="message">The message that wasn't parsed</param>
protected virtual void HandleUnparsedMessage(byte[] message)
{
}
/// <summary> /// <summary>
/// Connect a socket /// Connect a socket
/// </summary> /// </summary>

View File

@ -13,7 +13,6 @@ namespace CryptoExchange.Net.Converters
public abstract string[] IdFields { get; } public abstract string[] IdFields { get; }
public abstract Type? GetDeserializationType(Dictionary<string, string> idValues, List<MessageListener> listeners); public abstract Type? GetDeserializationType(Dictionary<string, string> idValues, List<MessageListener> listeners);
public abstract List<MessageListener> MatchToListener(ParsedMessage message, List<MessageListener> listeners);
/// <inheritdoc /> /// <inheritdoc />
public object? ReadJson(Stream stream, List<MessageListener> listeners) public object? ReadJson(Stream stream, List<MessageListener> listeners)
@ -24,8 +23,16 @@ namespace CryptoExchange.Net.Converters
// Deserialize to the correct type // Deserialize to the correct type
using var sr = new StreamReader(stream, Encoding.UTF8, false, (int)stream.Length, true); using var sr = new StreamReader(stream, Encoding.UTF8, false, (int)stream.Length, true);
using var jsonTextReader = new JsonTextReader(sr); using var jsonTextReader = new JsonTextReader(sr);
JToken token;
try
{
token = JToken.Load(jsonTextReader);
}
catch(Exception ex)
{
return null;
}
var token = JToken.Load(jsonTextReader);
var dict = new Dictionary<string, string>(); var dict = new Dictionary<string, string>();
foreach(var idField in IdFields) foreach(var idField in IdFields)
{ {

View File

@ -1,13 +1,13 @@
using CryptoExchange.Net.Converters; //using CryptoExchange.Net.Converters;
using CryptoExchange.Net.Objects.Sockets; //using CryptoExchange.Net.Objects.Sockets;
using System.Threading.Tasks; //using System.Threading.Tasks;
namespace CryptoExchange.Net.Interfaces //namespace CryptoExchange.Net.Interfaces
{ //{
internal interface IStreamMessageListener // internal interface IStreamMessageListener
{ // {
int Priority { get; } // int Priority { get; }
bool MessageMatches(ParsedMessage message); // bool MessageMatches(ParsedMessage message);
Task ProcessAsync(ParsedMessage message); // Task ProcessAsync(ParsedMessage message);
} // }
} //}

View File

@ -6,7 +6,7 @@ using System.Threading.Tasks;
namespace CryptoExchange.Net.Objects.Sockets namespace CryptoExchange.Net.Objects.Sockets
{ {
internal class PendingRequest : IStreamMessageListener internal class PendingRequest
{ {
public int Id { get; set; } public int Id { get; set; }
public Func<ParsedMessage, bool> MessageMatchesHandler { get; } public Func<ParsedMessage, bool> MessageMatchesHandler { get; }

View File

@ -10,7 +10,7 @@ namespace CryptoExchange.Net.Objects.Sockets
/// <summary> /// <summary>
/// Socket listener /// Socket listener
/// </summary> /// </summary>
public class MessageListener : IStreamMessageListener public class MessageListener
{ {
/// <summary> /// <summary>
/// Unique listener id /// Unique listener id
@ -80,13 +80,6 @@ namespace CryptoExchange.Net.Objects.Sockets
/// </summary> /// </summary>
public int Priority => Subscription is SystemSubscription ? 50 : 1; public int Priority => Subscription is SystemSubscription ? 50 : 1;
/// <summary>
/// Check if message matches the subscription
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
public bool MessageMatches(ParsedMessage message) => Subscription.MessageMatchesEvent(message);
/// <summary> /// <summary>
/// Process the message /// Process the message
/// </summary> /// </summary>

View File

@ -51,13 +51,18 @@ namespace CryptoExchange.Net.Sockets
/// </summary> /// </summary>
public event Action<ParsedMessage>? UnhandledMessage; public event Action<ParsedMessage>? UnhandledMessage;
/// <summary>
/// Unparsed message event
/// </summary>
public event Action<byte[]>? UnparsedMessage;
/// <summary> /// <summary>
/// The amount of listeners on this connection /// The amount of listeners on this connection
/// </summary> /// </summary>
public int UserListenerCount public int UserListenerCount
{ {
get { lock (_listenerLock) get { lock (_listenerLock)
return _messageIdentifierListeners.Count(h => h.UserListener); } return _messageIdentifierListeners.Values.Count(h => h.UserListener); }
} }
/// <summary> /// <summary>
@ -68,7 +73,7 @@ namespace CryptoExchange.Net.Sockets
get get
{ {
lock (_listenerLock) lock (_listenerLock)
return _listeners.Where(h => h.UserListener).ToArray(); return _messageIdentifierListeners.Values.Where(h => h.UserListener).ToArray();
} }
} }
@ -153,10 +158,8 @@ namespace CryptoExchange.Net.Sockets
} }
private bool _pausedActivity; private bool _pausedActivity;
//private readonly List<MessageListener> _listeners;
//private readonly List<IStreamMessageListener> _messageListeners; // ?
private readonly List<PendingRequest> _pendingRequests; private readonly List<PendingRequest> _pendingRequests;
private readonly List<MessageListener> _messageListeners;
private readonly Dictionary<string, MessageListener> _messageIdentifierListeners; private readonly Dictionary<string, MessageListener> _messageIdentifierListeners;
private readonly object _listenerLock = new(); private readonly object _listenerLock = new();
@ -184,7 +187,8 @@ namespace CryptoExchange.Net.Sockets
Properties = new Dictionary<string, object>(); Properties = new Dictionary<string, object>();
_pendingRequests = new List<PendingRequest>(); _pendingRequests = new List<PendingRequest>();
_messageIdentifierListeners = new Dictionary<string, IStreamMessageListener>(); _messageListeners = new List<MessageListener>();
_messageIdentifierListeners = new Dictionary<string, MessageListener>();
_socket = socket; _socket = socket;
_socket.OnStreamMessage += HandleStreamMessage; _socket.OnStreamMessage += HandleStreamMessage;
@ -215,7 +219,7 @@ namespace CryptoExchange.Net.Sockets
Authenticated = false; Authenticated = false;
lock(_listenerLock) lock(_listenerLock)
{ {
foreach (var listener in _messageIdentifierListeners.Values) foreach (var listener in _messageListeners)
listener.Confirmed = false; listener.Confirmed = false;
} }
Task.Run(() => ConnectionClosed?.Invoke()); Task.Run(() => ConnectionClosed?.Invoke());
@ -231,7 +235,7 @@ namespace CryptoExchange.Net.Sockets
Authenticated = false; Authenticated = false;
lock (_listenerLock) lock (_listenerLock)
{ {
foreach (var listener in _listeners) foreach (var listener in _messageListeners)
listener.Confirmed = false; listener.Confirmed = false;
} }
@ -255,10 +259,10 @@ namespace CryptoExchange.Net.Sockets
Status = SocketStatus.Resubscribing; Status = SocketStatus.Resubscribing;
lock (_messageListeners) lock (_messageListeners)
{ {
foreach (var pendingRequest in _messageListeners.OfType<PendingRequest>().ToList()) foreach (var pendingRequest in _pendingRequests.ToList())
{ {
pendingRequest.Fail(); pendingRequest.Fail();
_messageListeners.Remove(pendingRequest); // Remove?
} }
} }
@ -298,8 +302,8 @@ namespace CryptoExchange.Net.Sockets
protected virtual void HandleRequestSent(int requestId) protected virtual void HandleRequestSent(int requestId)
{ {
PendingRequest pendingRequest; PendingRequest pendingRequest;
lock (_messageListeners) lock (_pendingRequests)
pendingRequest = _messageListeners.OfType<PendingRequest>().SingleOrDefault(p => p.Id == requestId); pendingRequest = _pendingRequests.SingleOrDefault(p => p.Id == requestId);
if (pendingRequest == null) if (pendingRequest == null)
{ {
@ -321,32 +325,42 @@ namespace CryptoExchange.Net.Sockets
//var streamMessage = new StreamMessage(this, stream, timestamp); //var streamMessage = new StreamMessage(this, stream, timestamp);
TimeSpan userCodeDuration = TimeSpan.Zero; TimeSpan userCodeDuration = TimeSpan.Zero;
List<IStreamMessageListener> listeners; List<MessageListener> listeners;
lock (_listenerLock) lock (_listenerLock)
listeners = _messageListeners.OrderByDescending(x => x.Priority).ToList(); listeners = _messageListeners.OrderByDescending(x => x.Priority).ToList();
var result = (ParsedMessage)ApiClient.StreamConverter.ReadJson(stream, listeners.OfType<MessageListener>().ToList()); // TODO var result = (ParsedMessage)ApiClient.StreamConverter.ReadJson(stream, listeners.OfType<MessageListener>().ToList()); // TODO
stream.Position = 0; if(result == null)
{
stream.Position = 0;
var buffer = new byte[stream.Length];
stream.Read(buffer, 0, buffer.Length);
UnparsedMessage?.Invoke(buffer);
return;
}
if (result == null) if (result.Data == null)
{ {
_logger.LogWarning("Message not matched to type"); _logger.LogWarning("Message not matched to type");
return; return;
} }
// TODO lock
if (_messageIdentifierListeners.TryGetValue(result.Identifier.ToLowerInvariant(), out var idListener)) if (_messageIdentifierListeners.TryGetValue(result.Identifier.ToLowerInvariant(), out var idListener))
{ {
// Matched based on identifier // Matched based on identifier
var userSw = Stopwatch.StartNew();
await idListener.ProcessAsync(result).ConfigureAwait(false); await idListener.ProcessAsync(result).ConfigureAwait(false);
userSw.Stop();
return; return;
} }
foreach (var pendingRequest in _messageListeners.OfType<PendingRequest>()) foreach (var pendingRequest in _pendingRequests)
{ {
if (pendingRequest.MessageMatchesHandler(result)) if (pendingRequest.MessageMatchesHandler(result))
{ {
await pendingRequest.ProcessAsync(result).ConfigureAwait(false); await pendingRequest.ProcessAsync(result).ConfigureAwait(false);
break; return;
} }
} }
@ -447,7 +461,7 @@ namespace CryptoExchange.Net.Sockets
lock (_listenerLock) lock (_listenerLock)
{ {
foreach (var listener in _messageIdentifierListeners.Values) foreach (var listener in _messageListeners)
{ {
if (listener.CancellationTokenRegistration.HasValue) if (listener.CancellationTokenRegistration.HasValue)
listener.CancellationTokenRegistration.Value.Dispose(); listener.CancellationTokenRegistration.Value.Dispose();
@ -467,7 +481,7 @@ namespace CryptoExchange.Net.Sockets
{ {
lock (_listenerLock) lock (_listenerLock)
{ {
if (!_listeners.Contains(listener)) if (!_messageListeners.Contains(listener))
return; return;
listener.Closed = true; listener.Closed = true;
@ -492,7 +506,7 @@ namespace CryptoExchange.Net.Sockets
return; return;
} }
shouldCloseConnection = _listeners.All(r => !r.UserListener || r.Closed); shouldCloseConnection = _messageIdentifierListeners.All(r => !r.Value.UserListener || r.Value.Closed);
if (shouldCloseConnection) if (shouldCloseConnection)
Status = SocketStatus.Closing; Status = SocketStatus.Closing;
} }
@ -506,7 +520,8 @@ namespace CryptoExchange.Net.Sockets
lock (_listenerLock) lock (_listenerLock)
{ {
_messageListeners.Remove(listener); _messageListeners.Remove(listener);
_listeners.Remove(listener); foreach (var id in listener.Subscription.Identifiers)
_messageIdentifierListeners.Remove(id);
} }
} }
@ -523,23 +538,22 @@ namespace CryptoExchange.Net.Sockets
/// Add a listener to this connection /// Add a listener to this connection
/// </summary> /// </summary>
/// <param name="listener"></param> /// <param name="listener"></param>
public bool AddListener(MessageListener listener, List<string>? listenerIdentifiers) public bool AddListener(MessageListener listener)
{ {
lock (_listenerLock) lock (_listenerLock)
{ {
if (Status != SocketStatus.None && Status != SocketStatus.Connected) if (Status != SocketStatus.None && Status != SocketStatus.Connected)
return false; return false;
_listeners.Add(listener);
_messageListeners.Add(listener); _messageListeners.Add(listener);
if (listenerIdentifiers != null) if (listener.Subscription.Identifiers != null)
{ {
foreach (var id in listenerIdentifiers) foreach (var id in listener.Subscription.Identifiers)
_messageIdentifierListeners.Add(id.ToLowerInvariant(), listener); _messageIdentifierListeners.Add(id.ToLowerInvariant(), listener);
} }
if (listener.UserListener) if (listener.UserListener)
_logger.Log(LogLevel.Debug, $"Socket {SocketId} adding new listener with id {listener.Id}, total listeners on connection: {_listeners.Count(s => s.UserListener)}"); _logger.Log(LogLevel.Debug, $"Socket {SocketId} adding new listener with id {listener.Id}, total listeners on connection: {_messageListeners.Count(s => s.UserListener)}");
return true; return true;
} }
} }
@ -551,7 +565,7 @@ namespace CryptoExchange.Net.Sockets
public MessageListener? GetListener(int id) public MessageListener? GetListener(int id)
{ {
lock (_listenerLock) lock (_listenerLock)
return _listeners.SingleOrDefault(s => s.Id == id); return _messageListeners.SingleOrDefault(s => s.Id == id);
} }
/// <summary> /// <summary>
@ -562,7 +576,7 @@ namespace CryptoExchange.Net.Sockets
public MessageListener? GetListenerByRequest(Func<object?, bool> predicate) public MessageListener? GetListenerByRequest(Func<object?, bool> predicate)
{ {
lock(_listenerLock) lock(_listenerLock)
return _listeners.SingleOrDefault(s => predicate(s.Subscription)); return _messageListeners.SingleOrDefault(s => predicate(s.Subscription));
} }
/// <summary> /// <summary>
@ -580,7 +594,7 @@ namespace CryptoExchange.Net.Sockets
var pending = new PendingRequest(ExchangeHelpers.NextId(), handler, timeout, listener); var pending = new PendingRequest(ExchangeHelpers.NextId(), handler, timeout, listener);
lock (_messageListeners) lock (_messageListeners)
{ {
_messageListeners.Add(pending); _pendingRequests.Add(pending);
} }
var sendOk = Send(pending.Id, obj, weight); var sendOk = Send(pending.Id, obj, weight);
@ -651,7 +665,7 @@ namespace CryptoExchange.Net.Sockets
bool anySubscriptions = false; bool anySubscriptions = false;
lock (_listenerLock) lock (_listenerLock)
anySubscriptions = _listeners.Any(s => s.UserListener); anySubscriptions = _messageListeners.Any(s => s.UserListener);
if (!anySubscriptions) if (!anySubscriptions)
{ {
@ -663,7 +677,7 @@ namespace CryptoExchange.Net.Sockets
bool anyAuthenticated = false; bool anyAuthenticated = false;
lock (_listenerLock) lock (_listenerLock)
anyAuthenticated = _listeners.Any(s => s.Authenticated); anyAuthenticated = _messageListeners.Any(s => s.Authenticated);
if (anyAuthenticated) if (anyAuthenticated)
{ {
@ -683,7 +697,7 @@ namespace CryptoExchange.Net.Sockets
List<MessageListener> listenerList = new List<MessageListener>(); List<MessageListener> listenerList = new List<MessageListener>();
lock (_listenerLock) lock (_listenerLock)
{ {
foreach (var listener in _listeners) foreach (var listener in _messageListeners)
{ {
if (listener.Subscription != null) if (listener.Subscription != null)
listenerList.Add(listener); listenerList.Add(listener);

View File

@ -72,7 +72,7 @@ namespace CryptoExchange.Net.Sockets
/// </summary> /// </summary>
/// <param name="message"></param> /// <param name="message"></param>
/// <returns></returns> /// <returns></returns>
public abstract bool MessageMatchesEvent(ParsedMessage message); //public abstract bool MessageMatchesEvent(ParsedMessage message);
/// <summary> /// <summary>
/// Handle the update message /// Handle the update message
/// </summary> /// </summary>