mirror of
https://github.com/JKorf/CryptoExchange.Net
synced 2025-06-10 09:26:22 +00:00
wip
This commit is contained in:
parent
b057974cd0
commit
e3207033c3
@ -10,6 +10,7 @@ using System.Collections.Concurrent;
|
||||
using System.Collections.Generic;
|
||||
using System.IO;
|
||||
using System.Linq;
|
||||
using System.Net.WebSockets;
|
||||
using System.Text;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
@ -103,15 +104,12 @@ namespace CryptoExchange.Net
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// <inheritdoc />
|
||||
public new SocketExchangeOptions ClientOptions => (SocketExchangeOptions)base.ClientOptions;
|
||||
|
||||
/// <inheritdoc />
|
||||
public new SocketApiOptions ApiOptions => (SocketApiOptions)base.ApiOptions;
|
||||
|
||||
/// <inheritdoc />
|
||||
public abstract MessageInterpreterPipeline Pipeline { get; }
|
||||
#endregion
|
||||
|
||||
/// <summary>
|
||||
@ -135,19 +133,9 @@ namespace CryptoExchange.Net
|
||||
RateLimiters = rateLimiters;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Set a delegate which can manipulate the message stream before it is processed by listeners
|
||||
/// </summary>
|
||||
/// <param name="interceptor">Interceptor</param>
|
||||
protected void SetInterceptor(Func<Stream, Stream> interceptor)
|
||||
{
|
||||
this.interceptor = interceptor;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Connect to an url and listen for data on the BaseAddress
|
||||
/// </summary>
|
||||
/// <typeparam name="T">The type of the expected data</typeparam>
|
||||
/// <param name="subscription">The subscription</param>
|
||||
/// <param name="ct">Cancellation token for closing this subscription</param>
|
||||
/// <returns></returns>
|
||||
@ -159,7 +147,6 @@ namespace CryptoExchange.Net
|
||||
/// <summary>
|
||||
/// Connect to an url and listen for data
|
||||
/// </summary>
|
||||
/// <typeparam name="T">The type of the expected data</typeparam>
|
||||
/// <param name="url">The URL to connect to</param>
|
||||
/// <param name="subscription">The subscription</param>
|
||||
/// <param name="ct">Cancellation token for closing this subscription</param>
|
||||
@ -247,7 +234,7 @@ namespace CryptoExchange.Net
|
||||
return new CallResult<UpdateSubscription>(subResult.Error!);
|
||||
}
|
||||
|
||||
subscription.HandleSubQueryResponse(subQuery.Response);
|
||||
subscription.HandleSubQueryResponse(subQuery.Response!);
|
||||
}
|
||||
|
||||
subscription.Confirmed = true;
|
||||
@ -382,7 +369,7 @@ namespace CryptoExchange.Net
|
||||
/// Should return the request which can be used to authenticate a socket connection
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
protected internal virtual BaseQuery GetAuthenticationRequest() => throw new NotImplementedException();
|
||||
protected internal virtual Query GetAuthenticationRequest() => throw new NotImplementedException();
|
||||
|
||||
/// <summary>
|
||||
/// Adds a system subscription. Used for example to reply to ping requests
|
||||
@ -474,7 +461,7 @@ namespace CryptoExchange.Net
|
||||
/// Process an unhandled message
|
||||
/// </summary>
|
||||
/// <param name="message">The message that wasn't processed</param>
|
||||
protected virtual void HandleUnhandledMessage(BaseParsedMessage message)
|
||||
protected virtual void HandleUnhandledMessage(SocketMessage message)
|
||||
{
|
||||
}
|
||||
|
||||
@ -538,7 +525,7 @@ namespace CryptoExchange.Net
|
||||
/// <param name="interval">How often</param>
|
||||
/// <param name="queryDelegate">Method returning the query to send</param>
|
||||
/// <param name="callback">The callback for processing the response</param>
|
||||
protected virtual void QueryPeriodic(string identifier, TimeSpan interval, Func<SocketConnection, BaseQuery> queryDelegate, Action<CallResult>? callback)
|
||||
protected virtual void QueryPeriodic(string identifier, TimeSpan interval, Func<SocketConnection, Query> queryDelegate, Action<CallResult>? callback)
|
||||
{
|
||||
if (queryDelegate == null)
|
||||
throw new ArgumentNullException(nameof(queryDelegate));
|
||||
@ -686,7 +673,7 @@ namespace CryptoExchange.Net
|
||||
sb.AppendLine($" Id: {subscription.Id}");
|
||||
sb.AppendLine($" Confirmed: {subscription.Confirmed}");
|
||||
sb.AppendLine($" Invocations: {subscription.TotalInvocations}");
|
||||
sb.AppendLine($" Identifiers: [{string.Join(", ", subscription.StreamIdentifiers)}]");
|
||||
sb.AppendLine($" Identifiers: [{string.Join(", ", subscription.ListenerIdentifiers)}]");
|
||||
}
|
||||
}
|
||||
return sb.ToString();
|
||||
@ -708,5 +695,20 @@ namespace CryptoExchange.Net
|
||||
semaphoreSlim?.Dispose();
|
||||
base.Dispose();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Get the listener identifier for the message
|
||||
/// </summary>
|
||||
/// <param name="message"></param>
|
||||
/// <returns></returns>
|
||||
public abstract string GetListenerIdentifier(SocketMessage message);
|
||||
|
||||
/// <summary>
|
||||
/// Preprocess a stream message
|
||||
/// </summary>
|
||||
/// <param name="type"></param>
|
||||
/// <param name="stream"></param>
|
||||
/// <returns></returns>
|
||||
public virtual Stream PreprocessStreamMessage(WebSocketMessageType type, Stream stream) => stream;
|
||||
}
|
||||
}
|
||||
|
@ -1,158 +1,158 @@
|
||||
using CryptoExchange.Net.Interfaces;
|
||||
using CryptoExchange.Net.Objects.Sockets;
|
||||
using Newtonsoft.Json;
|
||||
using Newtonsoft.Json.Linq;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.IO;
|
||||
using System.Linq;
|
||||
using System.Runtime.InteropServices.ComTypes;
|
||||
using System.Text;
|
||||
//using CryptoExchange.Net.Interfaces;
|
||||
//using CryptoExchange.Net.Objects.Sockets;
|
||||
//using Newtonsoft.Json;
|
||||
//using Newtonsoft.Json.Linq;
|
||||
//using System;
|
||||
//using System.Collections.Generic;
|
||||
//using System.IO;
|
||||
//using System.Linq;
|
||||
//using System.Runtime.InteropServices.ComTypes;
|
||||
//using System.Text;
|
||||
|
||||
namespace CryptoExchange.Net.Converters
|
||||
{
|
||||
internal class JTokenAccessor : IMessageAccessor
|
||||
{
|
||||
private readonly JToken _token;
|
||||
private readonly Stream _stream;
|
||||
private readonly StreamReader _reader;
|
||||
private Dictionary<string, JToken?> _cache = new Dictionary<string, JToken?>();
|
||||
private static JsonSerializer _serializer = JsonSerializer.Create(SerializerOptions.WithConverters);
|
||||
//namespace CryptoExchange.Net.Converters
|
||||
//{
|
||||
// internal class JTokenAccessor : IMessageAccessor
|
||||
// {
|
||||
// private readonly JToken _token;
|
||||
// private readonly Stream _stream;
|
||||
// private readonly StreamReader _reader;
|
||||
// private Dictionary<string, JToken?> _cache = new Dictionary<string, JToken?>();
|
||||
// private static JsonSerializer _serializer = JsonSerializer.Create(SerializerOptions.WithConverters);
|
||||
|
||||
public JTokenAccessor(Stream stream)
|
||||
{
|
||||
_stream = stream;
|
||||
_reader = new StreamReader(stream, Encoding.UTF8, false, (int)stream.Length, true);
|
||||
using var jsonTextReader = new JsonTextReader(_reader);
|
||||
JToken token;
|
||||
try
|
||||
{
|
||||
_token = JToken.Load(jsonTextReader);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
// Not a json message
|
||||
throw;
|
||||
}
|
||||
}
|
||||
// public JTokenAccessor(Stream stream)
|
||||
// {
|
||||
// _stream = stream;
|
||||
// _reader = new StreamReader(stream, Encoding.UTF8, false, (int)stream.Length, true);
|
||||
// using var jsonTextReader = new JsonTextReader(_reader);
|
||||
// JToken token;
|
||||
// try
|
||||
// {
|
||||
// _token = JToken.Load(jsonTextReader);
|
||||
// }
|
||||
// catch (Exception ex)
|
||||
// {
|
||||
// // Not a json message
|
||||
// throw;
|
||||
// }
|
||||
// }
|
||||
|
||||
public BaseParsedMessage Instantiate(Type type)
|
||||
{
|
||||
var resultMessageType = typeof(ParsedMessage<>).MakeGenericType(type);
|
||||
var instance = (BaseParsedMessage)Activator.CreateInstance(resultMessageType, type == null ? null : _token.ToObject(type, _serializer));
|
||||
return instance;
|
||||
}
|
||||
// public BaseParsedMessage Instantiate(Type type)
|
||||
// {
|
||||
// var resultMessageType = typeof(ParsedMessage<>).MakeGenericType(type);
|
||||
// var instance = (BaseParsedMessage)Activator.CreateInstance(resultMessageType, type == null ? null : _token.ToObject(type, _serializer));
|
||||
// return instance;
|
||||
// }
|
||||
|
||||
public string GetOriginalString()
|
||||
{
|
||||
_stream.Position = 0;
|
||||
return _reader.ReadToEnd();
|
||||
}
|
||||
// public string GetOriginalString()
|
||||
// {
|
||||
// _stream.Position = 0;
|
||||
// return _reader.ReadToEnd();
|
||||
// }
|
||||
|
||||
public int? GetArrayIntValue(string? key, int index)
|
||||
{
|
||||
var accessToken = key == null ? _token : GetToken(key);
|
||||
if (accessToken == null || accessToken is not JArray arr)
|
||||
return null;
|
||||
return arr[index].Value<int>();
|
||||
}
|
||||
// public int? GetArrayIntValue(string? key, int index)
|
||||
// {
|
||||
// var accessToken = key == null ? _token : GetToken(key);
|
||||
// if (accessToken == null || accessToken is not JArray arr)
|
||||
// return null;
|
||||
// return arr[index].Value<int>();
|
||||
// }
|
||||
|
||||
public string? GetArrayStringValue(string? key, int index)
|
||||
{
|
||||
var accessToken = key == null ? _token : GetToken(key);
|
||||
if (accessToken == null || accessToken is not JArray arr)
|
||||
return null;
|
||||
// public string? GetArrayStringValue(string? key, int index)
|
||||
// {
|
||||
// var accessToken = key == null ? _token : GetToken(key);
|
||||
// if (accessToken == null || accessToken is not JArray arr)
|
||||
// return null;
|
||||
|
||||
if (arr.Count <= index)
|
||||
return null;
|
||||
// if (arr.Count <= index)
|
||||
// return null;
|
||||
|
||||
if (arr[index].Type != JTokenType.String)
|
||||
return null;
|
||||
// if (arr[index].Type != JTokenType.String)
|
||||
// return null;
|
||||
|
||||
return arr[index].Value<string>();
|
||||
}
|
||||
// return arr[index].Value<string>();
|
||||
// }
|
||||
|
||||
public int? GetCount(string key)
|
||||
{
|
||||
var accessToken = GetToken(key);
|
||||
return accessToken.Count();
|
||||
}
|
||||
// public int? GetCount(string key)
|
||||
// {
|
||||
// var accessToken = GetToken(key);
|
||||
// return accessToken.Count();
|
||||
// }
|
||||
|
||||
public int? GetIntValue(string key)
|
||||
{
|
||||
var accessToken = GetToken(key);
|
||||
return accessToken?.Value<int>();
|
||||
}
|
||||
// public int? GetIntValue(string key)
|
||||
// {
|
||||
// var accessToken = GetToken(key);
|
||||
// return accessToken?.Value<int>();
|
||||
// }
|
||||
|
||||
public string? GetStringValue(string key)
|
||||
{
|
||||
var accessToken = GetToken(key);
|
||||
if (accessToken?.Type == JTokenType.Object)
|
||||
return ((JObject)accessToken).Properties().First().Name;
|
||||
// public string? GetStringValue(string key)
|
||||
// {
|
||||
// var accessToken = GetToken(key);
|
||||
// if (accessToken?.Type == JTokenType.Object)
|
||||
// return ((JObject)accessToken).Properties().First().Name;
|
||||
|
||||
return accessToken?.ToString();
|
||||
}
|
||||
// return accessToken?.ToString();
|
||||
// }
|
||||
|
||||
public bool IsObject(string? key) => _token.Type == JTokenType.Object;
|
||||
public bool IsArray(IEnumerable<int> indexes)
|
||||
{
|
||||
var item = _token;
|
||||
foreach(var index in indexes)
|
||||
{
|
||||
if (item.Type != JTokenType.Array)
|
||||
return false;
|
||||
// public bool IsObject(string? key) => _token.Type == JTokenType.Object;
|
||||
// public bool IsArray(IEnumerable<int> indexes)
|
||||
// {
|
||||
// var item = _token;
|
||||
// foreach(var index in indexes)
|
||||
// {
|
||||
// if (item.Type != JTokenType.Array)
|
||||
// return false;
|
||||
|
||||
var arr = ((JArray)item);
|
||||
if (arr.Count <= index)
|
||||
return false;
|
||||
// var arr = ((JArray)item);
|
||||
// if (arr.Count <= index)
|
||||
// return false;
|
||||
|
||||
item = arr[index];
|
||||
}
|
||||
// item = arr[index];
|
||||
// }
|
||||
|
||||
return item.Type == JTokenType.Array;
|
||||
}
|
||||
// return item.Type == JTokenType.Array;
|
||||
// }
|
||||
|
||||
public bool IsEmptyArray(IEnumerable<int> indexes)
|
||||
{
|
||||
var item = _token;
|
||||
foreach (var index in indexes)
|
||||
{
|
||||
if (item.Type != JTokenType.Array)
|
||||
return false;
|
||||
// public bool IsEmptyArray(IEnumerable<int> indexes)
|
||||
// {
|
||||
// var item = _token;
|
||||
// foreach (var index in indexes)
|
||||
// {
|
||||
// if (item.Type != JTokenType.Array)
|
||||
// return false;
|
||||
|
||||
var arr = ((JArray)item);
|
||||
if (arr.Count <= index)
|
||||
return false;
|
||||
// var arr = ((JArray)item);
|
||||
// if (arr.Count <= index)
|
||||
// return false;
|
||||
|
||||
item = arr[index];
|
||||
}
|
||||
// item = arr[index];
|
||||
// }
|
||||
|
||||
return item.Type == JTokenType.Array && !item.HasValues;
|
||||
}
|
||||
// return item.Type == JTokenType.Array && !item.HasValues;
|
||||
// }
|
||||
|
||||
private JToken? GetToken(string key)
|
||||
{
|
||||
if (key == null)
|
||||
return _token;
|
||||
// private JToken? GetToken(string key)
|
||||
// {
|
||||
// if (key == null)
|
||||
// return _token;
|
||||
|
||||
if (_cache.TryGetValue(key, out var token))
|
||||
return token;
|
||||
// if (_cache.TryGetValue(key, out var token))
|
||||
// return token;
|
||||
|
||||
var splitTokens = key.Split(new char[] { ':' });
|
||||
var accessToken = _token;
|
||||
foreach (var splitToken in splitTokens)
|
||||
{
|
||||
if (accessToken.Type == JTokenType.Array)
|
||||
return null;
|
||||
// var splitTokens = key.Split(new char[] { ':' });
|
||||
// var accessToken = _token;
|
||||
// foreach (var splitToken in splitTokens)
|
||||
// {
|
||||
// if (accessToken.Type == JTokenType.Array)
|
||||
// return null;
|
||||
|
||||
accessToken = accessToken[splitToken];
|
||||
// accessToken = accessToken[splitToken];
|
||||
|
||||
if (accessToken == null)
|
||||
break;
|
||||
}
|
||||
// if (accessToken == null)
|
||||
// break;
|
||||
// }
|
||||
|
||||
_cache.Add(key, accessToken);
|
||||
return accessToken;
|
||||
}
|
||||
}
|
||||
}
|
||||
// _cache.Add(key, accessToken);
|
||||
// return accessToken;
|
||||
// }
|
||||
// }
|
||||
//}
|
||||
|
@ -1,21 +0,0 @@
|
||||
using CryptoExchange.Net.Objects.Sockets;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Text;
|
||||
|
||||
namespace CryptoExchange.Net.Interfaces
|
||||
{
|
||||
public interface IMessageAccessor
|
||||
{
|
||||
bool IsObject(string? key);
|
||||
bool IsArray(IEnumerable<int> indexes);
|
||||
bool IsEmptyArray(IEnumerable<int> indexes);
|
||||
string? GetStringValue(string key);
|
||||
int? GetIntValue(string key);
|
||||
public int? GetCount(string key);
|
||||
public int? GetArrayIntValue(string? key, int index);
|
||||
public string? GetArrayStringValue(string? key, int index);
|
||||
|
||||
public BaseParsedMessage Instantiate(Type type);
|
||||
}
|
||||
}
|
@ -8,11 +8,31 @@ using System.Threading.Tasks;
|
||||
|
||||
namespace CryptoExchange.Net.Interfaces
|
||||
{
|
||||
/// <summary>
|
||||
/// Message processor
|
||||
/// </summary>
|
||||
public interface IMessageProcessor
|
||||
{
|
||||
/// <summary>
|
||||
/// Id of the processor
|
||||
/// </summary>
|
||||
public int Id { get; }
|
||||
public List<string> StreamIdentifiers { get; }
|
||||
Task<CallResult> HandleMessageAsync(SocketConnection connection, DataEvent<BaseParsedMessage> message);
|
||||
Dictionary<string, Type> TypeMapping { get; }
|
||||
/// <summary>
|
||||
/// The identifiers for this processor
|
||||
/// </summary>
|
||||
public HashSet<string> ListenerIdentifiers { get; }
|
||||
/// <summary>
|
||||
/// Handle a message
|
||||
/// </summary>
|
||||
/// <param name="connection"></param>
|
||||
/// <param name="message"></param>
|
||||
/// <returns></returns>
|
||||
Task<CallResult> HandleAsync(SocketConnection connection, DataEvent<object> message);
|
||||
/// <summary>
|
||||
/// Get the type the message should be deserialized to
|
||||
/// </summary>
|
||||
/// <param name="message"></param>
|
||||
/// <returns></returns>
|
||||
Type? GetMessageType(SocketMessage message);
|
||||
}
|
||||
}
|
||||
|
@ -1,13 +0,0 @@
|
||||
//using CryptoExchange.Net.Converters;
|
||||
//using CryptoExchange.Net.Objects.Sockets;
|
||||
//using System.Threading.Tasks;
|
||||
|
||||
//namespace CryptoExchange.Net.Interfaces
|
||||
//{
|
||||
// internal interface IStreamMessageListener
|
||||
// {
|
||||
// int Priority { get; }
|
||||
// bool MessageMatches(ParsedMessage message);
|
||||
// Task ProcessAsync(ParsedMessage message);
|
||||
// }
|
||||
//}
|
@ -1,81 +0,0 @@
|
||||
using System;
|
||||
using System.Collections;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Text;
|
||||
using static System.Collections.Specialized.BitVector32;
|
||||
|
||||
namespace CryptoExchange.Net.Objects
|
||||
{
|
||||
public class ConcurrentList<T> : IEnumerable<T>
|
||||
{
|
||||
private readonly object _lock = new object();
|
||||
private readonly List<T> _collection = new List<T>();
|
||||
|
||||
public void Add(T item)
|
||||
{
|
||||
lock (_lock)
|
||||
_collection.Add(item);
|
||||
}
|
||||
|
||||
|
||||
public void Remove(T item)
|
||||
{
|
||||
lock (_lock)
|
||||
_collection.Remove(item);
|
||||
}
|
||||
|
||||
public T? SingleOrDefault(Func<T, bool> action)
|
||||
{
|
||||
lock (_lock)
|
||||
return _collection.SingleOrDefault(action);
|
||||
}
|
||||
|
||||
public bool All(Func<T, bool> action)
|
||||
{
|
||||
lock (_lock)
|
||||
return _collection.All(action);
|
||||
}
|
||||
|
||||
public bool Any(Func<T, bool> action)
|
||||
{
|
||||
lock (_lock)
|
||||
return _collection.Any(action);
|
||||
}
|
||||
|
||||
public int Count(Func<T, bool> action)
|
||||
{
|
||||
lock (_lock)
|
||||
return _collection.Count(action);
|
||||
}
|
||||
|
||||
public bool Contains(T item)
|
||||
{
|
||||
lock (_lock)
|
||||
return _collection.Contains(item);
|
||||
}
|
||||
|
||||
public T[] ToArray(Func<T, bool> predicate)
|
||||
{
|
||||
lock (_lock)
|
||||
return _collection.Where(predicate).ToArray();
|
||||
}
|
||||
|
||||
public List<T> ToList()
|
||||
{
|
||||
lock (_lock)
|
||||
return _collection.ToList();
|
||||
}
|
||||
|
||||
public IEnumerator<T> GetEnumerator()
|
||||
{
|
||||
lock (_lock)
|
||||
{
|
||||
foreach (var item in _collection)
|
||||
yield return item;
|
||||
}
|
||||
}
|
||||
|
||||
IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
|
||||
}
|
||||
}
|
@ -1,21 +0,0 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Text;
|
||||
|
||||
namespace CryptoExchange.Net.Objects.Sockets
|
||||
{
|
||||
public interface IMatchingStrategy
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
internal class IdMatchingStrategy : IMatchingStrategy
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
internal class FieldsMatchingStrategy : IMatchingStrategy
|
||||
{
|
||||
|
||||
}
|
||||
}
|
@ -1,54 +1,54 @@
|
||||
namespace CryptoExchange.Net.Objects.Sockets
|
||||
{
|
||||
/// <summary>
|
||||
/// Parsed message object
|
||||
/// </summary>
|
||||
public abstract class BaseParsedMessage
|
||||
{
|
||||
/// <summary>
|
||||
/// Stream identifier string
|
||||
/// </summary>
|
||||
public string StreamIdentifier { get; set; } = null!;
|
||||
/// <summary>
|
||||
/// Type identifier string
|
||||
/// </summary>
|
||||
public string TypeIdentifier { get; set; } = null!;
|
||||
/// <summary>
|
||||
/// Original data if the option is enabled
|
||||
/// </summary>
|
||||
public string? OriginalData { get; set; }
|
||||
/// <summary>
|
||||
/// If parsed
|
||||
/// </summary>
|
||||
public bool Parsed { get; set; }
|
||||
//namespace CryptoExchange.Net.Objects.Sockets
|
||||
//{
|
||||
// /// <summary>
|
||||
// /// Parsed message object
|
||||
// /// </summary>
|
||||
// public abstract class BaseParsedMessage
|
||||
// {
|
||||
// /// <summary>
|
||||
// /// Stream identifier string
|
||||
// /// </summary>
|
||||
// public string StreamIdentifier { get; set; } = null!;
|
||||
// /// <summary>
|
||||
// /// Type identifier string
|
||||
// /// </summary>
|
||||
// public string TypeIdentifier { get; set; } = null!;
|
||||
// /// <summary>
|
||||
// /// Original data if the option is enabled
|
||||
// /// </summary>
|
||||
// public string? OriginalData { get; set; }
|
||||
// /// <summary>
|
||||
// /// If parsed
|
||||
// /// </summary>
|
||||
// public bool Parsed { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Get the data object
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
public abstract object Data { get; }
|
||||
}
|
||||
// /// <summary>
|
||||
// /// Get the data object
|
||||
// /// </summary>
|
||||
// /// <returns></returns>
|
||||
// public abstract object Data { get; }
|
||||
// }
|
||||
|
||||
/// <summary>
|
||||
/// Parsed message object
|
||||
/// </summary>
|
||||
/// <typeparam name="T">Data type</typeparam>
|
||||
public class ParsedMessage<T> : BaseParsedMessage
|
||||
{
|
||||
/// <summary>
|
||||
/// Parsed data object
|
||||
/// </summary>
|
||||
public override object? Data { get; }
|
||||
// /// <summary>
|
||||
// /// Parsed message object
|
||||
// /// </summary>
|
||||
// /// <typeparam name="T">Data type</typeparam>
|
||||
// public class ParsedMessage<T> : BaseParsedMessage
|
||||
// {
|
||||
// /// <summary>
|
||||
// /// Parsed data object
|
||||
// /// </summary>
|
||||
// public override object? Data { get; }
|
||||
|
||||
public T? TypedData => (T)Data;
|
||||
// public T? TypedData => (T)Data;
|
||||
|
||||
/// <summary>
|
||||
/// ctor
|
||||
/// </summary>
|
||||
/// <param name="data"></param>
|
||||
public ParsedMessage(T? data)
|
||||
{
|
||||
Data = data;
|
||||
}
|
||||
}
|
||||
}
|
||||
// /// <summary>
|
||||
// /// ctor
|
||||
// /// </summary>
|
||||
// /// <param name="data"></param>
|
||||
// public ParsedMessage(T? data)
|
||||
// {
|
||||
// Data = data;
|
||||
// }
|
||||
// }
|
||||
//}
|
||||
|
@ -1,19 +1,19 @@
|
||||
using CryptoExchange.Net.Converters;
|
||||
using CryptoExchange.Net.Interfaces;
|
||||
using CryptoExchange.Net.Sockets;
|
||||
using Newtonsoft.Json.Linq;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.IO;
|
||||
using System.Net.WebSockets;
|
||||
using System.Text;
|
||||
//using CryptoExchange.Net.Converters;
|
||||
//using CryptoExchange.Net.Interfaces;
|
||||
//using CryptoExchange.Net.Sockets;
|
||||
//using Newtonsoft.Json.Linq;
|
||||
//using System;
|
||||
//using System.Collections.Generic;
|
||||
//using System.IO;
|
||||
//using System.Net.WebSockets;
|
||||
//using System.Text;
|
||||
|
||||
namespace CryptoExchange.Net.Objects.Sockets
|
||||
{
|
||||
public class MessageInterpreterPipeline
|
||||
{
|
||||
public Func<WebSocketMessageType, Stream, Stream>? PreProcessCallback { get; set; }
|
||||
public Func<IMessageAccessor, string?> GetStreamIdentifier { get; set; }
|
||||
public Func<IMessageAccessor, string, string?> GetTypeIdentifier { get; set; } = (accessor, streamId) => streamId;
|
||||
}
|
||||
}
|
||||
//namespace CryptoExchange.Net.Objects.Sockets
|
||||
//{
|
||||
// public class MessageInterpreterPipeline
|
||||
// {
|
||||
// public Func<WebSocketMessageType, Stream, Stream>? PreProcessCallback { get; set; }
|
||||
// public Func<IMessageAccessor, string?> GetStreamIdentifier { get; set; }
|
||||
// public Func<IMessageAccessor, string, string?> GetTypeIdentifier { get; set; } = (accessor, streamId) => streamId;
|
||||
// }
|
||||
//}
|
||||
|
@ -1,39 +0,0 @@
|
||||
using CryptoExchange.Net.Interfaces;
|
||||
using CryptoExchange.Net.Objects.Sockets;
|
||||
using CryptoExchange.Net.Sockets;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Newtonsoft.Json;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.IO;
|
||||
using System.Text;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace CryptoExchange.Net.Objects.Testing
|
||||
{
|
||||
public class TestWebsocket : CryptoExchangeWebSocketClient
|
||||
{
|
||||
public TestWebsocket(ILogger logger, WebSocketParameters websocketParameters) : base(logger, websocketParameters)
|
||||
{
|
||||
}
|
||||
|
||||
public override bool IsClosed => false;
|
||||
public override bool IsOpen => true;
|
||||
|
||||
public override Task<bool> ConnectAsync() => Task.FromResult(true);
|
||||
|
||||
public override Task CloseAsync() => Task.CompletedTask;
|
||||
|
||||
public override Task ReconnectAsync() => Task.CompletedTask;
|
||||
|
||||
public override void Send(int id, string data, int weight) { }
|
||||
|
||||
public void Receive(string data)
|
||||
{
|
||||
var bytes = Encoding.UTF8.GetBytes(data);
|
||||
var stream = new MemoryStream(bytes);
|
||||
stream.Position = 0;
|
||||
_ = ProcessData(System.Net.WebSockets.WebSocketMessageType.Text, stream);
|
||||
}
|
||||
}
|
||||
}
|
@ -1,24 +0,0 @@
|
||||
using CryptoExchange.Net.Interfaces;
|
||||
using CryptoExchange.Net.Objects.Sockets;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Text;
|
||||
|
||||
namespace CryptoExchange.Net.Objects.Testing
|
||||
{
|
||||
public class TestWebsocketFactory : IWebsocketFactory
|
||||
{
|
||||
private readonly Func<ILogger, WebSocketParameters, IWebsocket> _websocketFactory;
|
||||
|
||||
public TestWebsocketFactory(Func<ILogger, WebSocketParameters, IWebsocket> websocketFactory)
|
||||
{
|
||||
_websocketFactory = websocketFactory;
|
||||
}
|
||||
|
||||
public IWebsocket CreateWebsocket(ILogger logger, WebSocketParameters parameters)
|
||||
{
|
||||
return _websocketFactory(logger, parameters);
|
||||
}
|
||||
}
|
||||
}
|
@ -278,7 +278,7 @@ namespace CryptoExchange.Net.Sockets
|
||||
return;
|
||||
|
||||
var bytes = Parameters.Encoding.GetBytes(data);
|
||||
_logger.Log(LogLevel.Trace, $"Socket {Id} - msg {id} - Adding {bytes.Length} bytes to send buffer");
|
||||
_logger.Log(LogLevel.Trace, $"Socket {Id} msg {id} - Adding {bytes.Length} bytes to send buffer");
|
||||
_sendBuffer.Enqueue(new SendItem { Id = id, Weight = weight, Bytes = bytes });
|
||||
_sendEvent.Set();
|
||||
}
|
||||
@ -415,7 +415,7 @@ namespace CryptoExchange.Net.Sockets
|
||||
if (limitResult.Success)
|
||||
{
|
||||
if (limitResult.Data > 0)
|
||||
_logger.Log(LogLevel.Debug, $"Socket {Id} - msg {data.Id} - send delayed {limitResult.Data}ms because of rate limit");
|
||||
_logger.Log(LogLevel.Debug, $"Socket {Id} msg {data.Id} - send delayed {limitResult.Data}ms because of rate limit");
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -424,7 +424,7 @@ namespace CryptoExchange.Net.Sockets
|
||||
{
|
||||
await _socket.SendAsync(new ArraySegment<byte>(data.Bytes, 0, data.Bytes.Length), WebSocketMessageType.Text, true, _ctsSource.Token).ConfigureAwait(false);
|
||||
await (OnRequestSent?.Invoke(data.Id) ?? Task.CompletedTask).ConfigureAwait(false);
|
||||
_logger.Log(LogLevel.Trace, $"Socket {Id} - msg {data.Id} - sent {data.Bytes.Length} bytes");
|
||||
_logger.Log(LogLevel.Trace, $"Socket {Id} msg {data.Id} - sent {data.Bytes.Length} bytes");
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
@ -447,13 +447,13 @@ namespace CryptoExchange.Net.Sockets
|
||||
// Because this is running in a separate task and not awaited until the socket gets closed
|
||||
// any exception here will crash the send processing, but do so silently unless the socket get's stopped.
|
||||
// Make sure we at least let the owner know there was an error
|
||||
_logger.Log(LogLevel.Warning, $"Socket {Id} Send loop stopped with exception");
|
||||
_logger.Log(LogLevel.Warning, $"Socket {Id} send loop stopped with exception");
|
||||
await (OnError?.Invoke(e) ?? Task.CompletedTask).ConfigureAwait(false);
|
||||
throw;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_logger.Log(LogLevel.Debug, $"Socket {Id} Send loop finished");
|
||||
_logger.Log(LogLevel.Debug, $"Socket {Id} send loop finished");
|
||||
}
|
||||
}
|
||||
|
||||
@ -570,22 +570,30 @@ namespace CryptoExchange.Net.Sockets
|
||||
// Because this is running in a separate task and not awaited until the socket gets closed
|
||||
// any exception here will crash the receive processing, but do so silently unless the socket gets stopped.
|
||||
// Make sure we at least let the owner know there was an error
|
||||
_logger.Log(LogLevel.Warning, $"Socket {Id} Receive loop stopped with exception");
|
||||
_logger.Log(LogLevel.Warning, $"Socket {Id} receive loop stopped with exception");
|
||||
await (OnError?.Invoke(e) ?? Task.CompletedTask).ConfigureAwait(false);
|
||||
throw;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_logger.Log(LogLevel.Debug, $"Socket {Id} Receive loop finished");
|
||||
_logger.Log(LogLevel.Debug, $"Socket {Id} receive loop finished");
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Proccess a stream message
|
||||
/// </summary>
|
||||
/// <param name="type"></param>
|
||||
/// <param name="stream"></param>
|
||||
/// <returns></returns>
|
||||
protected async Task ProcessData(WebSocketMessageType type, Stream stream)
|
||||
{
|
||||
LastActionTime = DateTime.UtcNow;
|
||||
stream.Position = 0;
|
||||
|
||||
if (Parameters.Interceptor != null)
|
||||
stream = Parameters.Interceptor.Invoke(stream);
|
||||
|
||||
if (OnStreamMessage != null)
|
||||
await OnStreamMessage.Invoke(type, stream).ConfigureAwait(false);
|
||||
}
|
||||
@ -596,7 +604,7 @@ namespace CryptoExchange.Net.Sockets
|
||||
/// <returns></returns>
|
||||
protected async Task CheckTimeoutAsync()
|
||||
{
|
||||
_logger.Log(LogLevel.Debug, $"Socket {Id} Starting task checking for no data received for {Parameters.Timeout}");
|
||||
_logger.Log(LogLevel.Debug, $"Socket {Id} starting task checking for no data received for {Parameters.Timeout}");
|
||||
LastActionTime = DateTime.UtcNow;
|
||||
try
|
||||
{
|
||||
@ -607,7 +615,7 @@ namespace CryptoExchange.Net.Sockets
|
||||
|
||||
if (DateTime.UtcNow - LastActionTime > Parameters.Timeout)
|
||||
{
|
||||
_logger.Log(LogLevel.Warning, $"Socket {Id} No data received for {Parameters.Timeout}, reconnecting socket");
|
||||
_logger.Log(LogLevel.Warning, $"Socket {Id} no data received for {Parameters.Timeout}, reconnecting socket");
|
||||
_ = ReconnectAsync().ConfigureAwait(false);
|
||||
return;
|
||||
}
|
||||
|
@ -0,0 +1,39 @@
|
||||
using System;
|
||||
|
||||
namespace CryptoExchange.Net.Sockets.MessageParsing.Interfaces
|
||||
{
|
||||
/// <summary>
|
||||
/// Message accessor
|
||||
/// </summary>
|
||||
public interface IMessageAccessor
|
||||
{
|
||||
/// <summary>
|
||||
/// Is this a json message
|
||||
/// </summary>
|
||||
bool IsJson { get; }
|
||||
/// <summary>
|
||||
/// Get the type of node
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
NodeType? GetNodeType();
|
||||
/// <summary>
|
||||
/// Get the type of node
|
||||
/// </summary>
|
||||
/// <param name="path">Access path</param>
|
||||
/// <returns></returns>
|
||||
NodeType? GetNodeType(MessagePath path);
|
||||
/// <summary>
|
||||
/// Get the value of a path
|
||||
/// </summary>
|
||||
/// <typeparam name="T"></typeparam>
|
||||
/// <param name="path"></param>
|
||||
/// <returns></returns>
|
||||
T? GetValue<T>(MessagePath path);
|
||||
/// <summary>
|
||||
/// Deserialize the message into this type
|
||||
/// </summary>
|
||||
/// <param name="type"></param>
|
||||
/// <returns></returns>
|
||||
object Deserialize(Type type);
|
||||
}
|
||||
}
|
132
CryptoExchange.Net/Sockets/MessageParsing/JsonNetMessageData.cs
Normal file
132
CryptoExchange.Net/Sockets/MessageParsing/JsonNetMessageData.cs
Normal file
@ -0,0 +1,132 @@
|
||||
using CryptoExchange.Net.Converters;
|
||||
using CryptoExchange.Net.Sockets.MessageParsing.Interfaces;
|
||||
using Newtonsoft.Json;
|
||||
using Newtonsoft.Json.Linq;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.IO;
|
||||
using System.Text;
|
||||
|
||||
namespace CryptoExchange.Net.Sockets.MessageParsing
|
||||
{
|
||||
/// <summary>
|
||||
/// Json.Net message accessor
|
||||
/// </summary>
|
||||
public class JsonNetMessageData : IMessageAccessor
|
||||
{
|
||||
private readonly JToken? _token;
|
||||
private readonly Stream _stream;
|
||||
private static JsonSerializer _serializer = JsonSerializer.Create(SerializerOptions.WithConverters);
|
||||
|
||||
/// <inheritdoc />
|
||||
public bool IsJson { get; private set; }
|
||||
|
||||
/// <summary>
|
||||
/// ctor
|
||||
/// </summary>
|
||||
/// <param name="stream"></param>
|
||||
public JsonNetMessageData(Stream stream)
|
||||
{
|
||||
_stream = stream;
|
||||
using var reader = new StreamReader(stream, Encoding.UTF8, false, (int)stream.Length, true);
|
||||
using var jsonTextReader = new JsonTextReader(reader);
|
||||
|
||||
try
|
||||
{
|
||||
_token = JToken.Load(jsonTextReader);
|
||||
IsJson = true;
|
||||
}
|
||||
catch (Exception)
|
||||
{
|
||||
// Not a json message
|
||||
IsJson = false;
|
||||
}
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public object Deserialize(Type type)
|
||||
{
|
||||
if (!IsJson)
|
||||
{
|
||||
var sr = new StreamReader(_stream);
|
||||
return sr.ReadToEnd();
|
||||
}
|
||||
|
||||
return _token!.ToObject(type, _serializer)!;
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public NodeType? GetNodeType()
|
||||
{
|
||||
if (_token == null)
|
||||
return null;
|
||||
|
||||
if (_token.Type == JTokenType.Object)
|
||||
return NodeType.Object;
|
||||
|
||||
if (_token.Type == JTokenType.Array)
|
||||
return NodeType.Array;
|
||||
|
||||
return NodeType.Value;
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public NodeType? GetNodeType(MessagePath path)
|
||||
{
|
||||
var node = GetPathNode(path);
|
||||
if (node == null)
|
||||
return null;
|
||||
|
||||
if (node.Type == JTokenType.Object)
|
||||
return NodeType.Object;
|
||||
|
||||
if (node.Type == JTokenType.Array)
|
||||
return NodeType.Array;
|
||||
|
||||
return NodeType.Value;
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public T? GetValue<T>(MessagePath path)
|
||||
{
|
||||
var value = GetPathNode(path);
|
||||
if (value == null)
|
||||
return default;
|
||||
|
||||
if (value.Type == JTokenType.Object || value.Type == JTokenType.Array)
|
||||
return default;
|
||||
|
||||
return value!.Value<T>();
|
||||
}
|
||||
|
||||
private JToken? GetPathNode(MessagePath path)
|
||||
{
|
||||
var currentToken = _token;
|
||||
foreach (var node in path)
|
||||
{
|
||||
if (node.Type)
|
||||
{
|
||||
// Int value
|
||||
var val = (int)node.Value;
|
||||
if (currentToken!.Type != JTokenType.Array || ((JArray)currentToken).Count <= val)
|
||||
return null;
|
||||
|
||||
currentToken = currentToken[val];
|
||||
}
|
||||
else
|
||||
{
|
||||
// String value
|
||||
if (currentToken!.Type != JTokenType.Object)
|
||||
return null;
|
||||
|
||||
currentToken = currentToken[(string)node.Value];
|
||||
}
|
||||
|
||||
if (currentToken == null)
|
||||
return null;
|
||||
}
|
||||
|
||||
return currentToken;
|
||||
}
|
||||
}
|
||||
}
|
38
CryptoExchange.Net/Sockets/MessageParsing/MessageNode.cs
Normal file
38
CryptoExchange.Net/Sockets/MessageParsing/MessageNode.cs
Normal file
@ -0,0 +1,38 @@
|
||||
namespace CryptoExchange.Net.Sockets.MessageParsing
|
||||
{
|
||||
/// <summary>
|
||||
/// Node accessor
|
||||
/// </summary>
|
||||
public struct NodeAccessor
|
||||
{
|
||||
/// <summary>
|
||||
/// Value
|
||||
/// </summary>
|
||||
public object Value { get; }
|
||||
/// <summary>
|
||||
/// Type (true = int, false = string)
|
||||
/// </summary>
|
||||
public bool Type { get; }
|
||||
|
||||
private NodeAccessor(object value, bool type)
|
||||
{
|
||||
Value = value;
|
||||
Type = type;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Create an int node accessor
|
||||
/// </summary>
|
||||
/// <param name="value"></param>
|
||||
/// <returns></returns>
|
||||
public static NodeAccessor Int(int value) { return new NodeAccessor(value, true); }
|
||||
|
||||
/// <summary>
|
||||
/// Create a string node accessor
|
||||
/// </summary>
|
||||
/// <param name="value"></param>
|
||||
/// <returns></returns>
|
||||
public static NodeAccessor String(string value) { return new NodeAccessor(value, false); }
|
||||
}
|
||||
|
||||
}
|
50
CryptoExchange.Net/Sockets/MessageParsing/MessagePath.cs
Normal file
50
CryptoExchange.Net/Sockets/MessageParsing/MessagePath.cs
Normal file
@ -0,0 +1,50 @@
|
||||
using System.Collections;
|
||||
using System.Collections.Generic;
|
||||
|
||||
namespace CryptoExchange.Net.Sockets.MessageParsing
|
||||
{
|
||||
/// <summary>
|
||||
/// Message access definition
|
||||
/// </summary>
|
||||
public struct MessagePath : IEnumerable<NodeAccessor>
|
||||
{
|
||||
private List<NodeAccessor> _path;
|
||||
|
||||
internal void Add(NodeAccessor node)
|
||||
{
|
||||
_path.Add(node);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// ctor
|
||||
/// </summary>
|
||||
public MessagePath()
|
||||
{
|
||||
_path = new List<NodeAccessor>();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Create a new message path
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
public static MessagePath Get()
|
||||
{
|
||||
return new MessagePath();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// IEnumerable implementation
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
public IEnumerator<NodeAccessor> GetEnumerator()
|
||||
{
|
||||
for (var i = 0; i < _path.Count; i++)
|
||||
yield return _path[i];
|
||||
}
|
||||
|
||||
IEnumerator IEnumerable.GetEnumerator()
|
||||
{
|
||||
return GetEnumerator();
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,32 @@
|
||||
namespace CryptoExchange.Net.Sockets.MessageParsing
|
||||
{
|
||||
/// <summary>
|
||||
/// Message path extension methods
|
||||
/// </summary>
|
||||
public static class MessagePathExtension
|
||||
{
|
||||
/// <summary>
|
||||
/// Add a string node accessor
|
||||
/// </summary>
|
||||
/// <param name="path"></param>
|
||||
/// <param name="propName"></param>
|
||||
/// <returns></returns>
|
||||
public static MessagePath Property(this MessagePath path, string propName)
|
||||
{
|
||||
path.Add(NodeAccessor.String(propName));
|
||||
return path;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Add a int node accessor
|
||||
/// </summary>
|
||||
/// <param name="path"></param>
|
||||
/// <param name="index"></param>
|
||||
/// <returns></returns>
|
||||
public static MessagePath Index(this MessagePath path, int index)
|
||||
{
|
||||
path.Add(NodeAccessor.Int(index));
|
||||
return path;
|
||||
}
|
||||
}
|
||||
}
|
21
CryptoExchange.Net/Sockets/MessageParsing/NodeType.cs
Normal file
21
CryptoExchange.Net/Sockets/MessageParsing/NodeType.cs
Normal file
@ -0,0 +1,21 @@
|
||||
namespace CryptoExchange.Net.Sockets.MessageParsing
|
||||
{
|
||||
/// <summary>
|
||||
/// Message node type
|
||||
/// </summary>
|
||||
public enum NodeType
|
||||
{
|
||||
/// <summary>
|
||||
/// Array node
|
||||
/// </summary>
|
||||
Array,
|
||||
/// <summary>
|
||||
/// Object node
|
||||
/// </summary>
|
||||
Object,
|
||||
/// <summary>
|
||||
/// Value node
|
||||
/// </summary>
|
||||
Value
|
||||
}
|
||||
}
|
@ -11,26 +11,42 @@ namespace CryptoExchange.Net.Sockets
|
||||
/// <summary>
|
||||
/// Query
|
||||
/// </summary>
|
||||
public abstract class BaseQuery : IMessageProcessor
|
||||
public abstract class Query : IMessageProcessor
|
||||
{
|
||||
/// <summary>
|
||||
/// Unique identifier
|
||||
/// </summary>
|
||||
public int Id { get; } = ExchangeHelpers.NextId();
|
||||
|
||||
/// <summary>
|
||||
/// Has this query been completed
|
||||
/// </summary>
|
||||
public bool Completed { get; set; }
|
||||
public DateTime RequestTimestamp { get; set; }
|
||||
public CallResult? Result { get; set; }
|
||||
public BaseParsedMessage Response { get; set; }
|
||||
public Action OnFinished { get; set; }
|
||||
|
||||
protected AsyncResetEvent _event;
|
||||
protected CancellationTokenSource? _cts;
|
||||
|
||||
/// <summary>
|
||||
/// Strings to identify this subscription with
|
||||
/// Timestamp of when the request was send
|
||||
/// </summary>
|
||||
public abstract List<string> StreamIdentifiers { get; set; }
|
||||
public DateTime RequestTimestamp { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Result
|
||||
/// </summary>
|
||||
public CallResult? Result { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Response
|
||||
/// </summary>
|
||||
public object? Response { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Action to execute when query is finished
|
||||
/// </summary>
|
||||
public Action? OnFinished { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Strings to match this query to a received message
|
||||
/// </summary>
|
||||
public abstract HashSet<string> ListenerIdentifiers { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// The query request object
|
||||
@ -47,7 +63,22 @@ namespace CryptoExchange.Net.Sockets
|
||||
/// </summary>
|
||||
public int Weight { get; }
|
||||
|
||||
public abstract Dictionary<string, Type> TypeMapping { get; set; }
|
||||
/// <summary>
|
||||
/// Get the type the message should be deserialized to
|
||||
/// </summary>
|
||||
/// <param name="message"></param>
|
||||
/// <returns></returns>
|
||||
public abstract Type GetMessageType(SocketMessage message);
|
||||
|
||||
/// <summary>
|
||||
/// Wait event for response
|
||||
/// </summary>
|
||||
protected AsyncResetEvent _event;
|
||||
|
||||
/// <summary>
|
||||
/// Cancellation token
|
||||
/// </summary>
|
||||
protected CancellationTokenSource? _cts;
|
||||
|
||||
/// <summary>
|
||||
/// ctor
|
||||
@ -55,7 +86,7 @@ namespace CryptoExchange.Net.Sockets
|
||||
/// <param name="request"></param>
|
||||
/// <param name="authenticated"></param>
|
||||
/// <param name="weight"></param>
|
||||
public BaseQuery(object request, bool authenticated, int weight = 1)
|
||||
public Query(object request, bool authenticated, int weight = 1)
|
||||
{
|
||||
_event = new AsyncResetEvent(false, false);
|
||||
|
||||
@ -97,8 +128,9 @@ namespace CryptoExchange.Net.Sockets
|
||||
/// Handle a response message
|
||||
/// </summary>
|
||||
/// <param name="message"></param>
|
||||
/// <param name="connection"></param>
|
||||
/// <returns></returns>
|
||||
public abstract Task<CallResult> HandleMessageAsync(SocketConnection connection, DataEvent<BaseParsedMessage> message);
|
||||
public abstract Task<CallResult> HandleAsync(SocketConnection connection, DataEvent<object> message);
|
||||
|
||||
}
|
||||
|
||||
@ -106,20 +138,10 @@ namespace CryptoExchange.Net.Sockets
|
||||
/// Query
|
||||
/// </summary>
|
||||
/// <typeparam name="TResponse">Response object type</typeparam>
|
||||
public abstract class Query<TResponse> : BaseQuery
|
||||
public abstract class Query<TResponse> : Query
|
||||
{
|
||||
private Dictionary<string, Type> _typeMapping = new Dictionary<string, Type>
|
||||
{
|
||||
{ "", typeof(TResponse) }
|
||||
};
|
||||
public override Dictionary<string, Type> TypeMapping
|
||||
{
|
||||
get => _typeMapping;
|
||||
set
|
||||
{
|
||||
_typeMapping = value;
|
||||
}
|
||||
}
|
||||
/// <inheritdoc />
|
||||
public override Type GetMessageType(SocketMessage message) => typeof(TResponse);
|
||||
|
||||
/// <summary>
|
||||
/// The typed call result
|
||||
@ -137,13 +159,11 @@ namespace CryptoExchange.Net.Sockets
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public override async Task<CallResult> HandleMessageAsync(SocketConnection connection, DataEvent<BaseParsedMessage> message)
|
||||
public override async Task<CallResult> HandleAsync(SocketConnection connection, DataEvent<object> message)
|
||||
{
|
||||
Completed = true;
|
||||
Response = message.Data;
|
||||
Result = await HandleMessageAsync(connection, message.As((ParsedMessage<TResponse>)message.Data)).ConfigureAwait(false);
|
||||
// Set() gives calling/waiting request the signal to continue and allows the message processing thread to continue with next message.
|
||||
// However, the processing of the message isn't fully finished yet?
|
||||
Result = await HandleMessageAsync(connection, message.As((TResponse)message.Data)).ConfigureAwait(false);
|
||||
OnFinished?.Invoke();
|
||||
_event.Set();
|
||||
return Result;
|
||||
@ -152,9 +172,10 @@ namespace CryptoExchange.Net.Sockets
|
||||
/// <summary>
|
||||
/// Handle the query response
|
||||
/// </summary>
|
||||
/// <param name="connection"></param>
|
||||
/// <param name="message"></param>
|
||||
/// <returns></returns>
|
||||
public virtual Task<CallResult<TResponse>> HandleMessageAsync(SocketConnection connection, DataEvent<ParsedMessage<TResponse>> message) => Task.FromResult(new CallResult<TResponse>(message.Data.TypedData!));
|
||||
public virtual Task<CallResult<TResponse>> HandleMessageAsync(SocketConnection connection, DataEvent<TResponse> message) => Task.FromResult(new CallResult<TResponse>(message.Data));
|
||||
|
||||
/// <inheritdoc />
|
||||
public override void Timeout()
|
||||
|
@ -13,6 +13,7 @@ using System.Text;
|
||||
using System.Diagnostics.CodeAnalysis;
|
||||
using CryptoExchange.Net.Converters;
|
||||
using System.Diagnostics;
|
||||
using CryptoExchange.Net.Sockets.MessageParsing;
|
||||
|
||||
namespace CryptoExchange.Net.Sockets
|
||||
{
|
||||
@ -49,17 +50,24 @@ namespace CryptoExchange.Net.Sockets
|
||||
/// <summary>
|
||||
/// Unhandled message event
|
||||
/// </summary>
|
||||
public event Action<BaseParsedMessage>? UnhandledMessage;
|
||||
public event Action<SocketMessage>? UnhandledMessage;
|
||||
|
||||
/// <summary>
|
||||
/// Unparsed message event
|
||||
/// </summary>
|
||||
public event Action<byte[]>? UnparsedMessage;
|
||||
public event Action<byte[]>? UnparsedMessage; // TODO not linked up
|
||||
|
||||
/// <summary>
|
||||
/// The amount of subscriptions on this connection
|
||||
/// </summary>
|
||||
public int UserSubscriptionCount => _listenerManager.GetSubscriptions().Count(h => h.UserSubscription);
|
||||
public int UserSubscriptionCount
|
||||
{
|
||||
get
|
||||
{
|
||||
lock(_listenersLock)
|
||||
return _listeners.OfType<Subscription>().Count(h => h.UserSubscription);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Get a copy of the current message subscriptions
|
||||
@ -68,7 +76,8 @@ namespace CryptoExchange.Net.Sockets
|
||||
{
|
||||
get
|
||||
{
|
||||
return _listenerManager.GetSubscriptions().Where(h => h.UserSubscription).ToArray();
|
||||
lock(_listenersLock)
|
||||
return _listeners.OfType<Subscription>().Where(h => h.UserSubscription).ToArray();
|
||||
}
|
||||
}
|
||||
|
||||
@ -128,7 +137,7 @@ namespace CryptoExchange.Net.Sockets
|
||||
if (_pausedActivity != value)
|
||||
{
|
||||
_pausedActivity = value;
|
||||
_logger.Log(LogLevel.Information, $"Socket {SocketId} Paused activity: " + value);
|
||||
_logger.Log(LogLevel.Information, $"Socket {SocketId} paused activity: " + value);
|
||||
if(_pausedActivity) _ = Task.Run(() => ActivityPaused?.Invoke());
|
||||
else _ = Task.Run(() => ActivityUnpaused?.Invoke());
|
||||
}
|
||||
@ -153,7 +162,8 @@ namespace CryptoExchange.Net.Sockets
|
||||
}
|
||||
|
||||
private bool _pausedActivity;
|
||||
private readonly SocketListenerManager _listenerManager;
|
||||
private readonly object _listenersLock;
|
||||
private readonly List<IMessageProcessor> _listeners;
|
||||
private readonly ILogger _logger;
|
||||
private SocketStatus _status;
|
||||
|
||||
@ -186,57 +196,67 @@ namespace CryptoExchange.Net.Sockets
|
||||
_socket.OnError += HandleErrorAsync;
|
||||
_socket.GetReconnectionUrl = GetReconnectionUrlAsync;
|
||||
|
||||
_listenerManager = new SocketListenerManager(_logger, SocketId);
|
||||
_listenersLock = new object();
|
||||
_listeners = new List<IMessageProcessor>();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Handler for a socket opening
|
||||
/// </summary>
|
||||
protected virtual async Task HandleOpenAsync()
|
||||
protected virtual Task HandleOpenAsync()
|
||||
{
|
||||
Status = SocketStatus.Connected;
|
||||
PausedActivity = false;
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Handler for a socket closing without reconnect
|
||||
/// </summary>
|
||||
protected virtual async Task HandleCloseAsync()
|
||||
protected virtual Task HandleCloseAsync()
|
||||
{
|
||||
Status = SocketStatus.Closed;
|
||||
Authenticated = false;
|
||||
|
||||
foreach (var subscription in _listenerManager.GetSubscriptions())
|
||||
subscription.Confirmed = false;
|
||||
|
||||
foreach (var query in _listenerManager.GetQueries())
|
||||
lock (_listenersLock)
|
||||
{
|
||||
query.Fail("Connection interupted");
|
||||
_listenerManager.Remove(query);
|
||||
foreach (var subscription in _listeners.OfType<Subscription>())
|
||||
subscription.Confirmed = false;
|
||||
|
||||
foreach (var query in _listeners.OfType<Query>())
|
||||
{
|
||||
query.Fail("Connection interupted");
|
||||
_listeners.Remove(query);
|
||||
}
|
||||
}
|
||||
|
||||
Task.Run(() => ConnectionClosed?.Invoke());
|
||||
_ = Task.Run(() => ConnectionClosed?.Invoke());
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Handler for a socket losing conenction and starting reconnect
|
||||
/// </summary>
|
||||
protected virtual async Task HandleReconnectingAsync()
|
||||
protected virtual Task HandleReconnectingAsync()
|
||||
{
|
||||
Status = SocketStatus.Reconnecting;
|
||||
DisconnectTime = DateTime.UtcNow;
|
||||
Authenticated = false;
|
||||
|
||||
foreach (var subscription in _listenerManager.GetSubscriptions())
|
||||
subscription.Confirmed = false;
|
||||
|
||||
foreach (var query in _listenerManager.GetQueries())
|
||||
lock (_listenersLock)
|
||||
{
|
||||
query.Fail("Connection interupted");
|
||||
_listenerManager.Remove(query);
|
||||
foreach (var subscription in _listeners.OfType<Subscription>())
|
||||
subscription.Confirmed = false;
|
||||
|
||||
foreach (var query in _listeners.OfType<Query>())
|
||||
{
|
||||
query.Fail("Connection interupted");
|
||||
_listeners.Remove(query);
|
||||
}
|
||||
}
|
||||
|
||||
_ = Task.Run(() => ConnectionLost?.Invoke());
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@ -251,22 +271,26 @@ namespace CryptoExchange.Net.Sockets
|
||||
/// <summary>
|
||||
/// Handler for a socket which has reconnected
|
||||
/// </summary>
|
||||
protected virtual async Task HandleReconnectedAsync()
|
||||
protected virtual Task HandleReconnectedAsync()
|
||||
{
|
||||
Status = SocketStatus.Resubscribing;
|
||||
|
||||
foreach (var query in _listenerManager.GetQueries())
|
||||
lock (_listenersLock)
|
||||
{
|
||||
query.Fail("Connection interupted");
|
||||
_listenerManager.Remove(query);
|
||||
foreach (var query in _listeners.OfType<Query>())
|
||||
{
|
||||
query.Fail("Connection interupted");
|
||||
_listeners.Remove(query);
|
||||
}
|
||||
}
|
||||
|
||||
// Can't wait for this as it would cause a deadlock
|
||||
_ = Task.Run(async () =>
|
||||
{
|
||||
var reconnectSuccessful = await ProcessReconnectAsync().ConfigureAwait(false);
|
||||
if (!reconnectSuccessful)
|
||||
{
|
||||
_logger.Log(LogLevel.Warning, $"Socket {SocketId} Failed reconnect processing: {reconnectSuccessful.Error}, reconnecting again");
|
||||
_logger.Log(LogLevel.Warning, $"Socket {SocketId} failed reconnect processing: {reconnectSuccessful.Error}, reconnecting again");
|
||||
_ = _socket.ReconnectAsync().ConfigureAwait(false);
|
||||
}
|
||||
else
|
||||
@ -279,34 +303,44 @@ namespace CryptoExchange.Net.Sockets
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Handler for an error on a websocket
|
||||
/// </summary>
|
||||
/// <param name="e">The exception</param>
|
||||
protected virtual async Task HandleErrorAsync(Exception e)
|
||||
protected virtual Task HandleErrorAsync(Exception e)
|
||||
{
|
||||
if (e is WebSocketException wse)
|
||||
_logger.Log(LogLevel.Warning, $"Socket {SocketId} error: Websocket error code {wse.WebSocketErrorCode}, details: " + e.ToLogString());
|
||||
else
|
||||
_logger.Log(LogLevel.Warning, $"Socket {SocketId} error: " + e.ToLogString());
|
||||
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Handler for whenever a request is sent over the websocket
|
||||
/// </summary>
|
||||
/// <param name="requestId">Id of the request sent</param>
|
||||
protected virtual async Task HandleRequestSentAsync(int requestId)
|
||||
protected virtual Task HandleRequestSentAsync(int requestId)
|
||||
{
|
||||
var query = _listenerManager.GetById<BaseQuery>(requestId);
|
||||
Query query;
|
||||
lock (_listenersLock)
|
||||
{
|
||||
query = _listeners.OfType<Query>().FirstOrDefault(x => x.Id == requestId);
|
||||
}
|
||||
|
||||
if (query == null)
|
||||
{
|
||||
_logger.Log(LogLevel.Debug, $"Socket {SocketId} - msg {requestId} - message sent, but not pending");
|
||||
return;
|
||||
_logger.Log(LogLevel.Debug, $"Socket {SocketId} msg {requestId} - message sent, but not pending");
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
query.IsSend(ApiClient.ClientOptions.RequestTimeout);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@ -317,87 +351,74 @@ namespace CryptoExchange.Net.Sockets
|
||||
/// <returns></returns>
|
||||
protected virtual async Task HandleStreamMessage(WebSocketMessageType type, Stream stream)
|
||||
{
|
||||
var result = ReadJson(type, stream);
|
||||
if (result == null)
|
||||
{
|
||||
// Not able to parse at all
|
||||
var buffer = new byte[stream.Length];
|
||||
stream.Position = 0;
|
||||
stream.Read(buffer, 0, buffer.Length);
|
||||
_logger.LogDebug($"Socket {SocketId} Failed to parse data: {Encoding.UTF8.GetString(buffer)}");
|
||||
// 1. Decrypt/Preprocess if necessary
|
||||
stream = ApiClient.PreprocessStreamMessage(type, stream);
|
||||
|
||||
UnparsedMessage?.Invoke(buffer);
|
||||
// 2. Read data into accessor
|
||||
var messageData = new JsonNetMessageData(stream); // TODO if we let the implementation create this we can switch per implementation
|
||||
var message = new SocketMessage(DateTime.UtcNow, messageData);
|
||||
if (ApiClient.ApiOptions.OutputOriginalData ?? ApiClient.ClientOptions.OutputOriginalData)
|
||||
{
|
||||
stream.Position = 0;
|
||||
using var textReader = new StreamReader(stream, Encoding.UTF8, false, 1024, true);
|
||||
message.RawData = textReader.ReadToEnd();
|
||||
|
||||
_logger.LogTrace("Socket {SocketId} received {Data}", SocketId, message.RawData);
|
||||
}
|
||||
|
||||
// 3. Determine the subscription interested in the messsage
|
||||
var listenId = ApiClient.GetListenerIdentifier(message);
|
||||
|
||||
List<IMessageProcessor> processors;
|
||||
lock(_listenersLock)
|
||||
processors = _listeners.Where(s => s.ListenerIdentifiers.Contains(listenId)).ToList();
|
||||
|
||||
if (!processors.Any())
|
||||
{
|
||||
_logger.LogWarning("Socket {SocketId} received message not matched to any processor", SocketId);
|
||||
UnhandledMessage?.Invoke(message);
|
||||
return;
|
||||
}
|
||||
|
||||
if (result.OriginalData != null)
|
||||
_logger.LogDebug($"Socket {SocketId} Data received: {result.OriginalData}");
|
||||
|
||||
if (!result.Parsed)
|
||||
_logger.LogTrace("Socket {SocketId} {Count} processors matched to message with listener identifier {ListenerId}", SocketId, processors.Count, listenId);
|
||||
foreach (var processor in processors)
|
||||
{
|
||||
// Not able to determine the message type for the message
|
||||
_logger.LogWarning("Message not matched to type");
|
||||
return;
|
||||
}
|
||||
// 4. Determine the type to deserialize to
|
||||
var messageType = processor.GetMessageType(message);
|
||||
if (messageType == null)
|
||||
{
|
||||
_logger.LogWarning("Socket {SocketId} received message not recognized by handler {Id}", SocketId, processor.Id);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!await _listenerManager.InvokeListenersAsync(this, result.StreamIdentifier, result).ConfigureAwait(false))
|
||||
{
|
||||
// Not able to find a listener for this message
|
||||
stream.Position = 0;
|
||||
var unhandledBuffer = new byte[stream.Length];
|
||||
stream.Read(unhandledBuffer, 0, unhandledBuffer.Length);
|
||||
_logger.Log(LogLevel.Warning, $"Socket {SocketId} Message unidentified. Id: {result.StreamIdentifier.ToLowerInvariant()}, listening ids: [{string.Join(", ", _listenerManager.GetListenIds())}], Message: {Encoding.UTF8.GetString(unhandledBuffer)} ");
|
||||
UnhandledMessage?.Invoke(result);
|
||||
return;
|
||||
// 5. Deserialize the message
|
||||
object deserialized;
|
||||
try
|
||||
{
|
||||
deserialized = message.Deserialize(messageType);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning("Socket {SocketId} failed to deserialize message to type {Type}: {Exception}", SocketId, messageType.Name, ex.ToLogString());
|
||||
continue;
|
||||
}
|
||||
|
||||
// 6. Hand of the message to the subscription
|
||||
try
|
||||
{
|
||||
await processor.HandleAsync(this, new DataEvent<object>(deserialized, null, message.RawData, message.ReceiveTime, null)).ConfigureAwait(false);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning("Socket {SocketId} user message processing failed: {Exception}", SocketId, ex.ToLogString());
|
||||
if (processor is Subscription subscription)
|
||||
subscription.InvokeExceptionHandler(ex);
|
||||
}
|
||||
}
|
||||
|
||||
stream.Dispose();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Read a message from stream
|
||||
/// </summary>
|
||||
/// <param name="websocketMessageType"></param>
|
||||
/// <param name="stream"></param>
|
||||
/// <returns></returns>
|
||||
protected virtual BaseParsedMessage? ReadJson(WebSocketMessageType websocketMessageType, Stream stream)
|
||||
{
|
||||
// Start reading the data
|
||||
// Once we reach the properties that identify the message we save those in a dict
|
||||
// Once all id properties have been read callback to see what the deserialization type should be
|
||||
// Deserialize to the correct type
|
||||
|
||||
if (ApiClient.Pipeline.PreProcessCallback != null)
|
||||
stream = ApiClient.Pipeline.PreProcessCallback(websocketMessageType, stream);
|
||||
|
||||
var accessor = new JTokenAccessor(stream);
|
||||
if (accessor == null)
|
||||
return null;
|
||||
|
||||
var streamIdentity = ApiClient.Pipeline.GetStreamIdentifier(accessor);
|
||||
if (streamIdentity == null)
|
||||
return null;
|
||||
|
||||
var typeIdentity = ApiClient.Pipeline.GetTypeIdentifier(accessor, streamIdentity);
|
||||
var typeResult = _listenerManager.IdToType(streamIdentity, typeIdentity);
|
||||
if (typeResult == null)
|
||||
return null;
|
||||
|
||||
var idInstance = accessor.Instantiate(typeResult);
|
||||
if (ApiClient.ApiOptions.OutputOriginalData ?? ApiClient.ClientOptions.OutputOriginalData)
|
||||
{
|
||||
var buffer2 = new byte[stream.Length];
|
||||
stream.Position = 0;
|
||||
stream.Read(buffer2, 0, buffer2.Length);
|
||||
idInstance.OriginalData = Encoding.UTF8.GetString(buffer2);
|
||||
}
|
||||
|
||||
idInstance.StreamIdentifier = streamIdentity;
|
||||
idInstance.TypeIdentifier = typeIdentity;
|
||||
idInstance.Parsed = true;
|
||||
return idInstance;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Connect the websocket
|
||||
/// </summary>
|
||||
@ -428,10 +449,13 @@ namespace CryptoExchange.Net.Sockets
|
||||
if (ApiClient.socketConnections.ContainsKey(SocketId))
|
||||
ApiClient.socketConnections.TryRemove(SocketId, out _);
|
||||
|
||||
foreach (var subscription in _listenerManager.GetSubscriptions())
|
||||
lock (_listenersLock)
|
||||
{
|
||||
if (subscription.CancellationTokenRegistration.HasValue)
|
||||
subscription.CancellationTokenRegistration.Value.Dispose();
|
||||
foreach (var subscription in _listeners.OfType<Subscription>())
|
||||
{
|
||||
if (subscription.CancellationTokenRegistration.HasValue)
|
||||
subscription.CancellationTokenRegistration.Value.Dispose();
|
||||
}
|
||||
}
|
||||
|
||||
await _socket.CloseAsync().ConfigureAwait(false);
|
||||
@ -446,8 +470,11 @@ namespace CryptoExchange.Net.Sockets
|
||||
/// <returns></returns>
|
||||
public async Task CloseAsync(Subscription subscription, bool unsubEvenIfNotConfirmed = false)
|
||||
{
|
||||
if (!_listenerManager.Contains(subscription))
|
||||
return;
|
||||
lock (_listenersLock)
|
||||
{
|
||||
if (!_listeners.Contains(subscription))
|
||||
return;
|
||||
}
|
||||
|
||||
subscription.Closed = true;
|
||||
|
||||
@ -467,9 +494,13 @@ namespace CryptoExchange.Net.Sockets
|
||||
return;
|
||||
}
|
||||
|
||||
var shouldCloseConnection = _listenerManager.GetSubscriptions().All(r => !r.UserSubscription || r.Closed);
|
||||
if (shouldCloseConnection)
|
||||
Status = SocketStatus.Closing;
|
||||
bool shouldCloseConnection;
|
||||
lock (_listenersLock)
|
||||
{
|
||||
shouldCloseConnection = _listeners.OfType<Subscription>().All(r => !r.UserSubscription || r.Closed);
|
||||
if (shouldCloseConnection)
|
||||
Status = SocketStatus.Closing;
|
||||
}
|
||||
|
||||
if (shouldCloseConnection)
|
||||
{
|
||||
@ -477,7 +508,8 @@ namespace CryptoExchange.Net.Sockets
|
||||
await CloseAsync().ConfigureAwait(false);
|
||||
}
|
||||
|
||||
_listenerManager.Remove(subscription);
|
||||
lock (_listenersLock)
|
||||
_listeners.Remove(subscription);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@ -504,7 +536,8 @@ namespace CryptoExchange.Net.Sockets
|
||||
if (Status != SocketStatus.None && Status != SocketStatus.Connected)
|
||||
return false;
|
||||
|
||||
_listenerManager.Add(subscription);
|
||||
lock (_listenersLock)
|
||||
_listeners.Add(subscription);
|
||||
|
||||
if (subscription.UserSubscription)
|
||||
_logger.Log(LogLevel.Debug, $"Socket {SocketId} adding new subscription with id {subscription.Id}, total subscriptions on connection: {UserSubscriptionCount}");
|
||||
@ -515,21 +548,29 @@ namespace CryptoExchange.Net.Sockets
|
||||
/// Get a subscription on this connection by id
|
||||
/// </summary>
|
||||
/// <param name="id"></param>
|
||||
public Subscription? GetSubscription(int id) => _listenerManager.GetSubscriptions().SingleOrDefault(s => s.Id == id);
|
||||
public Subscription? GetSubscription(int id)
|
||||
{
|
||||
lock (_listenersLock)
|
||||
return _listeners.OfType<Subscription>().SingleOrDefault(s => s.Id == id);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Get a subscription on this connection by its subscribe request
|
||||
/// </summary>
|
||||
/// <param name="predicate">Filter for a request</param>
|
||||
/// <returns></returns>
|
||||
public Subscription? GetSubscriptionByRequest(Func<object?, bool> predicate) => _listenerManager.GetSubscriptions().SingleOrDefault(s => predicate(s));
|
||||
|
||||
public Subscription? GetSubscriptionByRequest(Func<object?, bool> predicate)
|
||||
{
|
||||
lock (_listenersLock)
|
||||
return _listeners.OfType<Subscription>().SingleOrDefault(s => predicate(s));
|
||||
}
|
||||
/// <summary>
|
||||
/// Send a query request and wait for an answer
|
||||
/// </summary>
|
||||
/// <param name="query">Query to send</param>
|
||||
/// <param name="onFinished">Action to run when query finishes</param>
|
||||
/// <returns></returns>
|
||||
public virtual async Task<CallResult> SendAndWaitQueryAsync(BaseQuery query, Action? onFinished = null)
|
||||
public virtual async Task<CallResult> SendAndWaitQueryAsync(Query query, Action? onFinished = null)
|
||||
{
|
||||
await SendAndWaitIntAsync(query, onFinished).ConfigureAwait(false);
|
||||
return query.Result ?? new CallResult(new ServerError("Timeout"));
|
||||
@ -540,6 +581,7 @@ namespace CryptoExchange.Net.Sockets
|
||||
/// </summary>
|
||||
/// <typeparam name="T">Query response type</typeparam>
|
||||
/// <param name="query">Query to send</param>
|
||||
/// <param name="onFinished">Action to run when query finishes</param>
|
||||
/// <returns></returns>
|
||||
public virtual async Task<CallResult<T>> SendAndWaitQueryAsync<T>(Query<T> query, Action? onFinished = null)
|
||||
{
|
||||
@ -547,9 +589,11 @@ namespace CryptoExchange.Net.Sockets
|
||||
return query.TypedResult ?? new CallResult<T>(new ServerError("Timeout"));
|
||||
}
|
||||
|
||||
private async Task SendAndWaitIntAsync(BaseQuery query, Action onFinished)
|
||||
private async Task SendAndWaitIntAsync(Query query, Action? onFinished)
|
||||
{
|
||||
_listenerManager.Add(query);
|
||||
lock(_listenersLock)
|
||||
_listeners.Add(query);
|
||||
|
||||
var sendOk = Send(query.Id, query.Request, query.Weight);
|
||||
if (!sendOk)
|
||||
{
|
||||
@ -600,7 +644,7 @@ namespace CryptoExchange.Net.Sockets
|
||||
/// <param name="requestId">The id of the request</param>
|
||||
public virtual bool Send(int requestId, string data, int weight)
|
||||
{
|
||||
_logger.Log(LogLevel.Trace, $"Socket {SocketId} - msg {requestId} - sending messsage: {data}");
|
||||
_logger.Log(LogLevel.Trace, $"Socket {SocketId} msg {requestId} - sending messsage: {data}");
|
||||
try
|
||||
{
|
||||
_socket.Send(requestId, data, weight);
|
||||
@ -617,16 +661,20 @@ namespace CryptoExchange.Net.Sockets
|
||||
if (!_socket.IsOpen)
|
||||
return new CallResult<bool>(new WebError("Socket not connected"));
|
||||
|
||||
var anySubscriptions = _listenerManager.GetSubscriptions().Any(s => s.UserSubscription);
|
||||
bool anySubscriptions;
|
||||
lock (_listenersLock)
|
||||
anySubscriptions = _listeners.OfType<Subscription>().Any(s => s.UserSubscription);
|
||||
if (!anySubscriptions)
|
||||
{
|
||||
// No need to resubscribe anything
|
||||
_logger.Log(LogLevel.Debug, $"Socket {SocketId} Nothing to resubscribe, closing connection");
|
||||
_logger.Log(LogLevel.Debug, $"Socket {SocketId} nothing to resubscribe, closing connection");
|
||||
_ = _socket.CloseAsync();
|
||||
return new CallResult<bool>(true);
|
||||
}
|
||||
|
||||
var anyAuthenticated = _listenerManager.GetSubscriptions().Any(s => s.Authenticated);
|
||||
bool anyAuthenticated;
|
||||
lock (_listenersLock)
|
||||
anyAuthenticated = _listeners.OfType<Subscription>().Any(s => s.Authenticated);
|
||||
if (anyAuthenticated)
|
||||
{
|
||||
// If we reconnected a authenticated connection we need to re-authenticate
|
||||
@ -642,7 +690,9 @@ namespace CryptoExchange.Net.Sockets
|
||||
}
|
||||
|
||||
// Get a list of all subscriptions on the socket
|
||||
var subList = _listenerManager.GetSubscriptions();
|
||||
List<Subscription> subList;
|
||||
lock (_listenersLock)
|
||||
subList = _listeners.OfType<Subscription>().ToList();
|
||||
|
||||
foreach(var subscription in subList)
|
||||
{
|
||||
@ -650,7 +700,7 @@ namespace CryptoExchange.Net.Sockets
|
||||
var result = await ApiClient.RevitalizeRequestAsync(subscription).ConfigureAwait(false);
|
||||
if (!result)
|
||||
{
|
||||
_logger.Log(LogLevel.Warning, $"Socket {SocketId} Failed request revitalization: " + result.Error);
|
||||
_logger.Log(LogLevel.Warning, $"Socket {SocketId} failed request revitalization: " + result.Error);
|
||||
return result.As(false);
|
||||
}
|
||||
}
|
||||
@ -670,8 +720,7 @@ namespace CryptoExchange.Net.Sockets
|
||||
|
||||
taskList.Add(SendAndWaitQueryAsync(subQuery, () =>
|
||||
{
|
||||
subscription.HandleSubQueryResponse(subQuery.Response);
|
||||
_listenerManager.Reset(subscription);
|
||||
subscription.HandleSubQueryResponse(subQuery.Response!);
|
||||
}));
|
||||
}
|
||||
|
||||
@ -710,7 +759,7 @@ namespace CryptoExchange.Net.Sockets
|
||||
return new CallResult(null);
|
||||
|
||||
var result = await SendAndWaitQueryAsync(subQuery).ConfigureAwait(false);
|
||||
subscription.HandleSubQueryResponse(subQuery.Response);
|
||||
subscription.HandleSubQueryResponse(subQuery.Response!);
|
||||
return result;
|
||||
}
|
||||
|
||||
|
@ -1,170 +0,0 @@
|
||||
using CryptoExchange.Net.Interfaces;
|
||||
using CryptoExchange.Net.Objects.Sockets;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Diagnostics;
|
||||
using System.Linq;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace CryptoExchange.Net.Sockets
|
||||
{
|
||||
public class SocketListenerManager
|
||||
{
|
||||
private ILogger _logger;
|
||||
private int _socketId;
|
||||
private object _lock = new object();
|
||||
//private Dictionary<int, IMessageProcessor> _idMap;
|
||||
//private Dictionary<string, Dictionary<string, Type>> _typeMap;
|
||||
private Dictionary<string, List<IMessageProcessor>> _listeners;
|
||||
|
||||
public SocketListenerManager(ILogger logger, int socketId)
|
||||
{
|
||||
//_idMap = new Dictionary<int, IMessageProcessor>();
|
||||
_listeners = new Dictionary<string, List<IMessageProcessor>>();
|
||||
//_typeMap = new Dictionary<string, Dictionary<string, Type>>();
|
||||
_logger = logger;
|
||||
_socketId = socketId;
|
||||
}
|
||||
|
||||
public Type? IdToType(string streamIdentifier, string typeIdentifier)
|
||||
{
|
||||
lock (_lock)
|
||||
{
|
||||
_listeners.TryGetValue(streamIdentifier, out var listeners);
|
||||
if (listeners == null)
|
||||
return null;
|
||||
|
||||
var result = listeners.SelectMany(l => l.TypeMapping).FirstOrDefault(x => x.Key == (typeIdentifier ?? ""));
|
||||
return result.Value;
|
||||
}
|
||||
}
|
||||
|
||||
public List<string> GetListenIds()
|
||||
{
|
||||
lock(_lock)
|
||||
return _listeners.Keys.ToList();
|
||||
}
|
||||
|
||||
public void Add(IMessageProcessor processor)
|
||||
{
|
||||
lock (_lock)
|
||||
{
|
||||
if (processor.StreamIdentifiers?.Any() == true)
|
||||
{
|
||||
foreach (var identifier in processor.StreamIdentifiers)
|
||||
{
|
||||
if (!_listeners.TryGetValue(identifier, out var list))
|
||||
{
|
||||
list = new List<IMessageProcessor>();
|
||||
_listeners.Add(identifier, list);
|
||||
}
|
||||
|
||||
list.Add(processor);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
public void Reset(IMessageProcessor processor)
|
||||
{
|
||||
lock (_lock)
|
||||
{
|
||||
//Debug.WriteLine("4 Resetting");
|
||||
Remove(processor);
|
||||
Add(processor);
|
||||
}
|
||||
}
|
||||
|
||||
public async Task<bool> InvokeListenersAsync(SocketConnection connection, string id, BaseParsedMessage data)
|
||||
{
|
||||
List<IMessageProcessor> listeners;
|
||||
lock (_lock)
|
||||
{
|
||||
if (!_listeners.TryGetValue(id, out var idListeners))
|
||||
return false;
|
||||
|
||||
listeners = idListeners.Where(i => data.TypeIdentifier == null || i.TypeMapping.ContainsKey(data.TypeIdentifier)).ToList();
|
||||
}
|
||||
|
||||
foreach (var listener in listeners)
|
||||
{
|
||||
_logger.Log(LogLevel.Trace, $"Socket {_socketId} Message mapped to processor {listener.Id} with identifier {data.StreamIdentifier}");
|
||||
if (listener is BaseQuery query)
|
||||
{
|
||||
Remove(listener);
|
||||
if (query?.Completed == true)
|
||||
{
|
||||
// Answer to a timed out request
|
||||
_logger.Log(LogLevel.Warning, $"Socket {_socketId} Received after request timeout. Consider increasing the RequestTimeout");
|
||||
}
|
||||
}
|
||||
|
||||
// Matched based on identifier
|
||||
var userSw = Stopwatch.StartNew();
|
||||
var dataEvent = new DataEvent<BaseParsedMessage>(data, null, data.OriginalData, DateTime.UtcNow, null);
|
||||
try
|
||||
{
|
||||
await listener.HandleMessageAsync(connection, dataEvent).ConfigureAwait(false);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
// TODO
|
||||
}
|
||||
|
||||
userSw.Stop();
|
||||
if (userSw.ElapsedMilliseconds > 500)
|
||||
{
|
||||
_logger.Log(LogLevel.Debug, $"Socket {_socketId} {(listener is Subscription ? "subscription " : "query " + listener!.Id)} message processing slow ({(int)userSw.ElapsedMilliseconds}ms), consider offloading data handling to another thread. " +
|
||||
"Data from this socket may arrive late or not at all if message processing is continuously slow.");
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
public T? GetById<T>(int id) where T : BaseQuery
|
||||
{
|
||||
lock (_lock)
|
||||
{
|
||||
var val = _listeners.Values.SelectMany(x => x).FirstOrDefault(x => x.Id == id);
|
||||
return (T)val;
|
||||
}
|
||||
}
|
||||
|
||||
public List<Subscription> GetSubscriptions()
|
||||
{
|
||||
lock (_lock)
|
||||
return _listeners.Values.SelectMany(v => v.OfType<Subscription>()).Distinct().ToList();
|
||||
}
|
||||
|
||||
public List<BaseQuery> GetQueries()
|
||||
{
|
||||
lock (_lock)
|
||||
return _listeners.Values.SelectMany(v => v.OfType<BaseQuery>()).ToList();
|
||||
}
|
||||
|
||||
public bool Contains(IMessageProcessor processor)
|
||||
{
|
||||
lock (_lock)
|
||||
return _listeners.Any(l => l.Value.Contains(processor));
|
||||
}
|
||||
|
||||
public void Remove(IMessageProcessor processor)
|
||||
{
|
||||
lock (_lock)
|
||||
{
|
||||
if (processor.StreamIdentifiers?.Any() != true)
|
||||
return;
|
||||
|
||||
foreach(var kv in _listeners)
|
||||
{
|
||||
if (kv.Value.Contains(processor))
|
||||
kv.Value.Remove(processor);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
45
CryptoExchange.Net/Sockets/SocketMessage.cs
Normal file
45
CryptoExchange.Net/Sockets/SocketMessage.cs
Normal file
@ -0,0 +1,45 @@
|
||||
using CryptoExchange.Net.Sockets.MessageParsing.Interfaces;
|
||||
using System;
|
||||
|
||||
namespace CryptoExchange.Net.Sockets
|
||||
{
|
||||
/// <summary>
|
||||
/// Message received from the websocket
|
||||
/// </summary>
|
||||
public class SocketMessage
|
||||
{
|
||||
/// <summary>
|
||||
/// Message receive time
|
||||
/// </summary>
|
||||
public DateTime ReceiveTime { get; set; }
|
||||
/// <summary>
|
||||
/// The message data
|
||||
/// </summary>
|
||||
public IMessageAccessor Message { get; set; }
|
||||
/// <summary>
|
||||
/// Raw string data
|
||||
/// </summary>
|
||||
public string? RawData { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// ctor
|
||||
/// </summary>
|
||||
/// <param name="receiveTime"></param>
|
||||
/// <param name="message"></param>
|
||||
public SocketMessage(DateTime receiveTime, IMessageAccessor message)
|
||||
{
|
||||
ReceiveTime = receiveTime;
|
||||
Message = message;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Deserialize the message to a type
|
||||
/// </summary>
|
||||
/// <param name="type"></param>
|
||||
/// <returns></returns>
|
||||
public object Deserialize(Type type)
|
||||
{
|
||||
return Message.Deserialize(type);
|
||||
}
|
||||
}
|
||||
}
|
@ -55,9 +55,9 @@ namespace CryptoExchange.Net.Sockets
|
||||
public bool Authenticated { get; }
|
||||
|
||||
/// <summary>
|
||||
/// Strings to identify this subscription with
|
||||
/// Strings to match this subscription to a received message
|
||||
/// </summary>
|
||||
public abstract List<string> StreamIdentifiers { get; set; }
|
||||
public abstract HashSet<string> ListenerIdentifiers { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Cancellation token registration
|
||||
@ -69,7 +69,12 @@ namespace CryptoExchange.Net.Sockets
|
||||
/// </summary>
|
||||
public event Action<Exception>? Exception;
|
||||
|
||||
public abstract Dictionary<string, Type> TypeMapping { get; }
|
||||
/// <summary>
|
||||
/// Get the deserialization type for this message
|
||||
/// </summary>
|
||||
/// <param name="message"></param>
|
||||
/// <returns></returns>
|
||||
public abstract Type? GetMessageType(SocketMessage message);
|
||||
|
||||
/// <summary>
|
||||
/// ctor
|
||||
@ -86,21 +91,36 @@ namespace CryptoExchange.Net.Sockets
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Get the subscribe object to send when subscribing
|
||||
/// Get the subscribe query to send when subscribing
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
public abstract BaseQuery? GetSubQuery(SocketConnection connection);
|
||||
|
||||
public virtual void HandleSubQueryResponse(BaseParsedMessage message) { }
|
||||
public virtual void HandleUnsubQueryResponse(BaseParsedMessage message) { }
|
||||
public abstract Query? GetSubQuery(SocketConnection connection);
|
||||
|
||||
/// <summary>
|
||||
/// Get the unsubscribe object to send when unsubscribing
|
||||
/// Handle a subscription query response
|
||||
/// </summary>
|
||||
/// <param name="message"></param>
|
||||
public virtual void HandleSubQueryResponse(object message) { }
|
||||
|
||||
/// <summary>
|
||||
/// Handle an unsubscription query response
|
||||
/// </summary>
|
||||
/// <param name="message"></param>
|
||||
public virtual void HandleUnsubQueryResponse(object message) { }
|
||||
|
||||
/// <summary>
|
||||
/// Get the unsubscribe query to send when unsubscribing
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
public abstract BaseQuery? GetUnsubQuery();
|
||||
public abstract Query? GetUnsubQuery();
|
||||
|
||||
public async Task<CallResult> HandleMessageAsync(SocketConnection connection, DataEvent<BaseParsedMessage> message)
|
||||
/// <summary>
|
||||
/// Handle an update message
|
||||
/// </summary>
|
||||
/// <param name="connection"></param>
|
||||
/// <param name="message"></param>
|
||||
/// <returns></returns>
|
||||
public async Task<CallResult> HandleAsync(SocketConnection connection, DataEvent<object> message)
|
||||
{
|
||||
ConnectionInvocations++;
|
||||
TotalInvocations++;
|
||||
@ -110,9 +130,10 @@ namespace CryptoExchange.Net.Sockets
|
||||
/// <summary>
|
||||
/// Handle the update message
|
||||
/// </summary>
|
||||
/// <param name="connection"></param>
|
||||
/// <param name="message"></param>
|
||||
/// <returns></returns>
|
||||
public abstract Task<CallResult> DoHandleMessageAsync(SocketConnection connection, DataEvent<BaseParsedMessage> message);
|
||||
public abstract Task<CallResult> DoHandleMessageAsync(SocketConnection connection, DataEvent<object> message);
|
||||
|
||||
/// <summary>
|
||||
/// Invoke the exception event
|
||||
@ -124,24 +145,9 @@ namespace CryptoExchange.Net.Sockets
|
||||
}
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public abstract class Subscription<TQuery> : Subscription<TQuery, TQuery>
|
||||
{
|
||||
/// <summary>
|
||||
/// ctor
|
||||
/// </summary>
|
||||
/// <param name="logger"></param>
|
||||
/// <param name="authenticated"></param>
|
||||
protected Subscription(ILogger logger, bool authenticated) : base(logger, authenticated)
|
||||
{
|
||||
}
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public abstract class Subscription<TSubResponse, TUnsubResponse> : Subscription
|
||||
{
|
||||
//public override Func<string, Type> ExpectedTypeDelegate => (x) => typeof(TEvent);
|
||||
|
||||
/// <summary>
|
||||
/// ctor
|
||||
/// </summary>
|
||||
@ -152,18 +158,24 @@ namespace CryptoExchange.Net.Sockets
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
//public override Task<CallResult> DoHandleMessageAsync(SocketConnection connection, DataEvent<BaseParsedMessage> message)
|
||||
// => HandleEventAsync(connection, message.As((ParsedMessage<TEvent>)message.Data));
|
||||
public override void HandleSubQueryResponse(object message)
|
||||
=> HandleSubQueryResponse((TSubResponse)message);
|
||||
|
||||
public override void HandleSubQueryResponse(BaseParsedMessage message)
|
||||
=> HandleSubQueryResponse((ParsedMessage<TSubResponse>)message);
|
||||
/// <summary>
|
||||
/// Handle a subscription query response
|
||||
/// </summary>
|
||||
/// <param name="message"></param>
|
||||
public virtual void HandleSubQueryResponse(TSubResponse message) { }
|
||||
|
||||
public virtual void HandleSubQueryResponse(ParsedMessage<TSubResponse> message) { }
|
||||
/// <inheritdoc />
|
||||
public override void HandleUnsubQueryResponse(object message)
|
||||
=> HandleUnsubQueryResponse((TUnsubResponse)message);
|
||||
|
||||
public override void HandleUnsubQueryResponse(BaseParsedMessage message)
|
||||
=> HandleUnsubQueryResponse((ParsedMessage<TUnsubResponse>)message);
|
||||
|
||||
public virtual void HandleUnsubQueryResponse(ParsedMessage<TUnsubResponse> message) { }
|
||||
/// <summary>
|
||||
/// Handle an unsubscription query response
|
||||
/// </summary>
|
||||
/// <param name="message"></param>
|
||||
public virtual void HandleUnsubQueryResponse(TUnsubResponse message) { }
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -22,26 +22,37 @@ namespace CryptoExchange.Net.Sockets
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public override BaseQuery? GetSubQuery(SocketConnection connection) => null;
|
||||
public override Query? GetSubQuery(SocketConnection connection) => null;
|
||||
|
||||
/// <inheritdoc />
|
||||
public override BaseQuery? GetUnsubQuery() => null;
|
||||
public override Query? GetUnsubQuery() => null;
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public abstract class SystemSubscription<T> : SystemSubscription
|
||||
{
|
||||
public override Dictionary<string, Type> TypeMapping => new Dictionary<string, Type>
|
||||
{
|
||||
{ "", typeof(T) }
|
||||
};
|
||||
/// <inheritdoc />
|
||||
public override Type GetMessageType(SocketMessage message) => typeof(T);
|
||||
|
||||
public override Task<CallResult> DoHandleMessageAsync(SocketConnection connection, DataEvent<BaseParsedMessage> message)
|
||||
=> HandleMessageAsync(connection, message.As((ParsedMessage<T>)message.Data));
|
||||
/// <inheritdoc />
|
||||
public override Task<CallResult> DoHandleMessageAsync(SocketConnection connection, DataEvent<object> message)
|
||||
=> HandleMessageAsync(connection, message.As((T)message.Data));
|
||||
|
||||
/// <summary>
|
||||
/// ctor
|
||||
/// </summary>
|
||||
/// <param name="logger"></param>
|
||||
/// <param name="authenticated"></param>
|
||||
protected SystemSubscription(ILogger logger, bool authenticated) : base(logger, authenticated)
|
||||
{
|
||||
}
|
||||
|
||||
public abstract Task<CallResult> HandleMessageAsync(SocketConnection connection, DataEvent<ParsedMessage<T>> message);
|
||||
/// <summary>
|
||||
/// Handle an update message
|
||||
/// </summary>
|
||||
/// <param name="connection"></param>
|
||||
/// <param name="message"></param>
|
||||
/// <returns></returns>
|
||||
public abstract Task<CallResult> HandleMessageAsync(SocketConnection connection, DataEvent<T> message);
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user