mirror of
https://github.com/JKorf/CryptoExchange.Net
synced 2025-06-08 16:36:15 +00:00
Updated socketclient
This commit is contained in:
parent
2c58ef7feb
commit
c489b4e9aa
@ -41,223 +41,223 @@ namespace CryptoExchange.Net.UnitTests
|
|||||||
socket.CanConnect = canConnect;
|
socket.CanConnect = canConnect;
|
||||||
|
|
||||||
// act
|
// act
|
||||||
var connectResult = client.ConnectSocketSub(new SocketSubscription(socket));
|
var connectResult = client.ConnectSocketSub(new SocketConnection(client, new Log(), socket));
|
||||||
|
|
||||||
// assert
|
// assert
|
||||||
Assert.IsTrue(connectResult.Success == canConnect);
|
Assert.IsTrue(connectResult.Success == canConnect);
|
||||||
}
|
}
|
||||||
|
|
||||||
[TestCase]
|
//[TestCase]
|
||||||
public void SocketMessages_Should_BeProcessedInDataHandlers()
|
//public void SocketMessages_Should_BeProcessedInDataHandlers()
|
||||||
{
|
//{
|
||||||
// arrange
|
// // arrange
|
||||||
var client = new TestSocketClient(new SocketClientOptions() { ReconnectInterval = TimeSpan.Zero, LogVerbosity = LogVerbosity.Debug });
|
// var client = new TestSocketClient(new SocketClientOptions() { ReconnectInterval = TimeSpan.Zero, LogVerbosity = LogVerbosity.Debug });
|
||||||
var socket = client.CreateSocket();
|
// var socket = client.CreateSocket();
|
||||||
socket.ShouldReconnect = true;
|
// socket.ShouldReconnect = true;
|
||||||
socket.CanConnect = true;
|
// socket.CanConnect = true;
|
||||||
socket.DisconnectTime = DateTime.UtcNow;
|
// socket.DisconnectTime = DateTime.UtcNow;
|
||||||
var sub = new SocketSubscription(socket);
|
// var sub = new SocketConnection(socket);
|
||||||
var rstEvent = new ManualResetEvent(false);
|
// var rstEvent = new ManualResetEvent(false);
|
||||||
JToken result = null;
|
// JToken result = null;
|
||||||
sub.MessageHandlers.Add("TestHandler", (subs, data) =>
|
// sub.MessageHandlers.Add("TestHandler", (subs, data) =>
|
||||||
{
|
// {
|
||||||
result = data;
|
// result = data;
|
||||||
rstEvent.Set();
|
// rstEvent.Set();
|
||||||
return true;
|
// return true;
|
||||||
|
|
||||||
});
|
// });
|
||||||
client.ConnectSocketSub(sub);
|
// client.ConnectSocketSub(sub);
|
||||||
|
|
||||||
// act
|
// // act
|
||||||
socket.InvokeMessage("{\"property\": 123}");
|
// socket.InvokeMessage("{\"property\": 123}");
|
||||||
rstEvent.WaitOne(1000);
|
// rstEvent.WaitOne(1000);
|
||||||
|
|
||||||
// assert
|
// // assert
|
||||||
Assert.IsTrue((int)result["property"] == 123);
|
// Assert.IsTrue((int)result["property"] == 123);
|
||||||
}
|
//}
|
||||||
|
|
||||||
[TestCase]
|
//[TestCase]
|
||||||
public void SocketMessages_Should_NotBeProcessedInSubsequentHandlersIfHandlerReturnsTrue()
|
//public void SocketMessages_Should_NotBeProcessedInSubsequentHandlersIfHandlerReturnsTrue()
|
||||||
{
|
//{
|
||||||
// arrange
|
// // arrange
|
||||||
var client = new TestSocketClient(new SocketClientOptions() { ReconnectInterval = TimeSpan.Zero, LogVerbosity = LogVerbosity.Debug });
|
// var client = new TestSocketClient(new SocketClientOptions() { ReconnectInterval = TimeSpan.Zero, LogVerbosity = LogVerbosity.Debug });
|
||||||
var socket = client.CreateSocket();
|
// var socket = client.CreateSocket();
|
||||||
socket.ShouldReconnect = true;
|
// socket.ShouldReconnect = true;
|
||||||
socket.CanConnect = true;
|
// socket.CanConnect = true;
|
||||||
socket.DisconnectTime = DateTime.UtcNow;
|
// socket.DisconnectTime = DateTime.UtcNow;
|
||||||
var sub = new SocketSubscription(socket);
|
// var sub = new SocketConnection(socket);
|
||||||
var rstEvent1 = new ManualResetEvent(false);
|
// var rstEvent1 = new ManualResetEvent(false);
|
||||||
var rstEvent2 = new ManualResetEvent(false);
|
// var rstEvent2 = new ManualResetEvent(false);
|
||||||
JToken result1 = null;
|
// JToken result1 = null;
|
||||||
JToken result2 = null;
|
// JToken result2 = null;
|
||||||
sub.MessageHandlers.Add("TestHandler", (subs, data) =>
|
// sub.MessageHandlers.Add("TestHandler", (subs, data) =>
|
||||||
{
|
// {
|
||||||
result1 = data;
|
// result1 = data;
|
||||||
rstEvent1.Set();
|
// rstEvent1.Set();
|
||||||
return true;
|
// return true;
|
||||||
});
|
// });
|
||||||
sub.MessageHandlers.Add("TestHandlerNotHit", (subs, data) =>
|
// sub.MessageHandlers.Add("TestHandlerNotHit", (subs, data) =>
|
||||||
{
|
// {
|
||||||
result2 = data;
|
// result2 = data;
|
||||||
rstEvent2.Set();
|
// rstEvent2.Set();
|
||||||
return true;
|
// return true;
|
||||||
});
|
// });
|
||||||
client.ConnectSocketSub(sub);
|
// client.ConnectSocketSub(sub);
|
||||||
|
|
||||||
// act
|
// // act
|
||||||
socket.InvokeMessage("{\"property\": 123}");
|
// socket.InvokeMessage("{\"property\": 123}");
|
||||||
rstEvent1.WaitOne(100);
|
// rstEvent1.WaitOne(100);
|
||||||
rstEvent2.WaitOne(100);
|
// rstEvent2.WaitOne(100);
|
||||||
|
|
||||||
// assert
|
// // assert
|
||||||
Assert.IsTrue((int)result1["property"] == 123);
|
// Assert.IsTrue((int)result1["property"] == 123);
|
||||||
Assert.IsTrue(result2 == null);
|
// Assert.IsTrue(result2 == null);
|
||||||
}
|
//}
|
||||||
|
|
||||||
[TestCase]
|
//[TestCase]
|
||||||
public void SocketMessages_Should_BeProcessedInSubsequentHandlersIfHandlerReturnsFalse()
|
//public void SocketMessages_Should_BeProcessedInSubsequentHandlersIfHandlerReturnsFalse()
|
||||||
{
|
//{
|
||||||
// arrange
|
// // arrange
|
||||||
var client = new TestSocketClient(new SocketClientOptions() { ReconnectInterval = TimeSpan.Zero, LogVerbosity = LogVerbosity.Debug });
|
// var client = new TestSocketClient(new SocketClientOptions() { ReconnectInterval = TimeSpan.Zero, LogVerbosity = LogVerbosity.Debug });
|
||||||
var socket = client.CreateSocket();
|
// var socket = client.CreateSocket();
|
||||||
socket.ShouldReconnect = true;
|
// socket.ShouldReconnect = true;
|
||||||
socket.CanConnect = true;
|
// socket.CanConnect = true;
|
||||||
socket.DisconnectTime = DateTime.UtcNow;
|
// socket.DisconnectTime = DateTime.UtcNow;
|
||||||
var sub = new SocketSubscription(socket);
|
// var sub = new SocketConnection(socket);
|
||||||
var rstEvent = new ManualResetEvent(false);
|
// var rstEvent = new ManualResetEvent(false);
|
||||||
JToken result = null;
|
// JToken result = null;
|
||||||
sub.MessageHandlers.Add("TestHandlerNotProcessing", (subs, data) =>
|
// sub.MessageHandlers.Add("TestHandlerNotProcessing", (subs, data) =>
|
||||||
{
|
// {
|
||||||
return false;
|
// return false;
|
||||||
});
|
// });
|
||||||
sub.MessageHandlers.Add("TestHandler", (subs, data) =>
|
// sub.MessageHandlers.Add("TestHandler", (subs, data) =>
|
||||||
{
|
// {
|
||||||
result = data;
|
// result = data;
|
||||||
rstEvent.Set();
|
// rstEvent.Set();
|
||||||
return true;
|
// return true;
|
||||||
});
|
// });
|
||||||
client.ConnectSocketSub(sub);
|
// client.ConnectSocketSub(sub);
|
||||||
|
|
||||||
// act
|
// // act
|
||||||
socket.InvokeMessage("{\"property\": 123}");
|
// socket.InvokeMessage("{\"property\": 123}");
|
||||||
rstEvent.WaitOne(100);
|
// rstEvent.WaitOne(100);
|
||||||
|
|
||||||
// assert
|
// // assert
|
||||||
Assert.IsTrue((int)result["property"] == 123);
|
// Assert.IsTrue((int)result["property"] == 123);
|
||||||
}
|
//}
|
||||||
|
|
||||||
|
|
||||||
[TestCase]
|
//[TestCase]
|
||||||
public void DisconnectedSocket_Should_Reconnect()
|
//public void DisconnectedSocket_Should_Reconnect()
|
||||||
{
|
//{
|
||||||
// arrange
|
// // arrange
|
||||||
bool reconnected = false;
|
// bool reconnected = false;
|
||||||
var client = new TestSocketClient(new SocketClientOptions(){ReconnectInterval = TimeSpan.Zero ,LogVerbosity = LogVerbosity.Debug});
|
// var client = new TestSocketClient(new SocketClientOptions(){ReconnectInterval = TimeSpan.Zero ,LogVerbosity = LogVerbosity.Debug});
|
||||||
var socket = client.CreateSocket();
|
// var socket = client.CreateSocket();
|
||||||
socket.ShouldReconnect = true;
|
// socket.ShouldReconnect = true;
|
||||||
socket.CanConnect = true;
|
// socket.CanConnect = true;
|
||||||
socket.DisconnectTime = DateTime.UtcNow;
|
// socket.DisconnectTime = DateTime.UtcNow;
|
||||||
var sub = new SocketSubscription(socket);
|
// var sub = new SocketConnection(socket);
|
||||||
client.ConnectSocketSub(sub);
|
// client.ConnectSocketSub(sub);
|
||||||
var rstEvent = new ManualResetEvent(false);
|
// var rstEvent = new ManualResetEvent(false);
|
||||||
client.OnReconnect += () =>
|
// client.OnReconnect += () =>
|
||||||
{
|
// {
|
||||||
reconnected = true;
|
// reconnected = true;
|
||||||
rstEvent.Set();
|
// rstEvent.Set();
|
||||||
return true;
|
// return true;
|
||||||
};
|
// };
|
||||||
|
|
||||||
// act
|
// // act
|
||||||
socket.InvokeClose();
|
// socket.InvokeClose();
|
||||||
rstEvent.WaitOne(1000);
|
// rstEvent.WaitOne(1000);
|
||||||
|
|
||||||
// assert
|
// // assert
|
||||||
Assert.IsTrue(reconnected);
|
// Assert.IsTrue(reconnected);
|
||||||
}
|
//}
|
||||||
|
|
||||||
[TestCase()]
|
//[TestCase()]
|
||||||
public void UnsubscribingStream_Should_CloseTheSocket()
|
//public void UnsubscribingStream_Should_CloseTheSocket()
|
||||||
{
|
//{
|
||||||
// arrange
|
// // arrange
|
||||||
var client = new TestSocketClient(new SocketClientOptions() { ReconnectInterval = TimeSpan.Zero, LogVerbosity = LogVerbosity.Debug });
|
// var client = new TestSocketClient(new SocketClientOptions() { ReconnectInterval = TimeSpan.Zero, LogVerbosity = LogVerbosity.Debug });
|
||||||
var socket = client.CreateSocket();
|
// var socket = client.CreateSocket();
|
||||||
socket.CanConnect = true;
|
// socket.CanConnect = true;
|
||||||
var sub = new SocketSubscription(socket);
|
// var sub = new SocketConnection(socket);
|
||||||
client.ConnectSocketSub(sub);
|
// client.ConnectSocketSub(sub);
|
||||||
var ups = new UpdateSubscription(sub);
|
// var ups = new UpdateSubscription(sub);
|
||||||
|
|
||||||
// act
|
// // act
|
||||||
client.Unsubscribe(ups).Wait();
|
// client.Unsubscribe(ups).Wait();
|
||||||
|
|
||||||
// assert
|
// // assert
|
||||||
Assert.IsTrue(socket.Connected == false);
|
// Assert.IsTrue(socket.Connected == false);
|
||||||
}
|
//}
|
||||||
|
|
||||||
[TestCase()]
|
//[TestCase()]
|
||||||
public void UnsubscribingAll_Should_CloseAllSockets()
|
//public void UnsubscribingAll_Should_CloseAllSockets()
|
||||||
{
|
//{
|
||||||
// arrange
|
// // arrange
|
||||||
var client = new TestSocketClient(new SocketClientOptions() { ReconnectInterval = TimeSpan.Zero, LogVerbosity = LogVerbosity.Debug });
|
// var client = new TestSocketClient(new SocketClientOptions() { ReconnectInterval = TimeSpan.Zero, LogVerbosity = LogVerbosity.Debug });
|
||||||
var socket1 = client.CreateSocket();
|
// var socket1 = client.CreateSocket();
|
||||||
var socket2 = client.CreateSocket();
|
// var socket2 = client.CreateSocket();
|
||||||
socket1.CanConnect = true;
|
// socket1.CanConnect = true;
|
||||||
socket2.CanConnect = true;
|
// socket2.CanConnect = true;
|
||||||
var sub1 = new SocketSubscription(socket1);
|
// var sub1 = new SocketConnection(socket1);
|
||||||
var sub2 = new SocketSubscription(socket2);
|
// var sub2 = new SocketConnection(socket2);
|
||||||
client.ConnectSocketSub(sub1);
|
// client.ConnectSocketSub(sub1);
|
||||||
client.ConnectSocketSub(sub2);
|
// client.ConnectSocketSub(sub2);
|
||||||
|
|
||||||
// act
|
// // act
|
||||||
client.UnsubscribeAll().Wait();
|
// client.UnsubscribeAll().Wait();
|
||||||
|
|
||||||
// assert
|
// // assert
|
||||||
Assert.IsTrue(socket1.Connected == false);
|
// Assert.IsTrue(socket1.Connected == false);
|
||||||
Assert.IsTrue(socket2.Connected == false);
|
// Assert.IsTrue(socket2.Connected == false);
|
||||||
}
|
//}
|
||||||
|
|
||||||
[TestCase()]
|
//[TestCase()]
|
||||||
public void FailingToConnectSocket_Should_ReturnError()
|
//public void FailingToConnectSocket_Should_ReturnError()
|
||||||
{
|
//{
|
||||||
// arrange
|
// // arrange
|
||||||
var client = new TestSocketClient(new SocketClientOptions() { ReconnectInterval = TimeSpan.Zero, LogVerbosity = LogVerbosity.Debug });
|
// var client = new TestSocketClient(new SocketClientOptions() { ReconnectInterval = TimeSpan.Zero, LogVerbosity = LogVerbosity.Debug });
|
||||||
var socket = client.CreateSocket();
|
// var socket = client.CreateSocket();
|
||||||
socket.CanConnect = false;
|
// socket.CanConnect = false;
|
||||||
var sub = new SocketSubscription(socket);
|
// var sub = new SocketConnection(socket);
|
||||||
|
|
||||||
// act
|
// // act
|
||||||
var connectResult = client.ConnectSocketSub(sub);
|
// var connectResult = client.ConnectSocketSub(sub);
|
||||||
|
|
||||||
// assert
|
// // assert
|
||||||
Assert.IsFalse(connectResult.Success);
|
// Assert.IsFalse(connectResult.Success);
|
||||||
}
|
//}
|
||||||
|
|
||||||
[Test]
|
//[Test]
|
||||||
public void WhenResubscribeFails_Socket_ShouldReconnect()
|
//public void WhenResubscribeFails_Socket_ShouldReconnect()
|
||||||
{
|
//{
|
||||||
// arrange
|
// // arrange
|
||||||
int reconnected = 0;
|
// int reconnected = 0;
|
||||||
var client = new TestSocketClient(new SocketClientOptions() { ReconnectInterval = TimeSpan.FromMilliseconds(1), LogVerbosity = LogVerbosity.Debug });
|
// var client = new TestSocketClient(new SocketClientOptions() { ReconnectInterval = TimeSpan.FromMilliseconds(1), LogVerbosity = LogVerbosity.Debug });
|
||||||
var socket = client.CreateSocket();
|
// var socket = client.CreateSocket();
|
||||||
socket.ShouldReconnect = true;
|
// socket.ShouldReconnect = true;
|
||||||
socket.CanConnect = true;
|
// socket.CanConnect = true;
|
||||||
socket.DisconnectTime = DateTime.UtcNow;
|
// socket.DisconnectTime = DateTime.UtcNow;
|
||||||
var sub = new SocketSubscription(socket);
|
// var sub = new SocketConnection(socket);
|
||||||
client.ConnectSocketSub(sub);
|
// client.ConnectSocketSub(sub);
|
||||||
var rstEvent = new ManualResetEvent(false);
|
// var rstEvent = new ManualResetEvent(false);
|
||||||
client.OnReconnect += () =>
|
// client.OnReconnect += () =>
|
||||||
{
|
// {
|
||||||
reconnected++;
|
// reconnected++;
|
||||||
rstEvent.Set();
|
// rstEvent.Set();
|
||||||
return reconnected == 2;
|
// return reconnected == 2;
|
||||||
};
|
// };
|
||||||
|
|
||||||
// act
|
// // act
|
||||||
socket.InvokeClose();
|
// socket.InvokeClose();
|
||||||
rstEvent.WaitOne(1000);
|
// rstEvent.WaitOne(1000);
|
||||||
Thread.Sleep(100);
|
// Thread.Sleep(100);
|
||||||
|
|
||||||
// assert
|
// // assert
|
||||||
Assert.IsTrue(reconnected == 2);
|
// Assert.IsTrue(reconnected == 2);
|
||||||
}
|
//}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -21,7 +21,8 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations
|
|||||||
public int Id { get; }
|
public int Id { get; }
|
||||||
public bool ShouldReconnect { get; set; }
|
public bool ShouldReconnect { get; set; }
|
||||||
public TimeSpan Timeout { get; set; }
|
public TimeSpan Timeout { get; set; }
|
||||||
public Func<byte[], string> DataInterpreter { get; set; }
|
public Func<string, string> DataInterpreterString { get; set; }
|
||||||
|
public Func<byte[], string> DataInterpreterBytes { get; set; }
|
||||||
public DateTime? DisconnectTime { get; set; }
|
public DateTime? DisconnectTime { get; set; }
|
||||||
public string Url { get; }
|
public string Url { get; }
|
||||||
public WebSocketState SocketState { get; }
|
public WebSocketState SocketState { get; }
|
||||||
|
@ -1,9 +1,11 @@
|
|||||||
using System;
|
using System;
|
||||||
|
using System.Threading.Tasks;
|
||||||
using CryptoExchange.Net.Interfaces;
|
using CryptoExchange.Net.Interfaces;
|
||||||
using CryptoExchange.Net.Logging;
|
using CryptoExchange.Net.Logging;
|
||||||
using CryptoExchange.Net.Objects;
|
using CryptoExchange.Net.Objects;
|
||||||
using CryptoExchange.Net.Sockets;
|
using CryptoExchange.Net.Sockets;
|
||||||
using Moq;
|
using Moq;
|
||||||
|
using Newtonsoft.Json.Linq;
|
||||||
|
|
||||||
namespace CryptoExchange.Net.UnitTests.TestImplementations
|
namespace CryptoExchange.Net.UnitTests.TestImplementations
|
||||||
{
|
{
|
||||||
@ -27,14 +29,39 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations
|
|||||||
return (TestSocket)CreateSocket(BaseAddress);
|
return (TestSocket)CreateSocket(BaseAddress);
|
||||||
}
|
}
|
||||||
|
|
||||||
public CallResult<bool> ConnectSocketSub(SocketSubscription sub)
|
public CallResult<bool> ConnectSocketSub(SocketConnection sub)
|
||||||
{
|
{
|
||||||
return ConnectSocket(sub).Result;
|
return ConnectSocket(sub).Result;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected override bool SocketReconnect(SocketSubscription subscription, TimeSpan disconnectedTime)
|
protected override bool HandleQueryResponse<T>(SocketConnection s, object request, JToken data, out CallResult<T> callResult)
|
||||||
{
|
{
|
||||||
return OnReconnect.Invoke();
|
throw new NotImplementedException();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected override bool HandleSubscriptionResponse(SocketConnection s, SocketSubscription subscription, object request, JToken message, out CallResult<object> callResult)
|
||||||
|
{
|
||||||
|
throw new NotImplementedException();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected override bool MessageMatchesHandler(JToken message, object request)
|
||||||
|
{
|
||||||
|
throw new NotImplementedException();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected override bool MessageMatchesHandler(JToken message, string identifier)
|
||||||
|
{
|
||||||
|
throw new NotImplementedException();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected override Task<CallResult<bool>> AuthenticateSocket(SocketConnection s)
|
||||||
|
{
|
||||||
|
throw new NotImplementedException();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected override Task<bool> Unsubscribe(SocketConnection connection, SocketSubscription s)
|
||||||
|
{
|
||||||
|
throw new NotImplementedException();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -17,6 +17,9 @@ namespace CryptoExchange.Net.Converters
|
|||||||
|
|
||||||
public override object ReadJson(JsonReader reader, Type objectType, object existingValue, JsonSerializer serializer)
|
public override object ReadJson(JsonReader reader, Type objectType, object existingValue, JsonSerializer serializer)
|
||||||
{
|
{
|
||||||
|
if (objectType == typeof(JToken))
|
||||||
|
return JToken.Load(reader);
|
||||||
|
|
||||||
var result = Activator.CreateInstance(objectType);
|
var result = Activator.CreateInstance(objectType);
|
||||||
var arr = JArray.Load(reader);
|
var arr = JArray.Load(reader);
|
||||||
return ParseObject(arr, result, objectType);
|
return ParseObject(arr, result, objectType);
|
||||||
|
@ -14,10 +14,9 @@ namespace CryptoExchange.Net.Interfaces
|
|||||||
|
|
||||||
int Id { get; }
|
int Id { get; }
|
||||||
string Origin { get; set; }
|
string Origin { get; set; }
|
||||||
bool ShouldReconnect { get; set; }
|
|
||||||
bool Reconnecting { get; set; }
|
bool Reconnecting { get; set; }
|
||||||
Func<byte[], string> DataInterpreter { get; set; }
|
Func<byte[], string> DataInterpreterBytes { get; set; }
|
||||||
DateTime? DisconnectTime { get; set; }
|
Func<string, string> DataInterpreterString { get; set; }
|
||||||
string Url { get; }
|
string Url { get; }
|
||||||
WebSocketState SocketState { get; }
|
WebSocketState SocketState { get; }
|
||||||
bool IsClosed { get; }
|
bool IsClosed { get; }
|
||||||
|
@ -88,6 +88,15 @@ namespace CryptoExchange.Net.Objects
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
public TimeSpan ReconnectInterval { get; set; } = TimeSpan.FromSeconds(5);
|
public TimeSpan ReconnectInterval { get; set; } = TimeSpan.FromSeconds(5);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// The time to wait for a socket response
|
||||||
|
/// </summary>
|
||||||
|
public TimeSpan SocketResponseTimeout { get; set; } = TimeSpan.FromSeconds(10);
|
||||||
|
/// <summary>
|
||||||
|
/// The time after which the connection is assumed to be dropped
|
||||||
|
/// </summary>
|
||||||
|
public TimeSpan SocketNoDataTimeout { get; set; }
|
||||||
|
|
||||||
public T Copy<T>() where T : SocketClientOptions, new()
|
public T Copy<T>() where T : SocketClientOptions, new()
|
||||||
{
|
{
|
||||||
var copy = new T
|
var copy = new T
|
||||||
@ -97,7 +106,8 @@ namespace CryptoExchange.Net.Objects
|
|||||||
Proxy = Proxy,
|
Proxy = Proxy,
|
||||||
LogWriters = LogWriters,
|
LogWriters = LogWriters,
|
||||||
AutoReconnect = AutoReconnect,
|
AutoReconnect = AutoReconnect,
|
||||||
ReconnectInterval = ReconnectInterval
|
ReconnectInterval = ReconnectInterval,
|
||||||
|
SocketResponseTimeout = SocketResponseTimeout
|
||||||
};
|
};
|
||||||
|
|
||||||
if (ApiCredentials != null)
|
if (ApiCredentials != null)
|
||||||
|
@ -1,6 +1,5 @@
|
|||||||
using System;
|
using System;
|
||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
using System.Diagnostics;
|
|
||||||
using System.Linq;
|
using System.Linq;
|
||||||
using System.Threading;
|
using System.Threading;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
@ -9,7 +8,6 @@ using CryptoExchange.Net.Interfaces;
|
|||||||
using CryptoExchange.Net.Logging;
|
using CryptoExchange.Net.Logging;
|
||||||
using CryptoExchange.Net.Objects;
|
using CryptoExchange.Net.Objects;
|
||||||
using CryptoExchange.Net.Sockets;
|
using CryptoExchange.Net.Sockets;
|
||||||
using Newtonsoft.Json;
|
|
||||||
using Newtonsoft.Json.Linq;
|
using Newtonsoft.Json.Linq;
|
||||||
|
|
||||||
namespace CryptoExchange.Net
|
namespace CryptoExchange.Net
|
||||||
@ -22,20 +20,22 @@ namespace CryptoExchange.Net
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
public IWebsocketFactory SocketFactory { get; set; } = new WebsocketFactory();
|
public IWebsocketFactory SocketFactory { get; set; } = new WebsocketFactory();
|
||||||
|
|
||||||
protected List<SocketSubscription> sockets = new List<SocketSubscription>();
|
protected internal List<SocketConnection> sockets = new List<SocketConnection>();
|
||||||
|
protected internal readonly object socketLock = new object();
|
||||||
|
|
||||||
public TimeSpan ReconnectInterval { get; private set; }
|
public TimeSpan ReconnectInterval { get; private set; }
|
||||||
public bool AutoReconnect { get; private set; }
|
public bool AutoReconnect { get; private set; }
|
||||||
protected Func<byte[], string> dataInterpreter;
|
public TimeSpan ResponseTimeout { get; private set; }
|
||||||
|
public TimeSpan SocketTimeout { get; private set; }
|
||||||
|
public int MaxSocketConnections { get; protected set; } = 999;
|
||||||
|
public int SocketCombineTarget { get; protected set; } = 1;
|
||||||
|
|
||||||
protected const string DataHandlerName = "DataHandler";
|
protected Func<byte[], string> dataInterpreterBytes;
|
||||||
protected const string AuthenticationHandlerName = "AuthenticationHandler";
|
protected Func<string, string> dataInterpreterString;
|
||||||
protected const string SubscriptionHandlerName = "SubscriptionHandler";
|
protected Dictionary<string, Action<SocketConnection, JToken>> genericHandlers = new Dictionary<string, Action<SocketConnection, JToken>>();
|
||||||
protected const string PingHandlerName = "PingHandler";
|
protected Task periodicTask;
|
||||||
|
protected AutoResetEvent periodicEvent;
|
||||||
protected const string DataEvent = "Data";
|
protected bool disposing;
|
||||||
protected const string SubscriptionEvent = "Subscription";
|
|
||||||
protected const string AuthenticationEvent = "Authentication";
|
|
||||||
#endregion
|
#endregion
|
||||||
|
|
||||||
protected SocketClient(SocketClientOptions exchangeOptions, AuthenticationProvider authenticationProvider): base(exchangeOptions, authenticationProvider)
|
protected SocketClient(SocketClientOptions exchangeOptions, AuthenticationProvider authenticationProvider): base(exchangeOptions, authenticationProvider)
|
||||||
@ -51,17 +51,230 @@ namespace CryptoExchange.Net
|
|||||||
{
|
{
|
||||||
AutoReconnect = exchangeOptions.AutoReconnect;
|
AutoReconnect = exchangeOptions.AutoReconnect;
|
||||||
ReconnectInterval = exchangeOptions.ReconnectInterval;
|
ReconnectInterval = exchangeOptions.ReconnectInterval;
|
||||||
|
ResponseTimeout = exchangeOptions.SocketResponseTimeout;
|
||||||
|
SocketTimeout = exchangeOptions.SocketNoDataTimeout;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Set a function to interpret the data, used when the data is received as bytes instead of a string
|
/// Set a function to interpret the data, used when the data is received as bytes instead of a string
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="handler"></param>
|
/// <param name="handler"></param>
|
||||||
protected void SetDataInterpreter(Func<byte[], string> handler)
|
protected void SetDataInterpreter(Func<byte[], string> byteHandler, Func<string, string> stringHandler)
|
||||||
{
|
{
|
||||||
dataInterpreter = handler;
|
dataInterpreterBytes = byteHandler;
|
||||||
|
dataInterpreterString = stringHandler;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected virtual async Task<CallResult<UpdateSubscription>> Subscribe<T>(object request, string identifier, bool authenticated, Action<T> dataHandler)
|
||||||
|
{
|
||||||
|
return await Subscribe(BaseAddress, request, identifier, authenticated, dataHandler).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected virtual async Task<CallResult<UpdateSubscription>> Subscribe<T>(string url, object request, string identifier, bool authenticated, Action<T> dataHandler)
|
||||||
|
{
|
||||||
|
SocketConnection socket;
|
||||||
|
SocketSubscription handler;
|
||||||
|
if (SocketCombineTarget == 1)
|
||||||
|
{;
|
||||||
|
lock (socketLock)
|
||||||
|
{
|
||||||
|
socket = GetWebsocket(url, authenticated);
|
||||||
|
handler = AddHandler(request, identifier, true, socket, dataHandler);
|
||||||
|
}
|
||||||
|
|
||||||
|
var connectResult = ConnectIfNeeded(socket, authenticated).GetAwaiter().GetResult();
|
||||||
|
if (!connectResult.Success)
|
||||||
|
return new CallResult<UpdateSubscription>(null, connectResult.Error);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
lock (socketLock)
|
||||||
|
{
|
||||||
|
socket = GetWebsocket(url, authenticated);
|
||||||
|
handler = AddHandler(request, identifier, true, socket, dataHandler);
|
||||||
|
|
||||||
|
var connectResult = ConnectIfNeeded(socket, authenticated).GetAwaiter().GetResult();
|
||||||
|
if (!connectResult.Success)
|
||||||
|
return new CallResult<UpdateSubscription>(null, connectResult.Error);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
if (request != null)
|
||||||
|
{
|
||||||
|
var subResult = await SubscribeAndWait(socket, request, handler).ConfigureAwait(false);
|
||||||
|
if (!subResult.Success)
|
||||||
|
{
|
||||||
|
await socket.Close(handler).ConfigureAwait(false);
|
||||||
|
return new CallResult<UpdateSubscription>(null, subResult.Error);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
else
|
||||||
|
handler.Confirmed = true;
|
||||||
|
|
||||||
|
socket.ShouldReconnect = true;
|
||||||
|
return new CallResult<UpdateSubscription>(new UpdateSubscription(socket, handler), null);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected internal virtual async Task<CallResult<bool>> SubscribeAndWait(SocketConnection socket, object request, SocketSubscription subscription)
|
||||||
|
{
|
||||||
|
CallResult<object> callResult = null;
|
||||||
|
await socket.SendAndWait(request, ResponseTimeout, (data) =>
|
||||||
|
{
|
||||||
|
if (!HandleSubscriptionResponse(socket, subscription, request, data, out callResult))
|
||||||
|
return false;
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}).ConfigureAwait(false);
|
||||||
|
|
||||||
|
if (callResult?.Success == true)
|
||||||
|
subscription.Confirmed = true;
|
||||||
|
|
||||||
|
return new CallResult<bool>(callResult?.Success ?? false, callResult == null ? new ServerError("No response on subscription request received"): callResult.Error);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected virtual async Task<CallResult<T>> Query<T>(object request, bool authenticated)
|
||||||
|
{
|
||||||
|
var socket = GetWebsocket(BaseAddress, authenticated);
|
||||||
|
var connectResult = await ConnectIfNeeded(socket, authenticated).ConfigureAwait(false);
|
||||||
|
if (!connectResult.Success)
|
||||||
|
return new CallResult<T>(default(T), connectResult.Error);
|
||||||
|
|
||||||
|
if (socket.PausedActivity)
|
||||||
|
{
|
||||||
|
log.Write(LogVerbosity.Info, "Socket has been paused, can't send query at this moment");
|
||||||
|
return new CallResult<T>(default(T), new ServerError("Socket is paused"));
|
||||||
|
}
|
||||||
|
|
||||||
|
return await QueryAndWait<T>(socket, request).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected virtual async Task<CallResult<T>> QueryAndWait<T>(SocketConnection socket, object request)
|
||||||
|
{
|
||||||
|
CallResult<T> dataResult = new CallResult<T>(default(T), new ServerError("No response on query received"));
|
||||||
|
await socket.SendAndWait(request, ResponseTimeout, (data) =>
|
||||||
|
{
|
||||||
|
if (!HandleQueryResponse<T>(socket, request, data, out var callResult))
|
||||||
|
return false;
|
||||||
|
|
||||||
|
dataResult = callResult;
|
||||||
|
return true;
|
||||||
|
}).ConfigureAwait(false);
|
||||||
|
|
||||||
|
return dataResult;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected virtual async Task<CallResult<bool>> ConnectIfNeeded(SocketConnection socket, bool authenticated)
|
||||||
|
{
|
||||||
|
if (!socket.Connected)
|
||||||
|
{
|
||||||
|
var connectResult = await ConnectSocket(socket).ConfigureAwait(false);
|
||||||
|
if (!connectResult.Success)
|
||||||
|
{
|
||||||
|
return new CallResult<bool>(false, new CantConnectError());
|
||||||
|
}
|
||||||
|
|
||||||
|
if (authenticated && !socket.Authenticated)
|
||||||
|
{
|
||||||
|
var result = await AuthenticateSocket(socket).ConfigureAwait(false);
|
||||||
|
if (!result.Success)
|
||||||
|
{
|
||||||
|
log.Write(LogVerbosity.Warning, "Socket authentication failed");
|
||||||
|
result.Error.Message = "Authentication failed: " + result.Error.Message;
|
||||||
|
return new CallResult<bool>(false, result.Error);
|
||||||
|
}
|
||||||
|
|
||||||
|
socket.Authenticated = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return new CallResult<bool>(true, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected virtual void AddGenericHandler(string identifier, Action<SocketConnection, JToken> action)
|
||||||
|
{
|
||||||
|
genericHandlers.Add(identifier, action);
|
||||||
|
List<SocketConnection> socketList;
|
||||||
|
lock (socketLock)
|
||||||
|
socketList = sockets.ToList();
|
||||||
|
foreach (var wrapper in socketList)
|
||||||
|
wrapper.AddHandler(identifier, false, action);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected internal abstract bool HandleQueryResponse<T>(SocketConnection s, object request, JToken data, out CallResult<T> callResult);
|
||||||
|
protected internal abstract bool HandleSubscriptionResponse(SocketConnection s, SocketSubscription subscription, object request, JToken message, out CallResult<object> callResult);
|
||||||
|
protected internal abstract bool MessageMatchesHandler(JToken message, object request);
|
||||||
|
protected internal abstract bool MessageMatchesHandler(JToken message, string identifier);
|
||||||
|
protected internal abstract Task<CallResult<bool>> AuthenticateSocket(SocketConnection s);
|
||||||
|
protected internal abstract Task<bool> Unsubscribe(SocketConnection connection, SocketSubscription s);
|
||||||
|
protected internal virtual JToken ProcessTokenData(JToken message)
|
||||||
|
{
|
||||||
|
return message;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected virtual SocketSubscription AddHandler<T>(object request, string identifier, bool userSubscription, SocketConnection connection, Action<T> dataHandler)
|
||||||
|
{
|
||||||
|
Action<SocketConnection, JToken> internalHandler = (socketWrapper, data) =>
|
||||||
|
{
|
||||||
|
if (typeof(T) == typeof(string))
|
||||||
|
{
|
||||||
|
dataHandler((T)Convert.ChangeType(data.ToString(), typeof(T)));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
var desResult = Deserialize<T>(data, false);
|
||||||
|
if (!desResult.Success)
|
||||||
|
{
|
||||||
|
log.Write(LogVerbosity.Warning, $"Failed to deserialize data into type {typeof(T)}: {desResult.Error}");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
dataHandler(desResult.Data);
|
||||||
|
};
|
||||||
|
|
||||||
|
if (request != null)
|
||||||
|
return connection.AddHandler(request, userSubscription, internalHandler);
|
||||||
|
return connection.AddHandler(identifier, userSubscription, internalHandler);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected virtual SocketConnection GetWebsocket(string address, bool authenticated)
|
||||||
|
{
|
||||||
|
SocketConnection result = sockets.Where(s => s.Socket.Url == address && (s.Authenticated == authenticated || !authenticated) && s.Connected).OrderBy(s => s.HandlerCount).FirstOrDefault();
|
||||||
|
if (result != null)
|
||||||
|
{
|
||||||
|
if (result.HandlerCount < SocketCombineTarget || (sockets.Count >= MaxSocketConnections && sockets.All(s => s.HandlerCount >= SocketCombineTarget)))
|
||||||
|
{
|
||||||
|
// Use existing socket if it has less than target connections OR it has the least connections and we can't make new
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create new socket
|
||||||
|
var socket = CreateSocket(address);
|
||||||
|
var socketWrapper = new SocketConnection(this, log, socket);
|
||||||
|
foreach (var kvp in genericHandlers)
|
||||||
|
socketWrapper.AddHandler(kvp.Key, false, kvp.Value);
|
||||||
|
return socketWrapper;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Connect a socket
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="socketConnection">The subscription to connect</param>
|
||||||
|
/// <returns></returns>
|
||||||
|
protected virtual async Task<CallResult<bool>> ConnectSocket(SocketConnection socketConnection)
|
||||||
|
{
|
||||||
|
if (await socketConnection.Socket.Connect().ConfigureAwait(false))
|
||||||
|
{
|
||||||
|
sockets.Add(socketConnection);
|
||||||
|
return new CallResult<bool>(true, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
socketConnection.Socket.Dispose();
|
||||||
|
return new CallResult<bool>(false, new CantConnectError());
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Create a socket for an address
|
/// Create a socket for an address
|
||||||
/// </summary>
|
/// </summary>
|
||||||
@ -75,187 +288,57 @@ namespace CryptoExchange.Net
|
|||||||
if (apiProxy != null)
|
if (apiProxy != null)
|
||||||
socket.SetProxy(apiProxy.Host, apiProxy.Port);
|
socket.SetProxy(apiProxy.Host, apiProxy.Port);
|
||||||
|
|
||||||
socket.DataInterpreter = dataInterpreter;
|
socket.Timeout = SocketTimeout;
|
||||||
socket.OnClose += () =>
|
socket.DataInterpreterBytes = dataInterpreterBytes;
|
||||||
{
|
socket.DataInterpreterString = dataInterpreterString;
|
||||||
lock (sockets)
|
|
||||||
{
|
|
||||||
foreach (var sub in sockets)
|
|
||||||
sub.ResetEvents();
|
|
||||||
}
|
|
||||||
|
|
||||||
SocketOnClose(socket);
|
|
||||||
};
|
|
||||||
socket.OnError += e =>
|
socket.OnError += e =>
|
||||||
{
|
{
|
||||||
log.Write(LogVerbosity.Info, $"Socket {socket.Id} error: " + e.ToString());
|
log.Write(LogVerbosity.Info, $"Socket {socket.Id} error: " + e.ToString());
|
||||||
SocketError(socket, e);
|
|
||||||
};
|
};
|
||||||
socket.OnOpen += () => SocketOpened(socket);
|
|
||||||
socket.OnClose += () => SocketClosed(socket);
|
|
||||||
return socket;
|
return socket;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected virtual SocketSubscription GetBackgroundSocket(bool authenticated = false)
|
public virtual void SendPeriodic(TimeSpan interval, Func<SocketConnection, object> objGetter)
|
||||||
{
|
{
|
||||||
lock (sockets)
|
periodicEvent = new AutoResetEvent(false);
|
||||||
return sockets.SingleOrDefault(s => s.Type == (authenticated ? SocketType.BackgroundAuthenticated : SocketType.Background));
|
periodicTask = Task.Run(() =>
|
||||||
}
|
|
||||||
|
|
||||||
protected virtual void SocketOpened(IWebsocket socket) { }
|
|
||||||
protected virtual void SocketClosed(IWebsocket socket) { }
|
|
||||||
protected virtual void SocketError(IWebsocket socket, Exception ex) { }
|
|
||||||
/// <summary>
|
|
||||||
/// Handler for when a socket reconnects. Should return true if reconnection handling was successful or false if not ( will try to reconnect again ). The handler should
|
|
||||||
/// handle functionality like resubscribing and re-authenticating the socket.
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="subscription">The socket subscription that was reconnected</param>
|
|
||||||
/// <param name="disconnectedTime">The time the socket was disconnected</param>
|
|
||||||
/// <returns></returns>
|
|
||||||
protected abstract bool SocketReconnect(SocketSubscription subscription, TimeSpan disconnectedTime);
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// Connect a socket
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="socketSubscription">The subscription to connect</param>
|
|
||||||
/// <returns></returns>
|
|
||||||
protected virtual async Task<CallResult<bool>> ConnectSocket(SocketSubscription socketSubscription)
|
|
||||||
{
|
|
||||||
socketSubscription.Socket.OnMessage += data => ProcessMessage(socketSubscription, data);
|
|
||||||
|
|
||||||
if (await socketSubscription.Socket.Connect().ConfigureAwait(false))
|
|
||||||
{
|
{
|
||||||
lock (sockets)
|
while (!disposing)
|
||||||
sockets.Add(socketSubscription);
|
|
||||||
return new CallResult<bool>(true, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
socketSubscription.Socket.Dispose();
|
|
||||||
return new CallResult<bool>(false, new CantConnectError());
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// The message handler. Normally distributes the received data to all data handlers
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="subscription">The subscription that received the data</param>
|
|
||||||
/// <param name="data">The data received</param>
|
|
||||||
protected virtual void ProcessMessage(SocketSubscription subscription, string data)
|
|
||||||
{
|
|
||||||
log.Write(LogVerbosity.Debug, $"Socket {subscription.Socket.Id} received data: " + data);
|
|
||||||
string currentHandlerName = null;
|
|
||||||
try
|
|
||||||
{
|
|
||||||
var sw = Stopwatch.StartNew();
|
|
||||||
foreach (var handler in subscription.MessageHandlers)
|
|
||||||
{
|
{
|
||||||
currentHandlerName = handler.Key;
|
periodicEvent.WaitOne(interval);
|
||||||
if (handler.Value(subscription, JToken.Parse(data)))
|
if (disposing)
|
||||||
break;
|
break;
|
||||||
}
|
|
||||||
sw.Stop();
|
|
||||||
if (sw.ElapsedMilliseconds > 500)
|
|
||||||
log.Write(LogVerbosity.Warning, $"Socket {subscription.Socket.Id} message processing slow ({sw.ElapsedMilliseconds}ms), consider offloading data handling to another thread. " +
|
|
||||||
"Data from this socket may arrive late or not at all if message processing is continuously slow.");
|
|
||||||
}
|
|
||||||
catch(Exception ex)
|
|
||||||
{
|
|
||||||
log.Write(LogVerbosity.Error, $"Socket {subscription.Socket.Id} Exception during message processing\r\nProcessor: {currentHandlerName}\r\nException: {ex}\r\nData: {data}");
|
|
||||||
subscription.InvokeExceptionHandler(ex);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
List<SocketConnection> socketList;
|
||||||
/// Handler for a socket closing. Reconnects the socket if needed, or removes it from the active socket list if not
|
lock (socketLock)
|
||||||
/// </summary>
|
socketList = sockets.ToList();
|
||||||
/// <param name="socket">The socket that was closed</param>
|
|
||||||
protected virtual void SocketOnClose(IWebsocket socket)
|
|
||||||
{
|
|
||||||
if (AutoReconnect && socket.ShouldReconnect)
|
|
||||||
{
|
|
||||||
if (socket.Reconnecting)
|
|
||||||
return; // Already reconnecting
|
|
||||||
|
|
||||||
socket.Reconnecting = true;
|
if (socketList.Any())
|
||||||
|
log.Write(LogVerbosity.Debug, "Sending periodic");
|
||||||
|
|
||||||
log.Write(LogVerbosity.Info, $"Socket {socket.Id} Connection lost, will try to reconnect after {ReconnectInterval}");
|
foreach (var socket in socketList)
|
||||||
Task.Run(() =>
|
|
||||||
{
|
|
||||||
while (socket.ShouldReconnect)
|
|
||||||
{
|
{
|
||||||
Thread.Sleep(ReconnectInterval);
|
if (disposing)
|
||||||
if (!socket.ShouldReconnect)
|
|
||||||
{
|
|
||||||
// Should reconnect changed to false while waiting to reconnect
|
|
||||||
socket.Reconnecting = false;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
socket.Reset();
|
|
||||||
if (!socket.Connect().Result)
|
|
||||||
{
|
|
||||||
log.Write(LogVerbosity.Debug, $"Socket {socket.Id} failed to reconnect");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
var time = socket.DisconnectTime;
|
|
||||||
socket.DisconnectTime = null;
|
|
||||||
|
|
||||||
log.Write(LogVerbosity.Info, $"Socket {socket.Id} reconnected after {DateTime.UtcNow - time}");
|
|
||||||
|
|
||||||
SocketSubscription subscription;
|
|
||||||
lock (sockets)
|
|
||||||
subscription = sockets.Single(s => s.Socket == socket);
|
|
||||||
|
|
||||||
if (!SocketReconnect(subscription, DateTime.UtcNow - time.Value))
|
|
||||||
{
|
|
||||||
log.Write(LogVerbosity.Info, $"Socket {socket.Id} failed to resubscribe");
|
|
||||||
socket.Close().Wait();
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
log.Write(LogVerbosity.Info, $"Socket {socket.Id} successfully resubscribed");
|
|
||||||
break;
|
break;
|
||||||
|
|
||||||
|
var obj = objGetter(socket);
|
||||||
|
if (obj != null)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
socket.Send(obj);
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
log.Write(LogVerbosity.Warning, "Periodic send failed: " + ex);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
socket.Reconnecting = false;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
log.Write(LogVerbosity.Info, $"Socket {socket.Id} closed");
|
|
||||||
socket.Dispose();
|
|
||||||
lock (sockets)
|
|
||||||
{
|
|
||||||
var subscription = sockets.SingleOrDefault(s => s.Socket.Id == socket.Id);
|
|
||||||
if(subscription != null)
|
|
||||||
sockets.Remove(subscription);
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
});
|
||||||
/// Send data to the websocket
|
|
||||||
/// </summary>
|
|
||||||
/// <typeparam name="T">The type of the object to send</typeparam>
|
|
||||||
/// <param name="socket">The socket to send to</param>
|
|
||||||
/// <param name="obj">The object to send</param>
|
|
||||||
/// <param name="nullValueHandling">How null values should be serialized</param>
|
|
||||||
protected virtual void Send<T>(IWebsocket socket, T obj, NullValueHandling nullValueHandling = NullValueHandling.Ignore)
|
|
||||||
{
|
|
||||||
Send(socket, JsonConvert.SerializeObject(obj, Formatting.None, new JsonSerializerSettings { NullValueHandling = nullValueHandling }));
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// Send string data to the websocket
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="socket">The socket to send to</param>
|
|
||||||
/// <param name="data">The data to send</param>
|
|
||||||
protected virtual void Send(IWebsocket socket, string data)
|
|
||||||
{
|
|
||||||
log.Write(LogVerbosity.Debug, $"Socket {socket.Id} sending data: {data}");
|
|
||||||
socket.Send(data);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Unsubscribe from a stream
|
/// Unsubscribe from a stream
|
||||||
@ -264,7 +347,10 @@ namespace CryptoExchange.Net
|
|||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public virtual async Task Unsubscribe(UpdateSubscription subscription)
|
public virtual async Task Unsubscribe(UpdateSubscription subscription)
|
||||||
{
|
{
|
||||||
log.Write(LogVerbosity.Info, $"Closing subscription {subscription.Id}");
|
if (subscription == null)
|
||||||
|
return;
|
||||||
|
|
||||||
|
log.Write(LogVerbosity.Info, "Closing subscription");
|
||||||
await subscription.Close().ConfigureAwait(false);
|
await subscription.Close().ConfigureAwait(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -274,15 +360,15 @@ namespace CryptoExchange.Net
|
|||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public virtual async Task UnsubscribeAll()
|
public virtual async Task UnsubscribeAll()
|
||||||
{
|
{
|
||||||
lock (sockets)
|
lock (socketLock)
|
||||||
log.Write(LogVerbosity.Debug, $"Closing all {sockets.Count} subscriptions");
|
log.Write(LogVerbosity.Debug, $"Closing all {sockets.Count} subscriptions");
|
||||||
|
|
||||||
await Task.Run(() =>
|
await Task.Run(() =>
|
||||||
{
|
{
|
||||||
var tasks = new List<Task>();
|
var tasks = new List<Task>();
|
||||||
lock (sockets)
|
lock (socketLock)
|
||||||
{
|
{
|
||||||
foreach (var sub in new List<SocketSubscription>(sockets))
|
foreach (var sub in new List<SocketConnection>(sockets))
|
||||||
tasks.Add(sub.Close());
|
tasks.Add(sub.Close());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -292,6 +378,8 @@ namespace CryptoExchange.Net
|
|||||||
|
|
||||||
public override void Dispose()
|
public override void Dispose()
|
||||||
{
|
{
|
||||||
|
disposing = true;
|
||||||
|
periodicEvent?.Set();
|
||||||
log.Write(LogVerbosity.Debug, "Disposing socket client, closing all subscriptions");
|
log.Write(LogVerbosity.Debug, "Disposing socket client, closing all subscriptions");
|
||||||
UnsubscribeAll().Wait();
|
UnsubscribeAll().Wait();
|
||||||
|
|
||||||
|
@ -32,9 +32,6 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
protected HttpConnectProxy proxy;
|
protected HttpConnectProxy proxy;
|
||||||
|
|
||||||
public int Id { get; }
|
public int Id { get; }
|
||||||
public DateTime? DisconnectTime { get; set; }
|
|
||||||
|
|
||||||
public bool ShouldReconnect { get; set; }
|
|
||||||
public bool Reconnecting { get; set; }
|
public bool Reconnecting { get; set; }
|
||||||
public string Origin { get; set; }
|
public string Origin { get; set; }
|
||||||
|
|
||||||
@ -42,7 +39,8 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
public bool IsClosed => socket.State == WebSocketState.Closed;
|
public bool IsClosed => socket.State == WebSocketState.Closed;
|
||||||
public bool IsOpen => socket.State == WebSocketState.Open;
|
public bool IsOpen => socket.State == WebSocketState.Open;
|
||||||
public SslProtocols SSLProtocols { get; set; } = SslProtocols.Tls12 | SslProtocols.Tls11 | SslProtocols.Tls;
|
public SslProtocols SSLProtocols { get; set; } = SslProtocols.Tls12 | SslProtocols.Tls11 | SslProtocols.Tls;
|
||||||
public Func<byte[], string> DataInterpreter { get; set; }
|
public Func<byte[], string> DataInterpreterBytes { get; set; }
|
||||||
|
public Func<string, string> DataInterpreterString { get; set; }
|
||||||
|
|
||||||
public DateTime LastActionTime { get; private set; }
|
public DateTime LastActionTime { get; private set; }
|
||||||
public TimeSpan Timeout { get; set; }
|
public TimeSpan Timeout { get; set; }
|
||||||
@ -77,7 +75,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
|
|
||||||
private void HandleByteData(byte[] data)
|
private void HandleByteData(byte[] data)
|
||||||
{
|
{
|
||||||
var message = DataInterpreter(data);
|
var message = DataInterpreterBytes(data);
|
||||||
Handle(messageHandlers, message);
|
Handle(messageHandlers, message);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -203,7 +201,13 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
socket.Opened += (o, s) => Handle(openHandlers);
|
socket.Opened += (o, s) => Handle(openHandlers);
|
||||||
socket.Closed += (o, s) => Handle(closeHandlers);
|
socket.Closed += (o, s) => Handle(closeHandlers);
|
||||||
socket.Error += (o, s) => Handle(errorHandlers, s.Exception);
|
socket.Error += (o, s) => Handle(errorHandlers, s.Exception);
|
||||||
socket.MessageReceived += (o, s) => Handle(messageHandlers, s.Message);
|
socket.MessageReceived += (o, s) =>
|
||||||
|
{
|
||||||
|
string data = s.Message;
|
||||||
|
if (DataInterpreterString != null)
|
||||||
|
data = DataInterpreterString(data);
|
||||||
|
Handle(messageHandlers, data);
|
||||||
|
};
|
||||||
socket.DataReceived += (o, s) => HandleByteData(s.Data);
|
socket.DataReceived += (o, s) => HandleByteData(s.Data);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
349
CryptoExchange.Net/Sockets/SocketConnection.cs
Normal file
349
CryptoExchange.Net/Sockets/SocketConnection.cs
Normal file
@ -0,0 +1,349 @@
|
|||||||
|
using CryptoExchange.Net.Interfaces;
|
||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Diagnostics;
|
||||||
|
using System.Linq;
|
||||||
|
using System.Threading;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
using CryptoExchange.Net.Logging;
|
||||||
|
using Newtonsoft.Json;
|
||||||
|
using Newtonsoft.Json.Linq;
|
||||||
|
|
||||||
|
namespace CryptoExchange.Net.Sockets
|
||||||
|
{
|
||||||
|
public class SocketConnection
|
||||||
|
{
|
||||||
|
public event Action ConnectionLost;
|
||||||
|
public event Action<TimeSpan> ConnectionRestored;
|
||||||
|
public event Action Closed;
|
||||||
|
|
||||||
|
public int HandlerCount
|
||||||
|
{
|
||||||
|
get { lock (handlersLock)
|
||||||
|
return handlers.Count(h => h.UserSubscription); }
|
||||||
|
}
|
||||||
|
|
||||||
|
public bool Authenticated { get; set; }
|
||||||
|
public bool Connected { get; private set; }
|
||||||
|
|
||||||
|
|
||||||
|
public IWebsocket Socket { get; set; }
|
||||||
|
public bool ShouldReconnect { get; set; }
|
||||||
|
public DateTime? DisconnectTime { get; set; }
|
||||||
|
public bool PausedActivity { get; set; }
|
||||||
|
|
||||||
|
private readonly List<SocketSubscription> handlers;
|
||||||
|
private readonly object handlersLock = new object();
|
||||||
|
|
||||||
|
private bool lostTriggered;
|
||||||
|
private readonly Log log;
|
||||||
|
private readonly SocketClient socketClient;
|
||||||
|
|
||||||
|
private readonly List<PendingRequest> pendingRequests;
|
||||||
|
|
||||||
|
public SocketConnection(SocketClient client, Log log, IWebsocket socket)
|
||||||
|
{
|
||||||
|
this.log = log;
|
||||||
|
socketClient = client;
|
||||||
|
|
||||||
|
pendingRequests = new List<PendingRequest>();
|
||||||
|
|
||||||
|
handlers = new List<SocketSubscription>();
|
||||||
|
Socket = socket;
|
||||||
|
|
||||||
|
Socket.Timeout = client.SocketTimeout;
|
||||||
|
Socket.OnMessage += ProcessMessage;
|
||||||
|
Socket.OnClose += () =>
|
||||||
|
{
|
||||||
|
if (lostTriggered)
|
||||||
|
return;
|
||||||
|
|
||||||
|
DisconnectTime = DateTime.UtcNow;
|
||||||
|
lostTriggered = true;
|
||||||
|
|
||||||
|
if (ShouldReconnect)
|
||||||
|
ConnectionLost?.Invoke();
|
||||||
|
};
|
||||||
|
Socket.OnClose += SocketOnClose;
|
||||||
|
Socket.OnOpen += () =>
|
||||||
|
{
|
||||||
|
PausedActivity = false;
|
||||||
|
Connected = true;
|
||||||
|
if (lostTriggered)
|
||||||
|
{
|
||||||
|
lostTriggered = false;
|
||||||
|
ConnectionRestored?.Invoke(DisconnectTime.HasValue ? DateTime.UtcNow - DisconnectTime.Value: TimeSpan.FromSeconds(0));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
public SocketSubscription AddHandler(object request, bool userSubscription, Action<SocketConnection, JToken> dataHandler)
|
||||||
|
{
|
||||||
|
var handler = new SocketSubscription(null, request, userSubscription, dataHandler);
|
||||||
|
lock (handlersLock)
|
||||||
|
handlers.Add(handler);
|
||||||
|
return handler;
|
||||||
|
}
|
||||||
|
|
||||||
|
public SocketSubscription AddHandler(string identifier, bool userSubscription, Action<SocketConnection, JToken> dataHandler)
|
||||||
|
{
|
||||||
|
var handler = new SocketSubscription(identifier, null, userSubscription, dataHandler);
|
||||||
|
lock (handlersLock)
|
||||||
|
handlers.Add(handler);
|
||||||
|
return handler;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void ProcessMessage(string data)
|
||||||
|
{
|
||||||
|
log.Write(LogVerbosity.Debug, $"Socket {Socket.Id} received data: " + data);
|
||||||
|
|
||||||
|
var tokenData = JToken.Parse(data);
|
||||||
|
|
||||||
|
foreach (var pendingRequest in pendingRequests.ToList())
|
||||||
|
{
|
||||||
|
if (pendingRequest.Check(tokenData))
|
||||||
|
{
|
||||||
|
pendingRequests.Remove(pendingRequest);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!HandleData(tokenData))
|
||||||
|
{
|
||||||
|
log.Write(LogVerbosity.Debug, "Message not handled: " + tokenData);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private bool HandleData(JToken tokenData)
|
||||||
|
{
|
||||||
|
SocketSubscription currentSubscription = null;
|
||||||
|
try
|
||||||
|
{
|
||||||
|
bool handled = false;
|
||||||
|
var sw = Stopwatch.StartNew();
|
||||||
|
lock (handlersLock)
|
||||||
|
{
|
||||||
|
foreach (var handler in handlers)
|
||||||
|
{
|
||||||
|
currentSubscription = handler;
|
||||||
|
if (handler.Request == null)
|
||||||
|
{
|
||||||
|
if (socketClient.MessageMatchesHandler(tokenData, handler.Identifier))
|
||||||
|
{
|
||||||
|
handled = true;
|
||||||
|
handler.MessageHandler(this, tokenData);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
if (socketClient.MessageMatchesHandler(tokenData, handler.Request))
|
||||||
|
{
|
||||||
|
handled = true;
|
||||||
|
tokenData = socketClient.ProcessTokenData(tokenData);
|
||||||
|
handler.MessageHandler(this, tokenData);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sw.Stop();
|
||||||
|
if (sw.ElapsedMilliseconds > 500)
|
||||||
|
log.Write(LogVerbosity.Warning, $"Socket {Socket.Id} message processing slow ({sw.ElapsedMilliseconds}ms), consider offloading data handling to another thread. " +
|
||||||
|
"Data from this socket may arrive late or not at all if message processing is continuously slow.");
|
||||||
|
return handled;
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
log.Write(LogVerbosity.Error, $"Socket {Socket.Id} Exception during message processing\r\nException: {ex}\r\nData: {tokenData}");
|
||||||
|
currentSubscription?.InvokeExceptionHandler(ex);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public virtual async Task SendAndWait<T>(T obj, TimeSpan timeout, Func<JToken, bool> handler)
|
||||||
|
{
|
||||||
|
var pending = new PendingRequest(handler, timeout);
|
||||||
|
pendingRequests.Add(pending);
|
||||||
|
await Task.Run(() =>
|
||||||
|
{
|
||||||
|
Send(obj);
|
||||||
|
pending.Event.WaitOne(timeout);
|
||||||
|
}).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Send data to the websocket
|
||||||
|
/// </summary>
|
||||||
|
/// <typeparam name="T">The type of the object to send</typeparam>
|
||||||
|
/// <param name="obj">The object to send</param>
|
||||||
|
/// <param name="nullValueHandling">How null values should be serialized</param>
|
||||||
|
public virtual void Send<T>(T obj, NullValueHandling nullValueHandling = NullValueHandling.Ignore)
|
||||||
|
{
|
||||||
|
Send(JsonConvert.SerializeObject(obj, Formatting.None, new JsonSerializerSettings { NullValueHandling = nullValueHandling }));
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Send string data to the websocket
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="data">The data to send</param>
|
||||||
|
public virtual void Send(string data)
|
||||||
|
{
|
||||||
|
log.Write(LogVerbosity.Debug, $"Socket {Socket.Id} sending data: {data}");
|
||||||
|
Socket.Send(data);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Handler for a socket closing. Reconnects the socket if needed, or removes it from the active socket list if not
|
||||||
|
/// </summary>
|
||||||
|
protected virtual void SocketOnClose()
|
||||||
|
{
|
||||||
|
if (socketClient.AutoReconnect && ShouldReconnect)
|
||||||
|
{
|
||||||
|
if (Socket.Reconnecting)
|
||||||
|
return; // Already reconnecting
|
||||||
|
|
||||||
|
Socket.Reconnecting = true;
|
||||||
|
|
||||||
|
log.Write(LogVerbosity.Info, $"Socket {Socket.Id} Connection lost, will try to reconnect after {socketClient.ReconnectInterval}");
|
||||||
|
Task.Run(async () =>
|
||||||
|
{
|
||||||
|
while (ShouldReconnect)
|
||||||
|
{
|
||||||
|
Thread.Sleep(socketClient.ReconnectInterval);
|
||||||
|
if (!ShouldReconnect)
|
||||||
|
{
|
||||||
|
// Should reconnect changed to false while waiting to reconnect
|
||||||
|
Socket.Reconnecting = false;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Socket.Reset();
|
||||||
|
if (!await Socket.Connect().ConfigureAwait(false))
|
||||||
|
{
|
||||||
|
log.Write(LogVerbosity.Debug, $"Socket {Socket.Id} failed to reconnect");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
var time = DisconnectTime;
|
||||||
|
DisconnectTime = null;
|
||||||
|
|
||||||
|
log.Write(LogVerbosity.Info, $"Socket {Socket.Id} reconnected after {DateTime.UtcNow - time}");
|
||||||
|
|
||||||
|
var reconnectResult = await ProcessReconnect().ConfigureAwait(false);
|
||||||
|
if (!reconnectResult)
|
||||||
|
await Socket.Close().ConfigureAwait(false);
|
||||||
|
else
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
Socket.Reconnecting = false;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
log.Write(LogVerbosity.Info, $"Socket {Socket.Id} closed");
|
||||||
|
Socket.Dispose();
|
||||||
|
Closed?.Invoke();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task<bool> ProcessReconnect()
|
||||||
|
{
|
||||||
|
if (Authenticated)
|
||||||
|
{
|
||||||
|
var authResult = await socketClient.AuthenticateSocket(this).ConfigureAwait(false);
|
||||||
|
if (!authResult.Success)
|
||||||
|
{
|
||||||
|
log.Write(LogVerbosity.Info, "Authentication failed on reconnected socket. Disconnecting and reconnecting.");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Write(LogVerbosity.Debug, "Authentication succeeded on reconnected socket.");
|
||||||
|
}
|
||||||
|
|
||||||
|
List<SocketSubscription> handlerList;
|
||||||
|
lock (handlersLock)
|
||||||
|
handlerList = handlers.Where(h => h.Request != null).ToList();
|
||||||
|
foreach (var handler in handlerList)
|
||||||
|
{
|
||||||
|
var resubResult = await socketClient.SubscribeAndWait(this, handler.Request, handler).ConfigureAwait(false);
|
||||||
|
if (!resubResult.Success)
|
||||||
|
{
|
||||||
|
log.Write(LogVerbosity.Debug, "Resubscribing all subscriptions failed on reconnected socket. Disconnecting and reconnecting.");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Write(LogVerbosity.Debug, "All subscription successfully resubscribed on reconnected socket.");
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task Close()
|
||||||
|
{
|
||||||
|
Connected = false;
|
||||||
|
ShouldReconnect = false;
|
||||||
|
lock (socketClient.socketLock)
|
||||||
|
{
|
||||||
|
if (socketClient.sockets.Contains(this))
|
||||||
|
socketClient.sockets.Remove(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
await Socket.Close().ConfigureAwait(false);
|
||||||
|
Socket.Dispose();
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task Close(SocketSubscription subscription)
|
||||||
|
{
|
||||||
|
if (subscription.Confirmed)
|
||||||
|
await socketClient.Unsubscribe(this, subscription).ConfigureAwait(false);
|
||||||
|
|
||||||
|
bool shouldCloseWrapper = false;
|
||||||
|
lock (handlersLock)
|
||||||
|
{
|
||||||
|
handlers.Remove(subscription);
|
||||||
|
if (handlers.Count(r => r.UserSubscription) == 0)
|
||||||
|
shouldCloseWrapper = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (shouldCloseWrapper)
|
||||||
|
await Close().ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public class PendingRequest
|
||||||
|
{
|
||||||
|
public Func<JToken, bool> Handler { get; }
|
||||||
|
public JToken Result { get; private set; }
|
||||||
|
public ManualResetEvent Event { get; }
|
||||||
|
public TimeSpan Timeout { get; }
|
||||||
|
|
||||||
|
private readonly DateTime startTime;
|
||||||
|
|
||||||
|
public PendingRequest(Func<JToken, bool> handler, TimeSpan timeout)
|
||||||
|
{
|
||||||
|
Handler = handler;
|
||||||
|
Event = new ManualResetEvent(false);
|
||||||
|
Timeout = timeout;
|
||||||
|
startTime = DateTime.UtcNow;
|
||||||
|
}
|
||||||
|
|
||||||
|
public bool Check(JToken data)
|
||||||
|
{
|
||||||
|
if (Handler(data))
|
||||||
|
{
|
||||||
|
Result = data;
|
||||||
|
Event.Set();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (DateTime.UtcNow - startTime > Timeout)
|
||||||
|
{
|
||||||
|
// Timed out
|
||||||
|
Event.Set();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -1,40 +0,0 @@
|
|||||||
using CryptoExchange.Net.Objects;
|
|
||||||
using System.Threading;
|
|
||||||
|
|
||||||
namespace CryptoExchange.Net.Sockets
|
|
||||||
{
|
|
||||||
public class SocketEvent
|
|
||||||
{
|
|
||||||
public string Name { get; set; }
|
|
||||||
public string WaitingId { get; set; }
|
|
||||||
|
|
||||||
private CallResult<bool> result;
|
|
||||||
private readonly ManualResetEvent setEvnt;
|
|
||||||
|
|
||||||
public SocketEvent(string name)
|
|
||||||
{
|
|
||||||
Name = name;
|
|
||||||
setEvnt = new ManualResetEvent(false);
|
|
||||||
result = new CallResult<bool>(false, new UnknownError("No response received"));
|
|
||||||
}
|
|
||||||
|
|
||||||
internal void Set(bool result, Error error)
|
|
||||||
{
|
|
||||||
this.result = new CallResult<bool>(result, error);
|
|
||||||
setEvnt.Set();
|
|
||||||
WaitingId = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
public CallResult<bool> Wait(int timeout = 5000)
|
|
||||||
{
|
|
||||||
setEvnt.WaitOne(timeout);
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void Reset()
|
|
||||||
{
|
|
||||||
setEvnt.Reset();
|
|
||||||
result = new CallResult<bool>(false, new UnknownError("No response received"));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,10 +0,0 @@
|
|||||||
using Newtonsoft.Json;
|
|
||||||
|
|
||||||
namespace CryptoExchange.Net.Sockets
|
|
||||||
{
|
|
||||||
public class SocketRequest
|
|
||||||
{
|
|
||||||
[JsonIgnore]
|
|
||||||
public bool Signed { get; set; }
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,161 +1,35 @@
|
|||||||
using CryptoExchange.Net.Interfaces;
|
using System;
|
||||||
using CryptoExchange.Net.Objects;
|
|
||||||
using Newtonsoft.Json.Linq;
|
using Newtonsoft.Json.Linq;
|
||||||
using System;
|
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
|
|
||||||
namespace CryptoExchange.Net.Sockets
|
namespace CryptoExchange.Net.Sockets
|
||||||
{
|
{
|
||||||
public class SocketSubscription
|
public class SocketSubscription
|
||||||
{
|
{
|
||||||
public event Action ConnectionLost;
|
|
||||||
public event Action<TimeSpan> ConnectionRestored;
|
|
||||||
public event Action<Exception> Exception;
|
public event Action<Exception> Exception;
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Message handlers for this subscription. Should return true if the message is handled and should not be distributed to the other handlers
|
/// Message handlers for this subscription. Should return true if the message is handled and should not be distributed to the other handlers
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public Dictionary<string, Func<SocketSubscription, JToken, bool>> MessageHandlers { get; set; }
|
public Action<SocketConnection, JToken> MessageHandler { get; set; }
|
||||||
public List<SocketEvent> Events { get; set; }
|
|
||||||
|
|
||||||
public IWebsocket Socket { get; set; }
|
public object Request { get; set; }
|
||||||
public SocketRequest Request { get; set; }
|
public string Identifier { get; set; }
|
||||||
|
public bool UserSubscription { get; set; }
|
||||||
public SocketType Type { get; set; }
|
|
||||||
|
|
||||||
private bool lostTriggered;
|
|
||||||
private readonly List<SocketEvent> waitingForEvents;
|
|
||||||
private object eventLock = new object();
|
|
||||||
|
|
||||||
|
|
||||||
public SocketSubscription(IWebsocket socket)
|
|
||||||
{
|
|
||||||
Socket = socket;
|
|
||||||
Events = new List<SocketEvent>();
|
|
||||||
waitingForEvents = new List<SocketEvent>();
|
|
||||||
|
|
||||||
MessageHandlers = new Dictionary<string, Func<SocketSubscription, JToken, bool>>();
|
|
||||||
|
|
||||||
Socket.OnClose += () =>
|
|
||||||
{
|
|
||||||
if (lostTriggered)
|
|
||||||
return;
|
|
||||||
|
|
||||||
Socket.DisconnectTime = DateTime.UtcNow;
|
|
||||||
lostTriggered = true;
|
|
||||||
|
|
||||||
lock (eventLock)
|
|
||||||
{
|
|
||||||
foreach (var events in Events)
|
|
||||||
events.Reset();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (Socket.ShouldReconnect)
|
|
||||||
ConnectionLost?.Invoke();
|
|
||||||
};
|
|
||||||
Socket.OnOpen += () =>
|
|
||||||
{
|
|
||||||
if (lostTriggered)
|
|
||||||
{
|
|
||||||
lostTriggered = false;
|
|
||||||
ConnectionRestored?.Invoke(Socket.DisconnectTime.HasValue ? DateTime.UtcNow - Socket.DisconnectTime.Value: TimeSpan.FromSeconds(0));
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
public void AddEvent(string name)
|
|
||||||
{
|
|
||||||
lock (eventLock)
|
|
||||||
Events.Add(new SocketEvent(name));
|
|
||||||
}
|
|
||||||
|
|
||||||
public void SetEventByName(string name, bool success, Error error)
|
|
||||||
{
|
|
||||||
lock (eventLock)
|
|
||||||
{
|
|
||||||
var waitingEvent = waitingForEvents.SingleOrDefault(e => e.Name == name);
|
|
||||||
if (waitingEvent != null)
|
|
||||||
{
|
|
||||||
waitingEvent.Set(success, error);
|
|
||||||
waitingForEvents.Remove(waitingEvent);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void SetEventById(string id, bool success, Error error)
|
|
||||||
{
|
|
||||||
lock (eventLock)
|
|
||||||
{
|
|
||||||
var waitingEvent = waitingForEvents.SingleOrDefault(e => e.WaitingId == id);
|
|
||||||
if (waitingEvent != null)
|
|
||||||
{
|
|
||||||
waitingEvent.Set(success, error);
|
|
||||||
waitingForEvents.Remove(waitingEvent);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public SocketEvent GetWaitingEvent(string name)
|
|
||||||
{
|
|
||||||
lock (eventLock)
|
|
||||||
return waitingForEvents.SingleOrDefault(w => w.Name == name);
|
|
||||||
}
|
|
||||||
|
|
||||||
public Task<CallResult<bool>> WaitForEvent(string name, TimeSpan timeout)
|
public bool Confirmed { get; set; }
|
||||||
{
|
|
||||||
lock (eventLock)
|
|
||||||
return WaitForEvent(name, (int)Math.Round(timeout.TotalMilliseconds, 0));
|
|
||||||
}
|
|
||||||
|
|
||||||
public Task<CallResult<bool>> WaitForEvent(string name, int timeout)
|
|
||||||
{
|
|
||||||
lock (eventLock)
|
|
||||||
{
|
|
||||||
var evnt = Events.Single(e => e.Name == name);
|
|
||||||
waitingForEvents.Add(evnt);
|
|
||||||
return Task.Run(() => evnt.Wait(timeout));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public Task<CallResult<bool>> WaitForEvent(string name, string id, TimeSpan timeout)
|
public SocketSubscription(string identifier, object request, bool userSubscription, Action<SocketConnection, JToken> dataHandler)
|
||||||
{
|
{
|
||||||
lock (eventLock)
|
UserSubscription = userSubscription;
|
||||||
return WaitForEvent(name, id, (int)Math.Round(timeout.TotalMilliseconds, 0));
|
MessageHandler = dataHandler;
|
||||||
|
Identifier = identifier;
|
||||||
|
Request = request;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Task<CallResult<bool>> WaitForEvent(string name, string id, int timeout)
|
|
||||||
{
|
|
||||||
lock (eventLock)
|
|
||||||
{
|
|
||||||
var evnt = Events.Single(e => e.Name == name);
|
|
||||||
evnt.WaitingId = id;
|
|
||||||
waitingForEvents.Add(evnt);
|
|
||||||
return Task.Run(() => evnt.Wait(timeout));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void ResetEvents()
|
|
||||||
{
|
|
||||||
lock (eventLock)
|
|
||||||
{
|
|
||||||
foreach (var waiting in waitingForEvents)
|
|
||||||
waiting.Set(false, new UnknownError("Connection reset"));
|
|
||||||
waitingForEvents.Clear();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void InvokeExceptionHandler(Exception e)
|
public void InvokeExceptionHandler(Exception e)
|
||||||
{
|
{
|
||||||
Exception?.Invoke(e);
|
Exception?.Invoke(e);
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task Close()
|
|
||||||
{
|
|
||||||
Socket.ShouldReconnect = false;
|
|
||||||
await Socket.Close().ConfigureAwait(false);
|
|
||||||
Socket.Dispose();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -5,6 +5,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
{
|
{
|
||||||
public class UpdateSubscription
|
public class UpdateSubscription
|
||||||
{
|
{
|
||||||
|
private readonly SocketConnection connection;
|
||||||
private readonly SocketSubscription subscription;
|
private readonly SocketSubscription subscription;
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -12,8 +13,8 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
public event Action ConnectionLost
|
public event Action ConnectionLost
|
||||||
{
|
{
|
||||||
add => subscription.ConnectionLost += value;
|
add => connection.ConnectionLost += value;
|
||||||
remove => subscription.ConnectionLost -= value;
|
remove => connection.ConnectionLost -= value;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -21,8 +22,8 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
public event Action<TimeSpan> ConnectionRestored
|
public event Action<TimeSpan> ConnectionRestored
|
||||||
{
|
{
|
||||||
add => subscription.ConnectionRestored += value;
|
add => connection.ConnectionRestored += value;
|
||||||
remove => subscription.ConnectionRestored -= value;
|
remove => connection.ConnectionRestored -= value;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -37,11 +38,12 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
/// <summary>
|
/// <summary>
|
||||||
/// The id of the socket
|
/// The id of the socket
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public int Id => subscription.Socket.Id;
|
public int Id => connection.Socket.Id;
|
||||||
|
|
||||||
public UpdateSubscription(SocketSubscription sub)
|
public UpdateSubscription(SocketConnection connection, SocketSubscription subscription)
|
||||||
{
|
{
|
||||||
subscription = sub;
|
this.connection = connection;
|
||||||
|
this.subscription = subscription;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -50,7 +52,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public async Task Close()
|
public async Task Close()
|
||||||
{
|
{
|
||||||
await subscription.Close().ConfigureAwait(false);
|
await connection.Close(subscription).ConfigureAwait(false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user