1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-06-11 01:46:12 +00:00
This commit is contained in:
JKorf 2023-12-31 11:29:14 +01:00
parent eee19b28a5
commit c1ee36dd8a
3 changed files with 36 additions and 31 deletions

View File

@ -22,6 +22,7 @@ namespace CryptoExchange.Net.Sockets
public DateTime RequestTimestamp { get; set; } public DateTime RequestTimestamp { get; set; }
public CallResult? Result { get; set; } public CallResult? Result { get; set; }
public BaseParsedMessage Response { get; set; } public BaseParsedMessage Response { get; set; }
public Action OnFinished { get; set; }
protected AsyncResetEvent _event; protected AsyncResetEvent _event;
protected CancellationTokenSource? _cts; protected CancellationTokenSource? _cts;
@ -133,6 +134,9 @@ namespace CryptoExchange.Net.Sockets
Completed = true; Completed = true;
Response = message.Data; Response = message.Data;
Result = await HandleMessageAsync(connection, message.As((ParsedMessage<TResponse>)message.Data)).ConfigureAwait(false); Result = await HandleMessageAsync(connection, message.As((ParsedMessage<TResponse>)message.Data)).ConfigureAwait(false);
// Set() gives calling/waiting request the signal to continue and allows the message processing thread to continue with next message.
// However, the processing of the message isn't fully finished yet?
OnFinished?.Invoke();
_event.Set(); _event.Set();
return Result; return Result;
} }
@ -152,6 +156,7 @@ namespace CryptoExchange.Net.Sockets
Completed = true; Completed = true;
Result = new CallResult<TResponse>(new CancellationRequestedError()); Result = new CallResult<TResponse>(new CancellationRequestedError());
OnFinished?.Invoke();
_event.Set(); _event.Set();
} }
@ -160,6 +165,7 @@ namespace CryptoExchange.Net.Sockets
{ {
Result = new CallResult<TResponse>(new ServerError(error)); Result = new CallResult<TResponse>(new ServerError(error));
Completed = true; Completed = true;
OnFinished?.Invoke();
_event.Set(); _event.Set();
} }
} }

View File

@ -260,23 +260,25 @@ namespace CryptoExchange.Net.Sockets
query.Fail("Connection interupted"); query.Fail("Connection interupted");
_listenerManager.Remove(query); _listenerManager.Remove(query);
} }
// Mark subscription as 'not confirmed', only map updates to them if confirmed. Don't await sub answer here
var reconnectSuccessful = await ProcessReconnectAsync().ConfigureAwait(false); _ = Task.Run(async () =>
if (!reconnectSuccessful)
{ {
_logger.Log(LogLevel.Warning, $"Socket {SocketId} Failed reconnect processing: {reconnectSuccessful.Error}, reconnecting again"); var reconnectSuccessful = await ProcessReconnectAsync().ConfigureAwait(false);
await _socket.ReconnectAsync().ConfigureAwait(false); if (!reconnectSuccessful)
}
else
{
Status = SocketStatus.Connected;
_ = Task.Run(() =>
{ {
ConnectionRestored?.Invoke(DateTime.UtcNow - DisconnectTime!.Value); _logger.Log(LogLevel.Warning, $"Socket {SocketId} Failed reconnect processing: {reconnectSuccessful.Error}, reconnecting again");
DisconnectTime = null; _ = _socket.ReconnectAsync().ConfigureAwait(false);
}); }
} else
{
Status = SocketStatus.Connected;
_ = Task.Run(() =>
{
ConnectionRestored?.Invoke(DateTime.UtcNow - DisconnectTime!.Value);
DisconnectTime = null;
});
}
});
} }
/// <summary> /// <summary>
@ -315,11 +317,6 @@ namespace CryptoExchange.Net.Sockets
/// <returns></returns> /// <returns></returns>
protected virtual async Task HandleStreamMessage(WebSocketMessageType type, Stream stream) protected virtual async Task HandleStreamMessage(WebSocketMessageType type, Stream stream)
{ {
var buffer2 = new byte[stream.Length];
stream.Position = 0;
stream.Read(buffer2, 0, buffer2.Length);
Debug.WriteLine("0 " + Encoding.UTF8.GetString(buffer2));
stream.Position = 0;
var result = ReadJson(type, stream); var result = ReadJson(type, stream);
if (result == null) if (result == null)
{ {
@ -388,7 +385,12 @@ namespace CryptoExchange.Net.Sockets
var idInstance = accessor.Instantiate(typeResult); var idInstance = accessor.Instantiate(typeResult);
if (ApiClient.ApiOptions.OutputOriginalData ?? ApiClient.ClientOptions.OutputOriginalData) if (ApiClient.ApiOptions.OutputOriginalData ?? ApiClient.ClientOptions.OutputOriginalData)
idInstance.OriginalData = idInstance.OriginalData; {
var buffer2 = new byte[stream.Length];
stream.Position = 0;
stream.Read(buffer2, 0, buffer2.Length);
idInstance.OriginalData = Encoding.UTF8.GetString(buffer2);
}
idInstance.StreamIdentifier = streamIdentity; idInstance.StreamIdentifier = streamIdentity;
idInstance.TypeIdentifier = typeIdentity; idInstance.TypeIdentifier = typeIdentity;
@ -527,9 +529,9 @@ namespace CryptoExchange.Net.Sockets
/// </summary> /// </summary>
/// <param name="query">Query to send</param> /// <param name="query">Query to send</param>
/// <returns></returns> /// <returns></returns>
public virtual async Task<CallResult> SendAndWaitQueryAsync(BaseQuery query) public virtual async Task<CallResult> SendAndWaitQueryAsync(BaseQuery query, Action? onFinished = null)
{ {
await SendAndWaitIntAsync(query).ConfigureAwait(false); await SendAndWaitIntAsync(query, onFinished).ConfigureAwait(false);
return query.Result ?? new CallResult(new ServerError("Timeout")); return query.Result ?? new CallResult(new ServerError("Timeout"));
} }
@ -539,13 +541,13 @@ namespace CryptoExchange.Net.Sockets
/// <typeparam name="T">Query response type</typeparam> /// <typeparam name="T">Query response type</typeparam>
/// <param name="query">Query to send</param> /// <param name="query">Query to send</param>
/// <returns></returns> /// <returns></returns>
public virtual async Task<CallResult<T>> SendAndWaitQueryAsync<T>(Query<T> query) public virtual async Task<CallResult<T>> SendAndWaitQueryAsync<T>(Query<T> query, Action? onFinished = null)
{ {
await SendAndWaitIntAsync(query).ConfigureAwait(false); await SendAndWaitIntAsync(query, onFinished).ConfigureAwait(false);
return query.TypedResult ?? new CallResult<T>(new ServerError("Timeout")); return query.TypedResult ?? new CallResult<T>(new ServerError("Timeout"));
} }
private async Task SendAndWaitIntAsync(BaseQuery query) private async Task SendAndWaitIntAsync(BaseQuery query, Action onFinished)
{ {
_listenerManager.Add(query); _listenerManager.Add(query);
var sendOk = Send(query.Id, query.Request, query.Weight); var sendOk = Send(query.Id, query.Request, query.Weight);
@ -555,6 +557,7 @@ namespace CryptoExchange.Net.Sockets
return; return;
} }
query.OnFinished = onFinished;
while (true) while (true)
{ {
if (!_socket.IsOpen) if (!_socket.IsOpen)
@ -665,14 +668,10 @@ namespace CryptoExchange.Net.Sockets
if (subQuery == null) if (subQuery == null)
continue; continue;
taskList.Add(SendAndWaitQueryAsync(subQuery).ContinueWith((x) => taskList.Add(SendAndWaitQueryAsync(subQuery, () =>
{ {
Debug.WriteLine("1");
subscription.HandleSubQueryResponse(subQuery.Response); subscription.HandleSubQueryResponse(subQuery.Response);
Debug.WriteLine("2");
_listenerManager.Reset(subscription); _listenerManager.Reset(subscription);
Debug.WriteLine("3");
return x.Result;
})); }));
} }

View File

@ -71,7 +71,7 @@ namespace CryptoExchange.Net.Sockets
{ {
lock (_lock) lock (_lock)
{ {
Debug.WriteLine("4 Resetting"); //Debug.WriteLine("4 Resetting");
Remove(processor); Remove(processor);
Add(processor); Add(processor);
} }