diff --git a/CryptoExchange.Net/Interfaces/IRequest.cs b/CryptoExchange.Net/Interfaces/IRequest.cs index ec5219b..33154e9 100644 --- a/CryptoExchange.Net/Interfaces/IRequest.cs +++ b/CryptoExchange.Net/Interfaces/IRequest.cs @@ -10,7 +10,7 @@ namespace CryptoExchange.Net.Interfaces Uri Uri { get; } WebHeaderCollection Headers { get; set; } string Method { get; set; } - + int Timeout { get; set; } void SetProxy(string host, int port); string ContentType { get; set; } diff --git a/CryptoExchange.Net/Objects/ExchangeOptions.cs b/CryptoExchange.Net/Objects/ExchangeOptions.cs index 44564fe..a5450e1 100644 --- a/CryptoExchange.Net/Objects/ExchangeOptions.cs +++ b/CryptoExchange.Net/Objects/ExchangeOptions.cs @@ -50,6 +50,11 @@ namespace CryptoExchange.Net.Objects /// What to do when a call would exceed the rate limit /// public RateLimitingBehaviour RateLimitingBehaviour { get; set; } = RateLimitingBehaviour.Wait; + + /// + /// The time the server has to respond to a request before timing out + /// + public TimeSpan RequestTimeout { get; set; } = TimeSpan.FromSeconds(30); } public class SocketClientOptions: ExchangeOptions diff --git a/CryptoExchange.Net/Requests/Request.cs b/CryptoExchange.Net/Requests/Request.cs index e2471a3..564fc77 100644 --- a/CryptoExchange.Net/Requests/Request.cs +++ b/CryptoExchange.Net/Requests/Request.cs @@ -44,6 +44,12 @@ namespace CryptoExchange.Net.Requests set => request.Method = value; } + public int Timeout + { + get => request.Timeout; + set => request.Timeout = value; + } + public Uri Uri => request.RequestUri; public void SetProxy(string host, int port) diff --git a/CryptoExchange.Net/RestClient.cs b/CryptoExchange.Net/RestClient.cs index 3a8a46d..f8733c5 100644 --- a/CryptoExchange.Net/RestClient.cs +++ b/CryptoExchange.Net/RestClient.cs @@ -27,6 +27,7 @@ namespace CryptoExchange.Net public IRequestFactory RequestFactory { get; set; } = new RequestFactory(); protected RateLimitingBehaviour rateLimitBehaviour; + protected int requestTimeout; protected PostParameters postParametersPosition = PostParameters.InBody; protected RequestBodyFormat requestBodyFormat = RequestBodyFormat.Json; @@ -44,6 +45,7 @@ namespace CryptoExchange.Net /// Options protected void Configure(ClientOptions exchangeOptions) { + requestTimeout = (int)Math.Round(exchangeOptions.RequestTimeout.TotalMilliseconds, 0); rateLimitBehaviour = exchangeOptions.RateLimitingBehaviour; rateLimiters = new List(); foreach (var rateLimiter in exchangeOptions.RateLimiters) @@ -246,6 +248,7 @@ namespace CryptoExchange.Net var returnedData = ""; try { + request.Timeout = requestTimeout; var response = await request.GetResponse().ConfigureAwait(false); using (var reader = new StreamReader(response.GetResponseStream())) { diff --git a/CryptoExchange.Net/SocketClient.cs b/CryptoExchange.Net/SocketClient.cs index fe9ffb2..0184bb6 100644 --- a/CryptoExchange.Net/SocketClient.cs +++ b/CryptoExchange.Net/SocketClient.cs @@ -20,7 +20,7 @@ namespace CryptoExchange.Net /// /// The factory for creating sockets. Used for unit testing /// - public IWebsocketFactory SocketFactory { get; set; } = new WebsocketFactory(); + public virtual IWebsocketFactory SocketFactory { get; set; } = new WebsocketFactory(); private const SslProtocols protocols = SslProtocols.Tls12 | SslProtocols.Tls11 | SslProtocols.Tls; @@ -206,6 +206,7 @@ namespace CryptoExchange.Net /// public virtual async Task Unsubscribe(UpdateSubscription subscription) { + log.Write(LogVerbosity.Info, $"Closing subscription {subscription.Id}"); await subscription.Close(); } @@ -215,6 +216,7 @@ namespace CryptoExchange.Net /// public virtual async Task UnsubscribeAll() { + log.Write(LogVerbosity.Debug, $"Closing all {sockets.Count} subscriptions"); await Task.Run(() => { var tasks = new List(); @@ -226,10 +228,11 @@ namespace CryptoExchange.Net public override void Dispose() { - lock(sockets) - foreach (var socket in sockets) - socket.Socket.Dispose(); - sockets.Clear(); + log.Write(LogVerbosity.Debug, "Disposing socket client, closing all subscriptions"); + lock (sockets) + UnsubscribeAll().Wait(); + + base.Dispose(); } } } diff --git a/CryptoExchange.Net/Sockets/BaseSocket.cs b/CryptoExchange.Net/Sockets/BaseSocket.cs index 6bbd4ea..79084aa 100644 --- a/CryptoExchange.Net/Sockets/BaseSocket.cs +++ b/CryptoExchange.Net/Sockets/BaseSocket.cs @@ -28,6 +28,9 @@ namespace CryptoExchange.Net.Sockets protected readonly List closeHandlers = new List(); protected readonly List> messageHandlers = new List>(); + protected IDictionary cookies; + protected IDictionary headers; + public int Id { get; } public DateTime? DisconnectTime { get; set; } public bool ShouldReconnect { get; set; } @@ -67,16 +70,8 @@ namespace CryptoExchange.Net.Sockets Id = NextStreamId(); this.log = log; Url = url; - socket = new WebSocket(url, cookies: cookies.ToList(), customHeaderItems: headers.ToList()) - { - EnableAutoSendPing = true, - AutoSendPingInterval = 10 - }; - socket.Opened += (o, s) => Handle(openHandlers); - socket.Closed += (o, s) => Handle(closeHandlers); - socket.Error += (o, s) => Handle(errorHandlers, s.Exception); - socket.MessageReceived += (o, s) => Handle(messageHandlers, s.Message); - socket.DataReceived += (o, s) => HandleByteData(s.Data); + this.cookies = cookies; + this.headers = headers; } private void HandleByteData(byte[] data) @@ -118,7 +113,7 @@ namespace CryptoExchange.Net.Sockets handle?.Invoke(data); } - public async Task Close() + public virtual async Task Close() { await Task.Run(() => { @@ -126,12 +121,12 @@ namespace CryptoExchange.Net.Sockets { if (socket == null || IsClosed) { - log.Write(LogVerbosity.Debug, $"Socket {Id} was already closed/disposed"); + log?.Write(LogVerbosity.Debug, $"Socket {Id} was already closed/disposed"); return; } var waitLock = new object(); - log.Write(LogVerbosity.Debug, $"Socket {Id} closing"); + log?.Write(LogVerbosity.Debug, $"Socket {Id} closing"); ManualResetEvent evnt = new ManualResetEvent(false); var handler = new EventHandler((o, a) => { @@ -147,24 +142,38 @@ namespace CryptoExchange.Net.Sockets evnt.Dispose(); evnt = null; } - log.Write(LogVerbosity.Debug, $"Socket {Id} closed"); + log?.Write(LogVerbosity.Debug, $"Socket {Id} closed"); } }).ConfigureAwait(false); } - public void Send(string data) + public virtual void Send(string data) { socket.Send(data); } - public async Task Connect() + public virtual async Task Connect() { + if (socket == null) + { + socket = new WebSocket(Url, cookies: cookies.ToList(), customHeaderItems: headers.ToList()) + { + EnableAutoSendPing = true, + AutoSendPingInterval = 10 + }; + socket.Opened += (o, s) => Handle(openHandlers); + socket.Closed += (o, s) => Handle(closeHandlers); + socket.Error += (o, s) => Handle(errorHandlers, s.Exception); + socket.MessageReceived += (o, s) => Handle(messageHandlers, s.Message); + socket.DataReceived += (o, s) => HandleByteData(s.Data); + } + return await Task.Run(() => { bool connected; lock (socketLock) { - log.Write(LogVerbosity.Debug, $"Socket {Id} connecting"); + log?.Write(LogVerbosity.Debug, $"Socket {Id} connecting"); var waitLock = new object(); ManualResetEvent evnt = new ManualResetEvent(false); var handler = new EventHandler((o, a) => @@ -192,9 +201,9 @@ namespace CryptoExchange.Net.Sockets } connected = socket.State == WebSocketState.Open; if (connected) - log.Write(LogVerbosity.Debug, $"Socket {Id} connected"); + log?.Write(LogVerbosity.Debug, $"Socket {Id} connected"); else - log.Write(LogVerbosity.Debug, $"Socket {Id} connection failed, state: " + socket.State); + log?.Write(LogVerbosity.Debug, $"Socket {Id} connection failed, state: " + socket.State); } if (socket.State == WebSocketState.Connecting) @@ -204,12 +213,12 @@ namespace CryptoExchange.Net.Sockets }).ConfigureAwait(false); } - public void SetEnabledSslProtocols(SslProtocols protocols) + public virtual void SetEnabledSslProtocols(SslProtocols protocols) { socket.Security.EnabledSslProtocols = protocols; } - public void SetProxy(string host, int port) + public virtual void SetProxy(string host, int port) { IPAddress address; socket.Proxy = IPAddress.TryParse(host, out address) @@ -222,7 +231,7 @@ namespace CryptoExchange.Net.Sockets lock (socketLock) { if (socket != null) - log.Write(LogVerbosity.Debug, $"Socket {Id} disposing websocket"); + log?.Write(LogVerbosity.Debug, $"Socket {Id} disposing websocket"); socket?.Dispose(); socket = null; diff --git a/CryptoExchange.Net/Sockets/SocketEvent.cs b/CryptoExchange.Net/Sockets/SocketEvent.cs index eb8a650..79a4788 100644 --- a/CryptoExchange.Net/Sockets/SocketEvent.cs +++ b/CryptoExchange.Net/Sockets/SocketEvent.cs @@ -6,6 +6,7 @@ namespace CryptoExchange.Net.Sockets public class SocketEvent { public string Name { get; set; } + public int WaitingId { get; set; } private CallResult result; private ManualResetEvent setEvnt; @@ -21,6 +22,7 @@ namespace CryptoExchange.Net.Sockets { this.result = new CallResult(result, error); setEvnt.Set(); + WaitingId = -1; } public CallResult Wait(int timeout = 5000) diff --git a/CryptoExchange.Net/Sockets/SocketRequest.cs b/CryptoExchange.Net/Sockets/SocketRequest.cs new file mode 100644 index 0000000..2a823a6 --- /dev/null +++ b/CryptoExchange.Net/Sockets/SocketRequest.cs @@ -0,0 +1,10 @@ +using Newtonsoft.Json; + +namespace CryptoExchange.Net.Sockets +{ + public class SocketRequest + { + [JsonIgnore] + public bool Signed { get; set; } + } +} diff --git a/CryptoExchange.Net/Sockets/SocketSubscription.cs b/CryptoExchange.Net/Sockets/SocketSubscription.cs index 747f42c..490db1b 100644 --- a/CryptoExchange.Net/Sockets/SocketSubscription.cs +++ b/CryptoExchange.Net/Sockets/SocketSubscription.cs @@ -17,17 +17,17 @@ namespace CryptoExchange.Net.Sockets public List Events { get; set; } public IWebsocket Socket { get; set; } - public object Request { get; set; } + public SocketRequest Request { get; set; } private bool lostTriggered; - private Dictionary waitingForIds; + private List waitingForEvents; public SocketSubscription(IWebsocket socket) { Socket = socket; Events = new List(); - waitingForIds = new Dictionary(); + waitingForEvents = new List(); DataHandlers = new List>(); @@ -67,25 +67,27 @@ namespace CryptoExchange.Net.Sockets public void SetEvent(int id, bool success, Error error) { - if (waitingForIds.ContainsKey(id)) + var waitingEvent = waitingForEvents.SingleOrDefault(e => e.WaitingId == id); + if (waitingEvent != null) { - waitingForIds[id].Set(success, error); - waitingForIds.Remove(id); + waitingEvent.Set(success, error); + waitingForEvents.Remove(waitingEvent); } } - public (int, SocketEvent) GetWaitingEvent(string name) + public SocketEvent GetWaitingEvent(string name) { - var result = waitingForIds.SingleOrDefault(w => w.Value.Name == name); - if (result.Equals(default(KeyValuePair))) - return (0, null); - - return (result.Key, result.Value); + return waitingForEvents.SingleOrDefault(w => w.Name == name); } - public CallResult WaitForEvent(string name, int timeout) + public Task> WaitForEvent(string name, int timeout) { - return Events.Single(e => e.Name == name).Wait(timeout); + return Task.Run(() => + { + var evnt = Events.Single(e => e.Name == name); + waitingForEvents.Add(evnt); + return evnt.Wait(timeout); + }); } public Task> WaitForEvent(string name, int id, int timeout) @@ -93,7 +95,8 @@ namespace CryptoExchange.Net.Sockets return Task.Run(() => { var evnt = Events.Single(e => e.Name == name); - waitingForIds.Add(id, evnt); + evnt.WaitingId = id; + waitingForEvents.Add(evnt); return evnt.Wait(timeout); }); } diff --git a/CryptoExchange.Net/Sockets/UpdateSubscription.cs b/CryptoExchange.Net/Sockets/UpdateSubscription.cs index e858d34..8c7ce19 100644 --- a/CryptoExchange.Net/Sockets/UpdateSubscription.cs +++ b/CryptoExchange.Net/Sockets/UpdateSubscription.cs @@ -25,6 +25,8 @@ namespace CryptoExchange.Net.Sockets remove => subscription.ConnectionRestored -= value; } + public int Id => subscription.Socket.Id; + public UpdateSubscription(SocketSubscription sub) { subscription = sub;