diff --git a/CryptoExchange.Net.UnitTests/SocketClientTests.cs b/CryptoExchange.Net.UnitTests/SocketClientTests.cs index 1f032d6..42c7ae1 100644 --- a/CryptoExchange.Net.UnitTests/SocketClientTests.cs +++ b/CryptoExchange.Net.UnitTests/SocketClientTests.cs @@ -17,8 +17,8 @@ namespace CryptoExchange.Net.UnitTests [TestCase] public void SettingOptions_Should_ResultInOptionsSet() { - // arrange - // act + //arrange + //act var client = new TestSocketClient(new SocketClientOptions() { BaseAddress = "http://test.address.com", @@ -26,7 +26,7 @@ namespace CryptoExchange.Net.UnitTests }); - // assert + //assert Assert.IsTrue(client.BaseAddress == "http://test.address.com"); Assert.IsTrue(client.ReconnectInterval.TotalSeconds == 6); } @@ -35,229 +35,127 @@ namespace CryptoExchange.Net.UnitTests [TestCase(false)] public void ConnectSocket_Should_ReturnConnectionResult(bool canConnect) { - // arrange + //arrange var client = new TestSocketClient(); var socket = client.CreateSocket(); socket.CanConnect = canConnect; - // act - var connectResult = client.ConnectSocketSub(new SocketConnection(client, new Log(), socket)); + //act + var connectResult = client.ConnectSocketSub(new SocketConnection(client, socket)); - // assert + //assert Assert.IsTrue(connectResult.Success == canConnect); } - //[TestCase] - //public void SocketMessages_Should_BeProcessedInDataHandlers() - //{ - // // arrange - // var client = new TestSocketClient(new SocketClientOptions() { ReconnectInterval = TimeSpan.Zero, LogVerbosity = LogVerbosity.Debug }); - // var socket = client.CreateSocket(); - // socket.ShouldReconnect = true; - // socket.CanConnect = true; - // socket.DisconnectTime = DateTime.UtcNow; - // var sub = new SocketConnection(socket); - // var rstEvent = new ManualResetEvent(false); - // JToken result = null; - // sub.MessageHandlers.Add("TestHandler", (subs, data) => - // { - // result = data; - // rstEvent.Set(); - // return true; + [TestCase] + public void SocketMessages_Should_BeProcessedInDataHandlers() + { + // arrange + var client = new TestSocketClient(new SocketClientOptions() { ReconnectInterval = TimeSpan.Zero, LogVerbosity = LogVerbosity.Debug }); + var socket = client.CreateSocket(); + socket.ShouldReconnect = true; + socket.CanConnect = true; + socket.DisconnectTime = DateTime.UtcNow; + var sub = new SocketConnection(client, socket); + var rstEvent = new ManualResetEvent(false); + JToken result = null; + sub.AddHandler("TestHandler", true, (connection, data) => + { + result = data; + rstEvent.Set(); + }); + client.ConnectSocketSub(sub); - // }); - // client.ConnectSocketSub(sub); + // act + socket.InvokeMessage("{\"property\": 123}"); + rstEvent.WaitOne(1000); - // // act - // socket.InvokeMessage("{\"property\": 123}"); - // rstEvent.WaitOne(1000); - - // // assert - // Assert.IsTrue((int)result["property"] == 123); - //} - - //[TestCase] - //public void SocketMessages_Should_NotBeProcessedInSubsequentHandlersIfHandlerReturnsTrue() - //{ - // // arrange - // var client = new TestSocketClient(new SocketClientOptions() { ReconnectInterval = TimeSpan.Zero, LogVerbosity = LogVerbosity.Debug }); - // var socket = client.CreateSocket(); - // socket.ShouldReconnect = true; - // socket.CanConnect = true; - // socket.DisconnectTime = DateTime.UtcNow; - // var sub = new SocketConnection(socket); - // var rstEvent1 = new ManualResetEvent(false); - // var rstEvent2 = new ManualResetEvent(false); - // JToken result1 = null; - // JToken result2 = null; - // sub.MessageHandlers.Add("TestHandler", (subs, data) => - // { - // result1 = data; - // rstEvent1.Set(); - // return true; - // }); - // sub.MessageHandlers.Add("TestHandlerNotHit", (subs, data) => - // { - // result2 = data; - // rstEvent2.Set(); - // return true; - // }); - // client.ConnectSocketSub(sub); - - // // act - // socket.InvokeMessage("{\"property\": 123}"); - // rstEvent1.WaitOne(100); - // rstEvent2.WaitOne(100); - - // // assert - // Assert.IsTrue((int)result1["property"] == 123); - // Assert.IsTrue(result2 == null); - //} - - //[TestCase] - //public void SocketMessages_Should_BeProcessedInSubsequentHandlersIfHandlerReturnsFalse() - //{ - // // arrange - // var client = new TestSocketClient(new SocketClientOptions() { ReconnectInterval = TimeSpan.Zero, LogVerbosity = LogVerbosity.Debug }); - // var socket = client.CreateSocket(); - // socket.ShouldReconnect = true; - // socket.CanConnect = true; - // socket.DisconnectTime = DateTime.UtcNow; - // var sub = new SocketConnection(socket); - // var rstEvent = new ManualResetEvent(false); - // JToken result = null; - // sub.MessageHandlers.Add("TestHandlerNotProcessing", (subs, data) => - // { - // return false; - // }); - // sub.MessageHandlers.Add("TestHandler", (subs, data) => - // { - // result = data; - // rstEvent.Set(); - // return true; - // }); - // client.ConnectSocketSub(sub); - - // // act - // socket.InvokeMessage("{\"property\": 123}"); - // rstEvent.WaitOne(100); - - // // assert - // Assert.IsTrue((int)result["property"] == 123); - //} - - - //[TestCase] - //public void DisconnectedSocket_Should_Reconnect() - //{ - // // arrange - // bool reconnected = false; - // var client = new TestSocketClient(new SocketClientOptions(){ReconnectInterval = TimeSpan.Zero ,LogVerbosity = LogVerbosity.Debug}); - // var socket = client.CreateSocket(); - // socket.ShouldReconnect = true; - // socket.CanConnect = true; - // socket.DisconnectTime = DateTime.UtcNow; - // var sub = new SocketConnection(socket); - // client.ConnectSocketSub(sub); - // var rstEvent = new ManualResetEvent(false); - // client.OnReconnect += () => - // { - // reconnected = true; - // rstEvent.Set(); - // return true; - // }; - - // // act - // socket.InvokeClose(); - // rstEvent.WaitOne(1000); - - // // assert - // Assert.IsTrue(reconnected); - //} - - //[TestCase()] - //public void UnsubscribingStream_Should_CloseTheSocket() - //{ - // // arrange - // var client = new TestSocketClient(new SocketClientOptions() { ReconnectInterval = TimeSpan.Zero, LogVerbosity = LogVerbosity.Debug }); - // var socket = client.CreateSocket(); - // socket.CanConnect = true; - // var sub = new SocketConnection(socket); - // client.ConnectSocketSub(sub); - // var ups = new UpdateSubscription(sub); - - // // act - // client.Unsubscribe(ups).Wait(); - - // // assert - // Assert.IsTrue(socket.Connected == false); - //} - - //[TestCase()] - //public void UnsubscribingAll_Should_CloseAllSockets() - //{ - // // arrange - // var client = new TestSocketClient(new SocketClientOptions() { ReconnectInterval = TimeSpan.Zero, LogVerbosity = LogVerbosity.Debug }); - // var socket1 = client.CreateSocket(); - // var socket2 = client.CreateSocket(); - // socket1.CanConnect = true; - // socket2.CanConnect = true; - // var sub1 = new SocketConnection(socket1); - // var sub2 = new SocketConnection(socket2); - // client.ConnectSocketSub(sub1); - // client.ConnectSocketSub(sub2); - - // // act - // client.UnsubscribeAll().Wait(); - - // // assert - // Assert.IsTrue(socket1.Connected == false); - // Assert.IsTrue(socket2.Connected == false); - //} - - //[TestCase()] - //public void FailingToConnectSocket_Should_ReturnError() - //{ - // // arrange - // var client = new TestSocketClient(new SocketClientOptions() { ReconnectInterval = TimeSpan.Zero, LogVerbosity = LogVerbosity.Debug }); - // var socket = client.CreateSocket(); - // socket.CanConnect = false; - // var sub = new SocketConnection(socket); - - // // act - // var connectResult = client.ConnectSocketSub(sub); - - // // assert - // Assert.IsFalse(connectResult.Success); - //} + // assert + Assert.IsTrue((int)result["property"] == 123); + } - //[Test] - //public void WhenResubscribeFails_Socket_ShouldReconnect() - //{ - // // arrange - // int reconnected = 0; - // var client = new TestSocketClient(new SocketClientOptions() { ReconnectInterval = TimeSpan.FromMilliseconds(1), LogVerbosity = LogVerbosity.Debug }); - // var socket = client.CreateSocket(); - // socket.ShouldReconnect = true; - // socket.CanConnect = true; - // socket.DisconnectTime = DateTime.UtcNow; - // var sub = new SocketConnection(socket); - // client.ConnectSocketSub(sub); - // var rstEvent = new ManualResetEvent(false); - // client.OnReconnect += () => - // { - // reconnected++; - // rstEvent.Set(); - // return reconnected == 2; - // }; + [TestCase] + public void DisconnectedSocket_Should_Reconnect() + { + // arrange + bool reconnected = false; + var client = new TestSocketClient(new SocketClientOptions() { ReconnectInterval = TimeSpan.Zero, LogVerbosity = LogVerbosity.Debug }); + var socket = client.CreateSocket(); + socket.ShouldReconnect = true; + socket.CanConnect = true; + socket.DisconnectTime = DateTime.UtcNow; + var sub = new SocketConnection(client, socket); + sub.ShouldReconnect = true; + client.ConnectSocketSub(sub); + var rstEvent = new ManualResetEvent(false); + sub.ConnectionRestored += (a) => + { + reconnected = true; + rstEvent.Set(); + }; - // // act - // socket.InvokeClose(); - // rstEvent.WaitOne(1000); - // Thread.Sleep(100); + // act + socket.InvokeClose(); + rstEvent.WaitOne(1000); - // // assert - // Assert.IsTrue(reconnected == 2); - //} + // assert + Assert.IsTrue(reconnected); + } + + [TestCase()] + public void UnsubscribingStream_Should_CloseTheSocket() + { + // arrange + var client = new TestSocketClient(new SocketClientOptions() { ReconnectInterval = TimeSpan.Zero, LogVerbosity = LogVerbosity.Debug }); + var socket = client.CreateSocket(); + socket.CanConnect = true; + var sub = new SocketConnection(client, socket); + client.ConnectSocketSub(sub); + var ups = new UpdateSubscription(sub, new SocketSubscription("Test", null, true, (d, a) => {})); + + // act + client.Unsubscribe(ups).Wait(); + + // assert + Assert.IsTrue(socket.Connected == false); + } + + [TestCase()] + public void UnsubscribingAll_Should_CloseAllSockets() + { + // arrange + var client = new TestSocketClient(new SocketClientOptions() { ReconnectInterval = TimeSpan.Zero, LogVerbosity = LogVerbosity.Debug }); + var socket1 = client.CreateSocket(); + var socket2 = client.CreateSocket(); + socket1.CanConnect = true; + socket2.CanConnect = true; + var sub1 = new SocketConnection(client, socket1); + var sub2 = new SocketConnection(client, socket2); + client.ConnectSocketSub(sub1); + client.ConnectSocketSub(sub2); + + // act + client.UnsubscribeAll().Wait(); + + // assert + Assert.IsTrue(socket1.Connected == false); + Assert.IsTrue(socket2.Connected == false); + } + + [TestCase()] + public void FailingToConnectSocket_Should_ReturnError() + { + // arrange + var client = new TestSocketClient(new SocketClientOptions() { ReconnectInterval = TimeSpan.Zero, LogVerbosity = LogVerbosity.Debug }); + var socket = client.CreateSocket(); + socket.CanConnect = false; + var sub = new SocketConnection(client, socket); + + // act + var connectResult = client.ConnectSocketSub(sub); + + // assert + Assert.IsFalse(connectResult.Success); + } } } diff --git a/CryptoExchange.Net.UnitTests/TestImplementations/TestSocket.cs b/CryptoExchange.Net.UnitTests/TestImplementations/TestSocket.cs index 4d40400..5430a71 100644 --- a/CryptoExchange.Net.UnitTests/TestImplementations/TestSocket.cs +++ b/CryptoExchange.Net.UnitTests/TestImplementations/TestSocket.cs @@ -52,6 +52,8 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations { Connected = CanConnect; ConnectCalls++; + if (CanConnect) + InvokeOpen(); return Task.FromResult(CanConnect); } diff --git a/CryptoExchange.Net.UnitTests/TestImplementations/TestSocketClient.cs b/CryptoExchange.Net.UnitTests/TestImplementations/TestSocketClient.cs index a42f965..d01dc94 100644 --- a/CryptoExchange.Net.UnitTests/TestImplementations/TestSocketClient.cs +++ b/CryptoExchange.Net.UnitTests/TestImplementations/TestSocketClient.cs @@ -11,8 +11,6 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations { public class TestSocketClient: SocketClient { - public Func OnReconnect { get; set; } - public TestSocketClient() : this(new SocketClientOptions()) { } @@ -51,7 +49,7 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations protected override bool MessageMatchesHandler(JToken message, string identifier) { - throw new NotImplementedException(); + return true; } protected override Task> AuthenticateSocket(SocketConnection s) diff --git a/CryptoExchange.Net/BaseClient.cs b/CryptoExchange.Net/BaseClient.cs index d2c03f2..3bbd751 100644 --- a/CryptoExchange.Net/BaseClient.cs +++ b/CryptoExchange.Net/BaseClient.cs @@ -15,7 +15,7 @@ namespace CryptoExchange.Net public abstract class BaseClient: IDisposable { public string BaseAddress { get; private set; } - protected Log log; + protected internal Log log; protected ApiProxy apiProxy; protected AuthenticationProvider authProvider; diff --git a/CryptoExchange.Net/ExtensionMethods.cs b/CryptoExchange.Net/ExtensionMethods.cs index e27bf35..f1ae8a8 100644 --- a/CryptoExchange.Net/ExtensionMethods.cs +++ b/CryptoExchange.Net/ExtensionMethods.cs @@ -4,6 +4,8 @@ using System.Linq; using System.Net; using System.Runtime.InteropServices; using System.Security; +using System.Threading; +using System.Threading.Tasks; namespace CryptoExchange.Net { @@ -83,5 +85,47 @@ namespace CryptoExchange.Net return result; } } + + public static IEnumerable> ToIEnumerable(this WebHeaderCollection headers) + { + if (headers == null) + return null; + + return Enumerable + .Range(0, headers.Count) + .SelectMany(i => headers.GetValues(i) + .Select(v => Tuple.Create(headers.GetKey(i), v)) + ); + } + + public static async Task WaitOneAsync(this WaitHandle handle, int millisecondsTimeout, CancellationToken cancellationToken) + { + RegisteredWaitHandle registeredHandle = null; + CancellationTokenRegistration tokenRegistration = default(CancellationTokenRegistration); + try + { + var tcs = new TaskCompletionSource(); + registeredHandle = ThreadPool.RegisterWaitForSingleObject( + handle, + (state, timedOut) => ((TaskCompletionSource)state).TrySetResult(!timedOut), + tcs, + millisecondsTimeout, + true); + tokenRegistration = cancellationToken.Register( + state => ((TaskCompletionSource)state).TrySetCanceled(), + tcs); + return await tcs.Task; + } + finally + { + registeredHandle?.Unregister(null); + tokenRegistration.Dispose(); + } + } + + public static Task WaitOneAsync(this WaitHandle handle, TimeSpan timeout) + { + return handle.WaitOneAsync((int)timeout.TotalMilliseconds, CancellationToken.None); + } } } diff --git a/CryptoExchange.Net/Interfaces/IRateLimiter.cs b/CryptoExchange.Net/Interfaces/IRateLimiter.cs index f5ba2dd..5220f46 100644 --- a/CryptoExchange.Net/Interfaces/IRateLimiter.cs +++ b/CryptoExchange.Net/Interfaces/IRateLimiter.cs @@ -4,6 +4,6 @@ namespace CryptoExchange.Net.Interfaces { public interface IRateLimiter { - CallResult LimitRequest(string url, RateLimitingBehaviour limitBehaviour); + CallResult LimitRequest(RestClient client, string url, RateLimitingBehaviour limitBehaviour); } } diff --git a/CryptoExchange.Net/Interfaces/IResponse.cs b/CryptoExchange.Net/Interfaces/IResponse.cs index 1b5d9f1..58430ed 100644 --- a/CryptoExchange.Net/Interfaces/IResponse.cs +++ b/CryptoExchange.Net/Interfaces/IResponse.cs @@ -1,4 +1,6 @@ -using System.IO; +using System; +using System.Collections.Generic; +using System.IO; using System.Net; namespace CryptoExchange.Net.Interfaces @@ -7,6 +9,7 @@ namespace CryptoExchange.Net.Interfaces { HttpStatusCode StatusCode { get; } Stream GetResponseStream(); + IEnumerable> GetResponseHeaders(); void Close(); } } diff --git a/CryptoExchange.Net/Objects/CallResult.cs b/CryptoExchange.Net/Objects/CallResult.cs index 78eee85..fbfb5cb 100644 --- a/CryptoExchange.Net/Objects/CallResult.cs +++ b/CryptoExchange.Net/Objects/CallResult.cs @@ -1,4 +1,6 @@ -using System.Net; +using System; +using System.Collections.Generic; +using System.Net; namespace CryptoExchange.Net.Objects { @@ -31,9 +33,21 @@ namespace CryptoExchange.Net.Objects /// public HttpStatusCode? ResponseStatusCode { get; set; } - public WebCallResult(HttpStatusCode? code, T data, Error error): base(data, error) + public IEnumerable> ResponseHeaders { get; set; } + + public WebCallResult(HttpStatusCode? code, IEnumerable> responseHeaders, T data, Error error): base(data, error) { + ResponseHeaders = responseHeaders; ResponseStatusCode = code; } + + public static WebCallResult CreateErrorResult(Error error) + { + return new WebCallResult(null, null, default(T), error); + } + public static WebCallResult CreateErrorResult(HttpStatusCode? code, IEnumerable> responseHeaders, Error error) + { + return new WebCallResult(code, responseHeaders, default(T), error); + } } } diff --git a/CryptoExchange.Net/Objects/ExchangeOptions.cs b/CryptoExchange.Net/Objects/ExchangeOptions.cs index 2e04091..01fc26f 100644 --- a/CryptoExchange.Net/Objects/ExchangeOptions.cs +++ b/CryptoExchange.Net/Objects/ExchangeOptions.cs @@ -97,6 +97,12 @@ namespace CryptoExchange.Net.Objects /// public TimeSpan SocketNoDataTimeout { get; set; } + /// + /// The amount of subscriptions that should be made on a single socket connection. Not all exchanges support multiple subscriptions on a single socket. + /// Setting this to a higher number increases subscription speed, but having more subscriptions on a single connection will also increase the amount of traffic on that single connection. + /// + public int? SocketSubscriptionsCombineTarget { get; set; } + public T Copy() where T : SocketClientOptions, new() { var copy = new T @@ -107,7 +113,8 @@ namespace CryptoExchange.Net.Objects LogWriters = LogWriters, AutoReconnect = AutoReconnect, ReconnectInterval = ReconnectInterval, - SocketResponseTimeout = SocketResponseTimeout + SocketResponseTimeout = SocketResponseTimeout, + SocketSubscriptionsCombineTarget = SocketSubscriptionsCombineTarget }; if (ApiCredentials != null) diff --git a/CryptoExchange.Net/RateLimiter/RateLimiterPerEndpoint.cs b/CryptoExchange.Net/RateLimiter/RateLimiterPerEndpoint.cs index 0a16ce8..e21ea69 100644 --- a/CryptoExchange.Net/RateLimiter/RateLimiterPerEndpoint.cs +++ b/CryptoExchange.Net/RateLimiter/RateLimiterPerEndpoint.cs @@ -29,7 +29,7 @@ namespace CryptoExchange.Net.RateLimiter this.perTimePeriod = perTimePeriod; } - public CallResult LimitRequest(string url, RateLimitingBehaviour limitingBehaviour) + public CallResult LimitRequest(RestClient client, string url, RateLimitingBehaviour limitingBehaviour) { int waitTime; RateLimitObject rlo; diff --git a/CryptoExchange.Net/RateLimiter/RateLimiterTotal.cs b/CryptoExchange.Net/RateLimiter/RateLimiterTotal.cs index f2bda63..f28beb8 100644 --- a/CryptoExchange.Net/RateLimiter/RateLimiterTotal.cs +++ b/CryptoExchange.Net/RateLimiter/RateLimiterTotal.cs @@ -30,7 +30,7 @@ namespace CryptoExchange.Net.RateLimiter this.perTimePeriod = perTimePeriod; } - public CallResult LimitRequest(string url, RateLimitingBehaviour limitBehaviour) + public CallResult LimitRequest(RestClient client, string url, RateLimitingBehaviour limitBehaviour) { var sw = Stopwatch.StartNew(); lock (requestLock) diff --git a/CryptoExchange.Net/Requests/Response.cs b/CryptoExchange.Net/Requests/Response.cs index f8bc037..515414f 100644 --- a/CryptoExchange.Net/Requests/Response.cs +++ b/CryptoExchange.Net/Requests/Response.cs @@ -1,4 +1,7 @@ -using System.IO; +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; using System.Net; using CryptoExchange.Net.Interfaces; @@ -20,6 +23,11 @@ namespace CryptoExchange.Net.Requests return response.GetResponseStream(); } + public IEnumerable> GetResponseHeaders() + { + return response.Headers.ToIEnumerable(); + } + public void Close() { response.Close(); diff --git a/CryptoExchange.Net/RestClient.cs b/CryptoExchange.Net/RestClient.cs index 47c6f6f..d21d105 100644 --- a/CryptoExchange.Net/RestClient.cs +++ b/CryptoExchange.Net/RestClient.cs @@ -121,7 +121,7 @@ namespace CryptoExchange.Net if (signed && authProvider == null) { log.Write(LogVerbosity.Warning, $"Request {uri.AbsolutePath} failed because no ApiCredentials were provided"); - return new WebCallResult(null, null, new NoApiCredentialsError()); + return new WebCallResult(null, null, null, new NoApiCredentialsError()); } var request = ConstructRequest(uri, method, parameters, signed); @@ -134,11 +134,11 @@ namespace CryptoExchange.Net foreach (var limiter in RateLimiters) { - var limitResult = limiter.LimitRequest(uri.AbsolutePath, RateLimitBehaviour); + var limitResult = limiter.LimitRequest(this, uri.AbsolutePath, RateLimitBehaviour); if (!limitResult.Success) { log.Write(LogVerbosity.Debug, $"Request {uri.AbsolutePath} failed because of rate limit"); - return new WebCallResult(null, null, limitResult.Error); + return new WebCallResult(null, null, null, limitResult.Error); } if (limitResult.Data > 0) @@ -152,20 +152,20 @@ namespace CryptoExchange.Net log.Write(LogVerbosity.Debug, $"Sending {method}{(signed ? " signed" : "")} request to {request.Uri} {paramString ?? ""}"); var result = await ExecuteRequest(request).ConfigureAwait(false); if(!result.Success) - return new WebCallResult(result.ResponseStatusCode, null, result.Error); + return new WebCallResult(result.ResponseStatusCode, result.ResponseHeaders, null, result.Error); var jsonResult = ValidateJson(result.Data); if(!jsonResult.Success) - return new WebCallResult(result.ResponseStatusCode, null, jsonResult.Error); + return new WebCallResult(result.ResponseStatusCode, result.ResponseHeaders, null, jsonResult.Error); if (IsErrorResponse(jsonResult.Data)) - return new WebCallResult(result.ResponseStatusCode, null, ParseErrorResponse(jsonResult.Data)); + return new WebCallResult(result.ResponseStatusCode, result.ResponseHeaders, null, ParseErrorResponse(jsonResult.Data)); var desResult = Deserialize(jsonResult.Data, checkResult); if (!desResult.Success) - return new WebCallResult(result.ResponseStatusCode, null, desResult.Error); + return new WebCallResult(result.ResponseStatusCode, result.ResponseHeaders, null, desResult.Error); - return new WebCallResult(result.ResponseStatusCode, desResult.Data, null); + return new WebCallResult(result.ResponseStatusCode, result.ResponseHeaders, desResult.Data, null); } /// @@ -277,13 +277,15 @@ namespace CryptoExchange.Net } var statusCode = response.StatusCode; + var returnHeaders = response.GetResponseHeaders(); response.Close(); - return new WebCallResult(statusCode, returnedData, null); + return new WebCallResult(statusCode, returnHeaders, returnedData, null); } catch (WebException we) { var response = (HttpWebResponse)we.Response; var statusCode = response?.StatusCode; + var returnHeaders = response?.Headers.ToIEnumerable(); try { @@ -296,7 +298,7 @@ namespace CryptoExchange.Net response.Close(); var jsonResult = ValidateJson(returnedData); - return !jsonResult.Success ? new WebCallResult(statusCode, null, jsonResult.Error) : new WebCallResult(statusCode, null, ParseErrorResponse(jsonResult.Data)); + return !jsonResult.Success ? new WebCallResult(statusCode, returnHeaders, null, jsonResult.Error) : new WebCallResult(statusCode, returnHeaders, null, ParseErrorResponse(jsonResult.Data)); } catch (Exception) { @@ -307,18 +309,18 @@ namespace CryptoExchange.Net { infoMessage += $" | {we.Status} - {we.Message}"; log.Write(LogVerbosity.Warning, infoMessage); - return new WebCallResult(0, null, new WebError(infoMessage)); + return new WebCallResult(0, null, null, new WebError(infoMessage)); } infoMessage = $"Status: {response.StatusCode}-{response.StatusDescription}, Message: {we.Message}"; log.Write(LogVerbosity.Warning, infoMessage); response.Close(); - return new WebCallResult(statusCode, null, new ServerError(infoMessage)); + return new WebCallResult(statusCode, returnHeaders, null, new ServerError(infoMessage)); } catch (Exception e) { log.Write(LogVerbosity.Error, $"Unknown error occured: {e.GetType()}, {e.Message}, {e.StackTrace}"); - return new WebCallResult(null, null, new UnknownError(e.Message + ", data: " + returnedData)); + return new WebCallResult(null, null, null, new UnknownError(e.Message + ", data: " + returnedData)); } } diff --git a/CryptoExchange.Net/SocketClient.cs b/CryptoExchange.Net/SocketClient.cs index 960d805..c06ffd9 100644 --- a/CryptoExchange.Net/SocketClient.cs +++ b/CryptoExchange.Net/SocketClient.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Threading; @@ -20,15 +21,26 @@ namespace CryptoExchange.Net /// public IWebsocketFactory SocketFactory { get; set; } = new WebsocketFactory(); - protected internal List sockets = new List(); - protected internal readonly object socketLock = new object(); + /// + /// List of socket connections currently connecting/connected + /// + protected internal ConcurrentDictionary sockets = new ConcurrentDictionary(); + protected internal readonly SemaphoreSlim semaphoreSlim = new SemaphoreSlim(1); + /// public TimeSpan ReconnectInterval { get; private set; } + /// public bool AutoReconnect { get; private set; } + /// public TimeSpan ResponseTimeout { get; private set; } - public TimeSpan SocketTimeout { get; private set; } - public int MaxSocketConnections { get; protected set; } = 999; - public int SocketCombineTarget { get; protected set; } = 1; + /// + public TimeSpan SocketNoDataTimeout { get; private set; } + /// + /// The max amount of concurrent socket connections + /// + public int MaxSocketConnections { get; protected set; } = 9999; + /// + public int SocketCombineTarget { get; protected set; } protected Func dataInterpreterBytes; protected Func dataInterpreterString; @@ -52,51 +64,72 @@ namespace CryptoExchange.Net AutoReconnect = exchangeOptions.AutoReconnect; ReconnectInterval = exchangeOptions.ReconnectInterval; ResponseTimeout = exchangeOptions.SocketResponseTimeout; - SocketTimeout = exchangeOptions.SocketNoDataTimeout; + SocketNoDataTimeout = exchangeOptions.SocketNoDataTimeout; + SocketCombineTarget = exchangeOptions.SocketSubscriptionsCombineTarget ?? 1; } /// /// Set a function to interpret the data, used when the data is received as bytes instead of a string /// - /// + /// Handler for byte data + /// Handler for string data protected void SetDataInterpreter(Func byteHandler, Func stringHandler) { dataInterpreterBytes = byteHandler; dataInterpreterString = stringHandler; } - protected virtual async Task> Subscribe(object request, string identifier, bool authenticated, Action dataHandler) + /// + /// Subscribe + /// + /// The expected return data + /// The request to send + /// The identifier to use + /// If the subscription should be authenticated + /// The handler of update data + /// + protected virtual Task> Subscribe(object request, string identifier, bool authenticated, Action dataHandler) { - return await Subscribe(BaseAddress, request, identifier, authenticated, dataHandler).ConfigureAwait(false); + return Subscribe(BaseAddress, request, identifier, authenticated, dataHandler); } + /// + /// Subscribe using a specif URL + /// + /// The type of the expected data + /// The URL to connect to + /// The request to send + /// The identifier to use + /// If the subscription should be authenticated + /// The handler of update data + /// protected virtual async Task> Subscribe(string url, object request, string identifier, bool authenticated, Action dataHandler) { SocketConnection socket; SocketSubscription handler; - if (SocketCombineTarget == 1) - {; - lock (socketLock) + bool released = false; + await semaphoreSlim.WaitAsync().ConfigureAwait(false); + try + { + socket = GetWebsocket(url, authenticated); + handler = AddHandler(request, identifier, true, socket, dataHandler); + if (SocketCombineTarget == 1) { - socket = GetWebsocket(url, authenticated); - handler = AddHandler(request, identifier, true, socket, dataHandler); + // Can release early when only a single sub per connection + semaphoreSlim.Release(); + released = true; } - var connectResult = ConnectIfNeeded(socket, authenticated).GetAwaiter().GetResult(); + var connectResult = await ConnectIfNeeded(socket, authenticated).ConfigureAwait(false); if (!connectResult.Success) return new CallResult(null, connectResult.Error); } - else + finally { - lock (socketLock) - { - socket = GetWebsocket(url, authenticated); - handler = AddHandler(request, identifier, true, socket, dataHandler); - - var connectResult = ConnectIfNeeded(socket, authenticated).GetAwaiter().GetResult(); - if (!connectResult.Success) - return new CallResult(null, connectResult.Error); - } + //When the task is ready, release the semaphore. It is vital to ALWAYS release the semaphore when we are ready, or else we will end up with a Semaphore that is forever locked. + //This is why it is important to do the Release within a try...finally clause; program execution may crash or take a different path, this way you are guaranteed execution + if(!released) + semaphoreSlim.Release(); } @@ -117,6 +150,13 @@ namespace CryptoExchange.Net return new CallResult(new UpdateSubscription(socket, handler), null); } + /// + /// Sends the subscribe request and waits for a response to that request + /// + /// The connection to send the request on + /// The request to send + /// The subscription the request is for + /// protected internal virtual async Task> SubscribeAndWait(SocketConnection socket, object request, SocketSubscription subscription) { CallResult callResult = null; @@ -134,12 +174,44 @@ namespace CryptoExchange.Net return new CallResult(callResult?.Success ?? false, callResult == null ? new ServerError("No response on subscription request received"): callResult.Error); } - protected virtual async Task> Query(object request, bool authenticated) + protected virtual Task> Query(object request, bool authenticated) { - var socket = GetWebsocket(BaseAddress, authenticated); - var connectResult = await ConnectIfNeeded(socket, authenticated).ConfigureAwait(false); - if (!connectResult.Success) - return new CallResult(default(T), connectResult.Error); + return Query(BaseAddress, request, authenticated); + } + + /// + /// Query for data + /// + /// The expected result type + /// The request to send + /// Whether the socket should be authenticated + /// + protected virtual async Task> Query(string url, object request, bool authenticated) + { + SocketConnection socket; + bool released = false; + await semaphoreSlim.WaitAsync().ConfigureAwait(false); + try + { + socket = GetWebsocket(url, authenticated); + if (SocketCombineTarget == 1) + { + // Can release early when only a single sub per connection + semaphoreSlim.Release(); + released = true; + } + + var connectResult = await ConnectIfNeeded(socket, authenticated).ConfigureAwait(false); + if (!connectResult.Success) + return new CallResult(default(T), connectResult.Error); + } + finally + { + //When the task is ready, release the semaphore. It is vital to ALWAYS release the semaphore when we are ready, or else we will end up with a Semaphore that is forever locked. + //This is why it is important to do the Release within a try...finally clause; program execution may crash or take a different path, this way you are guaranteed execution + if (!released) + semaphoreSlim.Release(); + } if (socket.PausedActivity) { @@ -150,6 +222,13 @@ namespace CryptoExchange.Net return await QueryAndWait(socket, request).ConfigureAwait(false); } + /// + /// Sends the query request and waits for the result + /// + /// The expected result type + /// The connection to send and wait on + /// The request to send + /// protected virtual async Task> QueryAndWait(SocketConnection socket, object request) { CallResult dataResult = new CallResult(default(T), new ServerError("No response on query received")); @@ -165,6 +244,12 @@ namespace CryptoExchange.Net return dataResult; } + /// + /// Checks if a socket needs to be connected and does so if needed + /// + /// The connection to check + /// Whether the socket should authenticated + /// protected virtual async Task> ConnectIfNeeded(SocketConnection socket, bool authenticated) { if (!socket.Connected) @@ -191,28 +276,75 @@ namespace CryptoExchange.Net return new CallResult(true, null); } - - protected virtual void AddGenericHandler(string identifier, Action action) - { - genericHandlers.Add(identifier, action); - List socketList; - lock (socketLock) - socketList = sockets.ToList(); - foreach (var wrapper in socketList) - wrapper.AddHandler(identifier, false, action); - } - + + /// + /// Needs to check if a received message was an answer to a query request (preferable by id) and set the callResult out to whatever the response is + /// + /// The type of response + /// The socket connection + /// The request that a response is awaited for + /// The message + /// The interpretation (null if message wasn't a response to the request) + /// True if the message was a response to the query protected internal abstract bool HandleQueryResponse(SocketConnection s, object request, JToken data, out CallResult callResult); + /// + /// Needs to check if a received message was an answer to a subscription request (preferable by id) and set the callResult out to whatever the response is + /// + /// The socket connection + /// + /// The request that a response is awaited for + /// The message + /// The interpretation (null if message wasn't a response to the request) + /// True if the message was a response to the subscription request protected internal abstract bool HandleSubscriptionResponse(SocketConnection s, SocketSubscription subscription, object request, JToken message, out CallResult callResult); + /// + /// Needs to check if a received message matches a handler. Typically if an update message matches the request + /// + /// The received data + /// The subscription request + /// protected internal abstract bool MessageMatchesHandler(JToken message, object request); + /// + /// Needs to check if a received message matches a handler. Typically if an received message matches a ping request or a other information pushed from the the server + /// + /// The received data + /// The string identifier of the handler + /// protected internal abstract bool MessageMatchesHandler(JToken message, string identifier); + /// + /// Needs to authenticate the socket so authenticated queries/subscriptions can be made on this socket connection + /// + /// + /// protected internal abstract Task> AuthenticateSocket(SocketConnection s); + /// + /// Needs to unsubscribe a subscription, typically by sending an unsubscribe request. If multiple subscriptions per socket is not allowed this can just return since the socket will be closed anyway + /// + /// The connection on which to unsubscribe + /// The subscription to unsubscribe + /// protected internal abstract Task Unsubscribe(SocketConnection connection, SocketSubscription s); + + /// + /// Optional handler to interpolate data before sending it to the handlers + /// + /// + /// protected internal virtual JToken ProcessTokenData(JToken message) { return message; } + /// + /// Add a handler for a subscription + /// + /// The type of data the subscription expects + /// The request of the subscription + /// The identifier of the subscription (can be null if request param is used) + /// Whether or not this is a user subscription (counts towards the max amount of handlers on a socket) + /// The socket connection the handler is on + /// The handler of the data received + /// protected virtual SocketSubscription AddHandler(object request, string identifier, bool userSubscription, SocketConnection connection, Action dataHandler) { Action internalHandler = (socketWrapper, data) => @@ -238,12 +370,31 @@ namespace CryptoExchange.Net return connection.AddHandler(identifier, userSubscription, internalHandler); } + /// + /// Adds a generic message handler. Used for example to reply to ping requests + /// + /// The name of the request handler. Needs to be unique + /// The action to execute when receiving a message for this handler (checked by ) + protected virtual void AddGenericHandler(string identifier, Action action) + { + genericHandlers.Add(identifier, action); + foreach (var connection in sockets.Values) + connection.AddHandler(identifier, false, action); + } + + /// + /// Gets a connection for a new subscription or query. Can be an existing if there are open position or a new one. + /// + /// The address the socket is for + /// Whether the socket should be authenticated + /// protected virtual SocketConnection GetWebsocket(string address, bool authenticated) { - SocketConnection result = sockets.Where(s => s.Socket.Url == address && (s.Authenticated == authenticated || !authenticated) && s.Connected).OrderBy(s => s.HandlerCount).FirstOrDefault(); + var socketResult = sockets.Where(s => s.Value.Socket.Url == address && (s.Value.Authenticated == authenticated || !authenticated) && s.Value.Connected).OrderBy(s => s.Value.HandlerCount).FirstOrDefault(); + var result = socketResult.Equals(default(KeyValuePair)) ? null : socketResult.Value; if (result != null) { - if (result.HandlerCount < SocketCombineTarget || (sockets.Count >= MaxSocketConnections && sockets.All(s => s.HandlerCount >= SocketCombineTarget))) + if (result.HandlerCount < SocketCombineTarget || (sockets.Count >= MaxSocketConnections && sockets.All(s => s.Value.HandlerCount >= SocketCombineTarget))) { // Use existing socket if it has less than target connections OR it has the least connections and we can't make new return result; @@ -252,7 +403,7 @@ namespace CryptoExchange.Net // Create new socket var socket = CreateSocket(address); - var socketWrapper = new SocketConnection(this, log, socket); + var socketWrapper = new SocketConnection(this, socket); foreach (var kvp in genericHandlers) socketWrapper.AddHandler(kvp.Key, false, kvp.Value); return socketWrapper; @@ -261,13 +412,13 @@ namespace CryptoExchange.Net /// /// Connect a socket /// - /// The subscription to connect + /// The socket to connect /// protected virtual async Task> ConnectSocket(SocketConnection socketConnection) { if (await socketConnection.Socket.Connect().ConfigureAwait(false)) { - sockets.Add(socketConnection); + sockets.TryAdd(socketConnection.Socket.Id, socketConnection); return new CallResult(true, null); } @@ -288,7 +439,7 @@ namespace CryptoExchange.Net if (apiProxy != null) socket.SetProxy(apiProxy.Host, apiProxy.Port); - socket.Timeout = SocketTimeout; + socket.Timeout = SocketNoDataTimeout; socket.DataInterpreterBytes = dataInterpreterBytes; socket.DataInterpreterString = dataInterpreterString; socket.OnError += e => @@ -298,25 +449,26 @@ namespace CryptoExchange.Net return socket; } + /// + /// Periodically sends an object to a socket + /// + /// How often + /// Method returning the object to send public virtual void SendPeriodic(TimeSpan interval, Func objGetter) { periodicEvent = new AutoResetEvent(false); - periodicTask = Task.Run(() => + periodicTask = Task.Run(async () => { while (!disposing) { - periodicEvent.WaitOne(interval); + await periodicEvent.WaitOneAsync(interval).ConfigureAwait(false); if (disposing) break; - - List socketList; - lock (socketLock) - socketList = sockets.ToList(); - - if (socketList.Any()) + + if (sockets.Any()) log.Write(LogVerbosity.Debug, "Sending periodic"); - foreach (var socket in socketList) + foreach (var socket in sockets.Values) { if (disposing) break; @@ -360,15 +512,14 @@ namespace CryptoExchange.Net /// public virtual async Task UnsubscribeAll() { - lock (socketLock) - log.Write(LogVerbosity.Debug, $"Closing all {sockets.Count} subscriptions"); + log.Write(LogVerbosity.Debug, $"Closing all {sockets.Count} subscriptions"); await Task.Run(() => { var tasks = new List(); - lock (socketLock) { - foreach (var sub in new List(sockets)) + var socketList = sockets.Values; + foreach (var sub in socketList) tasks.Add(sub.Close()); } @@ -376,6 +527,9 @@ namespace CryptoExchange.Net }).ConfigureAwait(false); } + /// + /// Dispose the client + /// public override void Dispose() { disposing = true; diff --git a/CryptoExchange.Net/Sockets/BaseSocket.cs b/CryptoExchange.Net/Sockets/BaseSocket.cs index dfda424..dd5dd4d 100644 --- a/CryptoExchange.Net/Sockets/BaseSocket.cs +++ b/CryptoExchange.Net/Sockets/BaseSocket.cs @@ -114,7 +114,7 @@ namespace CryptoExchange.Net.Sockets handle?.Invoke(data); } - protected void CheckTimeout() + protected async Task CheckTimeout() { while (true) { @@ -131,7 +131,7 @@ namespace CryptoExchange.Net.Sockets } } - Thread.Sleep(500); + await Task.Delay(500).ConfigureAwait(false); } } @@ -184,7 +184,7 @@ namespace CryptoExchange.Net.Sockets socket.Send(data); } - public virtual async Task Connect() + public virtual Task Connect() { if (socket == null) { @@ -211,7 +211,7 @@ namespace CryptoExchange.Net.Sockets socket.DataReceived += (o, s) => HandleByteData(s.Data); } - return await Task.Run(() => + return Task.Run(() => { bool connected; lock (socketLock) @@ -247,7 +247,7 @@ namespace CryptoExchange.Net.Sockets { log?.Write(LogVerbosity.Debug, $"Socket {Id} connected"); if ((timeoutTask == null || timeoutTask.IsCompleted) && Timeout != default(TimeSpan)) - timeoutTask = Task.Run(() => CheckTimeout()); + timeoutTask = Task.Run(CheckTimeout); } else log?.Write(LogVerbosity.Debug, $"Socket {Id} connection failed, state: " + socket.State); @@ -257,7 +257,7 @@ namespace CryptoExchange.Net.Sockets socket.Close(); return connected; - }).ConfigureAwait(false); + }); } public virtual void SetProxy(string host, int port) diff --git a/CryptoExchange.Net/Sockets/SocketConnection.cs b/CryptoExchange.Net/Sockets/SocketConnection.cs index 37861d5..233d219 100644 --- a/CryptoExchange.Net/Sockets/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/SocketConnection.cs @@ -32,7 +32,7 @@ namespace CryptoExchange.Net.Sockets public DateTime? DisconnectTime { get; set; } public bool PausedActivity { get; set; } - private readonly List handlers; + internal readonly List handlers; private readonly object handlersLock = new object(); private bool lostTriggered; @@ -41,9 +41,9 @@ namespace CryptoExchange.Net.Sockets private readonly List pendingRequests; - public SocketConnection(SocketClient client, Log log, IWebsocket socket) + public SocketConnection(SocketClient client, IWebsocket socket) { - this.log = log; + log = client.log; socketClient = client; pendingRequests = new List(); @@ -51,7 +51,7 @@ namespace CryptoExchange.Net.Sockets handlers = new List(); Socket = socket; - Socket.Timeout = client.SocketTimeout; + Socket.Timeout = client.SocketNoDataTimeout; Socket.OnMessage += ProcessMessage; Socket.OnClose += () => { @@ -160,15 +160,12 @@ namespace CryptoExchange.Net.Sockets } } - public virtual async Task SendAndWait(T obj, TimeSpan timeout, Func handler) + public virtual Task SendAndWait(T obj, TimeSpan timeout, Func handler) { var pending = new PendingRequest(handler, timeout); pendingRequests.Add(pending); - await Task.Run(() => - { - Send(obj); - pending.Event.WaitOne(timeout); - }).ConfigureAwait(false); + Send(obj); + return pending.Event.WaitOneAsync(timeout); } /// @@ -209,7 +206,7 @@ namespace CryptoExchange.Net.Sockets { while (ShouldReconnect) { - Thread.Sleep(socketClient.ReconnectInterval); + await Task.Delay(socketClient.ReconnectInterval).ConfigureAwait(false); if (!ShouldReconnect) { // Should reconnect changed to false while waiting to reconnect @@ -282,11 +279,9 @@ namespace CryptoExchange.Net.Sockets { Connected = false; ShouldReconnect = false; - lock (socketClient.socketLock) - { - if (socketClient.sockets.Contains(this)) - socketClient.sockets.Remove(this); - } + if (socketClient.sockets.ContainsKey(Socket.Id)) + socketClient.sockets.TryRemove(Socket.Id, out _); + await Socket.Close().ConfigureAwait(false); Socket.Dispose();