1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-06-09 08:56:13 +00:00

Added request timeout, some refactoring

This commit is contained in:
JKorf 2018-11-27 13:32:20 +01:00
parent 009ae3a45d
commit 86674c83f5
10 changed files with 86 additions and 43 deletions

View File

@ -10,7 +10,7 @@ namespace CryptoExchange.Net.Interfaces
Uri Uri { get; } Uri Uri { get; }
WebHeaderCollection Headers { get; set; } WebHeaderCollection Headers { get; set; }
string Method { get; set; } string Method { get; set; }
int Timeout { get; set; }
void SetProxy(string host, int port); void SetProxy(string host, int port);
string ContentType { get; set; } string ContentType { get; set; }

View File

@ -50,6 +50,11 @@ namespace CryptoExchange.Net.Objects
/// What to do when a call would exceed the rate limit /// What to do when a call would exceed the rate limit
/// </summary> /// </summary>
public RateLimitingBehaviour RateLimitingBehaviour { get; set; } = RateLimitingBehaviour.Wait; public RateLimitingBehaviour RateLimitingBehaviour { get; set; } = RateLimitingBehaviour.Wait;
/// <summary>
/// The time the server has to respond to a request before timing out
/// </summary>
public TimeSpan RequestTimeout { get; set; } = TimeSpan.FromSeconds(30);
} }
public class SocketClientOptions: ExchangeOptions public class SocketClientOptions: ExchangeOptions

View File

@ -44,6 +44,12 @@ namespace CryptoExchange.Net.Requests
set => request.Method = value; set => request.Method = value;
} }
public int Timeout
{
get => request.Timeout;
set => request.Timeout = value;
}
public Uri Uri => request.RequestUri; public Uri Uri => request.RequestUri;
public void SetProxy(string host, int port) public void SetProxy(string host, int port)

View File

@ -27,6 +27,7 @@ namespace CryptoExchange.Net
public IRequestFactory RequestFactory { get; set; } = new RequestFactory(); public IRequestFactory RequestFactory { get; set; } = new RequestFactory();
protected RateLimitingBehaviour rateLimitBehaviour; protected RateLimitingBehaviour rateLimitBehaviour;
protected int requestTimeout;
protected PostParameters postParametersPosition = PostParameters.InBody; protected PostParameters postParametersPosition = PostParameters.InBody;
protected RequestBodyFormat requestBodyFormat = RequestBodyFormat.Json; protected RequestBodyFormat requestBodyFormat = RequestBodyFormat.Json;
@ -44,6 +45,7 @@ namespace CryptoExchange.Net
/// <param name="exchangeOptions">Options</param> /// <param name="exchangeOptions">Options</param>
protected void Configure(ClientOptions exchangeOptions) protected void Configure(ClientOptions exchangeOptions)
{ {
requestTimeout = (int)Math.Round(exchangeOptions.RequestTimeout.TotalMilliseconds, 0);
rateLimitBehaviour = exchangeOptions.RateLimitingBehaviour; rateLimitBehaviour = exchangeOptions.RateLimitingBehaviour;
rateLimiters = new List<IRateLimiter>(); rateLimiters = new List<IRateLimiter>();
foreach (var rateLimiter in exchangeOptions.RateLimiters) foreach (var rateLimiter in exchangeOptions.RateLimiters)
@ -246,6 +248,7 @@ namespace CryptoExchange.Net
var returnedData = ""; var returnedData = "";
try try
{ {
request.Timeout = requestTimeout;
var response = await request.GetResponse().ConfigureAwait(false); var response = await request.GetResponse().ConfigureAwait(false);
using (var reader = new StreamReader(response.GetResponseStream())) using (var reader = new StreamReader(response.GetResponseStream()))
{ {

View File

@ -20,7 +20,7 @@ namespace CryptoExchange.Net
/// <summary> /// <summary>
/// The factory for creating sockets. Used for unit testing /// The factory for creating sockets. Used for unit testing
/// </summary> /// </summary>
public IWebsocketFactory SocketFactory { get; set; } = new WebsocketFactory(); public virtual IWebsocketFactory SocketFactory { get; set; } = new WebsocketFactory();
private const SslProtocols protocols = SslProtocols.Tls12 | SslProtocols.Tls11 | SslProtocols.Tls; private const SslProtocols protocols = SslProtocols.Tls12 | SslProtocols.Tls11 | SslProtocols.Tls;
@ -206,6 +206,7 @@ namespace CryptoExchange.Net
/// <returns></returns> /// <returns></returns>
public virtual async Task Unsubscribe(UpdateSubscription subscription) public virtual async Task Unsubscribe(UpdateSubscription subscription)
{ {
log.Write(LogVerbosity.Info, $"Closing subscription {subscription.Id}");
await subscription.Close(); await subscription.Close();
} }
@ -215,6 +216,7 @@ namespace CryptoExchange.Net
/// <returns></returns> /// <returns></returns>
public virtual async Task UnsubscribeAll() public virtual async Task UnsubscribeAll()
{ {
log.Write(LogVerbosity.Debug, $"Closing all {sockets.Count} subscriptions");
await Task.Run(() => await Task.Run(() =>
{ {
var tasks = new List<Task>(); var tasks = new List<Task>();
@ -226,10 +228,11 @@ namespace CryptoExchange.Net
public override void Dispose() public override void Dispose()
{ {
lock(sockets) log.Write(LogVerbosity.Debug, "Disposing socket client, closing all subscriptions");
foreach (var socket in sockets) lock (sockets)
socket.Socket.Dispose(); UnsubscribeAll().Wait();
sockets.Clear();
base.Dispose();
} }
} }
} }

View File

@ -28,6 +28,9 @@ namespace CryptoExchange.Net.Sockets
protected readonly List<Action> closeHandlers = new List<Action>(); protected readonly List<Action> closeHandlers = new List<Action>();
protected readonly List<Action<string>> messageHandlers = new List<Action<string>>(); protected readonly List<Action<string>> messageHandlers = new List<Action<string>>();
protected IDictionary<string, string> cookies;
protected IDictionary<string, string> headers;
public int Id { get; } public int Id { get; }
public DateTime? DisconnectTime { get; set; } public DateTime? DisconnectTime { get; set; }
public bool ShouldReconnect { get; set; } public bool ShouldReconnect { get; set; }
@ -67,16 +70,8 @@ namespace CryptoExchange.Net.Sockets
Id = NextStreamId(); Id = NextStreamId();
this.log = log; this.log = log;
Url = url; Url = url;
socket = new WebSocket(url, cookies: cookies.ToList(), customHeaderItems: headers.ToList()) this.cookies = cookies;
{ this.headers = headers;
EnableAutoSendPing = true,
AutoSendPingInterval = 10
};
socket.Opened += (o, s) => Handle(openHandlers);
socket.Closed += (o, s) => Handle(closeHandlers);
socket.Error += (o, s) => Handle(errorHandlers, s.Exception);
socket.MessageReceived += (o, s) => Handle(messageHandlers, s.Message);
socket.DataReceived += (o, s) => HandleByteData(s.Data);
} }
private void HandleByteData(byte[] data) private void HandleByteData(byte[] data)
@ -118,7 +113,7 @@ namespace CryptoExchange.Net.Sockets
handle?.Invoke(data); handle?.Invoke(data);
} }
public async Task Close() public virtual async Task Close()
{ {
await Task.Run(() => await Task.Run(() =>
{ {
@ -126,12 +121,12 @@ namespace CryptoExchange.Net.Sockets
{ {
if (socket == null || IsClosed) if (socket == null || IsClosed)
{ {
log.Write(LogVerbosity.Debug, $"Socket {Id} was already closed/disposed"); log?.Write(LogVerbosity.Debug, $"Socket {Id} was already closed/disposed");
return; return;
} }
var waitLock = new object(); var waitLock = new object();
log.Write(LogVerbosity.Debug, $"Socket {Id} closing"); log?.Write(LogVerbosity.Debug, $"Socket {Id} closing");
ManualResetEvent evnt = new ManualResetEvent(false); ManualResetEvent evnt = new ManualResetEvent(false);
var handler = new EventHandler((o, a) => var handler = new EventHandler((o, a) =>
{ {
@ -147,24 +142,38 @@ namespace CryptoExchange.Net.Sockets
evnt.Dispose(); evnt.Dispose();
evnt = null; evnt = null;
} }
log.Write(LogVerbosity.Debug, $"Socket {Id} closed"); log?.Write(LogVerbosity.Debug, $"Socket {Id} closed");
} }
}).ConfigureAwait(false); }).ConfigureAwait(false);
} }
public void Send(string data) public virtual void Send(string data)
{ {
socket.Send(data); socket.Send(data);
} }
public async Task<bool> Connect() public virtual async Task<bool> Connect()
{ {
if (socket == null)
{
socket = new WebSocket(Url, cookies: cookies.ToList(), customHeaderItems: headers.ToList())
{
EnableAutoSendPing = true,
AutoSendPingInterval = 10
};
socket.Opened += (o, s) => Handle(openHandlers);
socket.Closed += (o, s) => Handle(closeHandlers);
socket.Error += (o, s) => Handle(errorHandlers, s.Exception);
socket.MessageReceived += (o, s) => Handle(messageHandlers, s.Message);
socket.DataReceived += (o, s) => HandleByteData(s.Data);
}
return await Task.Run(() => return await Task.Run(() =>
{ {
bool connected; bool connected;
lock (socketLock) lock (socketLock)
{ {
log.Write(LogVerbosity.Debug, $"Socket {Id} connecting"); log?.Write(LogVerbosity.Debug, $"Socket {Id} connecting");
var waitLock = new object(); var waitLock = new object();
ManualResetEvent evnt = new ManualResetEvent(false); ManualResetEvent evnt = new ManualResetEvent(false);
var handler = new EventHandler((o, a) => var handler = new EventHandler((o, a) =>
@ -192,9 +201,9 @@ namespace CryptoExchange.Net.Sockets
} }
connected = socket.State == WebSocketState.Open; connected = socket.State == WebSocketState.Open;
if (connected) if (connected)
log.Write(LogVerbosity.Debug, $"Socket {Id} connected"); log?.Write(LogVerbosity.Debug, $"Socket {Id} connected");
else else
log.Write(LogVerbosity.Debug, $"Socket {Id} connection failed, state: " + socket.State); log?.Write(LogVerbosity.Debug, $"Socket {Id} connection failed, state: " + socket.State);
} }
if (socket.State == WebSocketState.Connecting) if (socket.State == WebSocketState.Connecting)
@ -204,12 +213,12 @@ namespace CryptoExchange.Net.Sockets
}).ConfigureAwait(false); }).ConfigureAwait(false);
} }
public void SetEnabledSslProtocols(SslProtocols protocols) public virtual void SetEnabledSslProtocols(SslProtocols protocols)
{ {
socket.Security.EnabledSslProtocols = protocols; socket.Security.EnabledSslProtocols = protocols;
} }
public void SetProxy(string host, int port) public virtual void SetProxy(string host, int port)
{ {
IPAddress address; IPAddress address;
socket.Proxy = IPAddress.TryParse(host, out address) socket.Proxy = IPAddress.TryParse(host, out address)
@ -222,7 +231,7 @@ namespace CryptoExchange.Net.Sockets
lock (socketLock) lock (socketLock)
{ {
if (socket != null) if (socket != null)
log.Write(LogVerbosity.Debug, $"Socket {Id} disposing websocket"); log?.Write(LogVerbosity.Debug, $"Socket {Id} disposing websocket");
socket?.Dispose(); socket?.Dispose();
socket = null; socket = null;

View File

@ -6,6 +6,7 @@ namespace CryptoExchange.Net.Sockets
public class SocketEvent public class SocketEvent
{ {
public string Name { get; set; } public string Name { get; set; }
public int WaitingId { get; set; }
private CallResult<bool> result; private CallResult<bool> result;
private ManualResetEvent setEvnt; private ManualResetEvent setEvnt;
@ -21,6 +22,7 @@ namespace CryptoExchange.Net.Sockets
{ {
this.result = new CallResult<bool>(result, error); this.result = new CallResult<bool>(result, error);
setEvnt.Set(); setEvnt.Set();
WaitingId = -1;
} }
public CallResult<bool> Wait(int timeout = 5000) public CallResult<bool> Wait(int timeout = 5000)

View File

@ -0,0 +1,10 @@
using Newtonsoft.Json;
namespace CryptoExchange.Net.Sockets
{
public class SocketRequest
{
[JsonIgnore]
public bool Signed { get; set; }
}
}

View File

@ -17,17 +17,17 @@ namespace CryptoExchange.Net.Sockets
public List<SocketEvent> Events { get; set; } public List<SocketEvent> Events { get; set; }
public IWebsocket Socket { get; set; } public IWebsocket Socket { get; set; }
public object Request { get; set; } public SocketRequest Request { get; set; }
private bool lostTriggered; private bool lostTriggered;
private Dictionary<int, SocketEvent> waitingForIds; private List<SocketEvent> waitingForEvents;
public SocketSubscription(IWebsocket socket) public SocketSubscription(IWebsocket socket)
{ {
Socket = socket; Socket = socket;
Events = new List<SocketEvent>(); Events = new List<SocketEvent>();
waitingForIds = new Dictionary<int, SocketEvent>(); waitingForEvents = new List<SocketEvent>();
DataHandlers = new List<Action<SocketSubscription, JToken>>(); DataHandlers = new List<Action<SocketSubscription, JToken>>();
@ -67,25 +67,27 @@ namespace CryptoExchange.Net.Sockets
public void SetEvent(int id, bool success, Error error) public void SetEvent(int id, bool success, Error error)
{ {
if (waitingForIds.ContainsKey(id)) var waitingEvent = waitingForEvents.SingleOrDefault(e => e.WaitingId == id);
if (waitingEvent != null)
{ {
waitingForIds[id].Set(success, error); waitingEvent.Set(success, error);
waitingForIds.Remove(id); waitingForEvents.Remove(waitingEvent);
} }
} }
public (int, SocketEvent) GetWaitingEvent(string name) public SocketEvent GetWaitingEvent(string name)
{ {
var result = waitingForIds.SingleOrDefault(w => w.Value.Name == name); return waitingForEvents.SingleOrDefault(w => w.Name == name);
if (result.Equals(default(KeyValuePair<int, SocketEvent>)))
return (0, null);
return (result.Key, result.Value);
} }
public CallResult<bool> WaitForEvent(string name, int timeout) public Task<CallResult<bool>> WaitForEvent(string name, int timeout)
{ {
return Events.Single(e => e.Name == name).Wait(timeout); return Task.Run(() =>
{
var evnt = Events.Single(e => e.Name == name);
waitingForEvents.Add(evnt);
return evnt.Wait(timeout);
});
} }
public Task<CallResult<bool>> WaitForEvent(string name, int id, int timeout) public Task<CallResult<bool>> WaitForEvent(string name, int id, int timeout)
@ -93,7 +95,8 @@ namespace CryptoExchange.Net.Sockets
return Task.Run(() => return Task.Run(() =>
{ {
var evnt = Events.Single(e => e.Name == name); var evnt = Events.Single(e => e.Name == name);
waitingForIds.Add(id, evnt); evnt.WaitingId = id;
waitingForEvents.Add(evnt);
return evnt.Wait(timeout); return evnt.Wait(timeout);
}); });
} }

View File

@ -25,6 +25,8 @@ namespace CryptoExchange.Net.Sockets
remove => subscription.ConnectionRestored -= value; remove => subscription.ConnectionRestored -= value;
} }
public int Id => subscription.Socket.Id;
public UpdateSubscription(SocketSubscription sub) public UpdateSubscription(SocketSubscription sub)
{ {
subscription = sub; subscription = sub;