1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-06-07 16:06:15 +00:00

Updated socket reconnection

This commit is contained in:
JKorf 2022-06-12 15:10:10 +02:00
parent c2080ef75f
commit c13dfa4461
3 changed files with 169 additions and 63 deletions

View File

@ -92,6 +92,18 @@ namespace CryptoExchange.Net
}
}
public int CurrentConnections => socketConnections.Count;
public int CurrentSubscriptions
{
get
{
if (!socketConnections.Any())
return 0;
return socketConnections.Sum(s => s.Value.SubscriptionCount);
}
}
/// <summary>
/// Client options
/// </summary>
@ -164,7 +176,7 @@ namespace CryptoExchange.Net
return new CallResult<UpdateSubscription>(new InvalidOperationError("Client disposed, can't subscribe"));
SocketConnection socketConnection;
SocketSubscription subscription;
SocketSubscription? subscription;
var released = false;
// Wait for a semaphore here, so we only connect 1 socket at a time.
// This is necessary for being able to see if connections can be combined
@ -179,23 +191,34 @@ namespace CryptoExchange.Net
try
{
// Get a new or existing socket connection
socketConnection = GetSocketConnection(apiClient, url, authenticated);
// Add a subscription on the socket connection
subscription = AddSubscription(request, identifier, true, socketConnection, dataHandler);
if (ClientOptions.SocketSubscriptionsCombineTarget == 1)
while (true)
{
// Only 1 subscription per connection, so no need to wait for connection since a new subscription will create a new connection anyway
semaphoreSlim.Release();
released = true;
// Get a new or existing socket connection
socketConnection = GetSocketConnection(apiClient, url, authenticated);
// Add a subscription on the socket connection
subscription = AddSubscription(request, identifier, true, socketConnection, dataHandler);
if (subscription == null)
{
log.Write(LogLevel.Trace, $"Socket {socketConnection.SocketId} failed to add subscription, retrying on different connection");
continue;
}
if (ClientOptions.SocketSubscriptionsCombineTarget == 1)
{
// Only 1 subscription per connection, so no need to wait for connection since a new subscription will create a new connection anyway
semaphoreSlim.Release();
released = true;
}
var needsConnecting = !socketConnection.Connected;
var connectResult = await ConnectIfNeededAsync(socketConnection, authenticated).ConfigureAwait(false);
if (!connectResult)
return new CallResult<UpdateSubscription>(connectResult.Error!);
break;
}
var needsConnecting = !socketConnection.Connected;
var connectResult = await ConnectIfNeededAsync(socketConnection, authenticated).ConfigureAwait(false);
if (!connectResult)
return new CallResult<UpdateSubscription>(connectResult.Error!);
}
finally
{
@ -443,9 +466,6 @@ namespace CryptoExchange.Net
/// <param name="message"></param>
/// <returns></returns>
protected internal virtual JToken ProcessTokenData(JToken message)
{
return message;
}
@ -460,7 +480,7 @@ namespace CryptoExchange.Net
/// <param name="connection">The socket connection the handler is on</param>
/// <param name="dataHandler">The handler of the data received</param>
/// <returns></returns>
protected virtual SocketSubscription AddSubscription<T>(object? request, string? identifier, bool userSubscription, SocketConnection connection, Action<DataEvent<T>> dataHandler)
protected virtual SocketSubscription? AddSubscription<T>(object? request, string? identifier, bool userSubscription, SocketConnection connection, Action<DataEvent<T>> dataHandler)
{
void InternalHandler(MessageEvent messageEvent)
{
@ -484,7 +504,8 @@ namespace CryptoExchange.Net
var subscription = request == null
? SocketSubscription.CreateForIdentifier(NextId(), identifier!, userSubscription, InternalHandler)
: SocketSubscription.CreateForRequest(NextId(), request, userSubscription, InternalHandler);
connection.AddSubscription(subscription);
if (!connection.AddSubscription(subscription))
return null;
return subscription;
}
@ -510,7 +531,8 @@ namespace CryptoExchange.Net
/// <returns></returns>
protected virtual SocketConnection GetSocketConnection(SocketApiClient apiClient, string address, bool authenticated)
{
var socketResult = socketConnections.Where(s => s.Value.Uri.ToString().TrimEnd('/') == address.TrimEnd('/')
var socketResult = socketConnections.Where(s => (s.Value.Status == SocketConnection.SocketStatus.None || s.Value.Status == SocketConnection.SocketStatus.Connected)
&& s.Value.Uri.ToString().TrimEnd('/') == address.TrimEnd('/')
&& (s.Value.ApiClient.GetType() == apiClient.GetType())
&& (s.Value.Authenticated == authenticated || !authenticated) && s.Value.Connected).OrderBy(s => s.Value.SubscriptionCount).FirstOrDefault();
var result = socketResult.Equals(default(KeyValuePair<int, SocketConnection>)) ? null : socketResult.Value;

View File

@ -34,6 +34,8 @@ namespace CryptoExchange.Net.Sockets
private readonly List<DateTime> _outgoingMessages;
private DateTime _lastReceivedMessagesUpdate;
private bool _closed;
private bool _disposed;
/// <summary>
/// Received messages, the size and the timstamp
@ -279,6 +281,10 @@ namespace CryptoExchange.Net.Sockets
/// <returns></returns>
private async Task CloseInternalAsync()
{
if (_closed || _disposed)
return;
_closed = true;
_ctsSource.Cancel();
_sendEvent.Set();
@ -291,6 +297,15 @@ namespace CryptoExchange.Net.Sockets
catch(Exception)
{ } // Can sometimes throw an exception when socket is in aborted state due to timing
}
else if(_socket.State == WebSocketState.CloseReceived)
{
try
{
await _socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing", default).ConfigureAwait(false);
}
catch (Exception)
{ } // Can sometimes throw an exception when socket is in aborted state due to timing
}
log.Write(LogLevel.Debug, $"Socket {Id} closed");
Handle(closeHandlers);
}
@ -300,7 +315,11 @@ namespace CryptoExchange.Net.Sockets
/// </summary>
public void Dispose()
{
if (_disposed)
return;
log.Write(LogLevel.Debug, $"Socket {Id} disposing");
_disposed = true;
_socket.Dispose();
_ctsSource.Dispose();
@ -320,6 +339,7 @@ namespace CryptoExchange.Net.Sockets
while (_sendBuffer.TryDequeue(out _)) { } // Clear send buffer
_socket = CreateSocket();
_closed = false;
}
/// <summary>

View File

@ -139,10 +139,26 @@ namespace CryptoExchange.Net.Sockets
private readonly BaseSocketClient socketClient;
private readonly List<PendingRequest> pendingRequests;
private Task? _socketProcessReconnectTask;
private Task? _socketProcessTask;
private Task? _socketReconnectTask;
private readonly AsyncResetEvent _reconnectWaitEvent;
private SocketStatus _status;
/// <summary>
/// Status of the socket connection
/// </summary>
public SocketStatus Status
{
get => _status;
private set
{
var oldStatus = _status;
_status = value;
log.Write(LogLevel.Trace, $"Socket {SocketId} status changed from {oldStatus} to {_status}");
}
}
/// <summary>
/// The underlying websocket
/// </summary>
@ -165,9 +181,13 @@ namespace CryptoExchange.Net.Sockets
subscriptions = new List<SocketSubscription>();
_socket = socket;
_reconnectWaitEvent = new AsyncResetEvent(false, true);
_socket.Timeout = client.ClientOptions.SocketNoDataTimeout;
_socket.OnMessage += ProcessMessage;
_socket.OnOpen += SocketOnOpen;
_socket.OnClose += () => _reconnectWaitEvent.Set();
}
/// <summary>
@ -178,7 +198,11 @@ namespace CryptoExchange.Net.Sockets
{
var connected = await _socket.ConnectAsync().ConfigureAwait(false);
if (connected)
StartProcessingTask();
{
Status = SocketStatus.Connected;
_socketReconnectTask = ReconnectWatcherAsync();
_socketProcessTask = _socket.ProcessAsync();
}
return connected;
}
@ -207,6 +231,9 @@ namespace CryptoExchange.Net.Sockets
/// <returns></returns>
public async Task CloseAsync()
{
if (Status == SocketStatus.Closed || Status == SocketStatus.Disposed)
return;
ShouldReconnect = false;
if (socketClient.socketConnections.ContainsKey(SocketId))
socketClient.socketConnections.TryRemove(SocketId, out _);
@ -220,24 +247,13 @@ namespace CryptoExchange.Net.Sockets
}
}
if (_status == SocketStatus.Reconnecting)
{
// Wait for reconnect task to finish
log.Write(LogLevel.Trace, "In reconnecting state, waiting for reconnecting to end");
if (_socketProcessReconnectTask != null)
await _socketProcessReconnectTask.ConfigureAwait(false);
await _socket.CloseAsync().ConfigureAwait(false);
}
else
{
// Close before waiting for process task to finish
await _socket.CloseAsync().ConfigureAwait(false);
if (_socketProcessReconnectTask != null)
await _socketProcessReconnectTask.ConfigureAwait(false);
}
while (Status == SocketStatus.Reconnecting)
// Wait for reconnecting to finish
await Task.Delay(100).ConfigureAwait(false);
await _socket.CloseAsync().ConfigureAwait(false);
if(_socketProcessTask != null)
await _socketProcessTask.ConfigureAwait(false);
_socket.Dispose();
}
@ -248,39 +264,40 @@ namespace CryptoExchange.Net.Sockets
/// <returns></returns>
public async Task CloseAsync(SocketSubscription subscription)
{
if (!_socket.IsOpen || _status == SocketStatus.Disposed)
if (Status == SocketStatus.Closing || Status == SocketStatus.Closed || Status == SocketStatus.Disposed)
return;
log.Write(LogLevel.Trace, $"Socket {SocketId} closing subscription {subscription.Id}");
if (subscription.CancellationTokenRegistration.HasValue)
subscription.CancellationTokenRegistration.Value.Dispose();
if (subscription.Confirmed)
if (subscription.Confirmed && _socket.IsOpen)
await socketClient.UnsubscribeAsync(this, subscription).ConfigureAwait(false);
bool shouldCloseConnection;
lock (subscriptionLock)
shouldCloseConnection = !subscriptions.Any(r => r.UserSubscription && subscription != r);
{
if (Status == SocketStatus.Closing)
{
log.Write(LogLevel.Trace, $"Socket {SocketId} already closing");
return;
}
shouldCloseConnection = subscriptions.All(r => !r.UserSubscription || r == subscription);
if (shouldCloseConnection)
Status = SocketStatus.Closing;
}
if (shouldCloseConnection)
{
log.Write(LogLevel.Trace, $"Socket {SocketId} closing as there are no more subscriptions");
await CloseAsync().ConfigureAwait(false);
}
lock (subscriptionLock)
subscriptions.Remove(subscription);
}
private void StartProcessingTask()
{
log.Write(LogLevel.Trace, $"Starting {SocketId} process/reconnect task");
_status = SocketStatus.Processing;
_socketProcessReconnectTask = Task.Run(async () =>
{
await _socket.ProcessAsync().ConfigureAwait(false);
_status = SocketStatus.Reconnecting;
await ReconnectAsync().ConfigureAwait(false);
log.Write(LogLevel.Trace, $"Process/reconnect {SocketId} task finished");
});
}
private async Task ReconnectAsync()
{
// Fail all pending requests
@ -344,7 +361,8 @@ namespace CryptoExchange.Net.Sockets
}
// Successfully reconnected, start processing
StartProcessingTask();
Status = SocketStatus.Connected;
_socketProcessTask = _socket.ProcessAsync();
ReconnectTry = 0;
var time = DisconnectTime;
@ -414,7 +432,7 @@ namespace CryptoExchange.Net.Sockets
/// </summary>
public void Dispose()
{
_status = SocketStatus.Disposed;
Status = SocketStatus.Disposed;
_socket.Dispose();
}
@ -487,10 +505,17 @@ namespace CryptoExchange.Net.Sockets
/// Add a subscription to this connection
/// </summary>
/// <param name="subscription"></param>
public void AddSubscription(SocketSubscription subscription)
public bool AddSubscription(SocketSubscription subscription)
{
lock(subscriptionLock)
lock (subscriptionLock)
{
if (Status != SocketStatus.None && Status != SocketStatus.Connected)
return false;
subscriptions.Add(subscription);
log.Write(LogLevel.Trace, $"Socket {SocketId} adding new subscription with id {subscription.Id}, total subscriptions on connection: {subscriptions.Count}");
return true;
}
}
/// <summary>
@ -633,6 +658,22 @@ namespace CryptoExchange.Net.Sockets
PausedActivity = false;
}
private async Task ReconnectWatcherAsync()
{
while (true)
{
await _reconnectWaitEvent.WaitAsync().ConfigureAwait(false);
if (!ShouldReconnect)
return;
Status = SocketStatus.Reconnecting;
await ReconnectAsync().ConfigureAwait(false);
if (!ShouldReconnect)
return;
}
}
private async Task<CallResult<bool>> ProcessReconnectAsync()
{
if (!_socket.IsOpen)
@ -691,11 +732,34 @@ namespace CryptoExchange.Net.Sockets
return await socketClient.SubscribeAndWaitAsync(this, socketSubscription.Request!, socketSubscription).ConfigureAwait(false);
}
private enum SocketStatus
/// <summary>
/// Status of the socket connection
/// </summary>
public enum SocketStatus
{
/// <summary>
/// None/Initial
/// </summary>
None,
Processing,
/// <summary>
/// Connected
/// </summary>
Connected,
/// <summary>
/// Reconnecting
/// </summary>
Reconnecting,
/// <summary>
/// Closing
/// </summary>
Closing,
/// <summary>
/// Closed
/// </summary>
Closed,
/// <summary>
/// Disposed
/// </summary>
Disposed
}
}