1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2026-02-16 14:13:46 +00:00

Added keep alive for listenkeys to UserDataTracker

This commit is contained in:
JKorf 2026-02-08 17:02:09 +01:00
parent 87dd2d9d40
commit 8e18482781
3 changed files with 77 additions and 0 deletions

View File

@ -5,6 +5,7 @@ using Microsoft.Extensions.Logging;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace CryptoExchange.Net.Trackers.UserData namespace CryptoExchange.Net.Trackers.UserData
@ -22,6 +23,11 @@ namespace CryptoExchange.Net.Trackers.UserData
/// Listen key to use for subscriptions /// Listen key to use for subscriptions
/// </summary> /// </summary>
protected string? _listenKey; protected string? _listenKey;
/// <summary>
/// Cts
/// </summary>
protected CancellationTokenSource? _cts;
/// <summary> /// <summary>
/// List of data trackers /// List of data trackers
/// </summary> /// </summary>
@ -68,6 +74,8 @@ namespace CryptoExchange.Net.Trackers.UserData
/// </summary> /// </summary>
public async Task<CallResult> StartAsync() public async Task<CallResult> StartAsync()
{ {
_cts = new CancellationTokenSource();
foreach(var tracker in DataTrackers) foreach(var tracker in DataTrackers)
tracker.OnConnectedChange += (x) => OnConnectedChange?.Invoke(tracker.DataType, x); tracker.OnConnectedChange += (x) => OnConnectedChange?.Invoke(tracker.DataType, x);
@ -100,12 +108,21 @@ namespace CryptoExchange.Net.Trackers.UserData
public async Task StopAsync() public async Task StopAsync()
{ {
_logger.LogDebug("Stopping UserDataTracker"); _logger.LogDebug("Stopping UserDataTracker");
_cts?.Cancel();
var tasks = new List<Task>(); var tasks = new List<Task>();
foreach (var dataTracker in DataTrackers) foreach (var dataTracker in DataTrackers)
tasks.Add(dataTracker.StopAsync()); tasks.Add(dataTracker.StopAsync());
await DoStopAsync().ConfigureAwait(false);
await Task.WhenAll(tasks).ConfigureAwait(false); await Task.WhenAll(tasks).ConfigureAwait(false);
_logger.LogDebug("Stopped UserDataTracker"); _logger.LogDebug("Stopped UserDataTracker");
} }
/// <summary>
/// Stop implementation
/// </summary>
/// <returns></returns>
protected virtual Task DoStopAsync() => Task.CompletedTask;
} }
} }

View File

@ -20,6 +20,7 @@ namespace CryptoExchange.Net.Trackers.UserData
private readonly IFuturesSymbolRestClient _symbolClient; private readonly IFuturesSymbolRestClient _symbolClient;
private readonly IListenKeyRestClient? _listenKeyClient; private readonly IListenKeyRestClient? _listenKeyClient;
private readonly ExchangeParameters? _exchangeParameters; private readonly ExchangeParameters? _exchangeParameters;
private Task? _lkKeepAliveTask;
/// <inheritdoc /> /// <inheritdoc />
protected override UserDataItemTracker[] DataTrackers { get; } protected override UserDataItemTracker[] DataTrackers { get; }
@ -115,10 +116,38 @@ namespace CryptoExchange.Net.Trackers.UserData
return lkResult; return lkResult;
} }
_lkKeepAliveTask = KeepAliveListenKeyAsync();
_listenKey = lkResult.Data; _listenKey = lkResult.Data;
} }
return CallResult.SuccessResult; return CallResult.SuccessResult;
} }
/// <inheritdoc />
protected override async Task DoStopAsync()
{
if (_lkKeepAliveTask != null)
await _lkKeepAliveTask.ConfigureAwait(false);
}
private async Task KeepAliveListenKeyAsync()
{
var interval = TimeSpan.FromMinutes(30);
while (!_cts!.IsCancellationRequested)
{
try { await Task.Delay(interval, _cts.Token).ConfigureAwait(false); } catch (Exception)
{
break;
}
var result = await _listenKeyClient!.KeepAliveListenKeyAsync(new KeepAliveListenKeyRequest(_listenKey!, TradingMode.Spot)).ConfigureAwait(false);
if (!result)
_logger.LogWarning("Listen key keep alive failed: " + result.Error);
// If failed shorten the delay to allow a couple more retries
interval = result ? TimeSpan.FromMinutes(30) : TimeSpan.FromMinutes(5);
}
}
} }
} }

View File

@ -7,6 +7,7 @@ using System.Linq;
using CryptoExchange.Net.Trackers.UserData.Interfaces; using CryptoExchange.Net.Trackers.UserData.Interfaces;
using CryptoExchange.Net.Trackers.UserData.Objects; using CryptoExchange.Net.Trackers.UserData.Objects;
using CryptoExchange.Net.Trackers.UserData.ItemTrackers; using CryptoExchange.Net.Trackers.UserData.ItemTrackers;
using System;
namespace CryptoExchange.Net.Trackers.UserData namespace CryptoExchange.Net.Trackers.UserData
{ {
@ -18,6 +19,7 @@ namespace CryptoExchange.Net.Trackers.UserData
private readonly ISpotSymbolRestClient _symbolClient; private readonly ISpotSymbolRestClient _symbolClient;
private readonly IListenKeyRestClient? _listenKeyClient; private readonly IListenKeyRestClient? _listenKeyClient;
private readonly ExchangeParameters? _exchangeParameters; private readonly ExchangeParameters? _exchangeParameters;
private Task? _lkKeepAliveTask;
/// <inheritdoc /> /// <inheritdoc />
protected override UserDataItemTracker[] DataTrackers { get; } protected override UserDataItemTracker[] DataTrackers { get; }
@ -91,10 +93,39 @@ namespace CryptoExchange.Net.Trackers.UserData
return lkResult; return lkResult;
} }
_lkKeepAliveTask = KeepAliveListenKeyAsync();
_listenKey = lkResult.Data; _listenKey = lkResult.Data;
} }
return CallResult.SuccessResult; return CallResult.SuccessResult;
} }
/// <inheritdoc />
protected override async Task DoStopAsync()
{
if (_lkKeepAliveTask != null)
await _lkKeepAliveTask.ConfigureAwait(false);
}
private async Task KeepAliveListenKeyAsync()
{
var interval = TimeSpan.FromMinutes(30);
while (!_cts!.IsCancellationRequested)
{
try { await Task.Delay(interval, _cts.Token).ConfigureAwait(false); }
catch (Exception)
{
break;
}
var result = await _listenKeyClient!.KeepAliveListenKeyAsync(new KeepAliveListenKeyRequest(_listenKey!, TradingMode.Spot)).ConfigureAwait(false);
if (!result)
_logger.LogWarning("Listen key keep alive failed: " + result.Error);
// If failed shorten the delay to allow a couple more retries
interval = result ? TimeSpan.FromMinutes(30) : TimeSpan.FromMinutes(5);
}
}
} }
} }