1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-06-12 02:16:23 +00:00
This commit is contained in:
JKorf 2023-12-17 21:17:31 +01:00
parent c931a60cb7
commit 58098edaa6
11 changed files with 72 additions and 50 deletions

View File

@ -62,6 +62,23 @@ namespace CryptoExchange.Net.Converters
}
public bool IsObject(string? key) => _token.Type == JTokenType.Object;
public bool IsArray(IEnumerable<int> indexes)
{
var item = _token;
foreach(var index in indexes)
{
if (item.Type != JTokenType.Array)
return false;
var arr = ((JArray)item);
if (arr.Count <= index)
return false;
item = arr[index];
}
return item.Type == JTokenType.Array;
}
private JToken? GetToken(string key)
{

View File

@ -48,7 +48,8 @@ namespace CryptoExchange.Net.Converters
preInstance.OriginalData = data;
}
preInstance.Identifier = result.Identifier;
preInstance.StreamIdentifier = result.StreamIdentifier;
preInstance.TypeIdentifier = result.TypeIdentifier;
preInstance.Parsed = true;
return preInstance;
}
@ -70,25 +71,28 @@ namespace CryptoExchange.Net.Converters
if (InterpreterPipeline.GetIdentity != null)
{
var identity = InterpreterPipeline.GetIdentity(accessor);
if (identity != null)
var (streamIdentity, typeIdentity) = InterpreterPipeline.GetIdentity(accessor);
if (streamIdentity != null)
{
var result = listenerManager.IdToType(identity);
if (result == null)
var result = listenerManager.IdToType(streamIdentity, typeIdentity);
if (result != null)
{
var idInstance = InterpreterPipeline.ObjectInitializer(token, result!);
if (outputOriginalData)
{
stream.Position = 0;
idInstance.OriginalData = sr.ReadToEnd();
}
idInstance.StreamIdentifier = streamIdentity;
idInstance.TypeIdentifier = typeIdentity;
idInstance.Parsed = true;
return idInstance;
}
else
{
}
var idInstance = InterpreterPipeline.ObjectInitializer(token, result!);
if (outputOriginalData)
{
stream.Position = 0;
idInstance.OriginalData = sr.ReadToEnd();
}
idInstance.Identifier = identity;
idInstance.Parsed = true;
return idInstance;
}
else
{
@ -175,7 +179,8 @@ namespace CryptoExchange.Net.Converters
instance.OriginalData = sr.ReadToEnd();
}
instance.Identifier = inspectResult.Identifier;
instance.StreamIdentifier = inspectResult.StreamIdentifier;
instance.TypeIdentifier = inspectResult.TypeIdentifier;
instance.Parsed = inspectResult.Type != null;
return instance;
}

View File

@ -7,6 +7,7 @@ namespace CryptoExchange.Net.Interfaces
public interface IMessageAccessor
{
bool IsObject(string? key);
bool IsArray(IEnumerable<int> indexes);
string? GetStringValue(string key);
int? GetIntValue(string key);
public int? GetCount(string key);

View File

@ -13,6 +13,6 @@ namespace CryptoExchange.Net.Interfaces
public int Id { get; }
public List<string> Identifiers { get; }
Task<CallResult> HandleMessageAsync(SocketConnection connection, DataEvent<BaseParsedMessage> message);
public Type ExpectedMessageType { get; }
public Func<string, Type> ExpectedTypeDelegate { get; }
}
}

View File

@ -6,9 +6,13 @@
public abstract class BaseParsedMessage
{
/// <summary>
/// Identifier string
/// Stream identifier string
/// </summary>
public string Identifier { get; set; } = null!;
public string StreamIdentifier { get; set; } = null!;
/// <summary>
/// Type identifier string
/// </summary>
public string TypeIdentifier { get; set; } = null!;
/// <summary>
/// Original data if the option is enabled
/// </summary>

View File

@ -14,8 +14,8 @@ namespace CryptoExchange.Net.Objects.Sockets
{
public Func<WebSocketMessageType, Stream, Stream>? PreProcessCallback { get; set; }
public List<PreInspectCallback> PreInspectCallbacks { get; set; } = new List<PreInspectCallback>();
public Func<IMessageAccessor, string?> GetIdentity { get; set; }
//public List<object> PostInspectCallbacks { get; set; } = new List<object>();
public Func<IMessageAccessor, (string?, string?)> GetIdentity { get; set; }
public List<object> PostInspectCallbacks { get; set; } = new List<object>();
public Func<JToken, Type, BaseParsedMessage> ObjectInitializer { get; set; } = SocketConverter.InstantiateMessageObject;
}
@ -45,18 +45,20 @@ namespace CryptoExchange.Net.Objects.Sockets
public class PostInspectArrayCallback
{
public List<int> TypeFields { get; set; } = new List<int>();
public Func<Dictionary<int, string>, Dictionary<string, Type>, PostInspectResult> Callback { get; set; }
public Func<IMessageAccessor, SocketListenerManager, PostInspectResult> Callback { get; set; }
}
public class PreInspectResult
{
public bool Matched { get; set; }
public string Identifier { get; set; }
public string StreamIdentifier { get; set; }
public string TypeIdentifier { get; set; }
}
public class PostInspectResult
{
public Type? Type { get; set; }
public string Identifier { get; set; }
public string StreamIdentifier { get; set; }
public string TypeIdentifier { get; set; }
}
}

View File

@ -46,7 +46,7 @@ namespace CryptoExchange.Net.Sockets
/// </summary>
public int Weight { get; }
public abstract Type ExpectedMessageType { get; }
public abstract Func<string, Type> ExpectedTypeDelegate { get; }
/// <summary>
/// ctor
@ -107,7 +107,7 @@ namespace CryptoExchange.Net.Sockets
/// <typeparam name="TResponse">Response object type</typeparam>
public abstract class Query<TResponse> : BaseQuery
{
public override Type ExpectedMessageType => typeof(TResponse);
public override Func<string, Type> ExpectedTypeDelegate => x => typeof(TResponse);
/// <summary>
/// The typed call result

View File

@ -335,13 +335,13 @@ namespace CryptoExchange.Net.Sockets
return;
}
if (!await _listenerManager.InvokeListenersAsync(this, result.Identifier, result).ConfigureAwait(false))
if (!await _listenerManager.InvokeListenersAsync(this, result.StreamIdentifier, result).ConfigureAwait(false))
{
// Not able to find a listener for this message
stream.Position = 0;
var unhandledBuffer = new byte[stream.Length];
stream.Read(unhandledBuffer, 0, unhandledBuffer.Length);
_logger.Log(LogLevel.Warning, $"Socket {SocketId} Message unidentified. Id: {result.Identifier.ToLowerInvariant()}, listening ids: [{string.Join(", ", _listenerManager.GetListenIds())}], Message: {Encoding.UTF8.GetString(unhandledBuffer)} ");
_logger.Log(LogLevel.Warning, $"Socket {SocketId} Message unidentified. Id: {result.StreamIdentifier.ToLowerInvariant()}, listening ids: [{string.Join(", ", _listenerManager.GetListenIds())}], Message: {Encoding.UTF8.GetString(unhandledBuffer)} ");
UnhandledMessage?.Invoke(result);
return;
}

View File

@ -15,24 +15,24 @@ namespace CryptoExchange.Net.Sockets
private int _socketId;
private object _lock = new object();
private Dictionary<int, IMessageProcessor> _idMap;
private Dictionary<string, Type> _typeMap;
private Dictionary<string, Func<string, Type>> _typeMap;
private Dictionary<string, List<IMessageProcessor>> _listeners;
public SocketListenerManager(ILogger logger, int socketId)
{
_idMap = new Dictionary<int, IMessageProcessor>();
_listeners = new Dictionary<string, List<IMessageProcessor>>();
_typeMap = new Dictionary<string, Type>();
_typeMap = new Dictionary<string, Func<string, Type>>();
_logger = logger;
_socketId = socketId;
}
public Type? IdToType(string id)
public Type? IdToType(string streamIdentifier, string typeIdentifier)
{
lock (_lock)
{
_typeMap.TryGetValue(id, out var type);
return type;
_typeMap.TryGetValue(streamIdentifier, out var typeDelegate);
return typeDelegate?.Invoke(typeIdentifier);
}
}
@ -78,7 +78,7 @@ namespace CryptoExchange.Net.Sockets
foreach (var listener in listeners)
{
_logger.Log(LogLevel.Trace, $"Socket {_socketId} Message mapped to processor {listener.Id} with identifier {data.Identifier}");
_logger.Log(LogLevel.Trace, $"Socket {_socketId} Message mapped to processor {listener.Id} with identifier {data.StreamIdentifier}");
if (listener is BaseQuery query)
{
Remove(listener);
@ -160,7 +160,7 @@ namespace CryptoExchange.Net.Sockets
private void UpdateMap()
{
_typeMap = _listeners.ToDictionary(x => x.Key, x => x.Value.First().ExpectedMessageType);
_typeMap = _listeners.ToDictionary(x => x.Key, x => x.Value.First().ExpectedTypeDelegate);
}
}
}

View File

@ -69,7 +69,7 @@ namespace CryptoExchange.Net.Sockets
/// </summary>
public event Action<Exception>? Exception;
public abstract Type ExpectedMessageType { get; }
public abstract Func<string, Type> ExpectedTypeDelegate { get; }
/// <summary>
/// ctor
@ -125,7 +125,7 @@ namespace CryptoExchange.Net.Sockets
}
/// <inheritdoc />
public abstract class Subscription<TQuery, TEvent> : Subscription<TQuery, TEvent, TQuery>
public abstract class Subscription<TQuery> : Subscription<TQuery, TQuery>
{
/// <summary>
/// ctor
@ -138,9 +138,9 @@ namespace CryptoExchange.Net.Sockets
}
/// <inheritdoc />
public abstract class Subscription<TSubResponse, TEvent, TUnsubResponse> : Subscription
public abstract class Subscription<TSubResponse, TUnsubResponse> : Subscription
{
public override Type ExpectedMessageType => typeof(TEvent);
//public override Func<string, Type> ExpectedTypeDelegate => (x) => typeof(TEvent);
/// <summary>
/// ctor
@ -152,15 +152,8 @@ namespace CryptoExchange.Net.Sockets
}
/// <inheritdoc />
public override Task<CallResult> DoHandleMessageAsync(SocketConnection connection, DataEvent<BaseParsedMessage> message)
=> HandleEventAsync(connection, message.As((ParsedMessage<TEvent>)message.Data));
/// <summary>
/// Handle the update message
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
public abstract Task<CallResult> HandleEventAsync(SocketConnection connection, DataEvent<ParsedMessage<TEvent>> message);
//public override Task<CallResult> DoHandleMessageAsync(SocketConnection connection, DataEvent<BaseParsedMessage> message)
// => HandleEventAsync(connection, message.As((ParsedMessage<TEvent>)message.Data));
public override void HandleSubQueryResponse(BaseParsedMessage message)
=> HandleSubQueryResponse((ParsedMessage<TSubResponse>)message);

View File

@ -29,7 +29,7 @@ namespace CryptoExchange.Net.Sockets
public abstract class SystemSubscription<T> : SystemSubscription
{
public override Type ExpectedMessageType => typeof(T);
public override Func<string, Type> ExpectedTypeDelegate => (x) => typeof(T);
public override Task<CallResult> DoHandleMessageAsync(SocketConnection connection, DataEvent<BaseParsedMessage> message)
=> HandleMessageAsync(connection, message.As((ParsedMessage<T>)message.Data));