1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-06-07 16:06:15 +00:00
This commit is contained in:
Jan Korf 2022-04-24 11:31:13 +02:00
parent f514e172d7
commit 11c48b3341
6 changed files with 156 additions and 120 deletions

View File

@ -443,6 +443,9 @@ namespace CryptoExchange.Net
/// <param name="message"></param> /// <param name="message"></param>
/// <returns></returns> /// <returns></returns>
protected internal virtual JToken ProcessTokenData(JToken message) protected internal virtual JToken ProcessTokenData(JToken message)
{ {
return message; return message;
} }
@ -605,27 +608,27 @@ namespace CryptoExchange.Net
if (disposing) if (disposing)
break; break;
foreach (var socket in socketConnections.Values) foreach (var socketConnection in socketConnections.Values)
{ {
if (disposing) if (disposing)
break; break;
if (!socket.Connected) if (!socketConnection.Connected)
continue; continue;
var obj = objGetter(socket); var obj = objGetter(socketConnection);
if (obj == null) if (obj == null)
continue; continue;
log.Write(LogLevel.Trace, $"Socket {socket.SocketId} sending periodic {identifier}"); log.Write(LogLevel.Trace, $"Socket {socketConnection.SocketId} sending periodic {identifier}");
try try
{ {
socket.Send(obj); socketConnection.Send(obj);
} }
catch (Exception ex) catch (Exception ex)
{ {
log.Write(LogLevel.Warning, $"Socket {socket.SocketId} Periodic send {identifier} failed: " + ex.ToLogString()); log.Write(LogLevel.Warning, $"Socket {socketConnection.SocketId} Periodic send {identifier} failed: " + ex.ToLogString());
} }
} }
} }
@ -680,18 +683,14 @@ namespace CryptoExchange.Net
public virtual async Task UnsubscribeAllAsync() public virtual async Task UnsubscribeAllAsync()
{ {
log.Write(LogLevel.Information, $"Closing all {socketConnections.Sum(s => s.Value.SubscriptionCount)} subscriptions"); log.Write(LogLevel.Information, $"Closing all {socketConnections.Sum(s => s.Value.SubscriptionCount)} subscriptions");
var tasks = new List<Task>();
await Task.Run(async () =>
{ {
var tasks = new List<Task>(); var socketList = socketConnections.Values;
{ foreach (var sub in socketList)
var socketList = socketConnections.Values; tasks.Add(sub.CloseAsync());
foreach (var sub in socketList) }
tasks.Add(sub.CloseAsync());
}
await Task.WhenAll(tasks.ToArray()).ConfigureAwait(false); await Task.WhenAll(tasks.ToArray()).ConfigureAwait(false);
}).ConfigureAwait(false);
} }
/// <summary> /// <summary>
@ -703,7 +702,7 @@ namespace CryptoExchange.Net
periodicEvent?.Set(); periodicEvent?.Set();
periodicEvent?.Dispose(); periodicEvent?.Dispose();
log.Write(LogLevel.Debug, "Disposing socket client, closing all subscriptions"); log.Write(LogLevel.Debug, "Disposing socket client, closing all subscriptions");
Task.Run(UnsubscribeAllAsync).ConfigureAwait(false).GetAwaiter().GetResult(); _ = UnsubscribeAllAsync();
semaphoreSlim?.Dispose(); semaphoreSlim?.Dispose();
base.Dispose(); base.Dispose();
} }

View File

@ -85,7 +85,11 @@ namespace CryptoExchange.Net.Interfaces
/// Connect the socket /// Connect the socket
/// </summary> /// </summary>
/// <returns></returns> /// <returns></returns>
Task<bool> ConnectAsync(); Task<bool> ConnectAsync();
/// <summary>
/// Receive and send messages over the connection. Resulting task should complete when closing the socket.
/// </summary>
/// <returns></returns>
Task ProcessAsync(); Task ProcessAsync();
/// <summary> /// <summary>
/// Send data /// Send data

View File

@ -23,7 +23,7 @@ namespace CryptoExchange.Net.Sockets
public class CryptoExchangeWebSocketClient : IWebsocket public class CryptoExchangeWebSocketClient : IWebsocket
{ {
internal static int lastStreamId; internal static int lastStreamId;
private static readonly object streamIdLock = new object(); private static readonly object streamIdLock = new();
private ClientWebSocket _socket; private ClientWebSocket _socket;
private readonly AsyncResetEvent _sendEvent; private readonly AsyncResetEvent _sendEvent;
@ -39,6 +39,7 @@ namespace CryptoExchange.Net.Sockets
/// Received messages, the size and the timstamp /// Received messages, the size and the timstamp
/// </summary> /// </summary>
protected readonly List<ReceiveItem> _receivedMessages; protected readonly List<ReceiveItem> _receivedMessages;
/// <summary> /// <summary>
/// Received messages lock /// Received messages lock
/// </summary> /// </summary>
@ -52,19 +53,19 @@ namespace CryptoExchange.Net.Sockets
/// <summary> /// <summary>
/// Handlers for when an error happens on the socket /// Handlers for when an error happens on the socket
/// </summary> /// </summary>
protected readonly List<Action<Exception>> errorHandlers = new List<Action<Exception>>(); protected readonly List<Action<Exception>> errorHandlers = new();
/// <summary> /// <summary>
/// Handlers for when the socket connection is opened /// Handlers for when the socket connection is opened
/// </summary> /// </summary>
protected readonly List<Action> openHandlers = new List<Action>(); protected readonly List<Action> openHandlers = new();
/// <summary> /// <summary>
/// Handlers for when the connection is closed /// Handlers for when the connection is closed
/// </summary> /// </summary>
protected readonly List<Action> closeHandlers = new List<Action>(); protected readonly List<Action> closeHandlers = new();
/// <summary> /// <summary>
/// Handlers for when a message is received /// Handlers for when a message is received
/// </summary> /// </summary>
protected readonly List<Action<string>> messageHandlers = new List<Action<string>>(); protected readonly List<Action<string>> messageHandlers = new();
/// <inheritdoc /> /// <inheritdoc />
public int Id { get; } public int Id { get; }
@ -223,7 +224,7 @@ namespace CryptoExchange.Net.Sockets
log.Write(LogLevel.Debug, $"Socket {Id} connecting"); log.Write(LogLevel.Debug, $"Socket {Id} connecting");
try try
{ {
using CancellationTokenSource tcs = new CancellationTokenSource(TimeSpan.FromSeconds(10)); using CancellationTokenSource tcs = new(TimeSpan.FromSeconds(10));
await _socket.ConnectAsync(Uri, tcs.Token).ConfigureAwait(false); await _socket.ConnectAsync(Uri, tcs.Token).ConfigureAwait(false);
Handle(openHandlers); Handle(openHandlers);

View File

@ -0,0 +1,47 @@
using CryptoExchange.Net.Objects;
using Newtonsoft.Json.Linq;
using System;
using System.Threading;
namespace CryptoExchange.Net.Sockets
{
internal class PendingRequest
{
public Func<JToken, bool> Handler { get; }
public JToken? Result { get; private set; }
public bool Completed { get; private set; }
public AsyncResetEvent Event { get; }
public TimeSpan Timeout { get; }
private CancellationTokenSource cts;
public PendingRequest(Func<JToken, bool> handler, TimeSpan timeout)
{
Handler = handler;
Event = new AsyncResetEvent(false, false);
Timeout = timeout;
cts = new CancellationTokenSource(timeout);
cts.Token.Register(Fail, false);
}
public bool CheckData(JToken data)
{
if (Handler(data))
{
Result = data;
Completed = true;
Event.Set();
return true;
}
return false;
}
public void Fail()
{
Completed = true;
Event.Set();
}
}
}

View File

@ -65,16 +65,22 @@ namespace CryptoExchange.Net.Sockets
/// <summary> /// <summary>
/// If connection is made /// If connection is made
/// </summary> /// </summary>
public bool Connected => Socket.IsOpen; public bool Connected => _socket.IsOpen;
/// <summary> /// <summary>
/// The underlying websocket /// The unique ID of the socket
/// </summary> /// </summary>
private IWebsocket Socket { get; set; } public int SocketId => _socket.Id;
public int SocketId => Socket.Id;
public double IncomingKbps => Socket.IncomingKbps; /// <summary>
public Uri Uri => Socket.Uri; /// The current kilobytes per second of data being received, averaged over the last 3 seconds
/// </summary>
public double IncomingKbps => _socket.IncomingKbps;
/// <summary>
/// The connection uri
/// </summary>
public Uri Uri => _socket.Uri;
/// <summary> /// <summary>
/// The API client the connection is for /// The API client the connection is for
@ -117,7 +123,7 @@ namespace CryptoExchange.Net.Sockets
if (pausedActivity != value) if (pausedActivity != value)
{ {
pausedActivity = value; pausedActivity = value;
log.Write(LogLevel.Information, $"Socket {Socket.Id} Paused activity: " + value); log.Write(LogLevel.Information, $"Socket {SocketId} Paused activity: " + value);
if(pausedActivity) ActivityPaused?.Invoke(); if(pausedActivity) ActivityPaused?.Invoke();
else ActivityUnpaused?.Invoke(); else ActivityUnpaused?.Invoke();
} }
@ -126,14 +132,19 @@ namespace CryptoExchange.Net.Sockets
private bool pausedActivity; private bool pausedActivity;
private readonly List<SocketSubscription> subscriptions; private readonly List<SocketSubscription> subscriptions;
private readonly object subscriptionLock = new object(); private readonly object subscriptionLock = new();
private bool lostTriggered; private bool lostTriggered;
private readonly Log log; private readonly Log log;
private readonly BaseSocketClient socketClient; private readonly BaseSocketClient socketClient;
private readonly List<PendingRequest> pendingRequests; private readonly List<PendingRequest> pendingRequests;
private Task _socketProcessReconnectTask; private Task? _socketProcessReconnectTask;
/// <summary>
/// The underlying websocket
/// </summary>
private readonly IWebsocket _socket;
/// <summary> /// <summary>
/// New socket connection /// New socket connection
@ -150,25 +161,33 @@ namespace CryptoExchange.Net.Sockets
pendingRequests = new List<PendingRequest>(); pendingRequests = new List<PendingRequest>();
subscriptions = new List<SocketSubscription>(); subscriptions = new List<SocketSubscription>();
Socket = socket; _socket = socket;
Socket.Timeout = client.ClientOptions.SocketNoDataTimeout; _socket.Timeout = client.ClientOptions.SocketNoDataTimeout;
Socket.OnMessage += ProcessMessage; _socket.OnMessage += ProcessMessage;
Socket.OnOpen += SocketOnOpen; _socket.OnOpen += SocketOnOpen;
} }
/// <summary>
/// Connect the websocket and start processing
/// </summary>
/// <returns></returns>
public async Task<bool> ConnectAsync() public async Task<bool> ConnectAsync()
{ {
var connected = await Socket.ConnectAsync().ConfigureAwait(false); var connected = await _socket.ConnectAsync().ConfigureAwait(false);
if (connected) if (connected)
StartProcessingTask(); StartProcessingTask();
return connected; return connected;
} }
/// <summary>
/// Trigger a reconnect of the socket connection
/// </summary>
/// <returns></returns>
public async Task TriggerReconnectAsync() public async Task TriggerReconnectAsync()
{ {
await Socket.CloseAsync().ConfigureAwait(false); await _socket.CloseAsync().ConfigureAwait(false);
} }
/// <summary> /// <summary>
@ -178,8 +197,8 @@ namespace CryptoExchange.Net.Sockets
public async Task CloseAsync() public async Task CloseAsync()
{ {
ShouldReconnect = false; ShouldReconnect = false;
if (socketClient.socketConnections.ContainsKey(Socket.Id)) if (socketClient.socketConnections.ContainsKey(SocketId))
socketClient.socketConnections.TryRemove(Socket.Id, out _); socketClient.socketConnections.TryRemove(SocketId, out _);
lock (subscriptionLock) lock (subscriptionLock)
{ {
@ -189,9 +208,12 @@ namespace CryptoExchange.Net.Sockets
subscription.CancellationTokenRegistration.Value.Dispose(); subscription.CancellationTokenRegistration.Value.Dispose();
} }
} }
await Socket.CloseAsync().ConfigureAwait(false); await _socket.CloseAsync().ConfigureAwait(false);
Socket.Dispose();
await _socketProcessReconnectTask.ConfigureAwait(false); if (_socketProcessReconnectTask != null)
await _socketProcessReconnectTask.ConfigureAwait(false);
_socket.Dispose();
} }
/// <summary> /// <summary>
@ -201,7 +223,7 @@ namespace CryptoExchange.Net.Sockets
/// <returns></returns> /// <returns></returns>
public async Task CloseAsync(SocketSubscription subscription) public async Task CloseAsync(SocketSubscription subscription)
{ {
if (!Socket.IsOpen) if (!_socket.IsOpen)
return; return;
if (subscription.CancellationTokenRegistration.HasValue) if (subscription.CancellationTokenRegistration.HasValue)
@ -225,7 +247,7 @@ namespace CryptoExchange.Net.Sockets
{ {
_socketProcessReconnectTask = Task.Run(async () => _socketProcessReconnectTask = Task.Run(async () =>
{ {
await Socket.ProcessAsync().ConfigureAwait(false); await _socket.ProcessAsync().ConfigureAwait(false);
await ReconnectAsync().ConfigureAwait(false); await ReconnectAsync().ConfigureAwait(false);
}); });
} }
@ -246,7 +268,7 @@ namespace CryptoExchange.Net.Sockets
{ {
// Should reconnect // Should reconnect
DisconnectTime = DateTime.UtcNow; DisconnectTime = DateTime.UtcNow;
log.Write(LogLevel.Warning, $"Socket {Socket.Id} Connection lost, will try to reconnect"); log.Write(LogLevel.Warning, $"Socket {SocketId} Connection lost, will try to reconnect");
if (!lostTriggered) if (!lostTriggered)
{ {
lostTriggered = true; lostTriggered = true;
@ -267,8 +289,8 @@ namespace CryptoExchange.Net.Sockets
return; return;
} }
Socket.Reset(); _socket.Reset();
if (!await Socket.ConnectAsync().ConfigureAwait(false)) if (!await _socket.ConnectAsync().ConfigureAwait(false))
{ {
// Reconnect failed // Reconnect failed
ReconnectTry++; ReconnectTry++;
@ -276,11 +298,11 @@ namespace CryptoExchange.Net.Sockets
if (socketClient.ClientOptions.MaxReconnectTries != null if (socketClient.ClientOptions.MaxReconnectTries != null
&& ReconnectTry >= socketClient.ClientOptions.MaxReconnectTries) && ReconnectTry >= socketClient.ClientOptions.MaxReconnectTries)
{ {
log.Write(LogLevel.Warning, $"Socket {Socket.Id} failed to reconnect after {ReconnectTry} tries, closing"); log.Write(LogLevel.Warning, $"Socket {SocketId} failed to reconnect after {ReconnectTry} tries, closing");
ShouldReconnect = false; ShouldReconnect = false;
if (socketClient.socketConnections.ContainsKey(Socket.Id)) if (socketClient.socketConnections.ContainsKey(SocketId))
socketClient.socketConnections.TryRemove(Socket.Id, out _); socketClient.socketConnections.TryRemove(SocketId, out _);
_ = Task.Run(() => ConnectionClosed?.Invoke()); _ = Task.Run(() => ConnectionClosed?.Invoke());
// Reached max tries, break loop and leave connection closed // Reached max tries, break loop and leave connection closed
@ -288,7 +310,7 @@ namespace CryptoExchange.Net.Sockets
} }
// Continue to try again // Continue to try again
log.Write(LogLevel.Debug, $"Socket {Socket.Id} failed to reconnect{(socketClient.ClientOptions.MaxReconnectTries != null ? $", try {ReconnectTry}/{socketClient.ClientOptions.MaxReconnectTries}" : "")}, will try again in {socketClient.ClientOptions.ReconnectInterval}"); log.Write(LogLevel.Debug, $"Socket {SocketId} failed to reconnect{(socketClient.ClientOptions.MaxReconnectTries != null ? $", try {ReconnectTry}/{socketClient.ClientOptions.MaxReconnectTries}" : "")}, will try again in {socketClient.ClientOptions.ReconnectInterval}");
continue; continue;
} }
@ -299,7 +321,7 @@ namespace CryptoExchange.Net.Sockets
var time = DisconnectTime; var time = DisconnectTime;
DisconnectTime = null; DisconnectTime = null;
log.Write(LogLevel.Information, $"Socket {Socket.Id} reconnected after {DateTime.UtcNow - time}"); log.Write(LogLevel.Information, $"Socket {SocketId} reconnected after {DateTime.UtcNow - time}");
var reconnectResult = await ProcessReconnectAsync().ConfigureAwait(false); var reconnectResult = await ProcessReconnectAsync().ConfigureAwait(false);
if (!reconnectResult) if (!reconnectResult)
@ -311,20 +333,20 @@ namespace CryptoExchange.Net.Sockets
if (socketClient.ClientOptions.MaxResubscribeTries != null && if (socketClient.ClientOptions.MaxResubscribeTries != null &&
ResubscribeTry >= socketClient.ClientOptions.MaxResubscribeTries) ResubscribeTry >= socketClient.ClientOptions.MaxResubscribeTries)
{ {
log.Write(LogLevel.Warning, $"Socket {Socket.Id} failed to resubscribe after {ResubscribeTry} tries, closing. Last resubscription error: {reconnectResult.Error}"); log.Write(LogLevel.Warning, $"Socket {SocketId} failed to resubscribe after {ResubscribeTry} tries, closing. Last resubscription error: {reconnectResult.Error}");
ShouldReconnect = false; ShouldReconnect = false;
if (socketClient.socketConnections.ContainsKey(Socket.Id)) if (socketClient.socketConnections.ContainsKey(SocketId))
socketClient.socketConnections.TryRemove(Socket.Id, out _); socketClient.socketConnections.TryRemove(SocketId, out _);
_ = Task.Run(() => ConnectionClosed?.Invoke()); _ = Task.Run(() => ConnectionClosed?.Invoke());
} }
else else
log.Write(LogLevel.Debug, $"Socket {Socket.Id} resubscribing all subscriptions failed on reconnected socket{(socketClient.ClientOptions.MaxResubscribeTries != null ? $", try {ResubscribeTry}/{socketClient.ClientOptions.MaxResubscribeTries}" : "")}. Error: {reconnectResult.Error}. Disconnecting and reconnecting."); log.Write(LogLevel.Debug, $"Socket {SocketId} resubscribing all subscriptions failed on reconnected socket{(socketClient.ClientOptions.MaxResubscribeTries != null ? $", try {ResubscribeTry}/{socketClient.ClientOptions.MaxResubscribeTries}" : "")}. Error: {reconnectResult.Error}. Disconnecting and reconnecting.");
// Failed resubscribe, close socket if it is still open // Failed resubscribe, close socket if it is still open
if (Socket.IsOpen) if (_socket.IsOpen)
await Socket.CloseAsync().ConfigureAwait(false); await _socket.CloseAsync().ConfigureAwait(false);
else else
DisconnectTime = DateTime.UtcNow; DisconnectTime = DateTime.UtcNow;
@ -334,7 +356,7 @@ namespace CryptoExchange.Net.Sockets
else else
{ {
// Succesfully reconnected // Succesfully reconnected
log.Write(LogLevel.Information, $"Socket {Socket.Id} data connection restored."); log.Write(LogLevel.Information, $"Socket {SocketId} data connection restored.");
ResubscribeTry = 0; ResubscribeTry = 0;
if (lostTriggered) if (lostTriggered)
{ {
@ -352,15 +374,18 @@ namespace CryptoExchange.Net.Sockets
_ = Task.Run(() => ConnectionClosed?.Invoke()); _ = Task.Run(() => ConnectionClosed?.Invoke());
// No reconnecting needed // No reconnecting needed
log.Write(LogLevel.Information, $"Socket {Socket.Id} closed"); log.Write(LogLevel.Information, $"Socket {SocketId} closed");
if (socketClient.socketConnections.ContainsKey(Socket.Id)) if (socketClient.socketConnections.ContainsKey(SocketId))
socketClient.socketConnections.TryRemove(Socket.Id, out _); socketClient.socketConnections.TryRemove(SocketId, out _);
} }
} }
/// <summary>
/// Dispose the connection
/// </summary>
public void Dispose() public void Dispose()
{ {
Socket.Dispose(); _socket.Dispose();
} }
/// <summary> /// <summary>
@ -370,7 +395,7 @@ namespace CryptoExchange.Net.Sockets
private void ProcessMessage(string data) private void ProcessMessage(string data)
{ {
var timestamp = DateTime.UtcNow; var timestamp = DateTime.UtcNow;
log.Write(LogLevel.Trace, $"Socket {Socket.Id} received data: " + data); log.Write(LogLevel.Trace, $"Socket {SocketId} received data: " + data);
if (string.IsNullOrEmpty(data)) return; if (string.IsNullOrEmpty(data)) return;
var tokenData = data.ToJToken(log); var tokenData = data.ToJToken(log);
@ -416,16 +441,16 @@ namespace CryptoExchange.Net.Sockets
if (!handled && !handledResponse) if (!handled && !handledResponse)
{ {
if (!socketClient.UnhandledMessageExpected) if (!socketClient.UnhandledMessageExpected)
log.Write(LogLevel.Warning, $"Socket {Socket.Id} Message not handled: " + tokenData); log.Write(LogLevel.Warning, $"Socket {SocketId} Message not handled: " + tokenData);
UnhandledMessage?.Invoke(tokenData); UnhandledMessage?.Invoke(tokenData);
} }
var total = DateTime.UtcNow - timestamp; var total = DateTime.UtcNow - timestamp;
if (userProcessTime.TotalMilliseconds > 500) if (userProcessTime.TotalMilliseconds > 500)
log.Write(LogLevel.Debug, $"Socket {Socket.Id} message processing slow ({(int)total.TotalMilliseconds}ms), consider offloading data handling to another thread. " + log.Write(LogLevel.Debug, $"Socket {SocketId} message processing slow ({(int)total.TotalMilliseconds}ms), consider offloading data handling to another thread. " +
"Data from this socket may arrive late or not at all if message processing is continuously slow."); "Data from this socket may arrive late or not at all if message processing is continuously slow.");
log.Write(LogLevel.Trace, $"Socket {Socket.Id} message processed in {(int)total.TotalMilliseconds}ms, ({(int)userProcessTime.TotalMilliseconds}ms user code)"); log.Write(LogLevel.Trace, $"Socket {SocketId} message processed in {(int)total.TotalMilliseconds}ms, ({(int)userProcessTime.TotalMilliseconds}ms user code)");
} }
/// <summary> /// <summary>
@ -509,7 +534,7 @@ namespace CryptoExchange.Net.Sockets
} }
catch (Exception ex) catch (Exception ex)
{ {
log.Write(LogLevel.Error, $"Socket {Socket.Id} Exception during message processing\r\nException: {ex.ToLogString()}\r\nData: {messageEvent.JsonData}"); log.Write(LogLevel.Error, $"Socket {SocketId} Exception during message processing\r\nException: {ex.ToLogString()}\r\nData: {messageEvent.JsonData}");
currentSubscription?.InvokeExceptionHandler(ex); currentSubscription?.InvokeExceptionHandler(ex);
return (false, TimeSpan.Zero); return (false, TimeSpan.Zero);
} }
@ -554,8 +579,8 @@ namespace CryptoExchange.Net.Sockets
/// <param name="data">The data to send</param> /// <param name="data">The data to send</param>
public virtual void Send(string data) public virtual void Send(string data)
{ {
log.Write(LogLevel.Trace, $"Socket {Socket.Id} sending data: {data}"); log.Write(LogLevel.Trace, $"Socket {SocketId} sending data: {data}");
Socket.Send(data); _socket.Send(data);
} }
/// <summary> /// <summary>
@ -569,7 +594,7 @@ namespace CryptoExchange.Net.Sockets
private async Task<CallResult<bool>> ProcessReconnectAsync() private async Task<CallResult<bool>> ProcessReconnectAsync()
{ {
if (!Socket.IsOpen) if (!_socket.IsOpen)
return new CallResult<bool>(new WebError("Socket not connected")); return new CallResult<bool>(new WebError("Socket not connected"));
if (Authenticated) if (Authenticated)
@ -578,11 +603,11 @@ namespace CryptoExchange.Net.Sockets
var authResult = await socketClient.AuthenticateSocketAsync(this).ConfigureAwait(false); var authResult = await socketClient.AuthenticateSocketAsync(this).ConfigureAwait(false);
if (!authResult) if (!authResult)
{ {
log.Write(LogLevel.Warning, $"Socket {Socket.Id} authentication failed on reconnected socket. Disconnecting and reconnecting."); log.Write(LogLevel.Warning, $"Socket {SocketId} authentication failed on reconnected socket. Disconnecting and reconnecting.");
return authResult; return authResult;
} }
log.Write(LogLevel.Debug, $"Socket {Socket.Id} authentication succeeded on reconnected socket."); log.Write(LogLevel.Debug, $"Socket {SocketId} authentication succeeded on reconnected socket.");
} }
// Get a list of all subscriptions on the socket // Get a list of all subscriptions on the socket
@ -593,7 +618,7 @@ namespace CryptoExchange.Net.Sockets
// Foreach subscription which is subscribed by a subscription request we will need to resend that request to resubscribe // Foreach subscription which is subscribed by a subscription request we will need to resend that request to resubscribe
for (var i = 0; i < subscriptionList.Count; i += socketClient.ClientOptions.MaxConcurrentResubscriptionsPerSocket) for (var i = 0; i < subscriptionList.Count; i += socketClient.ClientOptions.MaxConcurrentResubscriptionsPerSocket)
{ {
if (!Socket.IsOpen) if (!_socket.IsOpen)
return new CallResult<bool>(new WebError("Socket not connected")); return new CallResult<bool>(new WebError("Socket not connected"));
var taskList = new List<Task<CallResult<bool>>>(); var taskList = new List<Task<CallResult<bool>>>();
@ -605,10 +630,10 @@ namespace CryptoExchange.Net.Sockets
return taskList.First(t => !t.Result.Success).Result; return taskList.First(t => !t.Result.Success).Result;
} }
if (!Socket.IsOpen) if (!_socket.IsOpen)
return new CallResult<bool>(new WebError("Socket not connected")); return new CallResult<bool>(new WebError("Socket not connected"));
log.Write(LogLevel.Debug, $"Socket {Socket.Id} all subscription successfully resubscribed on reconnected socket."); log.Write(LogLevel.Debug, $"Socket {SocketId} all subscription successfully resubscribed on reconnected socket.");
return new CallResult<bool>(true); return new CallResult<bool>(true);
} }
@ -619,50 +644,10 @@ namespace CryptoExchange.Net.Sockets
internal async Task<CallResult<bool>> ResubscribeAsync(SocketSubscription socketSubscription) internal async Task<CallResult<bool>> ResubscribeAsync(SocketSubscription socketSubscription)
{ {
if (!Socket.IsOpen) if (!_socket.IsOpen)
return new CallResult<bool>(new UnknownError("Socket is not connected")); return new CallResult<bool>(new UnknownError("Socket is not connected"));
return await socketClient.SubscribeAndWaitAsync(this, socketSubscription.Request!, socketSubscription).ConfigureAwait(false); return await socketClient.SubscribeAndWaitAsync(this, socketSubscription.Request!, socketSubscription).ConfigureAwait(false);
} }
} }
internal class PendingRequest
{
public Func<JToken, bool> Handler { get; }
public JToken? Result { get; private set; }
public bool Completed { get; private set; }
public AsyncResetEvent Event { get; }
public TimeSpan Timeout { get; }
private CancellationTokenSource cts;
public PendingRequest(Func<JToken, bool> handler, TimeSpan timeout)
{
Handler = handler;
Event = new AsyncResetEvent(false, false);
Timeout = timeout;
cts = new CancellationTokenSource(timeout);
cts.Token.Register(Fail, false);
}
public bool CheckData(JToken data)
{
if (Handler(data))
{
Result = data;
Completed = true;
Event.Set();
return true;
}
return false;
}
public void Fail()
{
Completed = true;
Event.Set();
}
}
} }

View File

@ -6,7 +6,7 @@ using CryptoExchange.Net.Logging;
namespace CryptoExchange.Net.Sockets namespace CryptoExchange.Net.Sockets
{ {
/// <summary> /// <summary>
/// Default weboscket factory implementation /// Default websocket factory implementation
/// </summary> /// </summary>
public class WebsocketFactory : IWebsocketFactory public class WebsocketFactory : IWebsocketFactory
{ {