mirror of
https://github.com/JKorf/CryptoExchange.Net
synced 2025-06-07 16:06:15 +00:00
Socket client update, added headers to rest result
This commit is contained in:
parent
c489b4e9aa
commit
9008c4ed2c
@ -17,8 +17,8 @@ namespace CryptoExchange.Net.UnitTests
|
|||||||
[TestCase]
|
[TestCase]
|
||||||
public void SettingOptions_Should_ResultInOptionsSet()
|
public void SettingOptions_Should_ResultInOptionsSet()
|
||||||
{
|
{
|
||||||
// arrange
|
//arrange
|
||||||
// act
|
//act
|
||||||
var client = new TestSocketClient(new SocketClientOptions()
|
var client = new TestSocketClient(new SocketClientOptions()
|
||||||
{
|
{
|
||||||
BaseAddress = "http://test.address.com",
|
BaseAddress = "http://test.address.com",
|
||||||
@ -26,7 +26,7 @@ namespace CryptoExchange.Net.UnitTests
|
|||||||
});
|
});
|
||||||
|
|
||||||
|
|
||||||
// assert
|
//assert
|
||||||
Assert.IsTrue(client.BaseAddress == "http://test.address.com");
|
Assert.IsTrue(client.BaseAddress == "http://test.address.com");
|
||||||
Assert.IsTrue(client.ReconnectInterval.TotalSeconds == 6);
|
Assert.IsTrue(client.ReconnectInterval.TotalSeconds == 6);
|
||||||
}
|
}
|
||||||
@ -35,229 +35,127 @@ namespace CryptoExchange.Net.UnitTests
|
|||||||
[TestCase(false)]
|
[TestCase(false)]
|
||||||
public void ConnectSocket_Should_ReturnConnectionResult(bool canConnect)
|
public void ConnectSocket_Should_ReturnConnectionResult(bool canConnect)
|
||||||
{
|
{
|
||||||
// arrange
|
//arrange
|
||||||
var client = new TestSocketClient();
|
var client = new TestSocketClient();
|
||||||
var socket = client.CreateSocket();
|
var socket = client.CreateSocket();
|
||||||
socket.CanConnect = canConnect;
|
socket.CanConnect = canConnect;
|
||||||
|
|
||||||
// act
|
//act
|
||||||
var connectResult = client.ConnectSocketSub(new SocketConnection(client, new Log(), socket));
|
var connectResult = client.ConnectSocketSub(new SocketConnection(client, socket));
|
||||||
|
|
||||||
// assert
|
//assert
|
||||||
Assert.IsTrue(connectResult.Success == canConnect);
|
Assert.IsTrue(connectResult.Success == canConnect);
|
||||||
}
|
}
|
||||||
|
|
||||||
//[TestCase]
|
[TestCase]
|
||||||
//public void SocketMessages_Should_BeProcessedInDataHandlers()
|
public void SocketMessages_Should_BeProcessedInDataHandlers()
|
||||||
//{
|
{
|
||||||
// // arrange
|
// arrange
|
||||||
// var client = new TestSocketClient(new SocketClientOptions() { ReconnectInterval = TimeSpan.Zero, LogVerbosity = LogVerbosity.Debug });
|
var client = new TestSocketClient(new SocketClientOptions() { ReconnectInterval = TimeSpan.Zero, LogVerbosity = LogVerbosity.Debug });
|
||||||
// var socket = client.CreateSocket();
|
var socket = client.CreateSocket();
|
||||||
// socket.ShouldReconnect = true;
|
socket.ShouldReconnect = true;
|
||||||
// socket.CanConnect = true;
|
socket.CanConnect = true;
|
||||||
// socket.DisconnectTime = DateTime.UtcNow;
|
socket.DisconnectTime = DateTime.UtcNow;
|
||||||
// var sub = new SocketConnection(socket);
|
var sub = new SocketConnection(client, socket);
|
||||||
// var rstEvent = new ManualResetEvent(false);
|
var rstEvent = new ManualResetEvent(false);
|
||||||
// JToken result = null;
|
JToken result = null;
|
||||||
// sub.MessageHandlers.Add("TestHandler", (subs, data) =>
|
sub.AddHandler("TestHandler", true, (connection, data) =>
|
||||||
// {
|
{
|
||||||
// result = data;
|
result = data;
|
||||||
// rstEvent.Set();
|
rstEvent.Set();
|
||||||
// return true;
|
});
|
||||||
|
client.ConnectSocketSub(sub);
|
||||||
|
|
||||||
// });
|
// act
|
||||||
// client.ConnectSocketSub(sub);
|
socket.InvokeMessage("{\"property\": 123}");
|
||||||
|
rstEvent.WaitOne(1000);
|
||||||
|
|
||||||
// // act
|
// assert
|
||||||
// socket.InvokeMessage("{\"property\": 123}");
|
Assert.IsTrue((int)result["property"] == 123);
|
||||||
// rstEvent.WaitOne(1000);
|
}
|
||||||
|
|
||||||
// // assert
|
|
||||||
// Assert.IsTrue((int)result["property"] == 123);
|
|
||||||
//}
|
|
||||||
|
|
||||||
//[TestCase]
|
|
||||||
//public void SocketMessages_Should_NotBeProcessedInSubsequentHandlersIfHandlerReturnsTrue()
|
|
||||||
//{
|
|
||||||
// // arrange
|
|
||||||
// var client = new TestSocketClient(new SocketClientOptions() { ReconnectInterval = TimeSpan.Zero, LogVerbosity = LogVerbosity.Debug });
|
|
||||||
// var socket = client.CreateSocket();
|
|
||||||
// socket.ShouldReconnect = true;
|
|
||||||
// socket.CanConnect = true;
|
|
||||||
// socket.DisconnectTime = DateTime.UtcNow;
|
|
||||||
// var sub = new SocketConnection(socket);
|
|
||||||
// var rstEvent1 = new ManualResetEvent(false);
|
|
||||||
// var rstEvent2 = new ManualResetEvent(false);
|
|
||||||
// JToken result1 = null;
|
|
||||||
// JToken result2 = null;
|
|
||||||
// sub.MessageHandlers.Add("TestHandler", (subs, data) =>
|
|
||||||
// {
|
|
||||||
// result1 = data;
|
|
||||||
// rstEvent1.Set();
|
|
||||||
// return true;
|
|
||||||
// });
|
|
||||||
// sub.MessageHandlers.Add("TestHandlerNotHit", (subs, data) =>
|
|
||||||
// {
|
|
||||||
// result2 = data;
|
|
||||||
// rstEvent2.Set();
|
|
||||||
// return true;
|
|
||||||
// });
|
|
||||||
// client.ConnectSocketSub(sub);
|
|
||||||
|
|
||||||
// // act
|
|
||||||
// socket.InvokeMessage("{\"property\": 123}");
|
|
||||||
// rstEvent1.WaitOne(100);
|
|
||||||
// rstEvent2.WaitOne(100);
|
|
||||||
|
|
||||||
// // assert
|
|
||||||
// Assert.IsTrue((int)result1["property"] == 123);
|
|
||||||
// Assert.IsTrue(result2 == null);
|
|
||||||
//}
|
|
||||||
|
|
||||||
//[TestCase]
|
|
||||||
//public void SocketMessages_Should_BeProcessedInSubsequentHandlersIfHandlerReturnsFalse()
|
|
||||||
//{
|
|
||||||
// // arrange
|
|
||||||
// var client = new TestSocketClient(new SocketClientOptions() { ReconnectInterval = TimeSpan.Zero, LogVerbosity = LogVerbosity.Debug });
|
|
||||||
// var socket = client.CreateSocket();
|
|
||||||
// socket.ShouldReconnect = true;
|
|
||||||
// socket.CanConnect = true;
|
|
||||||
// socket.DisconnectTime = DateTime.UtcNow;
|
|
||||||
// var sub = new SocketConnection(socket);
|
|
||||||
// var rstEvent = new ManualResetEvent(false);
|
|
||||||
// JToken result = null;
|
|
||||||
// sub.MessageHandlers.Add("TestHandlerNotProcessing", (subs, data) =>
|
|
||||||
// {
|
|
||||||
// return false;
|
|
||||||
// });
|
|
||||||
// sub.MessageHandlers.Add("TestHandler", (subs, data) =>
|
|
||||||
// {
|
|
||||||
// result = data;
|
|
||||||
// rstEvent.Set();
|
|
||||||
// return true;
|
|
||||||
// });
|
|
||||||
// client.ConnectSocketSub(sub);
|
|
||||||
|
|
||||||
// // act
|
|
||||||
// socket.InvokeMessage("{\"property\": 123}");
|
|
||||||
// rstEvent.WaitOne(100);
|
|
||||||
|
|
||||||
// // assert
|
|
||||||
// Assert.IsTrue((int)result["property"] == 123);
|
|
||||||
//}
|
|
||||||
|
|
||||||
|
|
||||||
//[TestCase]
|
|
||||||
//public void DisconnectedSocket_Should_Reconnect()
|
|
||||||
//{
|
|
||||||
// // arrange
|
|
||||||
// bool reconnected = false;
|
|
||||||
// var client = new TestSocketClient(new SocketClientOptions(){ReconnectInterval = TimeSpan.Zero ,LogVerbosity = LogVerbosity.Debug});
|
|
||||||
// var socket = client.CreateSocket();
|
|
||||||
// socket.ShouldReconnect = true;
|
|
||||||
// socket.CanConnect = true;
|
|
||||||
// socket.DisconnectTime = DateTime.UtcNow;
|
|
||||||
// var sub = new SocketConnection(socket);
|
|
||||||
// client.ConnectSocketSub(sub);
|
|
||||||
// var rstEvent = new ManualResetEvent(false);
|
|
||||||
// client.OnReconnect += () =>
|
|
||||||
// {
|
|
||||||
// reconnected = true;
|
|
||||||
// rstEvent.Set();
|
|
||||||
// return true;
|
|
||||||
// };
|
|
||||||
|
|
||||||
// // act
|
|
||||||
// socket.InvokeClose();
|
|
||||||
// rstEvent.WaitOne(1000);
|
|
||||||
|
|
||||||
// // assert
|
|
||||||
// Assert.IsTrue(reconnected);
|
|
||||||
//}
|
|
||||||
|
|
||||||
//[TestCase()]
|
|
||||||
//public void UnsubscribingStream_Should_CloseTheSocket()
|
|
||||||
//{
|
|
||||||
// // arrange
|
|
||||||
// var client = new TestSocketClient(new SocketClientOptions() { ReconnectInterval = TimeSpan.Zero, LogVerbosity = LogVerbosity.Debug });
|
|
||||||
// var socket = client.CreateSocket();
|
|
||||||
// socket.CanConnect = true;
|
|
||||||
// var sub = new SocketConnection(socket);
|
|
||||||
// client.ConnectSocketSub(sub);
|
|
||||||
// var ups = new UpdateSubscription(sub);
|
|
||||||
|
|
||||||
// // act
|
|
||||||
// client.Unsubscribe(ups).Wait();
|
|
||||||
|
|
||||||
// // assert
|
|
||||||
// Assert.IsTrue(socket.Connected == false);
|
|
||||||
//}
|
|
||||||
|
|
||||||
//[TestCase()]
|
|
||||||
//public void UnsubscribingAll_Should_CloseAllSockets()
|
|
||||||
//{
|
|
||||||
// // arrange
|
|
||||||
// var client = new TestSocketClient(new SocketClientOptions() { ReconnectInterval = TimeSpan.Zero, LogVerbosity = LogVerbosity.Debug });
|
|
||||||
// var socket1 = client.CreateSocket();
|
|
||||||
// var socket2 = client.CreateSocket();
|
|
||||||
// socket1.CanConnect = true;
|
|
||||||
// socket2.CanConnect = true;
|
|
||||||
// var sub1 = new SocketConnection(socket1);
|
|
||||||
// var sub2 = new SocketConnection(socket2);
|
|
||||||
// client.ConnectSocketSub(sub1);
|
|
||||||
// client.ConnectSocketSub(sub2);
|
|
||||||
|
|
||||||
// // act
|
|
||||||
// client.UnsubscribeAll().Wait();
|
|
||||||
|
|
||||||
// // assert
|
|
||||||
// Assert.IsTrue(socket1.Connected == false);
|
|
||||||
// Assert.IsTrue(socket2.Connected == false);
|
|
||||||
//}
|
|
||||||
|
|
||||||
//[TestCase()]
|
|
||||||
//public void FailingToConnectSocket_Should_ReturnError()
|
|
||||||
//{
|
|
||||||
// // arrange
|
|
||||||
// var client = new TestSocketClient(new SocketClientOptions() { ReconnectInterval = TimeSpan.Zero, LogVerbosity = LogVerbosity.Debug });
|
|
||||||
// var socket = client.CreateSocket();
|
|
||||||
// socket.CanConnect = false;
|
|
||||||
// var sub = new SocketConnection(socket);
|
|
||||||
|
|
||||||
// // act
|
|
||||||
// var connectResult = client.ConnectSocketSub(sub);
|
|
||||||
|
|
||||||
// // assert
|
|
||||||
// Assert.IsFalse(connectResult.Success);
|
|
||||||
//}
|
|
||||||
|
|
||||||
//[Test]
|
[TestCase]
|
||||||
//public void WhenResubscribeFails_Socket_ShouldReconnect()
|
public void DisconnectedSocket_Should_Reconnect()
|
||||||
//{
|
{
|
||||||
// // arrange
|
// arrange
|
||||||
// int reconnected = 0;
|
bool reconnected = false;
|
||||||
// var client = new TestSocketClient(new SocketClientOptions() { ReconnectInterval = TimeSpan.FromMilliseconds(1), LogVerbosity = LogVerbosity.Debug });
|
var client = new TestSocketClient(new SocketClientOptions() { ReconnectInterval = TimeSpan.Zero, LogVerbosity = LogVerbosity.Debug });
|
||||||
// var socket = client.CreateSocket();
|
var socket = client.CreateSocket();
|
||||||
// socket.ShouldReconnect = true;
|
socket.ShouldReconnect = true;
|
||||||
// socket.CanConnect = true;
|
socket.CanConnect = true;
|
||||||
// socket.DisconnectTime = DateTime.UtcNow;
|
socket.DisconnectTime = DateTime.UtcNow;
|
||||||
// var sub = new SocketConnection(socket);
|
var sub = new SocketConnection(client, socket);
|
||||||
// client.ConnectSocketSub(sub);
|
sub.ShouldReconnect = true;
|
||||||
// var rstEvent = new ManualResetEvent(false);
|
client.ConnectSocketSub(sub);
|
||||||
// client.OnReconnect += () =>
|
var rstEvent = new ManualResetEvent(false);
|
||||||
// {
|
sub.ConnectionRestored += (a) =>
|
||||||
// reconnected++;
|
{
|
||||||
// rstEvent.Set();
|
reconnected = true;
|
||||||
// return reconnected == 2;
|
rstEvent.Set();
|
||||||
// };
|
};
|
||||||
|
|
||||||
// // act
|
// act
|
||||||
// socket.InvokeClose();
|
socket.InvokeClose();
|
||||||
// rstEvent.WaitOne(1000);
|
rstEvent.WaitOne(1000);
|
||||||
// Thread.Sleep(100);
|
|
||||||
|
|
||||||
// // assert
|
// assert
|
||||||
// Assert.IsTrue(reconnected == 2);
|
Assert.IsTrue(reconnected);
|
||||||
//}
|
}
|
||||||
|
|
||||||
|
[TestCase()]
|
||||||
|
public void UnsubscribingStream_Should_CloseTheSocket()
|
||||||
|
{
|
||||||
|
// arrange
|
||||||
|
var client = new TestSocketClient(new SocketClientOptions() { ReconnectInterval = TimeSpan.Zero, LogVerbosity = LogVerbosity.Debug });
|
||||||
|
var socket = client.CreateSocket();
|
||||||
|
socket.CanConnect = true;
|
||||||
|
var sub = new SocketConnection(client, socket);
|
||||||
|
client.ConnectSocketSub(sub);
|
||||||
|
var ups = new UpdateSubscription(sub, new SocketSubscription("Test", null, true, (d, a) => {}));
|
||||||
|
|
||||||
|
// act
|
||||||
|
client.Unsubscribe(ups).Wait();
|
||||||
|
|
||||||
|
// assert
|
||||||
|
Assert.IsTrue(socket.Connected == false);
|
||||||
|
}
|
||||||
|
|
||||||
|
[TestCase()]
|
||||||
|
public void UnsubscribingAll_Should_CloseAllSockets()
|
||||||
|
{
|
||||||
|
// arrange
|
||||||
|
var client = new TestSocketClient(new SocketClientOptions() { ReconnectInterval = TimeSpan.Zero, LogVerbosity = LogVerbosity.Debug });
|
||||||
|
var socket1 = client.CreateSocket();
|
||||||
|
var socket2 = client.CreateSocket();
|
||||||
|
socket1.CanConnect = true;
|
||||||
|
socket2.CanConnect = true;
|
||||||
|
var sub1 = new SocketConnection(client, socket1);
|
||||||
|
var sub2 = new SocketConnection(client, socket2);
|
||||||
|
client.ConnectSocketSub(sub1);
|
||||||
|
client.ConnectSocketSub(sub2);
|
||||||
|
|
||||||
|
// act
|
||||||
|
client.UnsubscribeAll().Wait();
|
||||||
|
|
||||||
|
// assert
|
||||||
|
Assert.IsTrue(socket1.Connected == false);
|
||||||
|
Assert.IsTrue(socket2.Connected == false);
|
||||||
|
}
|
||||||
|
|
||||||
|
[TestCase()]
|
||||||
|
public void FailingToConnectSocket_Should_ReturnError()
|
||||||
|
{
|
||||||
|
// arrange
|
||||||
|
var client = new TestSocketClient(new SocketClientOptions() { ReconnectInterval = TimeSpan.Zero, LogVerbosity = LogVerbosity.Debug });
|
||||||
|
var socket = client.CreateSocket();
|
||||||
|
socket.CanConnect = false;
|
||||||
|
var sub = new SocketConnection(client, socket);
|
||||||
|
|
||||||
|
// act
|
||||||
|
var connectResult = client.ConnectSocketSub(sub);
|
||||||
|
|
||||||
|
// assert
|
||||||
|
Assert.IsFalse(connectResult.Success);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -52,6 +52,8 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations
|
|||||||
{
|
{
|
||||||
Connected = CanConnect;
|
Connected = CanConnect;
|
||||||
ConnectCalls++;
|
ConnectCalls++;
|
||||||
|
if (CanConnect)
|
||||||
|
InvokeOpen();
|
||||||
return Task.FromResult(CanConnect);
|
return Task.FromResult(CanConnect);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -11,8 +11,6 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations
|
|||||||
{
|
{
|
||||||
public class TestSocketClient: SocketClient
|
public class TestSocketClient: SocketClient
|
||||||
{
|
{
|
||||||
public Func<bool> OnReconnect { get; set; }
|
|
||||||
|
|
||||||
public TestSocketClient() : this(new SocketClientOptions())
|
public TestSocketClient() : this(new SocketClientOptions())
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
@ -51,7 +49,7 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations
|
|||||||
|
|
||||||
protected override bool MessageMatchesHandler(JToken message, string identifier)
|
protected override bool MessageMatchesHandler(JToken message, string identifier)
|
||||||
{
|
{
|
||||||
throw new NotImplementedException();
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected override Task<CallResult<bool>> AuthenticateSocket(SocketConnection s)
|
protected override Task<CallResult<bool>> AuthenticateSocket(SocketConnection s)
|
||||||
|
@ -15,7 +15,7 @@ namespace CryptoExchange.Net
|
|||||||
public abstract class BaseClient: IDisposable
|
public abstract class BaseClient: IDisposable
|
||||||
{
|
{
|
||||||
public string BaseAddress { get; private set; }
|
public string BaseAddress { get; private set; }
|
||||||
protected Log log;
|
protected internal Log log;
|
||||||
protected ApiProxy apiProxy;
|
protected ApiProxy apiProxy;
|
||||||
protected AuthenticationProvider authProvider;
|
protected AuthenticationProvider authProvider;
|
||||||
|
|
||||||
|
@ -4,6 +4,8 @@ using System.Linq;
|
|||||||
using System.Net;
|
using System.Net;
|
||||||
using System.Runtime.InteropServices;
|
using System.Runtime.InteropServices;
|
||||||
using System.Security;
|
using System.Security;
|
||||||
|
using System.Threading;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
namespace CryptoExchange.Net
|
namespace CryptoExchange.Net
|
||||||
{
|
{
|
||||||
@ -83,5 +85,47 @@ namespace CryptoExchange.Net
|
|||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static IEnumerable<Tuple<string, string>> ToIEnumerable(this WebHeaderCollection headers)
|
||||||
|
{
|
||||||
|
if (headers == null)
|
||||||
|
return null;
|
||||||
|
|
||||||
|
return Enumerable
|
||||||
|
.Range(0, headers.Count)
|
||||||
|
.SelectMany(i => headers.GetValues(i)
|
||||||
|
.Select(v => Tuple.Create(headers.GetKey(i), v))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static async Task<bool> WaitOneAsync(this WaitHandle handle, int millisecondsTimeout, CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
RegisteredWaitHandle registeredHandle = null;
|
||||||
|
CancellationTokenRegistration tokenRegistration = default(CancellationTokenRegistration);
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var tcs = new TaskCompletionSource<bool>();
|
||||||
|
registeredHandle = ThreadPool.RegisterWaitForSingleObject(
|
||||||
|
handle,
|
||||||
|
(state, timedOut) => ((TaskCompletionSource<bool>)state).TrySetResult(!timedOut),
|
||||||
|
tcs,
|
||||||
|
millisecondsTimeout,
|
||||||
|
true);
|
||||||
|
tokenRegistration = cancellationToken.Register(
|
||||||
|
state => ((TaskCompletionSource<bool>)state).TrySetCanceled(),
|
||||||
|
tcs);
|
||||||
|
return await tcs.Task;
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
registeredHandle?.Unregister(null);
|
||||||
|
tokenRegistration.Dispose();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Task<bool> WaitOneAsync(this WaitHandle handle, TimeSpan timeout)
|
||||||
|
{
|
||||||
|
return handle.WaitOneAsync((int)timeout.TotalMilliseconds, CancellationToken.None);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -4,6 +4,6 @@ namespace CryptoExchange.Net.Interfaces
|
|||||||
{
|
{
|
||||||
public interface IRateLimiter
|
public interface IRateLimiter
|
||||||
{
|
{
|
||||||
CallResult<double> LimitRequest(string url, RateLimitingBehaviour limitBehaviour);
|
CallResult<double> LimitRequest(RestClient client, string url, RateLimitingBehaviour limitBehaviour);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,4 +1,6 @@
|
|||||||
using System.IO;
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.IO;
|
||||||
using System.Net;
|
using System.Net;
|
||||||
|
|
||||||
namespace CryptoExchange.Net.Interfaces
|
namespace CryptoExchange.Net.Interfaces
|
||||||
@ -7,6 +9,7 @@ namespace CryptoExchange.Net.Interfaces
|
|||||||
{
|
{
|
||||||
HttpStatusCode StatusCode { get; }
|
HttpStatusCode StatusCode { get; }
|
||||||
Stream GetResponseStream();
|
Stream GetResponseStream();
|
||||||
|
IEnumerable<Tuple<string, string>> GetResponseHeaders();
|
||||||
void Close();
|
void Close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,4 +1,6 @@
|
|||||||
using System.Net;
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Net;
|
||||||
|
|
||||||
namespace CryptoExchange.Net.Objects
|
namespace CryptoExchange.Net.Objects
|
||||||
{
|
{
|
||||||
@ -31,9 +33,21 @@ namespace CryptoExchange.Net.Objects
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
public HttpStatusCode? ResponseStatusCode { get; set; }
|
public HttpStatusCode? ResponseStatusCode { get; set; }
|
||||||
|
|
||||||
public WebCallResult(HttpStatusCode? code, T data, Error error): base(data, error)
|
public IEnumerable<Tuple<string, string>> ResponseHeaders { get; set; }
|
||||||
|
|
||||||
|
public WebCallResult(HttpStatusCode? code, IEnumerable<Tuple<string, string>> responseHeaders, T data, Error error): base(data, error)
|
||||||
{
|
{
|
||||||
|
ResponseHeaders = responseHeaders;
|
||||||
ResponseStatusCode = code;
|
ResponseStatusCode = code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static WebCallResult<T> CreateErrorResult(Error error)
|
||||||
|
{
|
||||||
|
return new WebCallResult<T>(null, null, default(T), error);
|
||||||
|
}
|
||||||
|
public static WebCallResult<T> CreateErrorResult(HttpStatusCode? code, IEnumerable<Tuple<string, string>> responseHeaders, Error error)
|
||||||
|
{
|
||||||
|
return new WebCallResult<T>(code, responseHeaders, default(T), error);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -97,6 +97,12 @@ namespace CryptoExchange.Net.Objects
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
public TimeSpan SocketNoDataTimeout { get; set; }
|
public TimeSpan SocketNoDataTimeout { get; set; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// The amount of subscriptions that should be made on a single socket connection. Not all exchanges support multiple subscriptions on a single socket.
|
||||||
|
/// Setting this to a higher number increases subscription speed, but having more subscriptions on a single connection will also increase the amount of traffic on that single connection.
|
||||||
|
/// </summary>
|
||||||
|
public int? SocketSubscriptionsCombineTarget { get; set; }
|
||||||
|
|
||||||
public T Copy<T>() where T : SocketClientOptions, new()
|
public T Copy<T>() where T : SocketClientOptions, new()
|
||||||
{
|
{
|
||||||
var copy = new T
|
var copy = new T
|
||||||
@ -107,7 +113,8 @@ namespace CryptoExchange.Net.Objects
|
|||||||
LogWriters = LogWriters,
|
LogWriters = LogWriters,
|
||||||
AutoReconnect = AutoReconnect,
|
AutoReconnect = AutoReconnect,
|
||||||
ReconnectInterval = ReconnectInterval,
|
ReconnectInterval = ReconnectInterval,
|
||||||
SocketResponseTimeout = SocketResponseTimeout
|
SocketResponseTimeout = SocketResponseTimeout,
|
||||||
|
SocketSubscriptionsCombineTarget = SocketSubscriptionsCombineTarget
|
||||||
};
|
};
|
||||||
|
|
||||||
if (ApiCredentials != null)
|
if (ApiCredentials != null)
|
||||||
|
@ -29,7 +29,7 @@ namespace CryptoExchange.Net.RateLimiter
|
|||||||
this.perTimePeriod = perTimePeriod;
|
this.perTimePeriod = perTimePeriod;
|
||||||
}
|
}
|
||||||
|
|
||||||
public CallResult<double> LimitRequest(string url, RateLimitingBehaviour limitingBehaviour)
|
public CallResult<double> LimitRequest(RestClient client, string url, RateLimitingBehaviour limitingBehaviour)
|
||||||
{
|
{
|
||||||
int waitTime;
|
int waitTime;
|
||||||
RateLimitObject rlo;
|
RateLimitObject rlo;
|
||||||
|
@ -30,7 +30,7 @@ namespace CryptoExchange.Net.RateLimiter
|
|||||||
this.perTimePeriod = perTimePeriod;
|
this.perTimePeriod = perTimePeriod;
|
||||||
}
|
}
|
||||||
|
|
||||||
public CallResult<double> LimitRequest(string url, RateLimitingBehaviour limitBehaviour)
|
public CallResult<double> LimitRequest(RestClient client, string url, RateLimitingBehaviour limitBehaviour)
|
||||||
{
|
{
|
||||||
var sw = Stopwatch.StartNew();
|
var sw = Stopwatch.StartNew();
|
||||||
lock (requestLock)
|
lock (requestLock)
|
||||||
|
@ -1,4 +1,7 @@
|
|||||||
using System.IO;
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.IO;
|
||||||
|
using System.Linq;
|
||||||
using System.Net;
|
using System.Net;
|
||||||
using CryptoExchange.Net.Interfaces;
|
using CryptoExchange.Net.Interfaces;
|
||||||
|
|
||||||
@ -20,6 +23,11 @@ namespace CryptoExchange.Net.Requests
|
|||||||
return response.GetResponseStream();
|
return response.GetResponseStream();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public IEnumerable<Tuple<string, string>> GetResponseHeaders()
|
||||||
|
{
|
||||||
|
return response.Headers.ToIEnumerable();
|
||||||
|
}
|
||||||
|
|
||||||
public void Close()
|
public void Close()
|
||||||
{
|
{
|
||||||
response.Close();
|
response.Close();
|
||||||
|
@ -121,7 +121,7 @@ namespace CryptoExchange.Net
|
|||||||
if (signed && authProvider == null)
|
if (signed && authProvider == null)
|
||||||
{
|
{
|
||||||
log.Write(LogVerbosity.Warning, $"Request {uri.AbsolutePath} failed because no ApiCredentials were provided");
|
log.Write(LogVerbosity.Warning, $"Request {uri.AbsolutePath} failed because no ApiCredentials were provided");
|
||||||
return new WebCallResult<T>(null, null, new NoApiCredentialsError());
|
return new WebCallResult<T>(null, null, null, new NoApiCredentialsError());
|
||||||
}
|
}
|
||||||
|
|
||||||
var request = ConstructRequest(uri, method, parameters, signed);
|
var request = ConstructRequest(uri, method, parameters, signed);
|
||||||
@ -134,11 +134,11 @@ namespace CryptoExchange.Net
|
|||||||
|
|
||||||
foreach (var limiter in RateLimiters)
|
foreach (var limiter in RateLimiters)
|
||||||
{
|
{
|
||||||
var limitResult = limiter.LimitRequest(uri.AbsolutePath, RateLimitBehaviour);
|
var limitResult = limiter.LimitRequest(this, uri.AbsolutePath, RateLimitBehaviour);
|
||||||
if (!limitResult.Success)
|
if (!limitResult.Success)
|
||||||
{
|
{
|
||||||
log.Write(LogVerbosity.Debug, $"Request {uri.AbsolutePath} failed because of rate limit");
|
log.Write(LogVerbosity.Debug, $"Request {uri.AbsolutePath} failed because of rate limit");
|
||||||
return new WebCallResult<T>(null, null, limitResult.Error);
|
return new WebCallResult<T>(null, null, null, limitResult.Error);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (limitResult.Data > 0)
|
if (limitResult.Data > 0)
|
||||||
@ -152,20 +152,20 @@ namespace CryptoExchange.Net
|
|||||||
log.Write(LogVerbosity.Debug, $"Sending {method}{(signed ? " signed" : "")} request to {request.Uri} {paramString ?? ""}");
|
log.Write(LogVerbosity.Debug, $"Sending {method}{(signed ? " signed" : "")} request to {request.Uri} {paramString ?? ""}");
|
||||||
var result = await ExecuteRequest(request).ConfigureAwait(false);
|
var result = await ExecuteRequest(request).ConfigureAwait(false);
|
||||||
if(!result.Success)
|
if(!result.Success)
|
||||||
return new WebCallResult<T>(result.ResponseStatusCode, null, result.Error);
|
return new WebCallResult<T>(result.ResponseStatusCode, result.ResponseHeaders, null, result.Error);
|
||||||
|
|
||||||
var jsonResult = ValidateJson(result.Data);
|
var jsonResult = ValidateJson(result.Data);
|
||||||
if(!jsonResult.Success)
|
if(!jsonResult.Success)
|
||||||
return new WebCallResult<T>(result.ResponseStatusCode, null, jsonResult.Error);
|
return new WebCallResult<T>(result.ResponseStatusCode, result.ResponseHeaders, null, jsonResult.Error);
|
||||||
|
|
||||||
if (IsErrorResponse(jsonResult.Data))
|
if (IsErrorResponse(jsonResult.Data))
|
||||||
return new WebCallResult<T>(result.ResponseStatusCode, null, ParseErrorResponse(jsonResult.Data));
|
return new WebCallResult<T>(result.ResponseStatusCode, result.ResponseHeaders, null, ParseErrorResponse(jsonResult.Data));
|
||||||
|
|
||||||
var desResult = Deserialize<T>(jsonResult.Data, checkResult);
|
var desResult = Deserialize<T>(jsonResult.Data, checkResult);
|
||||||
if (!desResult.Success)
|
if (!desResult.Success)
|
||||||
return new WebCallResult<T>(result.ResponseStatusCode, null, desResult.Error);
|
return new WebCallResult<T>(result.ResponseStatusCode, result.ResponseHeaders, null, desResult.Error);
|
||||||
|
|
||||||
return new WebCallResult<T>(result.ResponseStatusCode, desResult.Data, null);
|
return new WebCallResult<T>(result.ResponseStatusCode, result.ResponseHeaders, desResult.Data, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -277,13 +277,15 @@ namespace CryptoExchange.Net
|
|||||||
}
|
}
|
||||||
|
|
||||||
var statusCode = response.StatusCode;
|
var statusCode = response.StatusCode;
|
||||||
|
var returnHeaders = response.GetResponseHeaders();
|
||||||
response.Close();
|
response.Close();
|
||||||
return new WebCallResult<string>(statusCode, returnedData, null);
|
return new WebCallResult<string>(statusCode, returnHeaders, returnedData, null);
|
||||||
}
|
}
|
||||||
catch (WebException we)
|
catch (WebException we)
|
||||||
{
|
{
|
||||||
var response = (HttpWebResponse)we.Response;
|
var response = (HttpWebResponse)we.Response;
|
||||||
var statusCode = response?.StatusCode;
|
var statusCode = response?.StatusCode;
|
||||||
|
var returnHeaders = response?.Headers.ToIEnumerable();
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
@ -296,7 +298,7 @@ namespace CryptoExchange.Net
|
|||||||
response.Close();
|
response.Close();
|
||||||
|
|
||||||
var jsonResult = ValidateJson(returnedData);
|
var jsonResult = ValidateJson(returnedData);
|
||||||
return !jsonResult.Success ? new WebCallResult<string>(statusCode, null, jsonResult.Error) : new WebCallResult<string>(statusCode, null, ParseErrorResponse(jsonResult.Data));
|
return !jsonResult.Success ? new WebCallResult<string>(statusCode, returnHeaders, null, jsonResult.Error) : new WebCallResult<string>(statusCode, returnHeaders, null, ParseErrorResponse(jsonResult.Data));
|
||||||
}
|
}
|
||||||
catch (Exception)
|
catch (Exception)
|
||||||
{
|
{
|
||||||
@ -307,18 +309,18 @@ namespace CryptoExchange.Net
|
|||||||
{
|
{
|
||||||
infoMessage += $" | {we.Status} - {we.Message}";
|
infoMessage += $" | {we.Status} - {we.Message}";
|
||||||
log.Write(LogVerbosity.Warning, infoMessage);
|
log.Write(LogVerbosity.Warning, infoMessage);
|
||||||
return new WebCallResult<string>(0, null, new WebError(infoMessage));
|
return new WebCallResult<string>(0, null, null, new WebError(infoMessage));
|
||||||
}
|
}
|
||||||
|
|
||||||
infoMessage = $"Status: {response.StatusCode}-{response.StatusDescription}, Message: {we.Message}";
|
infoMessage = $"Status: {response.StatusCode}-{response.StatusDescription}, Message: {we.Message}";
|
||||||
log.Write(LogVerbosity.Warning, infoMessage);
|
log.Write(LogVerbosity.Warning, infoMessage);
|
||||||
response.Close();
|
response.Close();
|
||||||
return new WebCallResult<string>(statusCode, null, new ServerError(infoMessage));
|
return new WebCallResult<string>(statusCode, returnHeaders, null, new ServerError(infoMessage));
|
||||||
}
|
}
|
||||||
catch (Exception e)
|
catch (Exception e)
|
||||||
{
|
{
|
||||||
log.Write(LogVerbosity.Error, $"Unknown error occured: {e.GetType()}, {e.Message}, {e.StackTrace}");
|
log.Write(LogVerbosity.Error, $"Unknown error occured: {e.GetType()}, {e.Message}, {e.StackTrace}");
|
||||||
return new WebCallResult<string>(null, null, new UnknownError(e.Message + ", data: " + returnedData));
|
return new WebCallResult<string>(null, null, null, new UnknownError(e.Message + ", data: " + returnedData));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,4 +1,5 @@
|
|||||||
using System;
|
using System;
|
||||||
|
using System.Collections.Concurrent;
|
||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
using System.Linq;
|
using System.Linq;
|
||||||
using System.Threading;
|
using System.Threading;
|
||||||
@ -20,15 +21,26 @@ namespace CryptoExchange.Net
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
public IWebsocketFactory SocketFactory { get; set; } = new WebsocketFactory();
|
public IWebsocketFactory SocketFactory { get; set; } = new WebsocketFactory();
|
||||||
|
|
||||||
protected internal List<SocketConnection> sockets = new List<SocketConnection>();
|
/// <summary>
|
||||||
protected internal readonly object socketLock = new object();
|
/// List of socket connections currently connecting/connected
|
||||||
|
/// </summary>
|
||||||
|
protected internal ConcurrentDictionary<int, SocketConnection> sockets = new ConcurrentDictionary<int, SocketConnection>();
|
||||||
|
protected internal readonly SemaphoreSlim semaphoreSlim = new SemaphoreSlim(1);
|
||||||
|
|
||||||
|
/// <inheritdoc cref="SocketClientOptions.ReconnectInterval"/>
|
||||||
public TimeSpan ReconnectInterval { get; private set; }
|
public TimeSpan ReconnectInterval { get; private set; }
|
||||||
|
/// <inheritdoc cref="SocketClientOptions.AutoReconnect"/>
|
||||||
public bool AutoReconnect { get; private set; }
|
public bool AutoReconnect { get; private set; }
|
||||||
|
/// <inheritdoc cref="SocketClientOptions.SocketResponseTimeout"/>
|
||||||
public TimeSpan ResponseTimeout { get; private set; }
|
public TimeSpan ResponseTimeout { get; private set; }
|
||||||
public TimeSpan SocketTimeout { get; private set; }
|
/// <inheritdoc cref="SocketClientOptions.SocketNoDataTimeout"/>
|
||||||
public int MaxSocketConnections { get; protected set; } = 999;
|
public TimeSpan SocketNoDataTimeout { get; private set; }
|
||||||
public int SocketCombineTarget { get; protected set; } = 1;
|
/// <summary>
|
||||||
|
/// The max amount of concurrent socket connections
|
||||||
|
/// </summary>
|
||||||
|
public int MaxSocketConnections { get; protected set; } = 9999;
|
||||||
|
/// <inheritdoc cref="SocketClientOptions.SocketSubscriptionsCombineTarget"/>
|
||||||
|
public int SocketCombineTarget { get; protected set; }
|
||||||
|
|
||||||
protected Func<byte[], string> dataInterpreterBytes;
|
protected Func<byte[], string> dataInterpreterBytes;
|
||||||
protected Func<string, string> dataInterpreterString;
|
protected Func<string, string> dataInterpreterString;
|
||||||
@ -52,51 +64,72 @@ namespace CryptoExchange.Net
|
|||||||
AutoReconnect = exchangeOptions.AutoReconnect;
|
AutoReconnect = exchangeOptions.AutoReconnect;
|
||||||
ReconnectInterval = exchangeOptions.ReconnectInterval;
|
ReconnectInterval = exchangeOptions.ReconnectInterval;
|
||||||
ResponseTimeout = exchangeOptions.SocketResponseTimeout;
|
ResponseTimeout = exchangeOptions.SocketResponseTimeout;
|
||||||
SocketTimeout = exchangeOptions.SocketNoDataTimeout;
|
SocketNoDataTimeout = exchangeOptions.SocketNoDataTimeout;
|
||||||
|
SocketCombineTarget = exchangeOptions.SocketSubscriptionsCombineTarget ?? 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Set a function to interpret the data, used when the data is received as bytes instead of a string
|
/// Set a function to interpret the data, used when the data is received as bytes instead of a string
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="handler"></param>
|
/// <param name="byteHandler">Handler for byte data</param>
|
||||||
|
/// <param name="stringHandler">Handler for string data</param>
|
||||||
protected void SetDataInterpreter(Func<byte[], string> byteHandler, Func<string, string> stringHandler)
|
protected void SetDataInterpreter(Func<byte[], string> byteHandler, Func<string, string> stringHandler)
|
||||||
{
|
{
|
||||||
dataInterpreterBytes = byteHandler;
|
dataInterpreterBytes = byteHandler;
|
||||||
dataInterpreterString = stringHandler;
|
dataInterpreterString = stringHandler;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected virtual async Task<CallResult<UpdateSubscription>> Subscribe<T>(object request, string identifier, bool authenticated, Action<T> dataHandler)
|
/// <summary>
|
||||||
|
/// Subscribe
|
||||||
|
/// </summary>
|
||||||
|
/// <typeparam name="T">The expected return data</typeparam>
|
||||||
|
/// <param name="request">The request to send</param>
|
||||||
|
/// <param name="identifier">The identifier to use</param>
|
||||||
|
/// <param name="authenticated">If the subscription should be authenticated</param>
|
||||||
|
/// <param name="dataHandler">The handler of update data</param>
|
||||||
|
/// <returns></returns>
|
||||||
|
protected virtual Task<CallResult<UpdateSubscription>> Subscribe<T>(object request, string identifier, bool authenticated, Action<T> dataHandler)
|
||||||
{
|
{
|
||||||
return await Subscribe(BaseAddress, request, identifier, authenticated, dataHandler).ConfigureAwait(false);
|
return Subscribe(BaseAddress, request, identifier, authenticated, dataHandler);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Subscribe using a specif URL
|
||||||
|
/// </summary>
|
||||||
|
/// <typeparam name="T">The type of the expected data</typeparam>
|
||||||
|
/// <param name="url">The URL to connect to</param>
|
||||||
|
/// <param name="request">The request to send</param>
|
||||||
|
/// <param name="identifier">The identifier to use</param>
|
||||||
|
/// <param name="authenticated">If the subscription should be authenticated</param>
|
||||||
|
/// <param name="dataHandler">The handler of update data</param>
|
||||||
|
/// <returns></returns>
|
||||||
protected virtual async Task<CallResult<UpdateSubscription>> Subscribe<T>(string url, object request, string identifier, bool authenticated, Action<T> dataHandler)
|
protected virtual async Task<CallResult<UpdateSubscription>> Subscribe<T>(string url, object request, string identifier, bool authenticated, Action<T> dataHandler)
|
||||||
{
|
{
|
||||||
SocketConnection socket;
|
SocketConnection socket;
|
||||||
SocketSubscription handler;
|
SocketSubscription handler;
|
||||||
if (SocketCombineTarget == 1)
|
bool released = false;
|
||||||
{;
|
await semaphoreSlim.WaitAsync().ConfigureAwait(false);
|
||||||
lock (socketLock)
|
try
|
||||||
|
{
|
||||||
|
socket = GetWebsocket(url, authenticated);
|
||||||
|
handler = AddHandler(request, identifier, true, socket, dataHandler);
|
||||||
|
if (SocketCombineTarget == 1)
|
||||||
{
|
{
|
||||||
socket = GetWebsocket(url, authenticated);
|
// Can release early when only a single sub per connection
|
||||||
handler = AddHandler(request, identifier, true, socket, dataHandler);
|
semaphoreSlim.Release();
|
||||||
|
released = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
var connectResult = ConnectIfNeeded(socket, authenticated).GetAwaiter().GetResult();
|
var connectResult = await ConnectIfNeeded(socket, authenticated).ConfigureAwait(false);
|
||||||
if (!connectResult.Success)
|
if (!connectResult.Success)
|
||||||
return new CallResult<UpdateSubscription>(null, connectResult.Error);
|
return new CallResult<UpdateSubscription>(null, connectResult.Error);
|
||||||
}
|
}
|
||||||
else
|
finally
|
||||||
{
|
{
|
||||||
lock (socketLock)
|
//When the task is ready, release the semaphore. It is vital to ALWAYS release the semaphore when we are ready, or else we will end up with a Semaphore that is forever locked.
|
||||||
{
|
//This is why it is important to do the Release within a try...finally clause; program execution may crash or take a different path, this way you are guaranteed execution
|
||||||
socket = GetWebsocket(url, authenticated);
|
if(!released)
|
||||||
handler = AddHandler(request, identifier, true, socket, dataHandler);
|
semaphoreSlim.Release();
|
||||||
|
|
||||||
var connectResult = ConnectIfNeeded(socket, authenticated).GetAwaiter().GetResult();
|
|
||||||
if (!connectResult.Success)
|
|
||||||
return new CallResult<UpdateSubscription>(null, connectResult.Error);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -117,6 +150,13 @@ namespace CryptoExchange.Net
|
|||||||
return new CallResult<UpdateSubscription>(new UpdateSubscription(socket, handler), null);
|
return new CallResult<UpdateSubscription>(new UpdateSubscription(socket, handler), null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Sends the subscribe request and waits for a response to that request
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="socket">The connection to send the request on</param>
|
||||||
|
/// <param name="request">The request to send</param>
|
||||||
|
/// <param name="subscription">The subscription the request is for</param>
|
||||||
|
/// <returns></returns>
|
||||||
protected internal virtual async Task<CallResult<bool>> SubscribeAndWait(SocketConnection socket, object request, SocketSubscription subscription)
|
protected internal virtual async Task<CallResult<bool>> SubscribeAndWait(SocketConnection socket, object request, SocketSubscription subscription)
|
||||||
{
|
{
|
||||||
CallResult<object> callResult = null;
|
CallResult<object> callResult = null;
|
||||||
@ -134,12 +174,44 @@ namespace CryptoExchange.Net
|
|||||||
return new CallResult<bool>(callResult?.Success ?? false, callResult == null ? new ServerError("No response on subscription request received"): callResult.Error);
|
return new CallResult<bool>(callResult?.Success ?? false, callResult == null ? new ServerError("No response on subscription request received"): callResult.Error);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected virtual async Task<CallResult<T>> Query<T>(object request, bool authenticated)
|
protected virtual Task<CallResult<T>> Query<T>(object request, bool authenticated)
|
||||||
{
|
{
|
||||||
var socket = GetWebsocket(BaseAddress, authenticated);
|
return Query<T>(BaseAddress, request, authenticated);
|
||||||
var connectResult = await ConnectIfNeeded(socket, authenticated).ConfigureAwait(false);
|
}
|
||||||
if (!connectResult.Success)
|
|
||||||
return new CallResult<T>(default(T), connectResult.Error);
|
/// <summary>
|
||||||
|
/// Query for data
|
||||||
|
/// </summary>
|
||||||
|
/// <typeparam name="T">The expected result type</typeparam>
|
||||||
|
/// <param name="request">The request to send</param>
|
||||||
|
/// <param name="authenticated">Whether the socket should be authenticated</param>
|
||||||
|
/// <returns></returns>
|
||||||
|
protected virtual async Task<CallResult<T>> Query<T>(string url, object request, bool authenticated)
|
||||||
|
{
|
||||||
|
SocketConnection socket;
|
||||||
|
bool released = false;
|
||||||
|
await semaphoreSlim.WaitAsync().ConfigureAwait(false);
|
||||||
|
try
|
||||||
|
{
|
||||||
|
socket = GetWebsocket(url, authenticated);
|
||||||
|
if (SocketCombineTarget == 1)
|
||||||
|
{
|
||||||
|
// Can release early when only a single sub per connection
|
||||||
|
semaphoreSlim.Release();
|
||||||
|
released = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
var connectResult = await ConnectIfNeeded(socket, authenticated).ConfigureAwait(false);
|
||||||
|
if (!connectResult.Success)
|
||||||
|
return new CallResult<T>(default(T), connectResult.Error);
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
//When the task is ready, release the semaphore. It is vital to ALWAYS release the semaphore when we are ready, or else we will end up with a Semaphore that is forever locked.
|
||||||
|
//This is why it is important to do the Release within a try...finally clause; program execution may crash or take a different path, this way you are guaranteed execution
|
||||||
|
if (!released)
|
||||||
|
semaphoreSlim.Release();
|
||||||
|
}
|
||||||
|
|
||||||
if (socket.PausedActivity)
|
if (socket.PausedActivity)
|
||||||
{
|
{
|
||||||
@ -150,6 +222,13 @@ namespace CryptoExchange.Net
|
|||||||
return await QueryAndWait<T>(socket, request).ConfigureAwait(false);
|
return await QueryAndWait<T>(socket, request).ConfigureAwait(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Sends the query request and waits for the result
|
||||||
|
/// </summary>
|
||||||
|
/// <typeparam name="T">The expected result type</typeparam>
|
||||||
|
/// <param name="socket">The connection to send and wait on</param>
|
||||||
|
/// <param name="request">The request to send</param>
|
||||||
|
/// <returns></returns>
|
||||||
protected virtual async Task<CallResult<T>> QueryAndWait<T>(SocketConnection socket, object request)
|
protected virtual async Task<CallResult<T>> QueryAndWait<T>(SocketConnection socket, object request)
|
||||||
{
|
{
|
||||||
CallResult<T> dataResult = new CallResult<T>(default(T), new ServerError("No response on query received"));
|
CallResult<T> dataResult = new CallResult<T>(default(T), new ServerError("No response on query received"));
|
||||||
@ -165,6 +244,12 @@ namespace CryptoExchange.Net
|
|||||||
return dataResult;
|
return dataResult;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Checks if a socket needs to be connected and does so if needed
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="socket">The connection to check</param>
|
||||||
|
/// <param name="authenticated">Whether the socket should authenticated</param>
|
||||||
|
/// <returns></returns>
|
||||||
protected virtual async Task<CallResult<bool>> ConnectIfNeeded(SocketConnection socket, bool authenticated)
|
protected virtual async Task<CallResult<bool>> ConnectIfNeeded(SocketConnection socket, bool authenticated)
|
||||||
{
|
{
|
||||||
if (!socket.Connected)
|
if (!socket.Connected)
|
||||||
@ -191,28 +276,75 @@ namespace CryptoExchange.Net
|
|||||||
|
|
||||||
return new CallResult<bool>(true, null);
|
return new CallResult<bool>(true, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected virtual void AddGenericHandler(string identifier, Action<SocketConnection, JToken> action)
|
/// <summary>
|
||||||
{
|
/// Needs to check if a received message was an answer to a query request (preferable by id) and set the callResult out to whatever the response is
|
||||||
genericHandlers.Add(identifier, action);
|
/// </summary>
|
||||||
List<SocketConnection> socketList;
|
/// <typeparam name="T">The type of response</typeparam>
|
||||||
lock (socketLock)
|
/// <param name="s">The socket connection</param>
|
||||||
socketList = sockets.ToList();
|
/// <param name="request">The request that a response is awaited for</param>
|
||||||
foreach (var wrapper in socketList)
|
/// <param name="data">The message</param>
|
||||||
wrapper.AddHandler(identifier, false, action);
|
/// <param name="callResult">The interpretation (null if message wasn't a response to the request)</param>
|
||||||
}
|
/// <returns>True if the message was a response to the query</returns>
|
||||||
|
|
||||||
protected internal abstract bool HandleQueryResponse<T>(SocketConnection s, object request, JToken data, out CallResult<T> callResult);
|
protected internal abstract bool HandleQueryResponse<T>(SocketConnection s, object request, JToken data, out CallResult<T> callResult);
|
||||||
|
/// <summary>
|
||||||
|
/// Needs to check if a received message was an answer to a subscription request (preferable by id) and set the callResult out to whatever the response is
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="s">The socket connection</param>
|
||||||
|
/// <param name="subscription"></param>
|
||||||
|
/// <param name="request">The request that a response is awaited for</param>
|
||||||
|
/// <param name="data">The message</param>
|
||||||
|
/// <param name="callResult">The interpretation (null if message wasn't a response to the request)</param>
|
||||||
|
/// <returns>True if the message was a response to the subscription request</returns>
|
||||||
protected internal abstract bool HandleSubscriptionResponse(SocketConnection s, SocketSubscription subscription, object request, JToken message, out CallResult<object> callResult);
|
protected internal abstract bool HandleSubscriptionResponse(SocketConnection s, SocketSubscription subscription, object request, JToken message, out CallResult<object> callResult);
|
||||||
|
/// <summary>
|
||||||
|
/// Needs to check if a received message matches a handler. Typically if an update message matches the request
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="message">The received data</param>
|
||||||
|
/// <param name="request">The subscription request</param>
|
||||||
|
/// <returns></returns>
|
||||||
protected internal abstract bool MessageMatchesHandler(JToken message, object request);
|
protected internal abstract bool MessageMatchesHandler(JToken message, object request);
|
||||||
|
/// <summary>
|
||||||
|
/// Needs to check if a received message matches a handler. Typically if an received message matches a ping request or a other information pushed from the the server
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="message">The received data</param>
|
||||||
|
/// <param name="identifier">The string identifier of the handler</param>
|
||||||
|
/// <returns></returns>
|
||||||
protected internal abstract bool MessageMatchesHandler(JToken message, string identifier);
|
protected internal abstract bool MessageMatchesHandler(JToken message, string identifier);
|
||||||
|
/// <summary>
|
||||||
|
/// Needs to authenticate the socket so authenticated queries/subscriptions can be made on this socket connection
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="s"></param>
|
||||||
|
/// <returns></returns>
|
||||||
protected internal abstract Task<CallResult<bool>> AuthenticateSocket(SocketConnection s);
|
protected internal abstract Task<CallResult<bool>> AuthenticateSocket(SocketConnection s);
|
||||||
|
/// <summary>
|
||||||
|
/// Needs to unsubscribe a subscription, typically by sending an unsubscribe request. If multiple subscriptions per socket is not allowed this can just return since the socket will be closed anyway
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="connection">The connection on which to unsubscribe</param>
|
||||||
|
/// <param name="s">The subscription to unsubscribe</param>
|
||||||
|
/// <returns></returns>
|
||||||
protected internal abstract Task<bool> Unsubscribe(SocketConnection connection, SocketSubscription s);
|
protected internal abstract Task<bool> Unsubscribe(SocketConnection connection, SocketSubscription s);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Optional handler to interpolate data before sending it to the handlers
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="message"></param>
|
||||||
|
/// <returns></returns>
|
||||||
protected internal virtual JToken ProcessTokenData(JToken message)
|
protected internal virtual JToken ProcessTokenData(JToken message)
|
||||||
{
|
{
|
||||||
return message;
|
return message;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Add a handler for a subscription
|
||||||
|
/// </summary>
|
||||||
|
/// <typeparam name="T">The type of data the subscription expects</typeparam>
|
||||||
|
/// <param name="request">The request of the subscription</param>
|
||||||
|
/// <param name="identifier">The identifier of the subscription (can be null if request param is used)</param>
|
||||||
|
/// <param name="userSubscription">Whether or not this is a user subscription (counts towards the max amount of handlers on a socket)</param>
|
||||||
|
/// <param name="connection">The socket connection the handler is on</param>
|
||||||
|
/// <param name="dataHandler">The handler of the data received</param>
|
||||||
|
/// <returns></returns>
|
||||||
protected virtual SocketSubscription AddHandler<T>(object request, string identifier, bool userSubscription, SocketConnection connection, Action<T> dataHandler)
|
protected virtual SocketSubscription AddHandler<T>(object request, string identifier, bool userSubscription, SocketConnection connection, Action<T> dataHandler)
|
||||||
{
|
{
|
||||||
Action<SocketConnection, JToken> internalHandler = (socketWrapper, data) =>
|
Action<SocketConnection, JToken> internalHandler = (socketWrapper, data) =>
|
||||||
@ -238,12 +370,31 @@ namespace CryptoExchange.Net
|
|||||||
return connection.AddHandler(identifier, userSubscription, internalHandler);
|
return connection.AddHandler(identifier, userSubscription, internalHandler);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Adds a generic message handler. Used for example to reply to ping requests
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="identifier">The name of the request handler. Needs to be unique</param>
|
||||||
|
/// <param name="action">The action to execute when receiving a message for this handler (checked by <see cref="MessageMatchesHandler(Newtonsoft.Json.Linq.JToken,string)"/>)</param>
|
||||||
|
protected virtual void AddGenericHandler(string identifier, Action<SocketConnection, JToken> action)
|
||||||
|
{
|
||||||
|
genericHandlers.Add(identifier, action);
|
||||||
|
foreach (var connection in sockets.Values)
|
||||||
|
connection.AddHandler(identifier, false, action);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Gets a connection for a new subscription or query. Can be an existing if there are open position or a new one.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="address">The address the socket is for</param>
|
||||||
|
/// <param name="authenticated">Whether the socket should be authenticated</param>
|
||||||
|
/// <returns></returns>
|
||||||
protected virtual SocketConnection GetWebsocket(string address, bool authenticated)
|
protected virtual SocketConnection GetWebsocket(string address, bool authenticated)
|
||||||
{
|
{
|
||||||
SocketConnection result = sockets.Where(s => s.Socket.Url == address && (s.Authenticated == authenticated || !authenticated) && s.Connected).OrderBy(s => s.HandlerCount).FirstOrDefault();
|
var socketResult = sockets.Where(s => s.Value.Socket.Url == address && (s.Value.Authenticated == authenticated || !authenticated) && s.Value.Connected).OrderBy(s => s.Value.HandlerCount).FirstOrDefault();
|
||||||
|
var result = socketResult.Equals(default(KeyValuePair<int, SocketConnection>)) ? null : socketResult.Value;
|
||||||
if (result != null)
|
if (result != null)
|
||||||
{
|
{
|
||||||
if (result.HandlerCount < SocketCombineTarget || (sockets.Count >= MaxSocketConnections && sockets.All(s => s.HandlerCount >= SocketCombineTarget)))
|
if (result.HandlerCount < SocketCombineTarget || (sockets.Count >= MaxSocketConnections && sockets.All(s => s.Value.HandlerCount >= SocketCombineTarget)))
|
||||||
{
|
{
|
||||||
// Use existing socket if it has less than target connections OR it has the least connections and we can't make new
|
// Use existing socket if it has less than target connections OR it has the least connections and we can't make new
|
||||||
return result;
|
return result;
|
||||||
@ -252,7 +403,7 @@ namespace CryptoExchange.Net
|
|||||||
|
|
||||||
// Create new socket
|
// Create new socket
|
||||||
var socket = CreateSocket(address);
|
var socket = CreateSocket(address);
|
||||||
var socketWrapper = new SocketConnection(this, log, socket);
|
var socketWrapper = new SocketConnection(this, socket);
|
||||||
foreach (var kvp in genericHandlers)
|
foreach (var kvp in genericHandlers)
|
||||||
socketWrapper.AddHandler(kvp.Key, false, kvp.Value);
|
socketWrapper.AddHandler(kvp.Key, false, kvp.Value);
|
||||||
return socketWrapper;
|
return socketWrapper;
|
||||||
@ -261,13 +412,13 @@ namespace CryptoExchange.Net
|
|||||||
/// <summary>
|
/// <summary>
|
||||||
/// Connect a socket
|
/// Connect a socket
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="socketConnection">The subscription to connect</param>
|
/// <param name="socketConnection">The socket to connect</param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
protected virtual async Task<CallResult<bool>> ConnectSocket(SocketConnection socketConnection)
|
protected virtual async Task<CallResult<bool>> ConnectSocket(SocketConnection socketConnection)
|
||||||
{
|
{
|
||||||
if (await socketConnection.Socket.Connect().ConfigureAwait(false))
|
if (await socketConnection.Socket.Connect().ConfigureAwait(false))
|
||||||
{
|
{
|
||||||
sockets.Add(socketConnection);
|
sockets.TryAdd(socketConnection.Socket.Id, socketConnection);
|
||||||
return new CallResult<bool>(true, null);
|
return new CallResult<bool>(true, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -288,7 +439,7 @@ namespace CryptoExchange.Net
|
|||||||
if (apiProxy != null)
|
if (apiProxy != null)
|
||||||
socket.SetProxy(apiProxy.Host, apiProxy.Port);
|
socket.SetProxy(apiProxy.Host, apiProxy.Port);
|
||||||
|
|
||||||
socket.Timeout = SocketTimeout;
|
socket.Timeout = SocketNoDataTimeout;
|
||||||
socket.DataInterpreterBytes = dataInterpreterBytes;
|
socket.DataInterpreterBytes = dataInterpreterBytes;
|
||||||
socket.DataInterpreterString = dataInterpreterString;
|
socket.DataInterpreterString = dataInterpreterString;
|
||||||
socket.OnError += e =>
|
socket.OnError += e =>
|
||||||
@ -298,25 +449,26 @@ namespace CryptoExchange.Net
|
|||||||
return socket;
|
return socket;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Periodically sends an object to a socket
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="interval">How often</param>
|
||||||
|
/// <param name="objGetter">Method returning the object to send</param>
|
||||||
public virtual void SendPeriodic(TimeSpan interval, Func<SocketConnection, object> objGetter)
|
public virtual void SendPeriodic(TimeSpan interval, Func<SocketConnection, object> objGetter)
|
||||||
{
|
{
|
||||||
periodicEvent = new AutoResetEvent(false);
|
periodicEvent = new AutoResetEvent(false);
|
||||||
periodicTask = Task.Run(() =>
|
periodicTask = Task.Run(async () =>
|
||||||
{
|
{
|
||||||
while (!disposing)
|
while (!disposing)
|
||||||
{
|
{
|
||||||
periodicEvent.WaitOne(interval);
|
await periodicEvent.WaitOneAsync(interval).ConfigureAwait(false);
|
||||||
if (disposing)
|
if (disposing)
|
||||||
break;
|
break;
|
||||||
|
|
||||||
List<SocketConnection> socketList;
|
if (sockets.Any())
|
||||||
lock (socketLock)
|
|
||||||
socketList = sockets.ToList();
|
|
||||||
|
|
||||||
if (socketList.Any())
|
|
||||||
log.Write(LogVerbosity.Debug, "Sending periodic");
|
log.Write(LogVerbosity.Debug, "Sending periodic");
|
||||||
|
|
||||||
foreach (var socket in socketList)
|
foreach (var socket in sockets.Values)
|
||||||
{
|
{
|
||||||
if (disposing)
|
if (disposing)
|
||||||
break;
|
break;
|
||||||
@ -360,15 +512,14 @@ namespace CryptoExchange.Net
|
|||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public virtual async Task UnsubscribeAll()
|
public virtual async Task UnsubscribeAll()
|
||||||
{
|
{
|
||||||
lock (socketLock)
|
log.Write(LogVerbosity.Debug, $"Closing all {sockets.Count} subscriptions");
|
||||||
log.Write(LogVerbosity.Debug, $"Closing all {sockets.Count} subscriptions");
|
|
||||||
|
|
||||||
await Task.Run(() =>
|
await Task.Run(() =>
|
||||||
{
|
{
|
||||||
var tasks = new List<Task>();
|
var tasks = new List<Task>();
|
||||||
lock (socketLock)
|
|
||||||
{
|
{
|
||||||
foreach (var sub in new List<SocketConnection>(sockets))
|
var socketList = sockets.Values;
|
||||||
|
foreach (var sub in socketList)
|
||||||
tasks.Add(sub.Close());
|
tasks.Add(sub.Close());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -376,6 +527,9 @@ namespace CryptoExchange.Net
|
|||||||
}).ConfigureAwait(false);
|
}).ConfigureAwait(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Dispose the client
|
||||||
|
/// </summary>
|
||||||
public override void Dispose()
|
public override void Dispose()
|
||||||
{
|
{
|
||||||
disposing = true;
|
disposing = true;
|
||||||
|
@ -114,7 +114,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
handle?.Invoke(data);
|
handle?.Invoke(data);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void CheckTimeout()
|
protected async Task CheckTimeout()
|
||||||
{
|
{
|
||||||
while (true)
|
while (true)
|
||||||
{
|
{
|
||||||
@ -131,7 +131,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Thread.Sleep(500);
|
await Task.Delay(500).ConfigureAwait(false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -184,7 +184,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
socket.Send(data);
|
socket.Send(data);
|
||||||
}
|
}
|
||||||
|
|
||||||
public virtual async Task<bool> Connect()
|
public virtual Task<bool> Connect()
|
||||||
{
|
{
|
||||||
if (socket == null)
|
if (socket == null)
|
||||||
{
|
{
|
||||||
@ -211,7 +211,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
socket.DataReceived += (o, s) => HandleByteData(s.Data);
|
socket.DataReceived += (o, s) => HandleByteData(s.Data);
|
||||||
}
|
}
|
||||||
|
|
||||||
return await Task.Run(() =>
|
return Task.Run(() =>
|
||||||
{
|
{
|
||||||
bool connected;
|
bool connected;
|
||||||
lock (socketLock)
|
lock (socketLock)
|
||||||
@ -247,7 +247,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
{
|
{
|
||||||
log?.Write(LogVerbosity.Debug, $"Socket {Id} connected");
|
log?.Write(LogVerbosity.Debug, $"Socket {Id} connected");
|
||||||
if ((timeoutTask == null || timeoutTask.IsCompleted) && Timeout != default(TimeSpan))
|
if ((timeoutTask == null || timeoutTask.IsCompleted) && Timeout != default(TimeSpan))
|
||||||
timeoutTask = Task.Run(() => CheckTimeout());
|
timeoutTask = Task.Run(CheckTimeout);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
log?.Write(LogVerbosity.Debug, $"Socket {Id} connection failed, state: " + socket.State);
|
log?.Write(LogVerbosity.Debug, $"Socket {Id} connection failed, state: " + socket.State);
|
||||||
@ -257,7 +257,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
socket.Close();
|
socket.Close();
|
||||||
|
|
||||||
return connected;
|
return connected;
|
||||||
}).ConfigureAwait(false);
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public virtual void SetProxy(string host, int port)
|
public virtual void SetProxy(string host, int port)
|
||||||
|
@ -32,7 +32,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
public DateTime? DisconnectTime { get; set; }
|
public DateTime? DisconnectTime { get; set; }
|
||||||
public bool PausedActivity { get; set; }
|
public bool PausedActivity { get; set; }
|
||||||
|
|
||||||
private readonly List<SocketSubscription> handlers;
|
internal readonly List<SocketSubscription> handlers;
|
||||||
private readonly object handlersLock = new object();
|
private readonly object handlersLock = new object();
|
||||||
|
|
||||||
private bool lostTriggered;
|
private bool lostTriggered;
|
||||||
@ -41,9 +41,9 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
|
|
||||||
private readonly List<PendingRequest> pendingRequests;
|
private readonly List<PendingRequest> pendingRequests;
|
||||||
|
|
||||||
public SocketConnection(SocketClient client, Log log, IWebsocket socket)
|
public SocketConnection(SocketClient client, IWebsocket socket)
|
||||||
{
|
{
|
||||||
this.log = log;
|
log = client.log;
|
||||||
socketClient = client;
|
socketClient = client;
|
||||||
|
|
||||||
pendingRequests = new List<PendingRequest>();
|
pendingRequests = new List<PendingRequest>();
|
||||||
@ -51,7 +51,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
handlers = new List<SocketSubscription>();
|
handlers = new List<SocketSubscription>();
|
||||||
Socket = socket;
|
Socket = socket;
|
||||||
|
|
||||||
Socket.Timeout = client.SocketTimeout;
|
Socket.Timeout = client.SocketNoDataTimeout;
|
||||||
Socket.OnMessage += ProcessMessage;
|
Socket.OnMessage += ProcessMessage;
|
||||||
Socket.OnClose += () =>
|
Socket.OnClose += () =>
|
||||||
{
|
{
|
||||||
@ -160,15 +160,12 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public virtual async Task SendAndWait<T>(T obj, TimeSpan timeout, Func<JToken, bool> handler)
|
public virtual Task SendAndWait<T>(T obj, TimeSpan timeout, Func<JToken, bool> handler)
|
||||||
{
|
{
|
||||||
var pending = new PendingRequest(handler, timeout);
|
var pending = new PendingRequest(handler, timeout);
|
||||||
pendingRequests.Add(pending);
|
pendingRequests.Add(pending);
|
||||||
await Task.Run(() =>
|
Send(obj);
|
||||||
{
|
return pending.Event.WaitOneAsync(timeout);
|
||||||
Send(obj);
|
|
||||||
pending.Event.WaitOne(timeout);
|
|
||||||
}).ConfigureAwait(false);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -209,7 +206,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
{
|
{
|
||||||
while (ShouldReconnect)
|
while (ShouldReconnect)
|
||||||
{
|
{
|
||||||
Thread.Sleep(socketClient.ReconnectInterval);
|
await Task.Delay(socketClient.ReconnectInterval).ConfigureAwait(false);
|
||||||
if (!ShouldReconnect)
|
if (!ShouldReconnect)
|
||||||
{
|
{
|
||||||
// Should reconnect changed to false while waiting to reconnect
|
// Should reconnect changed to false while waiting to reconnect
|
||||||
@ -282,11 +279,9 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
{
|
{
|
||||||
Connected = false;
|
Connected = false;
|
||||||
ShouldReconnect = false;
|
ShouldReconnect = false;
|
||||||
lock (socketClient.socketLock)
|
if (socketClient.sockets.ContainsKey(Socket.Id))
|
||||||
{
|
socketClient.sockets.TryRemove(Socket.Id, out _);
|
||||||
if (socketClient.sockets.Contains(this))
|
|
||||||
socketClient.sockets.Remove(this);
|
|
||||||
}
|
|
||||||
|
|
||||||
await Socket.Close().ConfigureAwait(false);
|
await Socket.Close().ConfigureAwait(false);
|
||||||
Socket.Dispose();
|
Socket.Dispose();
|
||||||
|
Loading…
x
Reference in New Issue
Block a user