1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-06-08 08:26:20 +00:00
This commit is contained in:
JKorf 2023-11-04 11:16:46 +01:00
parent 312d54cf04
commit 6fa66d819d
8 changed files with 162 additions and 165 deletions

View File

@ -284,7 +284,7 @@ namespace CryptoExchange.Net
/// <returns></returns>
protected virtual Task<CallResult<T>> QueryAsync<T>(Query<T> query)
{
return QueryAsync<T>(BaseAddress, query);
return QueryAsync(BaseAddress, query);
}
/// <summary>
@ -403,7 +403,7 @@ namespace CryptoExchange.Net
/// Should return the request which can be used to authenticate a socket connection
/// </summary>
/// <returns></returns>
protected internal virtual Query GetAuthenticationRequest() => throw new NotImplementedException();
protected internal virtual BaseQuery GetAuthenticationRequest() => throw new NotImplementedException();
/// <summary>
/// Add a subscription to a connection
@ -418,7 +418,7 @@ namespace CryptoExchange.Net
if (!connection.AddListener(subscription))
return false;
return false;
return true;
}
/// <summary>
@ -511,7 +511,7 @@ namespace CryptoExchange.Net
/// Process an unhandled message
/// </summary>
/// <param name="message">The message that wasn't processed</param>
protected virtual void HandleUnhandledMessage(ParsedMessage message)
protected virtual void HandleUnhandledMessage(BaseParsedMessage message)
{
}

View File

@ -32,18 +32,17 @@ namespace CryptoExchange.Net.Converters
public abstract Type? GetDeserializationType(Dictionary<string, string?> idValues, List<BasePendingRequest> pendingRequests, List<Subscription> listeners);
/// <inheritdoc />
public ParsedMessage? ReadJson(Stream stream, List<BasePendingRequest> pendingRequests, List<Subscription> listeners, bool outputOriginalData)
public BaseParsedMessage? ReadJson(Stream stream, List<BasePendingRequest> pendingRequests, List<Subscription> listeners, bool outputOriginalData)
{
// Start reading the data
// Once we reach the properties that identify the message we save those in a dict
// Once all id properties have been read callback to see what the deserialization type should be
// Deserialize to the correct type
var result = new ParsedMessage();
using var sr = new StreamReader(stream, Encoding.UTF8, false, (int)stream.Length, true);
if (outputOriginalData)
{
result.OriginalData = sr.ReadToEnd();
//result.OriginalData = sr.ReadToEnd();
stream.Position = 0;
}
@ -81,10 +80,12 @@ namespace CryptoExchange.Net.Converters
idString += GetValueForKey(token, idField);
}
result.Identifier = idString;
var resultType = GetDeserializationType(typeIdDict, pendingRequests, listeners);
result.Data = resultType == null ? null : token.ToObject(resultType);
return result;
var resultMessageType = typeof(ParsedMessage<>).MakeGenericType(resultType);
var instance = (BaseParsedMessage)Activator.CreateInstance(resultMessageType, resultType == null ? null : token.ToObject(resultType));
instance.Identifier = idString;
instance.Parsed = resultType != null;
return instance;
}
private string? GetValueForKey(JToken token, string key)

View File

@ -3,7 +3,7 @@
/// <summary>
/// Parsed message object
/// </summary>
public class ParsedMessage
public abstract class BaseParsedMessage
{
/// <summary>
/// Identifier string
@ -13,9 +13,22 @@
/// Original data if the option is enabled
/// </summary>
public string? OriginalData { get; set; }
/// <summary>
/// If parsed
/// </summary>
public bool Parsed { get; set; }
}
public class ParsedMessage<T> : BaseParsedMessage
{
/// <summary>
/// Parsed data object
/// </summary>
public object? Data { get; set; }
public T? Data { get; set; }
public ParsedMessage(T? data)
{
Data = data;
}
}
}

View File

@ -11,7 +11,7 @@ namespace CryptoExchange.Net.Objects.Sockets
public abstract class BasePendingRequest
{
public int Id { get; set; }
public Func<ParsedMessage, bool> MessageMatchesHandler { get; }
public Func<BaseParsedMessage, bool> MessageMatchesHandler { get; }
public bool Completed { get; private set; }
public abstract Type ResponseType { get; }
@ -21,10 +21,9 @@ namespace CryptoExchange.Net.Objects.Sockets
public object Request { get; set; }
private CancellationTokenSource? _cts;
public abstract CallResult Result { get; set; }
public int Priority => 100;
protected BasePendingRequest(int id, object request, Func<ParsedMessage, bool> messageMatchesHandler, TimeSpan timeout)
protected BasePendingRequest(int id, object request, Func<BaseParsedMessage, bool> messageMatchesHandler, TimeSpan timeout)
{
Id = id;
MessageMatchesHandler = messageMatchesHandler;
@ -47,12 +46,12 @@ namespace CryptoExchange.Net.Objects.Sockets
Event.Set();
}
public bool MessageMatches(ParsedMessage message)
public bool MessageMatches(BaseParsedMessage message)
{
return MessageMatchesHandler(message);
}
public virtual Task ProcessAsync(ParsedMessage message)
public virtual Task ProcessAsync(BaseParsedMessage message)
{
Completed = true;
Event.Set();
@ -60,59 +59,60 @@ namespace CryptoExchange.Net.Objects.Sockets
}
}
public class PendingRequest : BasePendingRequest
{
public CallResult Result { get; set; }
public Func<ParsedMessage, CallResult> Handler { get; }
public override Type? ResponseType => null;
//public class PendingRequest : BasePendingRequest
//{
// public CallResult Result { get; set; }
// public Func<BaseParsedMessage, CallResult> Handler { get; }
// public override Type? ResponseType => null;
private PendingRequest(int id, object request, Func<ParsedMessage, bool> messageMatchesHandler, Func<ParsedMessage, CallResult> messageHandler, TimeSpan timeout)
: base(id, request, messageMatchesHandler, timeout)
{
Handler = messageHandler;
}
// private PendingRequest(int id, object request, Func<BaseParsedMessage, bool> messageMatchesHandler, Func<BaseParsedMessage, CallResult> messageHandler, TimeSpan timeout)
// : base(id, request, messageMatchesHandler, timeout)
// {
// Handler = messageHandler;
// }
public static PendingRequest CreateForQuery(Query query)
{
return new PendingRequest(ExchangeHelpers.NextId(), query.Request, query.MessageMatchesQuery, query.HandleResult, TimeSpan.FromSeconds(5));
}
// public static PendingRequest CreateForQuery(BaseQuery query)
// {
// return new PendingRequest(ExchangeHelpers.NextId(), query.Request, query.MessageMatchesQuery, query.HandleResult, TimeSpan.FromSeconds(5));
// }
public static PendingRequest CreateForSubRequest(Subscription subscription)
{
return new PendingRequest(ExchangeHelpers.NextId(), subscription.GetSubRequest, subscription.MessageMatchesSubRequest, subscription.HandleSubResponse, TimeSpan.FromSeconds(5));
}
// public static PendingRequest CreateForSubRequest(Subscription subscription)
// {
// return new PendingRequest(ExchangeHelpers.NextId(), subscription.GetSubRequest(), subscription.MessageMatchesSubRequest, subscription.HandleSubResponse, TimeSpan.FromSeconds(5));
// }
public static PendingRequest CreateForUnsubRequest(Subscription subscription)
{
return new PendingRequest(ExchangeHelpers.NextId(), subscription.GetUnsubRequest, subscription.MessageMatchesUnsubRequest, subscription.HandleUnsubResponse, TimeSpan.FromSeconds(5));
}
// public static PendingRequest CreateForUnsubRequest(Subscription subscription)
// {
// return new PendingRequest(ExchangeHelpers.NextId(), subscription.GetUnsubRequest(), subscription.MessageMatchesUnsubRequest, subscription.HandleUnsubResponse, TimeSpan.FromSeconds(5));
// }
public override void Fail(string error)
{
Result = new CallResult(new ServerError(error));
base.Fail(error);
}
// public override void Fail(string error)
// {
// Result = new CallResult(new ServerError(error));
// base.Fail(error);
// }
public override Task ProcessAsync(ParsedMessage message)
{
Result = Handler(message);
return base.ProcessAsync(message);
}
}
// public override Task ProcessAsync(BaseParsedMessage message)
// {
// Result = Handler(message);
// return base.ProcessAsync(message);
// }
//}
public class PendingRequest<T> : BasePendingRequest
{
public CallResult<T> Result { get; set; }
public Func<ParsedMessage, CallResult<T>> Handler { get; }
public override CallResult Result { get; set; }
public CallResult<T> TypedResult => (CallResult<T>)Result;
public Func<ParsedMessage<T>, CallResult<T>> Handler { get; }
public override Type? ResponseType => typeof(T);
public PendingRequest(int id, object request, Func<ParsedMessage, bool> messageMatchesHandler, Func<ParsedMessage, CallResult<T>> messageHandler, TimeSpan timeout)
: base(id, request, messageMatchesHandler, timeout)
public PendingRequest(int id, object request, Func<ParsedMessage<T>, bool> messageMatchesHandler, Func<ParsedMessage<T>, CallResult<T>> messageHandler, TimeSpan timeout)
: base(id, request, (x) => messageMatchesHandler((ParsedMessage<T>)x), timeout)
{
Handler = messageHandler;
}
public static PendingRequest<T> CreateForQuery<T>(Query<T> query)
public static PendingRequest<T> CreateForQuery(Query<T> query)
{
return new PendingRequest<T>(ExchangeHelpers.NextId(), query.Request, query.MessageMatchesQuery, x =>
{
@ -127,9 +127,9 @@ namespace CryptoExchange.Net.Objects.Sockets
base.Fail(error);
}
public override Task ProcessAsync(ParsedMessage message)
public override Task ProcessAsync(BaseParsedMessage message)
{
Result = Handler(message);
Result = Handler((ParsedMessage<T>)message);
return base.ProcessAsync(message);
}
}

View File

@ -25,12 +25,9 @@ namespace CryptoExchange.Net.Sockets
/// </summary>
public int Weight { get; }
/// <summary>
/// Check if the message is the response to the query
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
public abstract bool MessageMatchesQuery(ParsedMessage message);
public abstract bool MessageMatchesQuery(BaseParsedMessage message);
public abstract CallResult HandleResult(BaseParsedMessage message);
/// <summary>
/// ctor
@ -44,20 +41,8 @@ namespace CryptoExchange.Net.Sockets
Request = request;
Weight = weight;
}
}
public abstract class Query : BaseQuery
{
protected Query(object request, bool authenticated, int weight = 1) : base(request, authenticated, weight)
{
}
/// <summary>
/// Handle the query response
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
public abstract CallResult HandleResult(ParsedMessage message);
public abstract BasePendingRequest CreatePendingRequest();
}
public abstract class Query<TResponse> : BaseQuery
@ -66,11 +51,23 @@ namespace CryptoExchange.Net.Sockets
{
}
public override CallResult HandleResult(BaseParsedMessage message) => HandleResponse((ParsedMessage<TResponse>) message);
public override bool MessageMatchesQuery(BaseParsedMessage message) => MessageMatchesQuery((ParsedMessage<TResponse>)message);
/// <summary>
/// Handle the query response
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
public abstract CallResult<TResponse> HandleResponse(ParsedMessage message);
public abstract CallResult<TResponse> HandleResponse(ParsedMessage<TResponse> message);
/// <summary>
/// Check if the message is the response to the query
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
public abstract bool MessageMatchesQuery(ParsedMessage<TResponse> message);
public override BasePendingRequest CreatePendingRequest() => PendingRequest<TResponse>.CreateForQuery(this);
}
}

View File

@ -49,7 +49,7 @@ namespace CryptoExchange.Net.Sockets
/// <summary>
/// Unhandled message event
/// </summary>
public event Action<ParsedMessage>? UnhandledMessage;
public event Action<BaseParsedMessage>? UnhandledMessage;
/// <summary>
/// Unparsed message event
@ -341,7 +341,7 @@ namespace CryptoExchange.Net.Sockets
if (result.OriginalData != null)
_logger.LogDebug($"Socket {SocketId} Data received: {result.OriginalData}");
if (result.Data == null)
if (!result.Parsed)
{
_logger.LogWarning("Message not matched to type");
return;
@ -352,7 +352,7 @@ namespace CryptoExchange.Net.Sockets
{
// Matched based on identifier
var userSw = Stopwatch.StartNew();
var dataEvent = new DataEvent<ParsedMessage>(result, null, result.OriginalData, DateTime.UtcNow, null);
var dataEvent = new DataEvent<BaseParsedMessage>(result, null, result.OriginalData, DateTime.UtcNow, null);
await idListener.HandleEventAsync(dataEvent).ConfigureAwait(false);
userSw.Stop();
return;
@ -547,18 +547,18 @@ namespace CryptoExchange.Net.Sockets
return _messageListeners.SingleOrDefault(s => predicate(s));
}
public virtual async Task<CallResult<T>> SendAndWaitQueryAsync<T>(Query<T> query)
public virtual async Task<CallResult> SendAndWaitQueryAsync(BaseQuery query)
{
var pendingRequest = PendingRequest<T>.CreateForQuery(query);
var pendingRequest = query.CreatePendingRequest();
await SendAndWaitAsync(pendingRequest, query.Weight).ConfigureAwait(false);
return pendingRequest.Result;
}
public virtual async Task<CallResult> SendAndWaitQueryAsync(Query query)
public virtual async Task<CallResult<T>> SendAndWaitQueryAsync<T>(Query<T> query)
{
var pendingRequest = PendingRequest.CreateForQuery(query);
var pendingRequest = PendingRequest<T>.CreateForQuery(query);
await SendAndWaitAsync(pendingRequest, query.Weight).ConfigureAwait(false);
return pendingRequest.Result;
return pendingRequest.TypedResult;
}
public virtual async Task<CallResult> SendAndWaitSubAsync(Subscription subscription)
@ -607,52 +607,6 @@ namespace CryptoExchange.Net.Sockets
}
}
///// <summary>
///// Send data and wait for an answer
///// </summary>
///// <typeparam name="T">The data type expected in response</typeparam>
///// <param name="obj">The object to send</param>
///// <param name="timeout">The timeout for response</param>
///// <param name="listener">Listener if this is a subscribe request</param>
///// <param name="handler">The response handler</param>
///// <param name="weight">The weight of the message</param>
///// <returns></returns>
//public virtual async Task SendAndWaitAsync<T>(T obj, TimeSpan timeout, MessageListener? listener, int weight, Func<ParsedMessage, bool> handler)
//{
// // TODO either Query<T> or Subscription<T> should be passed here instead of T obj
// // That would allow to track the Query/Subscription on the PendingRequest instead of the listener, which allow us to match the pending request in the Converter
// var pending = new PendingRequest(ExchangeHelpers.NextId(), handler, timeout, listener);
// lock (_messageListeners)
// {
// _pendingRequests.Add(pending);
// }
// var sendOk = Send(pending.Id, obj, weight);
// if (!sendOk)
// {
// pending.Fail();
// return;
// }
// while (true)
// {
// if(!_socket.IsOpen)
// {
// pending.Fail();
// return;
// }
// if (pending.Completed)
// return;
// await pending.Event.WaitAsync(TimeSpan.FromMilliseconds(500)).ConfigureAwait(false);
// if (pending.Completed)
// return;
// }
//}
/// <summary>
/// Send data over the websocket connection
/// </summary>
@ -727,16 +681,7 @@ namespace CryptoExchange.Net.Sockets
// Get a list of all subscriptions on the socket
List<Subscription> listenerList = new List<Subscription>();
lock (_listenerLock)
{
// ?
foreach (var listener in _messageListeners)
{
if (listener != null)
listenerList.Add(listener);
else
listener.Confirmed = true;
}
}
listenerList = _messageListeners.ToList();
foreach(var listener in listenerList)
{
@ -778,10 +723,8 @@ namespace CryptoExchange.Net.Sockets
var unsubscribeRequest = subscription?.GetUnsubRequest();
if (unsubscribeRequest != null)
{
var pendingRequest = PendingRequest.CreateForUnsubRequest(subscription!);
await SendAndWaitAsync(pendingRequest, 1).ConfigureAwait(false);
_logger.Log(LogLevel.Information, $"Socket {SocketId} subscription {subscription.Id} unsubscribed");
await SendAndWaitUnsubAsync(subscription!).ConfigureAwait(false);
_logger.Log(LogLevel.Information, $"Socket {SocketId} subscription {subscription!.Id} unsubscribed");
}
}

View File

@ -57,33 +57,24 @@ namespace CryptoExchange.Net.Sockets
/// </summary>
/// <returns></returns>
public abstract object? GetSubRequest();
/// <summary>
/// Check if the message is the response to the subscribe request
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
public abstract bool MessageMatchesSubRequest(ParsedMessage message);
public abstract CallResult HandleSubResponse(ParsedMessage message);
/// <summary>
/// Get the unsubscribe object to send when unsubscribing
/// </summary>
/// <returns></returns>
public abstract object? GetUnsubRequest();
/// <summary>
/// Check if the message is the response to the unsubscribe request
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
public abstract bool MessageMatchesUnsubRequest(ParsedMessage message);
public abstract CallResult HandleUnsubResponse(ParsedMessage message);
/// <summary>
/// Handle the update message
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
public abstract Task HandleEventAsync(DataEvent<ParsedMessage> message);
public abstract Task HandleEventAsync(DataEvent<BaseParsedMessage> message);
public abstract CallResult HandleSubResponse(BaseParsedMessage message);
public abstract CallResult HandleUnsubResponse(BaseParsedMessage message);
public abstract bool MessageMatchesUnsubRequest(BaseParsedMessage message);
public abstract bool MessageMatchesSubRequest(BaseParsedMessage message);
/// <summary>
/// Invoke the exception event
@ -94,4 +85,56 @@ namespace CryptoExchange.Net.Sockets
Exception?.Invoke(e);
}
}
public abstract class Subscription<TQuery, TEvent> : Subscription<TQuery, TEvent, TQuery>
{
protected Subscription(ILogger logger, bool authenticated) : base(logger, authenticated)
{
}
}
public abstract class Subscription<TSubResponse, TEvent, TUnsubResponse> : Subscription
{
protected Subscription(ILogger logger, bool authenticated) : base(logger, authenticated)
{
}
public override CallResult HandleUnsubResponse(BaseParsedMessage message)
=> HandleUnsubResponse((ParsedMessage<TUnsubResponse>)message);
public override CallResult HandleSubResponse(BaseParsedMessage message)
=> HandleSubResponse((ParsedMessage<TSubResponse>)message);
public override Task HandleEventAsync(DataEvent<BaseParsedMessage> message)
=> HandleEventAsync(message.As((ParsedMessage<TEvent>)message.Data));
public override bool MessageMatchesSubRequest(BaseParsedMessage message)
=> MessageMatchesSubRequest((ParsedMessage<TSubResponse>)message);
public override bool MessageMatchesUnsubRequest(BaseParsedMessage message)
=> MessageMatchesUnsubRequest((ParsedMessage<TUnsubResponse>)message);
/// <summary>
/// Handle the update message
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
public abstract Task HandleEventAsync(DataEvent<ParsedMessage<TEvent>> message);
/// <summary>
/// Check if the message is the response to the subscribe request
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
public abstract bool MessageMatchesSubRequest(ParsedMessage<TSubResponse> message);
public abstract CallResult HandleSubResponse(ParsedMessage<TSubResponse> message);
/// <summary>
/// Check if the message is the response to the unsubscribe request
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
public abstract bool MessageMatchesUnsubRequest(ParsedMessage<TUnsubResponse> message);
public abstract CallResult HandleUnsubResponse(ParsedMessage<TUnsubResponse> message);
}
}

View File

@ -24,15 +24,15 @@ namespace CryptoExchange.Net.Sockets
/// <inheritdoc />
public override object? GetSubRequest() => null;
/// <inheritdoc />
public override bool MessageMatchesSubRequest(ParsedMessage message) => throw new NotImplementedException();
public override bool MessageMatchesSubRequest(BaseParsedMessage message) => throw new NotImplementedException();
/// <inheritdoc />
public override CallResult HandleSubResponse(ParsedMessage message) => throw new NotImplementedException();
public override CallResult HandleSubResponse(BaseParsedMessage message) => throw new NotImplementedException();
/// <inheritdoc />
public override object? GetUnsubRequest() => null;
/// <inheritdoc />
public override bool MessageMatchesUnsubRequest(ParsedMessage message) => throw new NotImplementedException();
public override bool MessageMatchesUnsubRequest(BaseParsedMessage message) => throw new NotImplementedException();
/// <inheritdoc />
public override CallResult HandleUnsubResponse(ParsedMessage message) => throw new NotImplementedException();
public override CallResult HandleUnsubResponse(BaseParsedMessage message) => throw new NotImplementedException();
}
}