1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-07-17 06:55:41 +00:00

Fixed issue causing duplicate subscriptions and data in the TradeTracker and KlineTracker when websocket connection was reconnected

This commit is contained in:
Jkorf 2025-07-16 09:27:42 +02:00
parent 2fde9a285e
commit aa06e0eead
2 changed files with 44 additions and 50 deletions

View File

@ -176,18 +176,32 @@ namespace CryptoExchange.Net.Trackers.Klines
Status = SyncStatus.Syncing;
_logger.KlineTrackerStarting(SymbolName);
var startResult = await DoStartAsync().ConfigureAwait(false);
if (!startResult)
var subResult = await _socketClient.SubscribeToKlineUpdatesAsync(new SubscribeKlineRequest(Symbol, _interval),
update =>
{
AddOrUpdate(update.Data);
}).ConfigureAwait(false);
if (!subResult)
{
_logger.KlineTrackerStartFailed(SymbolName, startResult.Error!.Message, startResult.Error.Exception);
_logger.KlineTrackerStartFailed(SymbolName, subResult.Error!.Message, subResult.Error.Exception);
Status = SyncStatus.Disconnected;
return new CallResult(startResult.Error!);
return subResult;
}
_updateSubscription = startResult.Data;
_updateSubscription = subResult.Data;
_updateSubscription.ConnectionLost += HandleConnectionLost;
_updateSubscription.ConnectionClosed += HandleConnectionClosed;
_updateSubscription.ConnectionRestored += HandleConnectionRestored;
var startResult = await DoStartAsync().ConfigureAwait(false);
if (!startResult)
{
_ = subResult.Data.CloseAsync();
Status = SyncStatus.Disconnected;
return new CallResult(startResult.Error!);
}
Status = SyncStatus.Synced;
_logger.KlineTrackerStarted(SymbolName);
return CallResult.SuccessResult;
@ -208,22 +222,10 @@ namespace CryptoExchange.Net.Trackers.Klines
/// The start procedure needed for kline syncing, generally subscribing to an update stream and requesting the snapshot
/// </summary>
/// <returns></returns>
protected virtual async Task<CallResult<UpdateSubscription>> DoStartAsync()
protected virtual async Task<CallResult> DoStartAsync()
{
var subResult = await _socketClient.SubscribeToKlineUpdatesAsync(new SubscribeKlineRequest(Symbol, _interval),
update =>
{
AddOrUpdate(update.Data);
}).ConfigureAwait(false);
if (!subResult)
{
Status = SyncStatus.Disconnected;
return subResult;
}
if (!_startWithSnapshot)
return subResult;
return CallResult.SuccessResult;
var startTime = Period == null ? (DateTime?)null : DateTime.UtcNow.Add(-Period.Value);
if (_restClient.GetKlinesOptions.MaxAge != null && DateTime.UtcNow.Add(-_restClient.GetKlinesOptions.MaxAge.Value) > startTime)
@ -236,11 +238,7 @@ namespace CryptoExchange.Net.Trackers.Klines
await foreach (var result in ExchangeHelpers.ExecutePages(_restClient.GetKlinesAsync, request).ConfigureAwait(false))
{
if (!result)
{
_ = subResult.Data.CloseAsync();
Status = SyncStatus.Disconnected;
return subResult.AsError<UpdateSubscription>(result.Error!);
}
return result;
if (Limit != null && data.Count > Limit)
break;
@ -249,7 +247,7 @@ namespace CryptoExchange.Net.Trackers.Klines
}
SetInitialData(data);
return subResult;
return CallResult.SuccessResult;
}
/// <summary>

View File

@ -199,7 +199,12 @@ namespace CryptoExchange.Net.Trackers.Trades
_startWithSnapshot = startWithSnapshot;
Status = SyncStatus.Syncing;
_logger.TradeTrackerStarting(SymbolName);
var subResult = await DoStartAsync().ConfigureAwait(false);
var subResult = await _socketClient.SubscribeToTradeUpdatesAsync(new SubscribeTradeRequest(Symbol),
update =>
{
AddData(update.Data);
}).ConfigureAwait(false);
if (!subResult)
{
_logger.TradeTrackerStartFailed(SymbolName, subResult.Error!.Message, subResult.Error.Exception);
@ -211,6 +216,15 @@ namespace CryptoExchange.Net.Trackers.Trades
_updateSubscription.ConnectionLost += HandleConnectionLost;
_updateSubscription.ConnectionClosed += HandleConnectionClosed;
_updateSubscription.ConnectionRestored += HandleConnectionRestored;
var result = await DoStartAsync().ConfigureAwait(false);
if (!result)
{
_ = subResult.Data.CloseAsync();
Status = SyncStatus.Disconnected;
return result;
}
SetSyncStatus();
_logger.TradeTrackerStarted(SymbolName);
return CallResult.SuccessResult;
@ -231,22 +245,10 @@ namespace CryptoExchange.Net.Trackers.Trades
/// The start procedure needed for trade syncing, generally subscribing to an update stream and requesting the snapshot
/// </summary>
/// <returns></returns>
protected virtual async Task<CallResult<UpdateSubscription>> DoStartAsync()
protected virtual async Task<CallResult> DoStartAsync()
{
var subResult = await _socketClient.SubscribeToTradeUpdatesAsync(new SubscribeTradeRequest(Symbol),
update =>
{
AddData(update.Data);
}).ConfigureAwait(false);
if (!subResult)
{
Status = SyncStatus.Disconnected;
return subResult;
}
if (!_startWithSnapshot)
return subResult;
return CallResult.SuccessResult;
if (_historyRestClient != null)
{
@ -256,12 +258,8 @@ namespace CryptoExchange.Net.Trackers.Trades
await foreach(var result in ExchangeHelpers.ExecutePages(_historyRestClient.GetTradeHistoryAsync, request).ConfigureAwait(false))
{
if (!result)
{
_ = subResult.Data.CloseAsync();
Status = SyncStatus.Disconnected;
return subResult.AsError<UpdateSubscription>(result.Error!);
}
return result;
if (Limit != null && data.Count > Limit)
break;
@ -279,15 +277,13 @@ namespace CryptoExchange.Net.Trackers.Trades
var snapshot = await _recentRestClient.GetRecentTradesAsync(new GetRecentTradesRequest(Symbol, limit)).ConfigureAwait(false);
if (!snapshot)
{
_ = subResult.Data.CloseAsync();
Status = SyncStatus.Disconnected;
return subResult.AsError<UpdateSubscription>(snapshot.Error!);
return snapshot;
}
SetInitialData(snapshot.Data);
}
return subResult;
return CallResult.SuccessResult;
}
/// <summary>