1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-06-08 00:16:27 +00:00

Various socket client changes

This commit is contained in:
JKorf 2018-11-23 15:50:43 +01:00
parent 8b1f9af458
commit b1085bc764
6 changed files with 148 additions and 48 deletions

View File

@ -60,13 +60,18 @@ namespace CryptoExchange.Net
} }
protected CallResult<T> Deserialize<T>(string data, bool checkObject = true, JsonSerializer serializer = null) where T : class protected CallResult<T> Deserialize<T>(string data, bool checkObject = true, JsonSerializer serializer = null) where T : class
{
var obj = JToken.Parse(data);
return Deserialize<T>(obj, checkObject, serializer);
}
protected CallResult<T> Deserialize<T>(JToken obj, bool checkObject = true, JsonSerializer serializer = null) where T : class
{ {
if (serializer == null) if (serializer == null)
serializer = defaultSerializer; serializer = defaultSerializer;
try try
{ {
var obj = JToken.Parse(data);
if (checkObject && log.Level == LogVerbosity.Debug) if (checkObject && log.Level == LogVerbosity.Debug)
{ {
try try
@ -92,19 +97,19 @@ namespace CryptoExchange.Net
} }
catch (JsonReaderException jre) catch (JsonReaderException jre)
{ {
var info = $"Deserialize JsonReaderException: {jre.Message}, Path: {jre.Path}, LineNumber: {jre.LineNumber}, LinePosition: {jre.LinePosition}. Received data: {data}"; var info = $"Deserialize JsonReaderException: {jre.Message}, Path: {jre.Path}, LineNumber: {jre.LineNumber}, LinePosition: {jre.LinePosition}. Received data: {obj.ToString()}";
log.Write(LogVerbosity.Error, info); log.Write(LogVerbosity.Error, info);
return new CallResult<T>(null, new DeserializeError(info)); return new CallResult<T>(null, new DeserializeError(info));
} }
catch (JsonSerializationException jse) catch (JsonSerializationException jse)
{ {
var info = $"Deserialize JsonSerializationException: {jse.Message}. Received data: {data}"; var info = $"Deserialize JsonSerializationException: {jse.Message}. Received data: {obj.ToString()}";
log.Write(LogVerbosity.Error, info); log.Write(LogVerbosity.Error, info);
return new CallResult<T>(null, new DeserializeError(info)); return new CallResult<T>(null, new DeserializeError(info));
} }
catch (Exception ex) catch (Exception ex)
{ {
var info = $"Deserialize Unknown Exception: {ex.Message}. Received data: {data}"; var info = $"Deserialize Unknown Exception: {ex.Message}. Received data: {obj.ToString()}";
log.Write(LogVerbosity.Error, info); log.Write(LogVerbosity.Error, info);
return new CallResult<T>(null, new DeserializeError(info)); return new CallResult<T>(null, new DeserializeError(info));
} }
@ -139,7 +144,7 @@ namespace CryptoExchange.Net
} }
foreach (var token in obj) foreach (var token in obj)
{ {
var d = properties.SingleOrDefault(p => p == token.Key); var d = properties.FirstOrDefault(p => p == token.Key);
if (d == null) if (d == null)
{ {
d = properties.SingleOrDefault(p => p.ToLower() == token.Key.ToLower()); d = properties.SingleOrDefault(p => p.ToLower() == token.Key.ToLower());

View File

@ -103,7 +103,7 @@ namespace CryptoExchange.Net
return new CallResult<long>(0, new CantConnectError() { Message = "Ping failed: " + reply.Status }); return new CallResult<long>(0, new CantConnectError() { Message = "Ping failed: " + reply.Status });
} }
protected virtual async Task<CallResult<T>> ExecuteRequest<T>(Uri uri, string method = Constants.GetMethod, Dictionary<string, object> parameters = null, bool signed = false) where T : class protected virtual async Task<CallResult<T>> ExecuteRequest<T>(Uri uri, string method = Constants.GetMethod, Dictionary<string, object> parameters = null, bool signed = false, bool checkResult = true) where T : class
{ {
log.Write(LogVerbosity.Debug, $"Creating request for " + uri); log.Write(LogVerbosity.Debug, $"Creating request for " + uri);
if (signed && authProvider == null) if (signed && authProvider == null)
@ -146,7 +146,7 @@ namespace CryptoExchange.Net
log.Write(LogVerbosity.Debug, $"Sending {method}{(signed ? " signed" : "")} request to {request.Uri} {(paramString ?? "")}"); log.Write(LogVerbosity.Debug, $"Sending {method}{(signed ? " signed" : "")} request to {request.Uri} {(paramString ?? "")}");
var result = await ExecuteRequest(request).ConfigureAwait(false); var result = await ExecuteRequest(request).ConfigureAwait(false);
return result.Error != null ? new CallResult<T>(null, result.Error) : Deserialize<T>(result.Data); return result.Error != null ? new CallResult<T>(null, result.Error) : Deserialize<T>(result.Data, checkResult);
} }
protected virtual IRequest ConstructRequest(Uri uri, string method, Dictionary<string, object> parameters, bool signed) protected virtual IRequest ConstructRequest(Uri uri, string method, Dictionary<string, object> parameters, bool signed)

View File

@ -63,6 +63,7 @@ namespace CryptoExchange.Net
socket.DataInterpreter = dataInterpreter; socket.DataInterpreter = dataInterpreter;
socket.OnClose += () => socket.OnClose += () =>
{ {
if(socket.DisconnectTime == null)
socket.DisconnectTime = DateTime.UtcNow; socket.DisconnectTime = DateTime.UtcNow;
SocketOnClose(socket); SocketOnClose(socket);
}; };
@ -73,7 +74,6 @@ namespace CryptoExchange.Net
}; };
socket.OnOpen += () => socket.OnOpen += () =>
{ {
socket.ShouldReconnect = true;
SocketOpened(socket); SocketOpened(socket);
}; };
return socket; return socket;
@ -84,18 +84,26 @@ namespace CryptoExchange.Net
protected virtual void SocketError(IWebsocket socket, Exception ex) { } protected virtual void SocketError(IWebsocket socket, Exception ex) { }
protected abstract bool SocketReconnect(SocketSubscription socket, TimeSpan disconnectedTime); protected abstract bool SocketReconnect(SocketSubscription socket, TimeSpan disconnectedTime);
protected virtual CallResult<SocketSubscription> ConnectSocket(IWebsocket socket) protected virtual async Task<CallResult<bool>> ConnectSocket(SocketSubscription socketSubscription)
{ {
if (socket.Connect().Result) socketSubscription.Socket.OnMessage += data => ProcessMessage(socketSubscription, data);
if (await socketSubscription.Socket.Connect())
{ {
var subscription = new SocketSubscription(socket);
lock (sockets) lock (sockets)
sockets.Add(subscription); sockets.Add(socketSubscription);
return new CallResult<SocketSubscription>(subscription, null); return new CallResult<bool>(true, null);
} }
socket.Dispose(); socketSubscription.Socket.Dispose();
return new CallResult<SocketSubscription>(null, new CantConnectError()); return new CallResult<bool>(false, new CantConnectError());
}
protected virtual void ProcessMessage(SocketSubscription sub, string data)
{
log.Write(LogVerbosity.Debug, $"Socket {sub.Socket.Id} received data: " + data);
foreach (var handler in sub.DataHandlers)
handler(sub, JToken.Parse(data));
} }
protected virtual void SocketOnClose(IWebsocket socket) protected virtual void SocketOnClose(IWebsocket socket)
@ -120,6 +128,7 @@ namespace CryptoExchange.Net
if (!SocketReconnect(subscription, DateTime.UtcNow - socket.DisconnectTime.Value)) if (!SocketReconnect(subscription, DateTime.UtcNow - socket.DisconnectTime.Value))
socket.Close().Wait(); // Close so we end up reconnecting again socket.Close().Wait(); // Close so we end up reconnecting again
socket.DisconnectTime = null;
return; return;
}); });
} }
@ -146,35 +155,6 @@ namespace CryptoExchange.Net
socket.Send(data); socket.Send(data);
} }
protected virtual async Task<CallResult<string>> SendAndWait<T>(IWebsocket socket, T obj, Func<JToken, bool> waitingFor, int timeout=5000)
{
return await Task.Run(() =>
{
var data = JsonConvert.SerializeObject(obj);
ManualResetEvent evnt = new ManualResetEvent(false);
string result = null;
var onMessageAction = new Action<string>((msg) =>
{
if (!waitingFor(JToken.Parse(msg)))
return;
log.Write(LogVerbosity.Debug, "Socket received query response: " + msg);
result = msg;
evnt?.Set();
});
socket.OnMessage += onMessageAction;
Send(socket, data);
evnt.WaitOne(timeout);
socket.OnMessage -= onMessageAction;
evnt.Dispose();
evnt = null;
if (result == null)
return new CallResult<string>(null, new ServerError("No response from server"));
return new CallResult<string>(result, null);
}).ConfigureAwait(false);
}
public override void Dispose() public override void Dispose()
{ {
lock(sockets) lock(sockets)

View File

@ -0,0 +1,38 @@
using CryptoExchange.Net.Objects;
using System.Threading;
namespace CryptoExchange.Net.Sockets
{
public class SocketEvent
{
public string Name { get; set; }
private CallResult<bool> result;
private ManualResetEvent setEvnt;
public SocketEvent(string name)
{
Name = name;
setEvnt = new ManualResetEvent(false);
result = new CallResult<bool>(false, new UnknownError("No response received"));
}
public void Set(bool result, Error error)
{
this.result = new CallResult<bool>(result, error);
setEvnt.Set();
}
public CallResult<bool> Wait(int timeout = 5000)
{
setEvnt.WaitOne(timeout);
return result;
}
public void Reset()
{
setEvnt.Reset();
result = new CallResult<bool>(false, new UnknownError("No response received"));
}
}
}

View File

@ -1,14 +1,20 @@
using CryptoExchange.Net.Interfaces; using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Objects;
using Newtonsoft.Json.Linq;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Text; using System.Linq;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Sockets namespace CryptoExchange.Net.Sockets
{ {
public class SocketSubscription public class SocketSubscription
{ {
public event Action ConnectionLost; public event Action ConnectionLost;
public event Action ConnectionRestored; public event Action<TimeSpan> ConnectionRestored;
public List<Action<SocketSubscription, JToken>> DataHandlers { get; set; }
public List<SocketEvent> Events { get; set; }
public IWebsocket Socket { get; set; } public IWebsocket Socket { get; set; }
public object Request { get; set; } public object Request { get; set; }
@ -18,6 +24,9 @@ namespace CryptoExchange.Net.Sockets
public SocketSubscription(IWebsocket socket) public SocketSubscription(IWebsocket socket)
{ {
Socket = socket; Socket = socket;
Events = new List<SocketEvent>();
DataHandlers = new List<Action<SocketSubscription, JToken>>();
Socket.OnClose += () => Socket.OnClose += () =>
{ {
@ -25,6 +34,10 @@ namespace CryptoExchange.Net.Sockets
return; return;
lostTriggered = true; lostTriggered = true;
foreach (var events in Events)
events.Reset();
if (Socket.ShouldReconnect) if (Socket.ShouldReconnect)
ConnectionLost?.Invoke(); ConnectionLost?.Invoke();
}; };
@ -32,8 +45,30 @@ namespace CryptoExchange.Net.Sockets
{ {
lostTriggered = false; lostTriggered = false;
if (Socket.DisconnectTime != null) if (Socket.DisconnectTime != null)
ConnectionRestored?.Invoke(); ConnectionRestored?.Invoke(DateTime.UtcNow - Socket.DisconnectTime.Value);
}; };
} }
public void AddEvent(string name)
{
Events.Add(new SocketEvent(name));
}
public void SetEvent(string name, bool success, Error error)
{
Events.SingleOrDefault(e => e.Name == name)?.Set(success, error);
}
public CallResult<bool> WaitForEvent(string name)
{
return Events.Single(e => e.Name == name).Wait();
}
public async Task Close()
{
Socket.ShouldReconnect = false;
await Socket.Close();
Socket.Dispose();
}
} }
} }

View File

@ -0,0 +1,42 @@
using System;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Sockets
{
public class UpdateSubscription
{
private SocketSubscription subscription;
/// <summary>
/// Event when the connection is lost
/// </summary>
public event Action ConnectionLost
{
add => subscription.ConnectionLost += value;
remove => subscription.ConnectionLost -= value;
}
/// <summary>
/// Event when the connection is restored. Timespan parameter indicates the time the socket has been offline for before reconnecting
/// </summary>
public event Action<TimeSpan> ConnectionRestored
{
add => subscription.ConnectionRestored += value;
remove => subscription.ConnectionRestored -= value;
}
public UpdateSubscription(SocketSubscription sub)
{
subscription = sub;
}
/// <summary>
/// Close the subscription
/// </summary>
/// <returns></returns>
public async Task Close()
{
await subscription.Close();
}
}
}