From aa06e0eead2368c69ad2e37464c1ca00e714d1e7 Mon Sep 17 00:00:00 2001 From: Jkorf Date: Wed, 16 Jul 2025 09:27:42 +0200 Subject: [PATCH] Fixed issue causing duplicate subscriptions and data in the TradeTracker and KlineTracker when websocket connection was reconnected --- .../Trackers/Klines/KlineTracker.cs | 48 +++++++++---------- .../Trackers/Trades/TradeTracker.cs | 46 ++++++++---------- 2 files changed, 44 insertions(+), 50 deletions(-) diff --git a/CryptoExchange.Net/Trackers/Klines/KlineTracker.cs b/CryptoExchange.Net/Trackers/Klines/KlineTracker.cs index 84af551..585a2c9 100644 --- a/CryptoExchange.Net/Trackers/Klines/KlineTracker.cs +++ b/CryptoExchange.Net/Trackers/Klines/KlineTracker.cs @@ -176,18 +176,32 @@ namespace CryptoExchange.Net.Trackers.Klines Status = SyncStatus.Syncing; _logger.KlineTrackerStarting(SymbolName); - var startResult = await DoStartAsync().ConfigureAwait(false); - if (!startResult) + var subResult = await _socketClient.SubscribeToKlineUpdatesAsync(new SubscribeKlineRequest(Symbol, _interval), + update => + { + AddOrUpdate(update.Data); + }).ConfigureAwait(false); + + if (!subResult) { - _logger.KlineTrackerStartFailed(SymbolName, startResult.Error!.Message, startResult.Error.Exception); + _logger.KlineTrackerStartFailed(SymbolName, subResult.Error!.Message, subResult.Error.Exception); Status = SyncStatus.Disconnected; - return new CallResult(startResult.Error!); + return subResult; } - _updateSubscription = startResult.Data; + _updateSubscription = subResult.Data; _updateSubscription.ConnectionLost += HandleConnectionLost; _updateSubscription.ConnectionClosed += HandleConnectionClosed; _updateSubscription.ConnectionRestored += HandleConnectionRestored; + + var startResult = await DoStartAsync().ConfigureAwait(false); + if (!startResult) + { + _ = subResult.Data.CloseAsync(); + Status = SyncStatus.Disconnected; + return new CallResult(startResult.Error!); + } + Status = SyncStatus.Synced; _logger.KlineTrackerStarted(SymbolName); return CallResult.SuccessResult; @@ -208,22 +222,10 @@ namespace CryptoExchange.Net.Trackers.Klines /// The start procedure needed for kline syncing, generally subscribing to an update stream and requesting the snapshot /// /// - protected virtual async Task> DoStartAsync() + protected virtual async Task DoStartAsync() { - var subResult = await _socketClient.SubscribeToKlineUpdatesAsync(new SubscribeKlineRequest(Symbol, _interval), - update => - { - AddOrUpdate(update.Data); - }).ConfigureAwait(false); - - if (!subResult) - { - Status = SyncStatus.Disconnected; - return subResult; - } - if (!_startWithSnapshot) - return subResult; + return CallResult.SuccessResult; var startTime = Period == null ? (DateTime?)null : DateTime.UtcNow.Add(-Period.Value); if (_restClient.GetKlinesOptions.MaxAge != null && DateTime.UtcNow.Add(-_restClient.GetKlinesOptions.MaxAge.Value) > startTime) @@ -236,11 +238,7 @@ namespace CryptoExchange.Net.Trackers.Klines await foreach (var result in ExchangeHelpers.ExecutePages(_restClient.GetKlinesAsync, request).ConfigureAwait(false)) { if (!result) - { - _ = subResult.Data.CloseAsync(); - Status = SyncStatus.Disconnected; - return subResult.AsError(result.Error!); - } + return result; if (Limit != null && data.Count > Limit) break; @@ -249,7 +247,7 @@ namespace CryptoExchange.Net.Trackers.Klines } SetInitialData(data); - return subResult; + return CallResult.SuccessResult; } /// diff --git a/CryptoExchange.Net/Trackers/Trades/TradeTracker.cs b/CryptoExchange.Net/Trackers/Trades/TradeTracker.cs index bdf56db..0b9bf77 100644 --- a/CryptoExchange.Net/Trackers/Trades/TradeTracker.cs +++ b/CryptoExchange.Net/Trackers/Trades/TradeTracker.cs @@ -199,7 +199,12 @@ namespace CryptoExchange.Net.Trackers.Trades _startWithSnapshot = startWithSnapshot; Status = SyncStatus.Syncing; _logger.TradeTrackerStarting(SymbolName); - var subResult = await DoStartAsync().ConfigureAwait(false); + var subResult = await _socketClient.SubscribeToTradeUpdatesAsync(new SubscribeTradeRequest(Symbol), + update => + { + AddData(update.Data); + }).ConfigureAwait(false); + if (!subResult) { _logger.TradeTrackerStartFailed(SymbolName, subResult.Error!.Message, subResult.Error.Exception); @@ -211,6 +216,15 @@ namespace CryptoExchange.Net.Trackers.Trades _updateSubscription.ConnectionLost += HandleConnectionLost; _updateSubscription.ConnectionClosed += HandleConnectionClosed; _updateSubscription.ConnectionRestored += HandleConnectionRestored; + + var result = await DoStartAsync().ConfigureAwait(false); + if (!result) + { + _ = subResult.Data.CloseAsync(); + Status = SyncStatus.Disconnected; + return result; + } + SetSyncStatus(); _logger.TradeTrackerStarted(SymbolName); return CallResult.SuccessResult; @@ -231,22 +245,10 @@ namespace CryptoExchange.Net.Trackers.Trades /// The start procedure needed for trade syncing, generally subscribing to an update stream and requesting the snapshot /// /// - protected virtual async Task> DoStartAsync() + protected virtual async Task DoStartAsync() { - var subResult = await _socketClient.SubscribeToTradeUpdatesAsync(new SubscribeTradeRequest(Symbol), - update => - { - AddData(update.Data); - }).ConfigureAwait(false); - - if (!subResult) - { - Status = SyncStatus.Disconnected; - return subResult; - } - if (!_startWithSnapshot) - return subResult; + return CallResult.SuccessResult; if (_historyRestClient != null) { @@ -256,12 +258,8 @@ namespace CryptoExchange.Net.Trackers.Trades await foreach(var result in ExchangeHelpers.ExecutePages(_historyRestClient.GetTradeHistoryAsync, request).ConfigureAwait(false)) { if (!result) - { - _ = subResult.Data.CloseAsync(); - Status = SyncStatus.Disconnected; - return subResult.AsError(result.Error!); - } - + return result; + if (Limit != null && data.Count > Limit) break; @@ -279,15 +277,13 @@ namespace CryptoExchange.Net.Trackers.Trades var snapshot = await _recentRestClient.GetRecentTradesAsync(new GetRecentTradesRequest(Symbol, limit)).ConfigureAwait(false); if (!snapshot) { - _ = subResult.Data.CloseAsync(); - Status = SyncStatus.Disconnected; - return subResult.AsError(snapshot.Error!); + return snapshot; } SetInitialData(snapshot.Data); } - return subResult; + return CallResult.SuccessResult; } ///