1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-06-07 16:06:15 +00:00

Added optional delay after socket connection, added callback when reconnected socket to revitalize original request, fixed proxy setting socket

This commit is contained in:
JKorf 2022-11-13 19:47:33 +01:00
parent 3365837338
commit ad614830d1
5 changed files with 41 additions and 16 deletions

View File

@ -1,6 +1,7 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
using CryptoExchange.Net.Authentication; using CryptoExchange.Net.Authentication;
using CryptoExchange.Net.Interfaces; using CryptoExchange.Net.Interfaces;
@ -101,16 +102,10 @@ namespace CryptoExchange.Net
/// </summary> /// </summary>
public string GetSubscriptionsState() public string GetSubscriptionsState()
{ {
//var sb = new StringBuilder(); var result = new StringBuilder();
//sb.AppendLine($"{socketConnections.Count} connections, {CurrentSubscriptions} subscriptions, kbps: {IncomingKbps}"); foreach(var client in ApiClients.OfType<SocketApiClient>())
//foreach(var connection in socketConnections) result.AppendLine(client.GetSubscriptionsState());
//{ return result.ToString();
// sb.AppendLine($" Connection {connection.Key}: {connection.Value.SubscriptionCount} subscriptions, status: {connection.Value.Status}, authenticated: {connection.Value.Authenticated}, kbps: {connection.Value.IncomingKbps}");
// foreach (var subscription in connection.Value.Subscriptions)
// sb.AppendLine($" Subscription {subscription.Id}, authenticated: {subscription.Authenticated}, confirmed: {subscription.Confirmed}");
//}
//return sb.ToString();
return "";
} }
} }
} }

View File

@ -378,6 +378,9 @@ namespace CryptoExchange.Net
if (!connectResult) if (!connectResult)
return new CallResult<bool>(connectResult.Error!); return new CallResult<bool>(connectResult.Error!);
if (Options.DelayAfterConnect != TimeSpan.Zero)
await Task.Delay(Options.DelayAfterConnect).ConfigureAwait(false);
if (!authenticated || socket.Authenticated) if (!authenticated || socket.Authenticated)
return new CallResult<bool>(true); return new CallResult<bool>(true);
@ -541,6 +544,16 @@ namespace CryptoExchange.Net
return Task.FromResult<Uri?>(connection.ConnectionUri); return Task.FromResult<Uri?>(connection.ConnectionUri);
} }
/// <summary>
/// Update the original request to send when the connection is restored after disconnecting. Can be used to update an authentication token for example.
/// </summary>
/// <param name="request">The original request</param>
/// <returns></returns>
public virtual Task<CallResult<object>> RevitalizeRequestAsync(object request)
{
return Task.FromResult(new CallResult<object>(request));
}
/// <summary> /// <summary>
/// Gets a connection for a new subscription or query. Can be an existing if there are open position or a new one. /// Gets a connection for a new subscription or query. Can be an existing if there are open position or a new one.
/// </summary> /// </summary>

View File

@ -268,6 +268,11 @@ namespace CryptoExchange.Net.Objects
/// </summary> /// </summary>
public int? MaxSocketConnections { get; set; } public int? MaxSocketConnections { get; set; }
/// <summary>
/// The time to wait after connecting a socket before sending messages. Can be used for API's which will rate limit if you subscribe directly after connecting.
/// </summary>
public TimeSpan DelayAfterConnect = TimeSpan.Zero;
/// <summary> /// <summary>
/// ctor /// ctor
/// </summary> /// </summary>

View File

@ -165,7 +165,7 @@ namespace CryptoExchange.Net.Sockets
socket.Options.KeepAliveInterval = Parameters.KeepAliveInterval ?? TimeSpan.Zero; socket.Options.KeepAliveInterval = Parameters.KeepAliveInterval ?? TimeSpan.Zero;
socket.Options.SetBuffer(65536, 65536); // Setting it to anything bigger than 65536 throws an exception in .net framework socket.Options.SetBuffer(65536, 65536); // Setting it to anything bigger than 65536 throws an exception in .net framework
if (Parameters.Proxy != null) if (Parameters.Proxy != null)
SetProxy(Parameters.Proxy); SetProxy(socket, Parameters.Proxy);
} }
catch (PlatformNotSupportedException) catch (PlatformNotSupportedException)
{ {
@ -739,22 +739,23 @@ namespace CryptoExchange.Net.Sockets
/// <summary> /// <summary>
/// Set proxy on socket /// Set proxy on socket
/// </summary> /// </summary>
/// <param name="socket"></param>
/// <param name="proxy"></param> /// <param name="proxy"></param>
/// <exception cref="ArgumentException"></exception> /// <exception cref="ArgumentException"></exception>
protected virtual void SetProxy(ApiProxy proxy) protected virtual void SetProxy(ClientWebSocket socket, ApiProxy proxy)
{ {
if (!Uri.TryCreate($"{proxy.Host}:{proxy.Port}", UriKind.Absolute, out var uri)) if (!Uri.TryCreate($"{proxy.Host}:{proxy.Port}", UriKind.Absolute, out var uri))
throw new ArgumentException("Proxy settings invalid, {proxy.Host}:{proxy.Port} not a valid URI", nameof(proxy)); throw new ArgumentException("Proxy settings invalid, {proxy.Host}:{proxy.Port} not a valid URI", nameof(proxy));
_socket.Options.Proxy = uri?.Scheme == null socket.Options.Proxy = uri?.Scheme == null
? _socket.Options.Proxy = new WebProxy(proxy.Host, proxy.Port) ? socket.Options.Proxy = new WebProxy(proxy.Host, proxy.Port)
: _socket.Options.Proxy = new WebProxy : socket.Options.Proxy = new WebProxy
{ {
Address = uri Address = uri
}; };
if (proxy.Login != null) if (proxy.Login != null)
_socket.Options.Proxy.Credentials = new NetworkCredential(proxy.Login, proxy.Password); socket.Options.Proxy.Credentials = new NetworkCredential(proxy.Login, proxy.Password);
} }
} }

View File

@ -253,6 +253,7 @@ namespace CryptoExchange.Net.Sockets
var reconnectSuccessful = await ProcessReconnectAsync().ConfigureAwait(false); var reconnectSuccessful = await ProcessReconnectAsync().ConfigureAwait(false);
if (!reconnectSuccessful) if (!reconnectSuccessful)
{ {
_log.Write(LogLevel.Warning, "Failed reconnect processing, reconnecting again");
await _socket.ReconnectAsync().ConfigureAwait(false); await _socket.ReconnectAsync().ConfigureAwait(false);
} }
else else
@ -637,6 +638,16 @@ namespace CryptoExchange.Net.Sockets
} }
} }
foreach(var subscription in subscriptionList.Where(s => s.Request != null))
{
var result = await ApiClient.RevitalizeRequestAsync(subscription.Request!).ConfigureAwait(false);
if (!result)
{
_log.Write(LogLevel.Warning, "Failed request revitalization: " + result.Error);
return result.As<bool>(false);
}
}
// Foreach subscription which is subscribed by a subscription request we will need to resend that request to resubscribe // Foreach subscription which is subscribed by a subscription request we will need to resend that request to resubscribe
for (var i = 0; i < subscriptionList.Count; i += ApiClient.Options.MaxConcurrentResubscriptionsPerSocket) for (var i = 0; i < subscriptionList.Count; i += ApiClient.Options.MaxConcurrentResubscriptionsPerSocket)
{ {