1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-06-07 16:06:15 +00:00

Added support for retrieving a reconnection url when socket connection is lost

This commit is contained in:
JKorf 2022-07-17 12:49:13 +02:00
parent 7c8cbfa4e2
commit 465e9f04f4
6 changed files with 71 additions and 24 deletions

View File

@ -13,11 +13,15 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations
public bool Connected { get; set; } public bool Connected { get; set; }
public event Action OnClose; public event Action OnClose;
#pragma warning disable 0067
public event Action OnReconnected; public event Action OnReconnected;
public event Action OnReconnecting; public event Action OnReconnecting;
#pragma warning restore 0067
public event Action<string> OnMessage; public event Action<string> OnMessage;
public event Action<Exception> OnError; public event Action<Exception> OnError;
public event Action OnOpen; public event Action OnOpen;
public Func<Task<Uri>> GetReconnectionUrl { get; set; }
public int Id { get; } public int Id { get; }
public bool ShouldReconnect { get; set; } public bool ShouldReconnect { get; set; }

View File

@ -543,6 +543,17 @@ namespace CryptoExchange.Net
return Task.FromResult(new CallResult<string?>(address)); return Task.FromResult(new CallResult<string?>(address));
} }
/// <summary>
/// Get the url to reconnect to after losing a connection
/// </summary>
/// <param name="apiClient"></param>
/// <param name="connection"></param>
/// <returns></returns>
public virtual Task<Uri?> GetReconnectUriAsync(SocketApiClient apiClient, SocketConnection connection)
{
return Task.FromResult<Uri?>(connection.ConnectionUri);
}
/// <summary> /// <summary>
/// Gets a connection for a new subscription or query. Can be an existing if there are open position or a new one. /// Gets a connection for a new subscription or query. Can be an existing if there are open position or a new one.
/// </summary> /// </summary>

View File

@ -1,4 +1,5 @@
using CryptoExchange.Net.Objects; using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Sockets;
using System; using System;
using System.Security.Authentication; using System.Security.Authentication;
using System.Text; using System.Text;
@ -35,6 +36,10 @@ namespace CryptoExchange.Net.Interfaces
/// Websocket has reconnected to the server /// Websocket has reconnected to the server
/// </summary> /// </summary>
event Action OnReconnected; event Action OnReconnected;
/// <summary>
/// Get reconntion url
/// </summary>
Func<Task<Uri?>> GetReconnectionUrl { get; set; }
/// <summary> /// <summary>
/// Unique id for this socket /// Unique id for this socket

View File

@ -33,7 +33,6 @@ namespace CryptoExchange.Net.Sockets
private readonly AsyncResetEvent _sendEvent; private readonly AsyncResetEvent _sendEvent;
private readonly ConcurrentQueue<byte[]> _sendBuffer; private readonly ConcurrentQueue<byte[]> _sendBuffer;
private readonly SemaphoreSlim _closeSem; private readonly SemaphoreSlim _closeSem;
private readonly WebSocketParameters _parameters;
private readonly List<DateTime> _outgoingMessages; private readonly List<DateTime> _outgoingMessages;
private ClientWebSocket _socket; private ClientWebSocket _socket;
@ -64,13 +63,16 @@ namespace CryptoExchange.Net.Sockets
/// <inheritdoc /> /// <inheritdoc />
public int Id { get; } public int Id { get; }
/// <inheritdoc />
public WebSocketParameters Parameters { get; }
/// <summary> /// <summary>
/// The timestamp this socket has been active for the last time /// The timestamp this socket has been active for the last time
/// </summary> /// </summary>
public DateTime LastActionTime { get; private set; } public DateTime LastActionTime { get; private set; }
/// <inheritdoc /> /// <inheritdoc />
public Uri Uri => _parameters.Uri; public Uri Uri => Parameters.Uri;
/// <inheritdoc /> /// <inheritdoc />
public bool IsClosed => _socket.State == WebSocketState.Closed; public bool IsClosed => _socket.State == WebSocketState.Closed;
@ -107,6 +109,8 @@ namespace CryptoExchange.Net.Sockets
public event Action? OnReconnecting; public event Action? OnReconnecting;
/// <inheritdoc /> /// <inheritdoc />
public event Action? OnReconnected; public event Action? OnReconnected;
/// <inheritdoc />
public Func<Task<Uri?>>? GetReconnectionUrl { get; set; }
/// <summary> /// <summary>
/// ctor /// ctor
@ -118,7 +122,7 @@ namespace CryptoExchange.Net.Sockets
Id = NextStreamId(); Id = NextStreamId();
_log = log; _log = log;
_parameters = websocketParameters; Parameters = websocketParameters;
_outgoingMessages = new List<DateTime>(); _outgoingMessages = new List<DateTime>();
_receivedMessages = new List<ReceiveItem>(); _receivedMessages = new List<ReceiveItem>();
_sendEvent = new AsyncResetEvent(); _sendEvent = new AsyncResetEvent();
@ -147,17 +151,17 @@ namespace CryptoExchange.Net.Sockets
private ClientWebSocket CreateSocket() private ClientWebSocket CreateSocket()
{ {
var cookieContainer = new CookieContainer(); var cookieContainer = new CookieContainer();
foreach (var cookie in _parameters.Cookies) foreach (var cookie in Parameters.Cookies)
cookieContainer.Add(new Cookie(cookie.Key, cookie.Value)); cookieContainer.Add(new Cookie(cookie.Key, cookie.Value));
var socket = new ClientWebSocket(); var socket = new ClientWebSocket();
socket.Options.Cookies = cookieContainer; socket.Options.Cookies = cookieContainer;
foreach (var header in _parameters.Headers) foreach (var header in Parameters.Headers)
socket.Options.SetRequestHeader(header.Key, header.Value); socket.Options.SetRequestHeader(header.Key, header.Value);
socket.Options.KeepAliveInterval = _parameters.KeepAliveInterval ?? TimeSpan.Zero; socket.Options.KeepAliveInterval = Parameters.KeepAliveInterval ?? TimeSpan.Zero;
socket.Options.SetBuffer(65536, 65536); // Setting it to anything bigger than 65536 throws an exception in .net framework socket.Options.SetBuffer(65536, 65536); // Setting it to anything bigger than 65536 throws an exception in .net framework
if (_parameters.Proxy != null) if (Parameters.Proxy != null)
SetProxy(_parameters.Proxy); SetProxy(Parameters.Proxy);
return socket; return socket;
} }
@ -188,7 +192,7 @@ namespace CryptoExchange.Net.Sockets
_processState = ProcessState.Processing; _processState = ProcessState.Processing;
var sendTask = SendLoopAsync(); var sendTask = SendLoopAsync();
var receiveTask = ReceiveLoopAsync(); var receiveTask = ReceiveLoopAsync();
var timeoutTask = _parameters.Timeout != null && _parameters.Timeout > TimeSpan.FromSeconds(0) ? CheckTimeoutAsync() : Task.CompletedTask; var timeoutTask = Parameters.Timeout != null && Parameters.Timeout > TimeSpan.FromSeconds(0) ? CheckTimeoutAsync() : Task.CompletedTask;
await Task.WhenAll(sendTask, receiveTask, timeoutTask).ConfigureAwait(false); await Task.WhenAll(sendTask, receiveTask, timeoutTask).ConfigureAwait(false);
_log.Write(LogLevel.Debug, $"Socket {Id} processing tasks finished"); _log.Write(LogLevel.Debug, $"Socket {Id} processing tasks finished");
@ -199,7 +203,7 @@ namespace CryptoExchange.Net.Sockets
await _closeTask.ConfigureAwait(false); await _closeTask.ConfigureAwait(false);
_closeTask = null; _closeTask = null;
if (!_parameters.AutoReconnect) if (!Parameters.AutoReconnect)
{ {
_processState = ProcessState.Idle; _processState = ProcessState.Idle;
OnClose?.Invoke(); OnClose?.Invoke();
@ -209,12 +213,23 @@ namespace CryptoExchange.Net.Sockets
if (!_stopRequested) if (!_stopRequested)
{ {
_processState = ProcessState.Reconnecting; _processState = ProcessState.Reconnecting;
OnReconnecting?.Invoke(); OnReconnecting?.Invoke();
} }
while (!_stopRequested) while (!_stopRequested)
{ {
_log.Write(LogLevel.Debug, $"Socket {Id} attempting to reconnect"); _log.Write(LogLevel.Debug, $"Socket {Id} attempting to reconnect");
var task = GetReconnectionUrl?.Invoke();
if (task != null)
{
var reconnectUri = await task.ConfigureAwait(false);
if (reconnectUri != null && Parameters.Uri != reconnectUri)
{
_log.Write(LogLevel.Debug, $"Socket {Id} reconnect URI set to {reconnectUri}");
Parameters.Uri = reconnectUri;
}
}
_socket = CreateSocket(); _socket = CreateSocket();
_ctsSource.Dispose(); _ctsSource.Dispose();
_ctsSource = new CancellationTokenSource(); _ctsSource = new CancellationTokenSource();
@ -223,7 +238,7 @@ namespace CryptoExchange.Net.Sockets
var connected = await ConnectInternalAsync().ConfigureAwait(false); var connected = await ConnectInternalAsync().ConfigureAwait(false);
if (!connected) if (!connected)
{ {
await Task.Delay(_parameters.ReconnectInterval).ConfigureAwait(false); await Task.Delay(Parameters.ReconnectInterval).ConfigureAwait(false);
continue; continue;
} }
@ -241,7 +256,7 @@ namespace CryptoExchange.Net.Sockets
if (_ctsSource.IsCancellationRequested) if (_ctsSource.IsCancellationRequested)
return; return;
var bytes = _parameters.Encoding.GetBytes(data); var bytes = Parameters.Encoding.GetBytes(data);
_log.Write(LogLevel.Trace, $"Socket {Id} Adding {bytes.Length} to sent buffer"); _log.Write(LogLevel.Trace, $"Socket {Id} Adding {bytes.Length} to sent buffer");
_sendBuffer.Enqueue(bytes); _sendBuffer.Enqueue(bytes);
_sendEvent.Set(); _sendEvent.Set();
@ -370,11 +385,11 @@ namespace CryptoExchange.Net.Sockets
while (_sendBuffer.TryDequeue(out var data)) while (_sendBuffer.TryDequeue(out var data))
{ {
if (_parameters.RatelimitPerSecond != null) if (Parameters.RatelimitPerSecond != null)
{ {
// Wait for rate limit // Wait for rate limit
DateTime? start = null; DateTime? start = null;
while (MessagesSentLastSecond() >= _parameters.RatelimitPerSecond) while (MessagesSentLastSecond() >= Parameters.RatelimitPerSecond)
{ {
start ??= DateTime.UtcNow; start ??= DateTime.UtcNow;
await Task.Delay(50).ConfigureAwait(false); await Task.Delay(50).ConfigureAwait(false);
@ -549,14 +564,14 @@ namespace CryptoExchange.Net.Sockets
string strData; string strData;
if (messageType == WebSocketMessageType.Binary) if (messageType == WebSocketMessageType.Binary)
{ {
if (_parameters.DataInterpreterBytes == null) if (Parameters.DataInterpreterBytes == null)
throw new Exception("Byte interpreter not set while receiving byte data"); throw new Exception("Byte interpreter not set while receiving byte data");
try try
{ {
var relevantData = new byte[count]; var relevantData = new byte[count];
Array.Copy(data, offset, relevantData, 0, count); Array.Copy(data, offset, relevantData, 0, count);
strData = _parameters.DataInterpreterBytes(relevantData); strData = Parameters.DataInterpreterBytes(relevantData);
} }
catch(Exception e) catch(Exception e)
{ {
@ -565,13 +580,13 @@ namespace CryptoExchange.Net.Sockets
} }
} }
else else
strData = _parameters.Encoding.GetString(data, offset, count); strData = Parameters.Encoding.GetString(data, offset, count);
if (_parameters.DataInterpreterString != null) if (Parameters.DataInterpreterString != null)
{ {
try try
{ {
strData = _parameters.DataInterpreterString(strData); strData = Parameters.DataInterpreterString(strData);
} }
catch(Exception e) catch(Exception e)
{ {
@ -633,7 +648,7 @@ namespace CryptoExchange.Net.Sockets
/// <returns></returns> /// <returns></returns>
protected async Task CheckTimeoutAsync() protected async Task CheckTimeoutAsync()
{ {
_log.Write(LogLevel.Debug, $"Socket {Id} Starting task checking for no data received for {_parameters.Timeout}"); _log.Write(LogLevel.Debug, $"Socket {Id} Starting task checking for no data received for {Parameters.Timeout}");
LastActionTime = DateTime.UtcNow; LastActionTime = DateTime.UtcNow;
try try
{ {
@ -642,9 +657,9 @@ namespace CryptoExchange.Net.Sockets
if (_ctsSource.IsCancellationRequested) if (_ctsSource.IsCancellationRequested)
return; return;
if (DateTime.UtcNow - LastActionTime > _parameters.Timeout) if (DateTime.UtcNow - LastActionTime > Parameters.Timeout)
{ {
_log.Write(LogLevel.Warning, $"Socket {Id} No data received for {_parameters.Timeout}, reconnecting socket"); _log.Write(LogLevel.Warning, $"Socket {Id} No data received for {Parameters.Timeout}, reconnecting socket");
_ = CloseAsync().ConfigureAwait(false); _ = CloseAsync().ConfigureAwait(false);
return; return;
} }

View File

@ -184,6 +184,7 @@ namespace CryptoExchange.Net.Sockets
_socket.OnReconnecting += HandleReconnecting; _socket.OnReconnecting += HandleReconnecting;
_socket.OnReconnected += HandleReconnected; _socket.OnReconnected += HandleReconnected;
_socket.OnError += HandleError; _socket.OnError += HandleError;
_socket.GetReconnectionUrl = GetReconnectionUrlAsync;
} }
/// <summary> /// <summary>
@ -223,7 +224,17 @@ namespace CryptoExchange.Net.Sockets
foreach (var sub in subscriptions) foreach (var sub in subscriptions)
sub.Confirmed = false; sub.Confirmed = false;
} }
Task.Run(() => ConnectionLost?.Invoke());
_ = Task.Run(() => ConnectionLost?.Invoke());
}
/// <summary>
/// Get the url to connect to when reconnecting
/// </summary>
/// <returns></returns>
protected virtual async Task<Uri?> GetReconnectionUrlAsync()
{
return await socketClient.GetReconnectUriAsync(ApiClient, this).ConfigureAwait(false);
} }
/// <summary> /// <summary>

View File

@ -2,6 +2,7 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Text; using System.Text;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Sockets namespace CryptoExchange.Net.Sockets
{ {