1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-10-27 08:27:19 +00:00

Fixed UpdateSubscription still propagating connection events even though the specific listener is unsubscribed

This commit is contained in:
Jkorf 2025-09-29 10:07:28 +02:00
parent aba6b773ce
commit b8b7512b35
3 changed files with 114 additions and 12 deletions

View File

@ -1,5 +1,7 @@
using CryptoExchange.Net.Sockets; using CryptoExchange.Net.Sockets;
using System; using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace CryptoExchange.Net.Objects.Sockets namespace CryptoExchange.Net.Objects.Sockets
@ -12,13 +14,21 @@ namespace CryptoExchange.Net.Objects.Sockets
private readonly SocketConnection _connection; private readonly SocketConnection _connection;
private readonly Subscription _listener; private readonly Subscription _listener;
private object _eventLock = new object();
private List<Action> _connectionClosedEventHandlers = new List<Action>();
private List<Action> _connectionLostEventHandlers = new List<Action>();
private List<Action<Error>> _resubscribeFailedEventHandlers = new List<Action<Error>>();
private List<Action<TimeSpan>> _connectionRestoredEventHandlers = new List<Action<TimeSpan>>();
private List<Action> _activityPausedEventHandlers = new List<Action>();
private List<Action> _activityUnpausedEventHandlers = new List<Action>();
/// <summary> /// <summary>
/// Event when the connection is lost. The socket will automatically reconnect when possible. /// Event when the connection is lost. The socket will automatically reconnect when possible.
/// </summary> /// </summary>
public event Action ConnectionLost public event Action ConnectionLost
{ {
add => _connection.ConnectionLost += value; add { lock (_eventLock) _connectionLostEventHandlers.Add(value); }
remove => _connection.ConnectionLost -= value; remove { lock (_eventLock) _connectionLostEventHandlers.Remove(value); }
} }
/// <summary> /// <summary>
@ -26,8 +36,8 @@ namespace CryptoExchange.Net.Objects.Sockets
/// </summary> /// </summary>
public event Action ConnectionClosed public event Action ConnectionClosed
{ {
add => _connection.ConnectionClosed += value; add { lock (_eventLock) _connectionClosedEventHandlers.Add(value); }
remove => _connection.ConnectionClosed -= value; remove { lock (_eventLock) _connectionClosedEventHandlers.Remove(value); }
} }
/// <summary> /// <summary>
@ -35,8 +45,8 @@ namespace CryptoExchange.Net.Objects.Sockets
/// </summary> /// </summary>
public event Action<Error> ResubscribingFailed public event Action<Error> ResubscribingFailed
{ {
add => _connection.ResubscribingFailed += value; add { lock (_eventLock) _resubscribeFailedEventHandlers.Add(value); }
remove => _connection.ResubscribingFailed -= value; remove { lock (_eventLock) _resubscribeFailedEventHandlers.Remove(value); }
} }
/// <summary> /// <summary>
@ -46,8 +56,8 @@ namespace CryptoExchange.Net.Objects.Sockets
/// </summary> /// </summary>
public event Action<TimeSpan> ConnectionRestored public event Action<TimeSpan> ConnectionRestored
{ {
add => _connection.ConnectionRestored += value; add { lock (_eventLock) _connectionRestoredEventHandlers.Add(value); }
remove => _connection.ConnectionRestored -= value; remove { lock (_eventLock) _connectionRestoredEventHandlers.Remove(value); }
} }
/// <summary> /// <summary>
@ -55,8 +65,8 @@ namespace CryptoExchange.Net.Objects.Sockets
/// </summary> /// </summary>
public event Action ActivityPaused public event Action ActivityPaused
{ {
add => _connection.ActivityPaused += value; add { lock (_eventLock) _activityPausedEventHandlers.Add(value); }
remove => _connection.ActivityPaused -= value; remove { lock (_eventLock) _activityPausedEventHandlers.Remove(value); }
} }
/// <summary> /// <summary>
@ -64,8 +74,8 @@ namespace CryptoExchange.Net.Objects.Sockets
/// </summary> /// </summary>
public event Action ActivityUnpaused public event Action ActivityUnpaused
{ {
add => _connection.ActivityUnpaused += value; add { lock (_eventLock) _activityUnpausedEventHandlers.Add(value); }
remove => _connection.ActivityUnpaused -= value; remove { lock (_eventLock) _activityUnpausedEventHandlers.Remove(value); }
} }
/// <summary> /// <summary>
@ -95,7 +105,85 @@ namespace CryptoExchange.Net.Objects.Sockets
public UpdateSubscription(SocketConnection connection, Subscription subscription) public UpdateSubscription(SocketConnection connection, Subscription subscription)
{ {
_connection = connection; _connection = connection;
_connection.ConnectionClosed += HandleConnectionClosedEvent;
_connection.ConnectionLost += HandleConnectionLostEvent;
_connection.ConnectionRestored += HandleConnectionRestoredEvent;
_connection.ResubscribingFailed += HandleResubscribeFailedEvent;
_connection.ActivityPaused += HandlePausedEvent;
_connection.ActivityUnpaused += HandleUnpausedEvent;
_listener = subscription; _listener = subscription;
_listener.Unsubscribed += HandleUnsubscribed;
}
private void HandleUnsubscribed()
{
_connection.ConnectionClosed -= HandleConnectionClosedEvent;
_connection.ConnectionLost -= HandleConnectionLostEvent;
_connection.ConnectionRestored -= HandleConnectionRestoredEvent;
_connection.ResubscribingFailed -= HandleResubscribeFailedEvent;
_connection.ActivityPaused -= HandlePausedEvent;
_connection.ActivityUnpaused -= HandleUnpausedEvent;
}
private void HandleConnectionClosedEvent()
{
List<Action> handlers;
lock (_eventLock)
handlers = _connectionClosedEventHandlers.ToList();
foreach(var callback in handlers)
callback();
}
private void HandleConnectionLostEvent()
{
List<Action> handlers;
lock (_eventLock)
handlers = _connectionLostEventHandlers.ToList();
foreach (var callback in handlers)
callback();
}
private void HandleConnectionRestoredEvent(TimeSpan period)
{
List<Action<TimeSpan>> handlers;
lock (_eventLock)
handlers = _connectionRestoredEventHandlers.ToList();
foreach (var callback in handlers)
callback(period);
}
private void HandleResubscribeFailedEvent(Error error)
{
List<Action<Error>> handlers;
lock (_eventLock)
handlers = _resubscribeFailedEventHandlers.ToList();
foreach (var callback in handlers)
callback(error);
}
private void HandlePausedEvent()
{
List<Action> handlers;
lock (_eventLock)
handlers = _activityPausedEventHandlers.ToList();
foreach (var callback in handlers)
callback();
}
private void HandleUnpausedEvent()
{
List<Action> handlers;
lock (_eventLock)
handlers = _activityUnpausedEventHandlers.ToList();
foreach (var callback in handlers)
callback();
} }
/// <summary> /// <summary>

View File

@ -706,6 +706,8 @@ namespace CryptoExchange.Net.Sockets
lock (_listenersLock) lock (_listenersLock)
_listeners.Remove(subscription); _listeners.Remove(subscription);
subscription.InvokeUnsubscribedHandler();
} }
/// <summary> /// <summary>

View File

@ -74,6 +74,10 @@ namespace CryptoExchange.Net.Sockets
/// Exception event /// Exception event
/// </summary> /// </summary>
public event Action<Exception>? Exception; public event Action<Exception>? Exception;
/// <summary>
/// Listener unsubscribed event
/// </summary>
public event Action? Unsubscribed;
/// <summary> /// <summary>
/// Subscription topic /// Subscription topic
@ -181,6 +185,14 @@ namespace CryptoExchange.Net.Sockets
Exception?.Invoke(e); Exception?.Invoke(e);
} }
/// <summary>
/// Invoke the unsubscribed event
/// </summary>
public void InvokeUnsubscribedHandler()
{
Unsubscribed?.Invoke();
}
/// <summary> /// <summary>
/// State of this subscription /// State of this subscription
/// </summary> /// </summary>