diff --git a/CryptoExchange.Net/BaseClient.cs b/CryptoExchange.Net/BaseClient.cs index 325bc73..6f4132c 100644 --- a/CryptoExchange.Net/BaseClient.cs +++ b/CryptoExchange.Net/BaseClient.cs @@ -60,13 +60,18 @@ namespace CryptoExchange.Net } protected CallResult Deserialize(string data, bool checkObject = true, JsonSerializer serializer = null) where T : class + { + var obj = JToken.Parse(data); + return Deserialize(obj, checkObject, serializer); + } + + protected CallResult Deserialize(JToken obj, bool checkObject = true, JsonSerializer serializer = null) where T : class { if (serializer == null) serializer = defaultSerializer; try { - var obj = JToken.Parse(data); if (checkObject && log.Level == LogVerbosity.Debug) { try @@ -92,19 +97,19 @@ namespace CryptoExchange.Net } catch (JsonReaderException jre) { - var info = $"Deserialize JsonReaderException: {jre.Message}, Path: {jre.Path}, LineNumber: {jre.LineNumber}, LinePosition: {jre.LinePosition}. Received data: {data}"; + var info = $"Deserialize JsonReaderException: {jre.Message}, Path: {jre.Path}, LineNumber: {jre.LineNumber}, LinePosition: {jre.LinePosition}. Received data: {obj.ToString()}"; log.Write(LogVerbosity.Error, info); return new CallResult(null, new DeserializeError(info)); } catch (JsonSerializationException jse) { - var info = $"Deserialize JsonSerializationException: {jse.Message}. Received data: {data}"; + var info = $"Deserialize JsonSerializationException: {jse.Message}. Received data: {obj.ToString()}"; log.Write(LogVerbosity.Error, info); return new CallResult(null, new DeserializeError(info)); } catch (Exception ex) { - var info = $"Deserialize Unknown Exception: {ex.Message}. Received data: {data}"; + var info = $"Deserialize Unknown Exception: {ex.Message}. Received data: {obj.ToString()}"; log.Write(LogVerbosity.Error, info); return new CallResult(null, new DeserializeError(info)); } @@ -139,7 +144,7 @@ namespace CryptoExchange.Net } foreach (var token in obj) { - var d = properties.SingleOrDefault(p => p == token.Key); + var d = properties.FirstOrDefault(p => p == token.Key); if (d == null) { d = properties.SingleOrDefault(p => p.ToLower() == token.Key.ToLower()); diff --git a/CryptoExchange.Net/RestClient.cs b/CryptoExchange.Net/RestClient.cs index 0136930..55565aa 100644 --- a/CryptoExchange.Net/RestClient.cs +++ b/CryptoExchange.Net/RestClient.cs @@ -103,7 +103,7 @@ namespace CryptoExchange.Net return new CallResult(0, new CantConnectError() { Message = "Ping failed: " + reply.Status }); } - protected virtual async Task> ExecuteRequest(Uri uri, string method = Constants.GetMethod, Dictionary parameters = null, bool signed = false) where T : class + protected virtual async Task> ExecuteRequest(Uri uri, string method = Constants.GetMethod, Dictionary parameters = null, bool signed = false, bool checkResult = true) where T : class { log.Write(LogVerbosity.Debug, $"Creating request for " + uri); if (signed && authProvider == null) @@ -146,7 +146,7 @@ namespace CryptoExchange.Net log.Write(LogVerbosity.Debug, $"Sending {method}{(signed ? " signed" : "")} request to {request.Uri} {(paramString ?? "")}"); var result = await ExecuteRequest(request).ConfigureAwait(false); - return result.Error != null ? new CallResult(null, result.Error) : Deserialize(result.Data); + return result.Error != null ? new CallResult(null, result.Error) : Deserialize(result.Data, checkResult); } protected virtual IRequest ConstructRequest(Uri uri, string method, Dictionary parameters, bool signed) diff --git a/CryptoExchange.Net/SocketClient.cs b/CryptoExchange.Net/SocketClient.cs index 3a78224..c79efc3 100644 --- a/CryptoExchange.Net/SocketClient.cs +++ b/CryptoExchange.Net/SocketClient.cs @@ -63,7 +63,8 @@ namespace CryptoExchange.Net socket.DataInterpreter = dataInterpreter; socket.OnClose += () => { - socket.DisconnectTime = DateTime.UtcNow; + if(socket.DisconnectTime == null) + socket.DisconnectTime = DateTime.UtcNow; SocketOnClose(socket); }; socket.OnError += (e) => @@ -73,7 +74,6 @@ namespace CryptoExchange.Net }; socket.OnOpen += () => { - socket.ShouldReconnect = true; SocketOpened(socket); }; return socket; @@ -84,18 +84,26 @@ namespace CryptoExchange.Net protected virtual void SocketError(IWebsocket socket, Exception ex) { } protected abstract bool SocketReconnect(SocketSubscription socket, TimeSpan disconnectedTime); - protected virtual CallResult ConnectSocket(IWebsocket socket) + protected virtual async Task> ConnectSocket(SocketSubscription socketSubscription) { - if (socket.Connect().Result) + socketSubscription.Socket.OnMessage += data => ProcessMessage(socketSubscription, data); + + if (await socketSubscription.Socket.Connect()) { - var subscription = new SocketSubscription(socket); lock (sockets) - sockets.Add(subscription); - return new CallResult(subscription, null); + sockets.Add(socketSubscription); + return new CallResult(true, null); } - socket.Dispose(); - return new CallResult(null, new CantConnectError()); + socketSubscription.Socket.Dispose(); + return new CallResult(false, new CantConnectError()); + } + + protected virtual void ProcessMessage(SocketSubscription sub, string data) + { + log.Write(LogVerbosity.Debug, $"Socket {sub.Socket.Id} received data: " + data); + foreach (var handler in sub.DataHandlers) + handler(sub, JToken.Parse(data)); } protected virtual void SocketOnClose(IWebsocket socket) @@ -120,6 +128,7 @@ namespace CryptoExchange.Net if (!SocketReconnect(subscription, DateTime.UtcNow - socket.DisconnectTime.Value)) socket.Close().Wait(); // Close so we end up reconnecting again + socket.DisconnectTime = null; return; }); } @@ -146,35 +155,6 @@ namespace CryptoExchange.Net socket.Send(data); } - protected virtual async Task> SendAndWait(IWebsocket socket, T obj, Func waitingFor, int timeout=5000) - { - return await Task.Run(() => - { - var data = JsonConvert.SerializeObject(obj); - ManualResetEvent evnt = new ManualResetEvent(false); - string result = null; - var onMessageAction = new Action((msg) => - { - if (!waitingFor(JToken.Parse(msg))) - return; - - log.Write(LogVerbosity.Debug, "Socket received query response: " + msg); - result = msg; - evnt?.Set(); - }); - - socket.OnMessage += onMessageAction; - Send(socket, data); - evnt.WaitOne(timeout); - socket.OnMessage -= onMessageAction; - evnt.Dispose(); - evnt = null; - if (result == null) - return new CallResult(null, new ServerError("No response from server")); - return new CallResult(result, null); - }).ConfigureAwait(false); - } - public override void Dispose() { lock(sockets) diff --git a/CryptoExchange.Net/Sockets/SocketEvent.cs b/CryptoExchange.Net/Sockets/SocketEvent.cs new file mode 100644 index 0000000..eb8a650 --- /dev/null +++ b/CryptoExchange.Net/Sockets/SocketEvent.cs @@ -0,0 +1,38 @@ +using CryptoExchange.Net.Objects; +using System.Threading; + +namespace CryptoExchange.Net.Sockets +{ + public class SocketEvent + { + public string Name { get; set; } + + private CallResult result; + private ManualResetEvent setEvnt; + + public SocketEvent(string name) + { + Name = name; + setEvnt = new ManualResetEvent(false); + result = new CallResult(false, new UnknownError("No response received")); + } + + public void Set(bool result, Error error) + { + this.result = new CallResult(result, error); + setEvnt.Set(); + } + + public CallResult Wait(int timeout = 5000) + { + setEvnt.WaitOne(timeout); + return result; + } + + public void Reset() + { + setEvnt.Reset(); + result = new CallResult(false, new UnknownError("No response received")); + } + } +} diff --git a/CryptoExchange.Net/Sockets/SocketSubscription.cs b/CryptoExchange.Net/Sockets/SocketSubscription.cs index 16668c4..b6364ac 100644 --- a/CryptoExchange.Net/Sockets/SocketSubscription.cs +++ b/CryptoExchange.Net/Sockets/SocketSubscription.cs @@ -1,14 +1,20 @@ using CryptoExchange.Net.Interfaces; +using CryptoExchange.Net.Objects; +using Newtonsoft.Json.Linq; using System; using System.Collections.Generic; -using System.Text; +using System.Linq; +using System.Threading.Tasks; namespace CryptoExchange.Net.Sockets { public class SocketSubscription { public event Action ConnectionLost; - public event Action ConnectionRestored; + public event Action ConnectionRestored; + + public List> DataHandlers { get; set; } + public List Events { get; set; } public IWebsocket Socket { get; set; } public object Request { get; set; } @@ -18,6 +24,9 @@ namespace CryptoExchange.Net.Sockets public SocketSubscription(IWebsocket socket) { Socket = socket; + Events = new List(); + + DataHandlers = new List>(); Socket.OnClose += () => { @@ -25,6 +34,10 @@ namespace CryptoExchange.Net.Sockets return; lostTriggered = true; + + foreach (var events in Events) + events.Reset(); + if (Socket.ShouldReconnect) ConnectionLost?.Invoke(); }; @@ -32,8 +45,30 @@ namespace CryptoExchange.Net.Sockets { lostTriggered = false; if (Socket.DisconnectTime != null) - ConnectionRestored?.Invoke(); + ConnectionRestored?.Invoke(DateTime.UtcNow - Socket.DisconnectTime.Value); }; } + + public void AddEvent(string name) + { + Events.Add(new SocketEvent(name)); + } + + public void SetEvent(string name, bool success, Error error) + { + Events.SingleOrDefault(e => e.Name == name)?.Set(success, error); + } + + public CallResult WaitForEvent(string name) + { + return Events.Single(e => e.Name == name).Wait(); + } + + public async Task Close() + { + Socket.ShouldReconnect = false; + await Socket.Close(); + Socket.Dispose(); + } } } diff --git a/CryptoExchange.Net/Sockets/UpdateSubscription.cs b/CryptoExchange.Net/Sockets/UpdateSubscription.cs new file mode 100644 index 0000000..e858d34 --- /dev/null +++ b/CryptoExchange.Net/Sockets/UpdateSubscription.cs @@ -0,0 +1,42 @@ +using System; +using System.Threading.Tasks; + +namespace CryptoExchange.Net.Sockets +{ + public class UpdateSubscription + { + private SocketSubscription subscription; + + /// + /// Event when the connection is lost + /// + public event Action ConnectionLost + { + add => subscription.ConnectionLost += value; + remove => subscription.ConnectionLost -= value; + } + + /// + /// Event when the connection is restored. Timespan parameter indicates the time the socket has been offline for before reconnecting + /// + public event Action ConnectionRestored + { + add => subscription.ConnectionRestored += value; + remove => subscription.ConnectionRestored -= value; + } + + public UpdateSubscription(SocketSubscription sub) + { + subscription = sub; + } + + /// + /// Close the subscription + /// + /// + public async Task Close() + { + await subscription.Close(); + } + } +}