1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-06-09 08:56:13 +00:00
This commit is contained in:
JKorf 2023-12-28 10:39:35 +01:00
parent 58098edaa6
commit eee19b28a5
13 changed files with 227 additions and 311 deletions

View File

@ -111,7 +111,7 @@ namespace CryptoExchange.Net
public new SocketApiOptions ApiOptions => (SocketApiOptions)base.ApiOptions;
/// <inheritdoc />
public abstract SocketConverter StreamConverter { get; }
public abstract MessageInterpreterPipeline Pipeline { get; }
#endregion
/// <summary>
@ -686,7 +686,7 @@ namespace CryptoExchange.Net
sb.AppendLine($" Id: {subscription.Id}");
sb.AppendLine($" Confirmed: {subscription.Confirmed}");
sb.AppendLine($" Invocations: {subscription.TotalInvocations}");
sb.AppendLine($" Identifiers: [{string.Join(", ", subscription.Identifiers)}]");
sb.AppendLine($" Identifiers: [{string.Join(", ", subscription.StreamIdentifiers)}]");
}
}
return sb.ToString();

View File

@ -1,8 +1,12 @@
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Objects.Sockets;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Runtime.InteropServices.ComTypes;
using System.Text;
namespace CryptoExchange.Net.Converters
@ -10,11 +14,39 @@ namespace CryptoExchange.Net.Converters
internal class JTokenAccessor : IMessageAccessor
{
private readonly JToken _token;
private readonly Stream _stream;
private readonly StreamReader _reader;
private Dictionary<string, JToken?> _cache = new Dictionary<string, JToken?>();
private static JsonSerializer _serializer = JsonSerializer.Create(SerializerOptions.WithConverters);
public JTokenAccessor(JToken token)
public JTokenAccessor(Stream stream)
{
_token = token;
_stream = stream;
_reader = new StreamReader(stream, Encoding.UTF8, false, (int)stream.Length, true);
using var jsonTextReader = new JsonTextReader(_reader);
JToken token;
try
{
_token = JToken.Load(jsonTextReader);
}
catch (Exception ex)
{
// Not a json message
throw;
}
}
public BaseParsedMessage Instantiate(Type type)
{
var resultMessageType = typeof(ParsedMessage<>).MakeGenericType(type);
var instance = (BaseParsedMessage)Activator.CreateInstance(resultMessageType, type == null ? null : _token.ToObject(type, _serializer));
return instance;
}
public string GetOriginalString()
{
_stream.Position = 0;
return _reader.ReadToEnd();
}
public int? GetArrayIntValue(string? key, int index)

View File

@ -1,195 +1,58 @@
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Sockets;
using CryptoExchange.Net.Sockets;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net.WebSockets;
using System.Text;
//using CryptoExchange.Net.Interfaces;
//using CryptoExchange.Net.Objects;
//using CryptoExchange.Net.Objects.Sockets;
//using CryptoExchange.Net.Sockets;
//using Newtonsoft.Json;
//using Newtonsoft.Json.Linq;
//using System;
//using System.Collections.Generic;
//using System.IO;
//using System.Linq;
//using System.Net.WebSockets;
//using System.Text;
namespace CryptoExchange.Net.Converters
{
/// <summary>
/// Socket message converter
/// </summary>
public abstract class SocketConverter
{
private static JsonSerializer _serializer = JsonSerializer.Create(SerializerOptions.WithConverters);
//namespace CryptoExchange.Net.Converters
//{
// /// <summary>
// /// Socket message converter
// /// </summary>
// public abstract class SocketConverter
// {
public abstract MessageInterpreterPipeline InterpreterPipeline { get; }
// public abstract MessageInterpreterPipeline InterpreterPipeline { get; }
/// <inheritdoc />
public BaseParsedMessage? ReadJson(WebSocketMessageType websocketMessageType, Stream stream, SocketListenerManager listenerManager, bool outputOriginalData)
{
// Start reading the data
// Once we reach the properties that identify the message we save those in a dict
// Once all id properties have been read callback to see what the deserialization type should be
// Deserialize to the correct type
// /// <inheritdoc />
// public BaseParsedMessage? ReadJson(WebSocketMessageType websocketMessageType, Stream stream, SocketListenerManager listenerManager, bool outputOriginalData)
// {
// // Start reading the data
// // Once we reach the properties that identify the message we save those in a dict
// // Once all id properties have been read callback to see what the deserialization type should be
// // Deserialize to the correct type
if (InterpreterPipeline.PreProcessCallback != null)
stream = InterpreterPipeline.PreProcessCallback(websocketMessageType, stream);
// if (InterpreterPipeline.PreProcessCallback != null)
// stream = InterpreterPipeline.PreProcessCallback(websocketMessageType, stream);
using var sr = new StreamReader(stream, Encoding.UTF8, false, (int)stream.Length, true);
foreach (var callback in InterpreterPipeline.PreInspectCallbacks)
{
var result = callback.Callback(stream);
if (result.Matched)
{
var data = sr.ReadToEnd();
var messageType = typeof(ParsedMessage<>).MakeGenericType(typeof(string));
var preInstance = (BaseParsedMessage)Activator.CreateInstance(messageType, data);
if (outputOriginalData)
{
stream.Position = 0;
preInstance.OriginalData = data;
}
// var accessor = new JTokenAccessor(stream);
// if (accessor == null)
// return null;
preInstance.StreamIdentifier = result.StreamIdentifier;
preInstance.TypeIdentifier = result.TypeIdentifier;
preInstance.Parsed = true;
return preInstance;
}
}
// var streamIdentity = InterpreterPipeline.GetStreamIdentifier(accessor);
// if (streamIdentity == null)
// return null;
using var jsonTextReader = new JsonTextReader(sr);
JToken token;
try
{
token = JToken.Load(jsonTextReader);
}
catch(Exception ex)
{
// Not a json message
return null;
}
// var typeIdentity = InterpreterPipeline.GetTypeIdentifier(accessor);
// var typeResult = listenerManager.IdToType(streamIdentity, typeIdentity);
// if (typeResult == null)
// return null;
var accessor = new JTokenAccessor(token);
// var idInstance = accessor.Instantiate(typeResult);
// if (outputOriginalData)
// idInstance.OriginalData = idInstance.OriginalData;
if (InterpreterPipeline.GetIdentity != null)
{
var (streamIdentity, typeIdentity) = InterpreterPipeline.GetIdentity(accessor);
if (streamIdentity != null)
{
var result = listenerManager.IdToType(streamIdentity, typeIdentity);
if (result != null)
{
var idInstance = InterpreterPipeline.ObjectInitializer(token, result!);
if (outputOriginalData)
{
stream.Position = 0;
idInstance.OriginalData = sr.ReadToEnd();
}
idInstance.StreamIdentifier = streamIdentity;
idInstance.TypeIdentifier = typeIdentity;
idInstance.Parsed = true;
return idInstance;
}
else
{
}
}
else
{
// Message not identified
// TODO return
}
}
PostInspectResult? inspectResult = null;
object? usedParser = null;
//if (token.Type == JTokenType.Object)
//{
// foreach (var callback in InterpreterPipeline.PostInspectCallbacks.OfType<PostInspectCallback>())
// {
// bool allFieldsPresent = true;
// foreach (var field in callback.TypeFields)
// {
// var value = accessor.GetStringValue(field.Key);
// if (value == null)
// {
// if (field.Required)
// {
// allFieldsPresent = false;
// break;
// }
// }
// }
// if (allFieldsPresent)
// {
// inspectResult = callback.Callback(accessor, processors);
// usedParser = callback;
// if (inspectResult.Type != null)
// break;
// }
// }
//}
//else
//{
// foreach (var callback in InterpreterPipeline.PostInspectCallbacks.OfType<PostInspectArrayCallback>())
// {
// var typeIdArrayDict = new Dictionary<int, string>();
// bool allFieldsPresent = true;
// var maxIndex = callback.TypeFields.Max();
// if (((JArray)token).Count <= maxIndex)
// continue;
// foreach (var field in callback.TypeFields)
// {
// var value = token[field];
// if (value == null)
// {
// allFieldsPresent = false;
// break;
// }
// typeIdArrayDict[field] = value.ToString();
// }
// if (allFieldsPresent)
// {
// inspectResult = callback.Callback(typeIdArrayDict, processors);
// usedParser = callback;
// break;
// }
// }
//}
if (usedParser == null)
{
//throw new Exception("No parser found for message");
return null;
}
BaseParsedMessage instance;
if (inspectResult.Type != null)
instance = InterpreterPipeline.ObjectInitializer(token, inspectResult.Type);
else
instance = new ParsedMessage<object>(null);
if (outputOriginalData)
{
stream.Position = 0;
instance.OriginalData = sr.ReadToEnd();
}
instance.StreamIdentifier = inspectResult.StreamIdentifier;
instance.TypeIdentifier = inspectResult.TypeIdentifier;
instance.Parsed = inspectResult.Type != null;
return instance;
}
public static BaseParsedMessage InstantiateMessageObject(JToken token, Type type)
{
var resultMessageType = typeof(ParsedMessage<>).MakeGenericType(type);
var instance = (BaseParsedMessage)Activator.CreateInstance(resultMessageType, type == null ? null : token.ToObject(type, _serializer));
return instance;
}
}
}
// idInstance.StreamIdentifier = streamIdentity;
// idInstance.TypeIdentifier = typeIdentity;
// idInstance.Parsed = true;
// return idInstance;
// }
// }
//}

View File

@ -1,4 +1,5 @@
using System;
using CryptoExchange.Net.Objects.Sockets;
using System;
using System.Collections.Generic;
using System.Text;
@ -13,5 +14,7 @@ namespace CryptoExchange.Net.Interfaces
public int? GetCount(string key);
public int? GetArrayIntValue(string? key, int index);
public string? GetArrayStringValue(string? key, int index);
public BaseParsedMessage Instantiate(Type type);
}
}

View File

@ -11,8 +11,8 @@ namespace CryptoExchange.Net.Interfaces
public interface IMessageProcessor
{
public int Id { get; }
public List<string> Identifiers { get; }
public List<string> StreamIdentifiers { get; }
Task<CallResult> HandleMessageAsync(SocketConnection connection, DataEvent<BaseParsedMessage> message);
public Func<string, Type> ExpectedTypeDelegate { get; }
Dictionary<string, Type> TypeMapping { get; }
}
}

View File

@ -17,7 +17,7 @@ namespace CryptoExchange.Net.Interfaces
/// <summary>
/// Websocket closed event
/// </summary>
event Action OnClose;
event Func<Task> OnClose;
/// <summary>
/// Websocket message received event
/// </summary>
@ -25,23 +25,23 @@ namespace CryptoExchange.Net.Interfaces
/// <summary>
/// Websocket sent event, RequestId as parameter
/// </summary>
event Action<int> OnRequestSent;
event Func<int, Task> OnRequestSent;
/// <summary>
/// Websocket error event
/// </summary>
event Action<Exception> OnError;
event Func<Exception, Task> OnError;
/// <summary>
/// Websocket opened event
/// </summary>
event Action OnOpen;
event Func<Task> OnOpen;
/// <summary>
/// Websocket has lost connection to the server and is attempting to reconnect
/// </summary>
event Action OnReconnecting;
event Func<Task> OnReconnecting;
/// <summary>
/// Websocket has reconnected to the server
/// </summary>
event Action OnReconnected;
event Func<Task> OnReconnected;
/// <summary>
/// Get reconntion url
/// </summary>

View File

@ -13,52 +13,7 @@ namespace CryptoExchange.Net.Objects.Sockets
public class MessageInterpreterPipeline
{
public Func<WebSocketMessageType, Stream, Stream>? PreProcessCallback { get; set; }
public List<PreInspectCallback> PreInspectCallbacks { get; set; } = new List<PreInspectCallback>();
public Func<IMessageAccessor, (string?, string?)> GetIdentity { get; set; }
public List<object> PostInspectCallbacks { get; set; } = new List<object>();
public Func<JToken, Type, BaseParsedMessage> ObjectInitializer { get; set; } = SocketConverter.InstantiateMessageObject;
}
public class PreInspectCallback
{
public Func<Stream, PreInspectResult> Callback { get; set; }
}
public class PostInspectCallback
{
public List<TypeField> TypeFields { get; set; } = new List<TypeField>();
public Func<IMessageAccessor, Dictionary<string, Type>, PostInspectResult> Callback { get; set; }
}
public class TypeField
{
public string Key { get; set; }
public bool Required { get; set; }
public TypeField(string key, bool required = true)
{
Key = key;
Required = required;
}
}
public class PostInspectArrayCallback
{
public List<int> TypeFields { get; set; } = new List<int>();
public Func<IMessageAccessor, SocketListenerManager, PostInspectResult> Callback { get; set; }
}
public class PreInspectResult
{
public bool Matched { get; set; }
public string StreamIdentifier { get; set; }
public string TypeIdentifier { get; set; }
}
public class PostInspectResult
{
public Type? Type { get; set; }
public string StreamIdentifier { get; set; }
public string TypeIdentifier { get; set; }
public Func<IMessageAccessor, string?> GetStreamIdentifier { get; set; }
public Func<IMessageAccessor, string?> GetTypeIdentifier { get; set; }
}
}

View File

@ -98,25 +98,25 @@ namespace CryptoExchange.Net.Sockets
}
/// <inheritdoc />
public event Action? OnClose;
public event Func<Task>? OnClose;
/// <inheritdoc />
public event Func<WebSocketMessageType, Stream, Task>? OnStreamMessage;
/// <inheritdoc />
public event Action<int>? OnRequestSent;
public event Func<int, Task>? OnRequestSent;
/// <inheritdoc />
public event Action<Exception>? OnError;
public event Func<Exception, Task>? OnError;
/// <inheritdoc />
public event Action? OnOpen;
public event Func<Task>? OnOpen;
/// <inheritdoc />
public event Action? OnReconnecting;
public event Func<Task>? OnReconnecting;
/// <inheritdoc />
public event Action? OnReconnected;
public event Func<Task>? OnReconnected;
/// <inheritdoc />
public Func<Task<Uri?>>? GetReconnectionUrl { get; set; }
@ -147,7 +147,7 @@ namespace CryptoExchange.Net.Sockets
if (!await ConnectInternalAsync().ConfigureAwait(false))
return false;
OnOpen?.Invoke();
await (OnOpen?.Invoke() ?? Task.CompletedTask).ConfigureAwait(false);
_processTask = ProcessAsync();
return true;
}
@ -222,14 +222,14 @@ namespace CryptoExchange.Net.Sockets
if (!Parameters.AutoReconnect)
{
_processState = ProcessState.Idle;
OnClose?.Invoke();
await (OnClose?.Invoke() ?? Task.CompletedTask).ConfigureAwait(false);
return;
}
if (!_stopRequested)
{
_processState = ProcessState.Reconnecting;
OnReconnecting?.Invoke();
await (OnReconnecting?.Invoke() ?? Task.CompletedTask).ConfigureAwait(false);
}
var sinceLastReconnect = DateTime.UtcNow - _lastReconnectTime;
@ -263,7 +263,7 @@ namespace CryptoExchange.Net.Sockets
}
_lastReconnectTime = DateTime.UtcNow;
OnReconnected?.Invoke();
await (OnReconnected?.Invoke() ?? Task.CompletedTask).ConfigureAwait(false);
break;
}
}
@ -326,7 +326,7 @@ namespace CryptoExchange.Net.Sockets
await _closeTask.ConfigureAwait(false);
if(_processTask != null)
await _processTask.ConfigureAwait(false);
OnClose?.Invoke();
await (OnClose?.Invoke() ?? Task.CompletedTask).ConfigureAwait(false);
_logger.Log(LogLevel.Debug, $"Socket {Id} closed");
}
@ -423,7 +423,7 @@ namespace CryptoExchange.Net.Sockets
try
{
await _socket.SendAsync(new ArraySegment<byte>(data.Bytes, 0, data.Bytes.Length), WebSocketMessageType.Text, true, _ctsSource.Token).ConfigureAwait(false);
OnRequestSent?.Invoke(data.Id);
await (OnRequestSent?.Invoke(data.Id) ?? Task.CompletedTask).ConfigureAwait(false);
_logger.Log(LogLevel.Trace, $"Socket {Id} - msg {data.Id} - sent {data.Bytes.Length} bytes");
}
catch (OperationCanceledException)
@ -434,7 +434,7 @@ namespace CryptoExchange.Net.Sockets
catch (Exception ioe)
{
// Connection closed unexpectedly, .NET framework
OnError?.Invoke(ioe);
await (OnError?.Invoke(ioe) ?? Task.CompletedTask).ConfigureAwait(false);
if (_closeTask?.IsCompleted != false)
_closeTask = CloseInternalAsync();
break;
@ -448,7 +448,7 @@ namespace CryptoExchange.Net.Sockets
// any exception here will crash the send processing, but do so silently unless the socket get's stopped.
// Make sure we at least let the owner know there was an error
_logger.Log(LogLevel.Warning, $"Socket {Id} Send loop stopped with exception");
OnError?.Invoke(e);
await (OnError?.Invoke(e) ?? Task.CompletedTask).ConfigureAwait(false);
throw;
}
finally
@ -492,7 +492,7 @@ namespace CryptoExchange.Net.Sockets
catch (Exception wse)
{
// Connection closed unexpectedly
OnError?.Invoke(wse);
await (OnError?.Invoke(wse) ?? Task.CompletedTask).ConfigureAwait(false);
if (_closeTask?.IsCompleted != false)
_closeTask = CloseInternalAsync();
break;
@ -571,7 +571,7 @@ namespace CryptoExchange.Net.Sockets
// any exception here will crash the receive processing, but do so silently unless the socket gets stopped.
// Make sure we at least let the owner know there was an error
_logger.Log(LogLevel.Warning, $"Socket {Id} Receive loop stopped with exception");
OnError?.Invoke(e);
await (OnError?.Invoke(e) ?? Task.CompletedTask).ConfigureAwait(false);
throw;
}
finally
@ -627,7 +627,7 @@ namespace CryptoExchange.Net.Sockets
// Because this is running in a separate task and not awaited until the socket gets closed
// any exception here will stop the timeout checking, but do so silently unless the socket get's stopped.
// Make sure we at least let the owner know there was an error
OnError?.Invoke(e);
await (OnError?.Invoke(e) ?? Task.CompletedTask).ConfigureAwait(false);
throw;
}
}

View File

@ -29,7 +29,7 @@ namespace CryptoExchange.Net.Sockets
/// <summary>
/// Strings to identify this subscription with
/// </summary>
public abstract List<string> Identifiers { get; }
public abstract List<string> StreamIdentifiers { get; }
/// <summary>
/// The query request object
@ -46,7 +46,7 @@ namespace CryptoExchange.Net.Sockets
/// </summary>
public int Weight { get; }
public abstract Func<string, Type> ExpectedTypeDelegate { get; }
public abstract Dictionary<string, Type> TypeMapping { get; }
/// <summary>
/// ctor
@ -107,7 +107,10 @@ namespace CryptoExchange.Net.Sockets
/// <typeparam name="TResponse">Response object type</typeparam>
public abstract class Query<TResponse> : BaseQuery
{
public override Func<string, Type> ExpectedTypeDelegate => x => typeof(TResponse);
public override Dictionary<string, Type> TypeMapping => new Dictionary<string, Type>
{
{ "", typeof(TResponse) }
};
/// <summary>
/// The typed call result

View File

@ -11,6 +11,8 @@ using System.IO;
using CryptoExchange.Net.Objects.Sockets;
using System.Text;
using System.Diagnostics.CodeAnalysis;
using CryptoExchange.Net.Converters;
using System.Diagnostics;
namespace CryptoExchange.Net.Sockets
{
@ -176,12 +178,12 @@ namespace CryptoExchange.Net.Sockets
_socket = socket;
_socket.OnStreamMessage += HandleStreamMessage;
_socket.OnRequestSent += HandleRequestSent;
_socket.OnOpen += HandleOpen;
_socket.OnClose += HandleClose;
_socket.OnReconnecting += HandleReconnecting;
_socket.OnReconnected += HandleReconnected;
_socket.OnError += HandleError;
_socket.OnRequestSent += HandleRequestSentAsync;
_socket.OnOpen += HandleOpenAsync;
_socket.OnClose += HandleCloseAsync;
_socket.OnReconnecting += HandleReconnectingAsync;
_socket.OnReconnected += HandleReconnectedAsync;
_socket.OnError += HandleErrorAsync;
_socket.GetReconnectionUrl = GetReconnectionUrlAsync;
_listenerManager = new SocketListenerManager(_logger, SocketId);
@ -190,7 +192,7 @@ namespace CryptoExchange.Net.Sockets
/// <summary>
/// Handler for a socket opening
/// </summary>
protected virtual void HandleOpen()
protected virtual async Task HandleOpenAsync()
{
Status = SocketStatus.Connected;
PausedActivity = false;
@ -199,7 +201,7 @@ namespace CryptoExchange.Net.Sockets
/// <summary>
/// Handler for a socket closing without reconnect
/// </summary>
protected virtual void HandleClose()
protected virtual async Task HandleCloseAsync()
{
Status = SocketStatus.Closed;
Authenticated = false;
@ -219,7 +221,7 @@ namespace CryptoExchange.Net.Sockets
/// <summary>
/// Handler for a socket losing conenction and starting reconnect
/// </summary>
protected virtual void HandleReconnecting()
protected virtual async Task HandleReconnectingAsync()
{
Status = SocketStatus.Reconnecting;
DisconnectTime = DateTime.UtcNow;
@ -249,7 +251,7 @@ namespace CryptoExchange.Net.Sockets
/// <summary>
/// Handler for a socket which has reconnected
/// </summary>
protected virtual async void HandleReconnected()
protected virtual async Task HandleReconnectedAsync()
{
Status = SocketStatus.Resubscribing;
@ -258,6 +260,7 @@ namespace CryptoExchange.Net.Sockets
query.Fail("Connection interupted");
_listenerManager.Remove(query);
}
// Mark subscription as 'not confirmed', only map updates to them if confirmed. Don't await sub answer here
var reconnectSuccessful = await ProcessReconnectAsync().ConfigureAwait(false);
if (!reconnectSuccessful)
@ -280,7 +283,7 @@ namespace CryptoExchange.Net.Sockets
/// Handler for an error on a websocket
/// </summary>
/// <param name="e">The exception</param>
protected virtual void HandleError(Exception e)
protected virtual async Task HandleErrorAsync(Exception e)
{
if (e is WebSocketException wse)
_logger.Log(LogLevel.Warning, $"Socket {SocketId} error: Websocket error code {wse.WebSocketErrorCode}, details: " + e.ToLogString());
@ -292,7 +295,7 @@ namespace CryptoExchange.Net.Sockets
/// Handler for whenever a request is sent over the websocket
/// </summary>
/// <param name="requestId">Id of the request sent</param>
protected virtual void HandleRequestSent(int requestId)
protected virtual async Task HandleRequestSentAsync(int requestId)
{
var query = _listenerManager.GetById<BaseQuery>(requestId);
if (query == null)
@ -312,8 +315,13 @@ namespace CryptoExchange.Net.Sockets
/// <returns></returns>
protected virtual async Task HandleStreamMessage(WebSocketMessageType type, Stream stream)
{
var result = ApiClient.StreamConverter.ReadJson(type, stream, _listenerManager, ApiClient.ApiOptions.OutputOriginalData ?? ApiClient.ClientOptions.OutputOriginalData);
if(result == null)
var buffer2 = new byte[stream.Length];
stream.Position = 0;
stream.Read(buffer2, 0, buffer2.Length);
Debug.WriteLine("0 " + Encoding.UTF8.GetString(buffer2));
stream.Position = 0;
var result = ReadJson(type, stream);
if (result == null)
{
// Not able to parse at all
var buffer = new byte[stream.Length];
@ -347,7 +355,46 @@ namespace CryptoExchange.Net.Sockets
}
stream.Dispose();
}
}
/// <summary>
/// Read a message from stream
/// </summary>
/// <param name="websocketMessageType"></param>
/// <param name="stream"></param>
/// <returns></returns>
protected virtual BaseParsedMessage? ReadJson(WebSocketMessageType websocketMessageType, Stream stream)
{
// Start reading the data
// Once we reach the properties that identify the message we save those in a dict
// Once all id properties have been read callback to see what the deserialization type should be
// Deserialize to the correct type
if (ApiClient.Pipeline.PreProcessCallback != null)
stream = ApiClient.Pipeline.PreProcessCallback(websocketMessageType, stream);
var accessor = new JTokenAccessor(stream);
if (accessor == null)
return null;
var streamIdentity = ApiClient.Pipeline.GetStreamIdentifier(accessor);
if (streamIdentity == null)
return null;
var typeIdentity = ApiClient.Pipeline.GetTypeIdentifier(accessor);
var typeResult = _listenerManager.IdToType(streamIdentity, typeIdentity);
if (typeResult == null)
return null;
var idInstance = accessor.Instantiate(typeResult);
if (ApiClient.ApiOptions.OutputOriginalData ?? ApiClient.ClientOptions.OutputOriginalData)
idInstance.OriginalData = idInstance.OriginalData;
idInstance.StreamIdentifier = streamIdentity;
idInstance.TypeIdentifier = typeIdentity;
idInstance.Parsed = true;
return idInstance;
}
/// <summary>
/// Connect the websocket
@ -619,8 +666,12 @@ namespace CryptoExchange.Net.Sockets
continue;
taskList.Add(SendAndWaitQueryAsync(subQuery).ContinueWith((x) =>
{
{
Debug.WriteLine("1");
subscription.HandleSubQueryResponse(subQuery.Response);
Debug.WriteLine("2");
_listenerManager.Reset(subscription);
Debug.WriteLine("3");
return x.Result;
}));
}

View File

@ -14,15 +14,15 @@ namespace CryptoExchange.Net.Sockets
private ILogger _logger;
private int _socketId;
private object _lock = new object();
private Dictionary<int, IMessageProcessor> _idMap;
private Dictionary<string, Func<string, Type>> _typeMap;
//private Dictionary<int, IMessageProcessor> _idMap;
//private Dictionary<string, Dictionary<string, Type>> _typeMap;
private Dictionary<string, List<IMessageProcessor>> _listeners;
public SocketListenerManager(ILogger logger, int socketId)
{
_idMap = new Dictionary<int, IMessageProcessor>();
//_idMap = new Dictionary<int, IMessageProcessor>();
_listeners = new Dictionary<string, List<IMessageProcessor>>();
_typeMap = new Dictionary<string, Func<string, Type>>();
//_typeMap = new Dictionary<string, Dictionary<string, Type>>();
_logger = logger;
_socketId = socketId;
}
@ -31,8 +31,12 @@ namespace CryptoExchange.Net.Sockets
{
lock (_lock)
{
_typeMap.TryGetValue(streamIdentifier, out var typeDelegate);
return typeDelegate?.Invoke(typeIdentifier);
_listeners.TryGetValue(streamIdentifier, out var listeners);
if (listeners == null)
return null;
listeners.First().TypeMapping.TryGetValue(typeIdentifier ?? "", out var type);
return type;
}
}
@ -46,10 +50,9 @@ namespace CryptoExchange.Net.Sockets
{
lock (_lock)
{
_idMap.Add(processor.Id, processor);
if (processor.Identifiers?.Any() == true)
if (processor.StreamIdentifiers?.Any() == true)
{
foreach (var identifier in processor.Identifiers)
foreach (var identifier in processor.StreamIdentifiers)
{
if (!_listeners.TryGetValue(identifier, out var list))
{
@ -61,7 +64,16 @@ namespace CryptoExchange.Net.Sockets
}
}
UpdateMap();
}
}
public void Reset(IMessageProcessor processor)
{
lock (_lock)
{
Debug.WriteLine("4 Resetting");
Remove(processor);
Add(processor);
}
}
@ -116,7 +128,7 @@ namespace CryptoExchange.Net.Sockets
{
lock (_lock)
{
_idMap.TryGetValue(id, out var val);
var val = _listeners.Values.SelectMany(x => x).FirstOrDefault(x => x.Id == id);
return (T)val;
}
}
@ -143,24 +155,16 @@ namespace CryptoExchange.Net.Sockets
{
lock (_lock)
{
_idMap.Remove(processor.Id);
if (processor.Identifiers?.Any() == true)
{
foreach (var identifier in processor.Identifiers)
{
_listeners[identifier].Remove(processor);
if (!_listeners[identifier].Any())
_listeners.Remove(identifier);
}
}
if (processor.StreamIdentifiers?.Any() != true)
return;
UpdateMap();
foreach(var kv in _listeners)
{
if (kv.Value.Contains(processor))
kv.Value.Remove(processor);
}
}
}
private void UpdateMap()
{
_typeMap = _listeners.ToDictionary(x => x.Key, x => x.Value.First().ExpectedTypeDelegate);
}
}
}

View File

@ -57,7 +57,7 @@ namespace CryptoExchange.Net.Sockets
/// <summary>
/// Strings to identify this subscription with
/// </summary>
public abstract List<string> Identifiers { get; }
public abstract List<string> StreamIdentifiers { get; }
/// <summary>
/// Cancellation token registration
@ -69,7 +69,7 @@ namespace CryptoExchange.Net.Sockets
/// </summary>
public event Action<Exception>? Exception;
public abstract Func<string, Type> ExpectedTypeDelegate { get; }
public abstract Dictionary<string, Type> TypeMapping { get; }
/// <summary>
/// ctor

View File

@ -2,6 +2,7 @@
using CryptoExchange.Net.Objects.Sockets;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Sockets
@ -29,7 +30,11 @@ namespace CryptoExchange.Net.Sockets
public abstract class SystemSubscription<T> : SystemSubscription
{
public override Func<string, Type> ExpectedTypeDelegate => (x) => typeof(T);
public override Dictionary<string, Type> TypeMapping => new Dictionary<string, Type>
{
{ "", typeof(T) }
};
public override Task<CallResult> DoHandleMessageAsync(SocketConnection connection, DataEvent<BaseParsedMessage> message)
=> HandleMessageAsync(connection, message.As((ParsedMessage<T>)message.Data));