1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-06-09 00:46:19 +00:00
This commit is contained in:
JKorf 2023-11-18 14:53:50 +01:00
parent 1ba66be29f
commit c41e128900
14 changed files with 224 additions and 41 deletions

View File

@ -169,6 +169,9 @@ namespace CryptoExchange.Net
if (_disposing)
return new CallResult<UpdateSubscription>(new InvalidOperationError("Client disposed, can't subscribe"));
if (subscription.Authenticated && AuthenticationProvider == null)
return new CallResult<UpdateSubscription>(new NoApiCredentialsError());
SocketConnection socketConnection;
var released = false;
// Wait for a semaphore here, so we only connect 1 socket at a time.
@ -243,6 +246,8 @@ namespace CryptoExchange.Net
return new CallResult<UpdateSubscription>(subResult.Error!);
}
subscription.HandleSubQueryResponse(subQuery.Response);
}
else
{
@ -355,6 +360,9 @@ namespace CryptoExchange.Net
/// <returns></returns>
public virtual async Task<CallResult<bool>> AuthenticateSocketAsync(SocketConnection socket)
{
if (AuthenticationProvider == null)
return new CallResult<bool>(new NoApiCredentialsError());
_logger.Log(LogLevel.Debug, $"Socket {socket.SocketId} Attempting to authenticate");
var authRequest = GetAuthenticationRequest();
var result = await socket.SendAndWaitQueryAsync(authRequest).ConfigureAwait(false);

View File

@ -0,0 +1,83 @@
using CryptoExchange.Net.Interfaces;
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace CryptoExchange.Net.Converters
{
internal class JTokenAccessor : IMessageAccessor
{
private readonly JToken _token;
private Dictionary<string, JToken?> _cache = new Dictionary<string, JToken?>();
public JTokenAccessor(JToken token)
{
_token = token;
}
public int? GetArrayIntValue(string? key, int index)
{
var accessToken = key == null ? _token : GetToken(key);
if (accessToken == null || accessToken is not JArray arr)
return null;
return arr[index].Value<int>();
}
public string? GetArrayStringValue(string? key, int index)
{
var accessToken = key == null ? _token : GetToken(key);
if (accessToken == null || accessToken is not JArray arr)
return null;
if (arr.Count <= index)
return null;
return arr[index].Value<string>();
}
public int? GetCount(string key)
{
var accessToken = GetToken(key);
return accessToken.Count();
}
public int? GetIntValue(string key)
{
var accessToken = GetToken(key);
return accessToken?.Value<int>();
}
public string? GetStringValue(string key)
{
var accessToken = GetToken(key);
if (accessToken?.Type == JTokenType.Object)
return ((JObject)accessToken).Properties().First().Name;
return accessToken?.ToString();
}
private JToken? GetToken(string key)
{
if (_cache.TryGetValue(key, out var token))
return token;
var splitTokens = key.Split(new char[] { ':' });
var accessToken = _token;
foreach (var splitToken in splitTokens)
{
if (accessToken.Type == JTokenType.Array)
return null;
accessToken = accessToken[splitToken];
if (accessToken == null)
break;
}
_cache.Add(key, accessToken);
return accessToken;
}
}
}

View File

@ -8,6 +8,7 @@ using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net.WebSockets;
using System.Text;
namespace CryptoExchange.Net.Converters
@ -22,13 +23,16 @@ namespace CryptoExchange.Net.Converters
public abstract MessageInterpreterPipeline InterpreterPipeline { get; }
/// <inheritdoc />
public BaseParsedMessage? ReadJson(Stream stream, Dictionary<string, Type> processors, bool outputOriginalData)
public BaseParsedMessage? ReadJson(WebSocketMessageType websocketMessageType, Stream stream, Dictionary<string, Type> processors, bool outputOriginalData)
{
// Start reading the data
// Once we reach the properties that identify the message we save those in a dict
// Once all id properties have been read callback to see what the deserialization type should be
// Deserialize to the correct type
if (InterpreterPipeline.PreProcessCallback != null)
stream = InterpreterPipeline.PreProcessCallback(websocketMessageType, stream);
using var sr = new StreamReader(stream, Encoding.UTF8, false, (int)stream.Length, true);
foreach (var callback in InterpreterPipeline.PreInspectCallbacks)
{
@ -56,14 +60,36 @@ namespace CryptoExchange.Net.Converters
{
token = JToken.Load(jsonTextReader);
}
catch(Exception)
catch(Exception ex)
{
// Not a json message
return null;
}
var accessor = new JTokenAccessor(token);
if (InterpreterPipeline.GetIdentity != null)
{
var identity = InterpreterPipeline.GetIdentity(accessor);
if (identity != null)
{
if (processors.TryGetValue(identity, out var type))
{
var idInstance = InterpreterPipeline.ObjectInitializer(token, type);
if (outputOriginalData)
{
stream.Position = 0;
idInstance.OriginalData = sr.ReadToEnd();
}
idInstance.Identifier = identity;
idInstance.Parsed = true;
return idInstance;
}
}
}
PostInspectResult? inspectResult = null;
Dictionary<string, string?> typeIdDict = new Dictionary<string, string?>();
object? usedParser = null;
if (token.Type == JTokenType.Object)
{
@ -72,22 +98,20 @@ namespace CryptoExchange.Net.Converters
bool allFieldsPresent = true;
foreach (var field in callback.TypeFields)
{
var value = typeIdDict.TryGetValue(field, out var cachedValue) ? cachedValue : GetValueForKey(token, field);
var value = accessor.GetStringValue(field.Key);
if (value == null)
{
if (callback.AllFieldPresentNeeded)
if (field.Required)
{
allFieldsPresent = false;
break;
}
}
typeIdDict[field] = value;
}
if (allFieldsPresent)
{
inspectResult = callback.Callback(typeIdDict, processors);
inspectResult = callback.Callback(accessor, processors);
usedParser = callback;
if (inspectResult.Type != null)
break;
@ -126,7 +150,10 @@ namespace CryptoExchange.Net.Converters
}
if (usedParser == null)
throw new Exception("No parser found for message");
{
//throw new Exception("No parser found for message");
return null;
}
BaseParsedMessage instance;
if (inspectResult.Type != null)
@ -152,6 +179,7 @@ namespace CryptoExchange.Net.Converters
return instance;
}
private string? GetValueForKey(JToken token, string key)
{
var splitTokens = key.Split(new char[] { ':' });
@ -170,6 +198,9 @@ namespace CryptoExchange.Net.Converters
}
}
if (accessToken?.Type == JTokenType.Object)
return ((JObject)accessToken).Properties().First().Name;
return accessToken?.ToString();
}
}

View File

@ -0,0 +1,15 @@
using System;
using System.Collections.Generic;
using System.Text;
namespace CryptoExchange.Net.Interfaces
{
public interface IMessageAccessor
{
string? GetStringValue(string key);
int? GetIntValue(string key);
public int? GetCount(string key);
public int? GetArrayIntValue(string? key, int index);
public string? GetArrayStringValue(string? key, int index);
}
}

View File

@ -1,5 +1,6 @@
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Sockets;
using CryptoExchange.Net.Sockets;
using System;
using System.Collections.Generic;
using System.Text;
@ -11,7 +12,7 @@ namespace CryptoExchange.Net.Interfaces
{
public int Id { get; }
public List<string> Identifiers { get; }
Task<CallResult> HandleMessageAsync(DataEvent<BaseParsedMessage> message);
Task<CallResult> HandleMessageAsync(SocketConnection connection, DataEvent<BaseParsedMessage> message);
public Type ExpectedMessageType { get; }
}
}

View File

@ -2,6 +2,7 @@
using CryptoExchange.Net.Sockets;
using System;
using System.IO;
using System.Net.WebSockets;
using System.Security.Authentication;
using System.Text;
using System.Threading.Tasks;
@ -20,7 +21,7 @@ namespace CryptoExchange.Net.Interfaces
/// <summary>
/// Websocket message received event
/// </summary>
event Func<Stream, Task> OnStreamMessage;
event Func<WebSocketMessageType, Stream, Task> OnStreamMessage;
/// <summary>
/// Websocket sent event, RequestId as parameter
/// </summary>

View File

@ -5,13 +5,16 @@ using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic;
using System.IO;
using System.Net.WebSockets;
using System.Text;
namespace CryptoExchange.Net.Objects.Sockets
{
public class MessageInterpreterPipeline
{
public Func<WebSocketMessageType, Stream, Stream>? PreProcessCallback { get; set; }
public List<PreInspectCallback> PreInspectCallbacks { get; set; } = new List<PreInspectCallback>();
public Func<IMessageAccessor, string?> GetIdentity { get; set; }
public List<object> PostInspectCallbacks { get; set; } = new List<object>();
public Func<JToken, Type, BaseParsedMessage> ObjectInitializer { get; set; } = SocketConverter.InstantiateMessageObject;
}
@ -23,9 +26,20 @@ namespace CryptoExchange.Net.Objects.Sockets
public class PostInspectCallback
{
public bool AllFieldPresentNeeded { get; set; } = true;
public List<string> TypeFields { get; set; } = new List<string>();
public Func<Dictionary<string, string?>, Dictionary<string, Type>, PostInspectResult> Callback { get; set; }
public List<TypeField> TypeFields { get; set; } = new List<TypeField>();
public Func<IMessageAccessor, Dictionary<string, Type>, PostInspectResult> Callback { get; set; }
}
public class TypeField
{
public string Key { get; set; }
public bool Required { get; set; }
public TypeField(string key, bool required = true)
{
Key = key;
Required = required;
}
}
public class PostInspectArrayCallback

View File

@ -33,7 +33,7 @@ namespace CryptoExchange.Net.Objects.Testing
var bytes = Encoding.UTF8.GetBytes(data);
var stream = new MemoryStream(bytes);
stream.Position = 0;
_ = ProcessData(stream);
_ = ProcessData(System.Net.WebSockets.WebSocketMessageType.Text, stream);
}
}
}

View File

@ -101,7 +101,7 @@ namespace CryptoExchange.Net.Sockets
public event Action? OnClose;
/// <inheritdoc />
public event Func<Stream, Task>? OnStreamMessage;
public event Func<WebSocketMessageType, Stream, Task>? OnStreamMessage;
/// <inheritdoc />
public event Action<int>? OnRequestSent;
@ -521,7 +521,7 @@ namespace CryptoExchange.Net.Sockets
{
// Received a complete message and it's not multi part
_logger.Log(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in single message");
await ProcessData(new MemoryStream(buffer.Array, buffer.Offset, receiveResult.Count)).ConfigureAwait(false);
await ProcessData(receiveResult.MessageType, new MemoryStream(buffer.Array, buffer.Offset, receiveResult.Count)).ConfigureAwait(false);
}
else
{
@ -555,7 +555,7 @@ namespace CryptoExchange.Net.Sockets
{
// Reassemble complete message from memory stream
_logger.Log(LogLevel.Trace, $"Socket {Id} reassembled message of {memoryStream!.Length} bytes");
await ProcessData(memoryStream).ConfigureAwait(false);
await ProcessData(receiveResult.MessageType, memoryStream).ConfigureAwait(false);
memoryStream.Dispose();
}
else
@ -580,13 +580,13 @@ namespace CryptoExchange.Net.Sockets
}
}
protected async Task ProcessData(Stream stream)
protected async Task ProcessData(WebSocketMessageType type, Stream stream)
{
stream.Position = 0;
if (Parameters.Interceptor != null)
stream = Parameters.Interceptor.Invoke(stream);
if (OnStreamMessage != null)
await OnStreamMessage.Invoke(stream).ConfigureAwait(false);
await OnStreamMessage.Invoke(type, stream).ConfigureAwait(false);
}
/// <summary>

View File

@ -21,6 +21,7 @@ namespace CryptoExchange.Net.Sockets
public bool Completed { get; set; }
public DateTime RequestTimestamp { get; set; }
public CallResult? Result { get; set; }
public BaseParsedMessage Response { get; set; }
protected AsyncResetEvent _event;
protected CancellationTokenSource? _cts;
@ -96,7 +97,7 @@ namespace CryptoExchange.Net.Sockets
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
public abstract Task<CallResult> HandleMessageAsync(DataEvent<BaseParsedMessage> message);
public abstract Task<CallResult> HandleMessageAsync(SocketConnection connection, DataEvent<BaseParsedMessage> message);
}
@ -124,10 +125,11 @@ namespace CryptoExchange.Net.Sockets
}
/// <inheritdoc />
public override async Task<CallResult> HandleMessageAsync(DataEvent<BaseParsedMessage> message)
public override async Task<CallResult> HandleMessageAsync(SocketConnection connection, DataEvent<BaseParsedMessage> message)
{
Completed = true;
Result = await HandleMessageAsync(message.As((ParsedMessage<TResponse>)message.Data)).ConfigureAwait(false);
Response = message.Data;
Result = await HandleMessageAsync(connection, message.As((ParsedMessage<TResponse>)message.Data)).ConfigureAwait(false);
_event.Set();
return Result;
}
@ -137,7 +139,7 @@ namespace CryptoExchange.Net.Sockets
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
public virtual Task<CallResult<TResponse>> HandleMessageAsync(DataEvent<ParsedMessage<TResponse>> message) => Task.FromResult(new CallResult<TResponse>(message.Data.TypedData!));
public virtual Task<CallResult<TResponse>> HandleMessageAsync(SocketConnection connection, DataEvent<ParsedMessage<TResponse>> message) => Task.FromResult(new CallResult<TResponse>(message.Data.TypedData!));
/// <inheritdoc />
public override void Timeout()

View File

@ -308,9 +308,9 @@ namespace CryptoExchange.Net.Sockets
/// </summary>
/// <param name="stream"></param>
/// <returns></returns>
protected virtual async Task HandleStreamMessage(Stream stream)
protected virtual async Task HandleStreamMessage(WebSocketMessageType type, Stream stream)
{
var result = ApiClient.StreamConverter.ReadJson(stream, _listenerManager.GetMapping(), ApiClient.ApiOptions.OutputOriginalData ?? ApiClient.ClientOptions.OutputOriginalData);
var result = ApiClient.StreamConverter.ReadJson(type, stream, _listenerManager.GetMapping(), ApiClient.ApiOptions.OutputOriginalData ?? ApiClient.ClientOptions.OutputOriginalData);
if(result == null)
{
// Not able to parse at all
@ -331,7 +331,7 @@ namespace CryptoExchange.Net.Sockets
return;
}
if (!await _listenerManager.InvokeListenersAsync(result.Identifier, result).ConfigureAwait(false))
if (!await _listenerManager.InvokeListenersAsync(this, result.Identifier, result).ConfigureAwait(false))
{
// Not able to find a listener for this message
stream.Position = 0;
@ -607,8 +607,12 @@ namespace CryptoExchange.Net.Sockets
var subQuery = subscription.GetSubQuery(this);
if (subQuery == null)
continue;
taskList.Add(SendAndWaitQueryAsync(subQuery));
taskList.Add(SendAndWaitQueryAsync(subQuery).ContinueWith((x) =>
{
subscription.HandleSubQueryResponse(subQuery.Response);
return x.Result;
}));
}
await Task.WhenAll(taskList).ConfigureAwait(false);
@ -645,7 +649,9 @@ namespace CryptoExchange.Net.Sockets
if (subQuery == null)
return new CallResult(null);
return await SendAndWaitQueryAsync(subQuery).ConfigureAwait(false);
var result = await SendAndWaitQueryAsync(subQuery).ConfigureAwait(false);
subscription.HandleSubQueryResponse(subQuery.Response);
return result;
}
/// <summary>

View File

@ -64,7 +64,7 @@ namespace CryptoExchange.Net.Sockets
}
}
public async Task<bool> InvokeListenersAsync(string id, BaseParsedMessage data)
public async Task<bool> InvokeListenersAsync(SocketConnection connection, string id, BaseParsedMessage data)
{
List<IMessageProcessor> listeners;
lock (_lock)
@ -91,7 +91,15 @@ namespace CryptoExchange.Net.Sockets
// Matched based on identifier
var userSw = Stopwatch.StartNew();
var dataEvent = new DataEvent<BaseParsedMessage>(data, null, data.OriginalData, DateTime.UtcNow, null);
await listener.HandleMessageAsync(dataEvent).ConfigureAwait(false);
try
{
await listener.HandleMessageAsync(connection, dataEvent).ConfigureAwait(false);
}
catch (Exception ex)
{
// TODO
}
userSw.Stop();
if (userSw.ElapsedMilliseconds > 500)
{

View File

@ -91,17 +91,20 @@ namespace CryptoExchange.Net.Sockets
/// <returns></returns>
public abstract BaseQuery? GetSubQuery(SocketConnection connection);
public virtual void HandleSubQueryResponse(BaseParsedMessage message) { }
public virtual void HandleUnsubQueryResponse(BaseParsedMessage message) { }
/// <summary>
/// Get the unsubscribe object to send when unsubscribing
/// </summary>
/// <returns></returns>
public abstract BaseQuery? GetUnsubQuery();
public async Task<CallResult> HandleMessageAsync(DataEvent<BaseParsedMessage> message)
public async Task<CallResult> HandleMessageAsync(SocketConnection connection, DataEvent<BaseParsedMessage> message)
{
ConnectionInvocations++;
TotalInvocations++;
return await DoHandleMessageAsync(message).ConfigureAwait(false);
return await DoHandleMessageAsync(connection, message).ConfigureAwait(false);
}
/// <summary>
@ -109,7 +112,7 @@ namespace CryptoExchange.Net.Sockets
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
public abstract Task<CallResult> DoHandleMessageAsync(DataEvent<BaseParsedMessage> message);
public abstract Task<CallResult> DoHandleMessageAsync(SocketConnection connection, DataEvent<BaseParsedMessage> message);
/// <summary>
/// Invoke the exception event
@ -149,14 +152,25 @@ namespace CryptoExchange.Net.Sockets
}
/// <inheritdoc />
public override Task<CallResult> DoHandleMessageAsync(DataEvent<BaseParsedMessage> message)
=> HandleEventAsync(message.As((ParsedMessage<TEvent>)message.Data));
public override Task<CallResult> DoHandleMessageAsync(SocketConnection connection, DataEvent<BaseParsedMessage> message)
=> HandleEventAsync(connection, message.As((ParsedMessage<TEvent>)message.Data));
/// <summary>
/// Handle the update message
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
public abstract Task<CallResult> HandleEventAsync(DataEvent<ParsedMessage<TEvent>> message);
public abstract Task<CallResult> HandleEventAsync(SocketConnection connection, DataEvent<ParsedMessage<TEvent>> message);
public override void HandleSubQueryResponse(BaseParsedMessage message)
=> HandleSubQueryResponse((ParsedMessage<TSubResponse>)message);
public virtual void HandleSubQueryResponse(ParsedMessage<TSubResponse> message) { }
public override void HandleUnsubQueryResponse(BaseParsedMessage message)
=> HandleUnsubQueryResponse((ParsedMessage<TUnsubResponse>)message);
public virtual void HandleUnsubQueryResponse(ParsedMessage<TUnsubResponse> message) { }
}
}

View File

@ -30,13 +30,13 @@ namespace CryptoExchange.Net.Sockets
public abstract class SystemSubscription<T> : SystemSubscription
{
public override Type ExpectedMessageType => typeof(T);
public override Task<CallResult> DoHandleMessageAsync(DataEvent<BaseParsedMessage> message)
=> HandleMessageAsync(message.As((ParsedMessage<T>)message.Data));
public override Task<CallResult> DoHandleMessageAsync(SocketConnection connection, DataEvent<BaseParsedMessage> message)
=> HandleMessageAsync(connection, message.As((ParsedMessage<T>)message.Data));
protected SystemSubscription(ILogger logger, bool authenticated) : base(logger, authenticated)
{
}
public abstract Task<CallResult> HandleMessageAsync(DataEvent<ParsedMessage<T>> message);
public abstract Task<CallResult> HandleMessageAsync(SocketConnection connection, DataEvent<ParsedMessage<T>> message);
}
}