diff --git a/CryptoExchange.Net.UnitTests/SocketClientTests.cs b/CryptoExchange.Net.UnitTests/SocketClientTests.cs index 89bf014..51c157a 100644 --- a/CryptoExchange.Net.UnitTests/SocketClientTests.cs +++ b/CryptoExchange.Net.UnitTests/SocketClientTests.cs @@ -58,9 +58,7 @@ namespace CryptoExchange.Net.UnitTests options.ReconnectInterval = TimeSpan.Zero; }); var socket = client.CreateSocket(); - socket.ShouldReconnect = true; socket.CanConnect = true; - socket.DisconnectTime = DateTime.UtcNow; var sub = new SocketConnection(new TraceLogger(), client.SubClient, socket, null); var rstEvent = new ManualResetEvent(false); Dictionary result = null; @@ -75,7 +73,7 @@ namespace CryptoExchange.Net.UnitTests sub.AddSubscription(subObj); // act - socket.InvokeMessage("{\"property\": \"123\", \"topic\": \"topic\"}"); + socket.InvokeMessage("{\"property\": \"123\", \"action\": \"update\", \"topic\": \"topic\"}"); rstEvent.WaitOne(1000); // assert @@ -93,9 +91,7 @@ namespace CryptoExchange.Net.UnitTests options.SubOptions.OutputOriginalData = enabled; }); var socket = client.CreateSocket(); - socket.ShouldReconnect = true; socket.CanConnect = true; - socket.DisconnectTime = DateTime.UtcNow; var sub = new SocketConnection(new TraceLogger(), client.SubClient, socket, null); var rstEvent = new ManualResetEvent(false); string original = null; @@ -107,7 +103,7 @@ namespace CryptoExchange.Net.UnitTests rstEvent.Set(); }); sub.AddSubscription(subObj); - var msgToSend = JsonConvert.SerializeObject(new { topic = "topic", property = 123 }); + var msgToSend = JsonConvert.SerializeObject(new { topic = "topic", action = "update", property = 123 }); // act socket.InvokeMessage(msgToSend); @@ -202,7 +198,7 @@ namespace CryptoExchange.Net.UnitTests // act var sub = client.SubClient.SubscribeToSomethingAsync(channel, onUpdate => {}, ct: default); - socket.InvokeMessage(JsonConvert.SerializeObject(new { channel, status = "error" })); + socket.InvokeMessage(JsonConvert.SerializeObject(new { channel, action = "subscribe", status = "error" })); await sub; // assert @@ -225,7 +221,7 @@ namespace CryptoExchange.Net.UnitTests // act var sub = client.SubClient.SubscribeToSomethingAsync(channel, onUpdate => {}, ct: default); - socket.InvokeMessage(JsonConvert.SerializeObject(new { channel, status = "confirmed" })); + socket.InvokeMessage(JsonConvert.SerializeObject(new { channel, action = "subscribe", status = "confirmed" })); await sub; // assert diff --git a/CryptoExchange.Net.UnitTests/TestImplementations/Sockets/TestChannelQuery.cs b/CryptoExchange.Net.UnitTests/TestImplementations/Sockets/TestChannelQuery.cs index d0d62e3..12ff600 100644 --- a/CryptoExchange.Net.UnitTests/TestImplementations/Sockets/TestChannelQuery.cs +++ b/CryptoExchange.Net.UnitTests/TestImplementations/Sockets/TestChannelQuery.cs @@ -10,6 +10,10 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations.Sockets { internal class SubResponse { + + [JsonProperty("action")] + public string Action { get; set; } = null!; + [JsonProperty("channel")] public string Channel { get; set; } = null!; @@ -19,6 +23,9 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations.Sockets internal class UnsubResponse { + [JsonProperty("action")] + public string Action { get; set; } = null!; + [JsonProperty("status")] public string Status { get; set; } = null!; } @@ -29,7 +36,7 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations.Sockets public TestChannelQuery(string channel, string request, bool authenticated, int weight = 1) : base(request, authenticated, weight) { - ListenerIdentifiers = new HashSet { channel }; + ListenerIdentifiers = new HashSet { request + "-" + channel }; } public override CallResult HandleMessage(SocketConnection connection, DataEvent message) diff --git a/CryptoExchange.Net.UnitTests/TestImplementations/Sockets/TestSubscription.cs b/CryptoExchange.Net.UnitTests/TestImplementations/Sockets/TestSubscription.cs index d5a634e..f8f1afd 100644 --- a/CryptoExchange.Net.UnitTests/TestImplementations/Sockets/TestSubscription.cs +++ b/CryptoExchange.Net.UnitTests/TestImplementations/Sockets/TestSubscription.cs @@ -15,7 +15,7 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations.Sockets { private readonly Action> _handler; - public override HashSet ListenerIdentifiers { get; set; } = new HashSet { "topic" }; + public override HashSet ListenerIdentifiers { get; set; } = new HashSet { "update-topic" }; public TestSubscription(ILogger logger, Action> handler) : base(logger, false) { diff --git a/CryptoExchange.Net.UnitTests/TestImplementations/TestSocket.cs b/CryptoExchange.Net.UnitTests/TestImplementations/TestSocket.cs index 3862cab..c3408de 100644 --- a/CryptoExchange.Net.UnitTests/TestImplementations/TestSocket.cs +++ b/CryptoExchange.Net.UnitTests/TestImplementations/TestSocket.cs @@ -1,131 +1,132 @@ -using System; -using System.IO; -using System.Net.WebSockets; -using System.Security.Authentication; -using System.Text; -using System.Threading.Tasks; -using CryptoExchange.Net.Interfaces; -using CryptoExchange.Net.Objects; +//using System; +//using System.IO; +//using System.Net.WebSockets; +//using System.Security.Authentication; +//using System.Text; +//using System.Threading.Tasks; +//using CryptoExchange.Net.Interfaces; +//using CryptoExchange.Net.Objects; -namespace CryptoExchange.Net.UnitTests.TestImplementations -{ - public class TestSocket: IWebsocket - { - public bool CanConnect { get; set; } - public bool Connected { get; set; } +//namespace CryptoExchange.Net.UnitTests.TestImplementations +//{ +// public class TestSocket: IWebsocket +// { +// public bool CanConnect { get; set; } +// public bool Connected { get; set; } - public event Func OnClose; -#pragma warning disable 0067 - public event Func OnReconnected; - public event Func OnReconnecting; - public event Func OnRequestRateLimited; -#pragma warning restore 0067 - public event Func OnRequestSent; - public event Func, Task> OnStreamMessage; - public event Func OnError; - public event Func OnOpen; - public Func> GetReconnectionUrl { get; set; } +// public event Func OnClose; +//#pragma warning disable 0067 +// public event Func OnReconnected; +// public event Func OnReconnecting; +// public event Func OnRequestRateLimited; +//#pragma warning restore 0067 +// public event Func OnRequestSent; +// public event Func, Task> OnStreamMessage; +// public event Func OnError; +// public event Func OnOpen; +// public Func> GetReconnectionUrl { get; set; } - public int Id { get; } - public bool ShouldReconnect { get; set; } - public TimeSpan Timeout { get; set; } - public Func DataInterpreterString { get; set; } - public Func DataInterpreterBytes { get; set; } - public DateTime? DisconnectTime { get; set; } - public string Url { get; } - public bool IsClosed => !Connected; - public bool IsOpen => Connected; - public bool PingConnection { get; set; } - public TimeSpan PingInterval { get; set; } - public SslProtocols SSLProtocols { get; set; } - public Encoding Encoding { get; set; } +// public int Id { get; } +// public bool ShouldReconnect { get; set; } +// public TimeSpan Timeout { get; set; } +// public Func DataInterpreterString { get; set; } +// public Func DataInterpreterBytes { get; set; } +// public DateTime? DisconnectTime { get; set; } +// public string Url { get; } +// public bool IsClosed => !Connected; +// public bool IsOpen => Connected; +// public bool PingConnection { get; set; } +// public TimeSpan PingInterval { get; set; } +// public SslProtocols SSLProtocols { get; set; } +// public Encoding Encoding { get; set; } - public int ConnectCalls { get; private set; } - public bool Reconnecting { get; set; } - public string Origin { get; set; } - public int? RatelimitPerSecond { get; set; } +// public int ConnectCalls { get; private set; } +// public bool Reconnecting { get; set; } +// public string Origin { get; set; } +// public int? RatelimitPerSecond { get; set; } - public double IncomingKbps => throw new NotImplementedException(); +// public double IncomingKbps => throw new NotImplementedException(); - public Uri Uri => new Uri(""); +// public Uri Uri => new Uri(""); - public TimeSpan KeepAliveInterval { get; set; } +// public TimeSpan KeepAliveInterval { get; set; } - public static int lastId = 0; - public static object lastIdLock = new object(); +// public static int lastId = 0; +// public static object lastIdLock = new object(); - public TestSocket() - { - lock (lastIdLock) - { - Id = lastId + 1; - lastId++; - } - } +// public TestSocket() +// { +// lock (lastIdLock) +// { +// Id = lastId + 1; +// lastId++; +// } +// } - public Task ConnectAsync() - { - Connected = CanConnect; - ConnectCalls++; - if (CanConnect) - InvokeOpen(); - return Task.FromResult(CanConnect ? new CallResult(null) : new CallResult(new CantConnectError())); - } +// public Task ConnectAsync() +// { +// Connected = CanConnect; +// ConnectCalls++; +// if (CanConnect) +// InvokeOpen(); +// return Task.FromResult(CanConnect ? new CallResult(null) : new CallResult(new CantConnectError())); +// } - public void Send(int requestId, string data, int weight) - { - if(!Connected) - throw new Exception("Socket not connected"); - OnRequestSent?.Invoke(requestId); - } +// public bool Send(int requestId, string data, int weight) +// { +// if(!Connected) +// throw new Exception("Socket not connected"); +// OnRequestSent?.Invoke(requestId); +// return true; +// } - public void Reset() - { - } +// public void Reset() +// { +// } - public Task CloseAsync() - { - Connected = false; - DisconnectTime = DateTime.UtcNow; - OnClose?.Invoke(); - return Task.FromResult(0); - } +// public Task CloseAsync() +// { +// Connected = false; +// DisconnectTime = DateTime.UtcNow; +// OnClose?.Invoke(); +// return Task.FromResult(0); +// } - public void SetProxy(string host, int port) - { - throw new NotImplementedException(); - } - public void Dispose() - { - } +// public void SetProxy(string host, int port) +// { +// throw new NotImplementedException(); +// } +// public void Dispose() +// { +// } - public void InvokeClose() - { - Connected = false; - DisconnectTime = DateTime.UtcNow; - Reconnecting = true; - OnClose?.Invoke(); - } +// public void InvokeClose() +// { +// Connected = false; +// DisconnectTime = DateTime.UtcNow; +// Reconnecting = true; +// OnClose?.Invoke(); +// } - public void InvokeOpen() - { - OnOpen?.Invoke(); - } +// public void InvokeOpen() +// { +// OnOpen?.Invoke(); +// } - public void InvokeMessage(string data) - { - OnStreamMessage?.Invoke(WebSocketMessageType.Text, new ReadOnlyMemory(Encoding.UTF8.GetBytes(data))).Wait(); - } +// public void InvokeMessage(string data) +// { +// OnStreamMessage?.Invoke(WebSocketMessageType.Text, new ReadOnlyMemory(Encoding.UTF8.GetBytes(data))).Wait(); +// } - public void SetProxy(ApiProxy proxy) - { - throw new NotImplementedException(); - } +// public void SetProxy(ApiProxy proxy) +// { +// throw new NotImplementedException(); +// } - public void InvokeError(Exception error) - { - OnError?.Invoke(error); - } - public Task ReconnectAsync() => Task.CompletedTask; - } -} +// public void InvokeError(Exception error) +// { +// OnError?.Invoke(error); +// } +// public Task ReconnectAsync() => Task.CompletedTask; +// } +//} diff --git a/CryptoExchange.Net.UnitTests/TestImplementations/TestSocketClient.cs b/CryptoExchange.Net.UnitTests/TestImplementations/TestSocketClient.cs index 1df4635..661f6d7 100644 --- a/CryptoExchange.Net.UnitTests/TestImplementations/TestSocketClient.cs +++ b/CryptoExchange.Net.UnitTests/TestImplementations/TestSocketClient.cs @@ -13,11 +13,11 @@ using CryptoExchange.Net.Sockets; using CryptoExchange.Net.UnitTests.TestImplementations.Sockets; using Microsoft.Extensions.Logging; using Moq; -using Newtonsoft.Json.Linq; +using CryptoExchange.Net.Testing.Implementations; namespace CryptoExchange.Net.UnitTests.TestImplementations { - public class TestSocketClient: BaseSocketClient + internal class TestSocketClient: BaseSocketClient { public TestSubSocketClient SubClient { get; } @@ -41,12 +41,12 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations SubClient = AddApiClient(new TestSubSocketClient(options, options.SubOptions)); SubClient.SocketFactory = new Mock().Object; - Mock.Get(SubClient.SocketFactory).Setup(f => f.CreateWebsocket(It.IsAny(), It.IsAny())).Returns(new TestSocket()); + Mock.Get(SubClient.SocketFactory).Setup(f => f.CreateWebsocket(It.IsAny(), It.IsAny())).Returns(new TestSocket("https://test.com")); } public TestSocket CreateSocket() { - Mock.Get(SubClient.SocketFactory).Setup(f => f.CreateWebsocket(It.IsAny(), It.IsAny())).Returns(new TestSocket()); + Mock.Get(SubClient.SocketFactory).Setup(f => f.CreateWebsocket(It.IsAny(), It.IsAny())).Returns(new TestSocket("https://test.com")); return (TestSocket)SubClient.CreateSocketInternal("https://localhost:123/"); } @@ -75,6 +75,7 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations public class TestSubSocketClient : SocketApiClient { private MessagePath _channelPath = MessagePath.Get().Property("channel"); + private MessagePath _actionPath = MessagePath.Get().Property("action"); private MessagePath _topicPath = MessagePath.Get().Property("topic"); public Subscription TestSubscription { get; private set; } = null; @@ -110,7 +111,7 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations var id = message.GetValue(_channelPath); id ??= message.GetValue(_topicPath); - return id; + return message.GetValue(_actionPath) + "-" + id; } public Task> SubscribeToSomethingAsync(string channel, Action> onUpdate, CancellationToken ct) diff --git a/CryptoExchange.Net/Converters/SystemTextJson/ArrayConverter.cs b/CryptoExchange.Net/Converters/SystemTextJson/ArrayConverter.cs index ec6f0ed..fb48322 100644 --- a/CryptoExchange.Net/Converters/SystemTextJson/ArrayConverter.cs +++ b/CryptoExchange.Net/Converters/SystemTextJson/ArrayConverter.cs @@ -32,7 +32,7 @@ namespace CryptoExchange.Net.Converters.SystemTextJson public ArrayPropertyAttribute ArrayProperty { get; set; } = null!; public Type? JsonConverterType { get; set; } public bool DefaultDeserialization { get; set; } - public Type TargetType { get; set; } + public Type TargetType { get; set; } = null!; } private class ArrayConverterInner : JsonConverter diff --git a/CryptoExchange.Net/Interfaces/IWebsocket.cs b/CryptoExchange.Net/Interfaces/IWebsocket.cs index 09bffae..f38f917 100644 --- a/CryptoExchange.Net/Interfaces/IWebsocket.cs +++ b/CryptoExchange.Net/Interfaces/IWebsocket.cs @@ -78,7 +78,7 @@ namespace CryptoExchange.Net.Interfaces /// /// /// - void Send(int id, string data, int weight); + bool Send(int id, string data, int weight); /// /// Reconnect the socket /// diff --git a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs index 6de3abe..c8f90b5 100644 --- a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs +++ b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs @@ -322,15 +322,16 @@ namespace CryptoExchange.Net.Sockets } /// - public virtual void Send(int id, string data, int weight) + public virtual bool Send(int id, string data, int weight) { - if (_ctsSource.IsCancellationRequested) - return; + if (_ctsSource.IsCancellationRequested || _processState != ProcessState.Processing) + return false; var bytes = Parameters.Encoding.GetBytes(data); _logger.SocketAddingBytesToSendBuffer(Id, id, bytes); _sendBuffer.Enqueue(new SendItem { Id = id, Weight = weight, Bytes = bytes }); _sendEvent.Set(); + return true; } /// @@ -389,9 +390,7 @@ namespace CryptoExchange.Net.Sockets if (_disposed) return; - //_closeState = CloseState.Closing; _ctsSource.Cancel(); - _sendEvent.Set(); if (_socket.State == WebSocketState.Open) { @@ -436,6 +435,7 @@ namespace CryptoExchange.Net.Sockets _disposed = true; _socket.Dispose(); _ctsSource?.Dispose(); + _sendEvent.Dispose(); _logger.SocketDisposed(Id); } @@ -450,10 +450,15 @@ namespace CryptoExchange.Net.Sockets { while (true) { - if (_ctsSource.IsCancellationRequested) + try + { + if (!_sendBuffer.Any()) + await _sendEvent.WaitAsync(ct: _ctsSource.Token).ConfigureAwait(false); + } + catch (OperationCanceledException) + { break; - - await _sendEvent.WaitAsync().ConfigureAwait(false); + } if (_ctsSource.IsCancellationRequested) break; @@ -507,7 +512,8 @@ namespace CryptoExchange.Net.Sockets // Make sure we at least let the owner know there was an error _logger.SocketSendLoopStoppedWithException(Id, e.Message, e); await (OnError?.Invoke(e) ?? Task.CompletedTask).ConfigureAwait(false); - throw; + if (_closeTask?.IsCompleted != false) + _closeTask = CloseInternalAsync(); } finally { @@ -583,7 +589,6 @@ namespace CryptoExchange.Net.Sockets // Received a complete message and it's not multi part _logger.SocketReceivedSingleMessage(Id, receiveResult.Count); await ProcessData(receiveResult.MessageType, new ReadOnlyMemory(buffer.Array, buffer.Offset, receiveResult.Count)).ConfigureAwait(false); - _logger.LogTrace($"[Sckt {Id}] process completed"); } else { @@ -693,7 +698,6 @@ namespace CryptoExchange.Net.Sockets // any exception here will stop the timeout checking, but do so silently unless the socket get's stopped. // Make sure we at least let the owner know there was an error await (OnError?.Invoke(e) ?? Task.CompletedTask).ConfigureAwait(false); - throw; } } @@ -718,10 +722,14 @@ namespace CryptoExchange.Net.Sockets var checkTime = DateTime.UtcNow; if (checkTime - _lastReceivedMessagesUpdate > TimeSpan.FromSeconds(1)) { - foreach (var msg in _receivedMessages.ToList()) // To list here because we're removing from the list + for (var i = 0; i < _receivedMessages.Count; i++) { + var msg = _receivedMessages[i]; if (checkTime - msg.Timestamp > TimeSpan.FromSeconds(3)) + { _receivedMessages.Remove(msg); + i--; + } } _lastReceivedMessagesUpdate = checkTime; diff --git a/CryptoExchange.Net/Sockets/SocketConnection.cs b/CryptoExchange.Net/Sockets/SocketConnection.cs index 88d7352..7b6014f 100644 --- a/CryptoExchange.Net/Sockets/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/SocketConnection.cs @@ -802,7 +802,9 @@ namespace CryptoExchange.Net.Sockets _logger.SendingData(SocketId, requestId, data); try { - _socket.Send(requestId, data, weight); + if (!_socket.Send(requestId, data, weight)) + return new CallResult(new WebError("Failed to send message, connection not open")); + return new CallResult(null); } catch(Exception ex) diff --git a/CryptoExchange.Net/Testing/Implementations/TestSocket.cs b/CryptoExchange.Net/Testing/Implementations/TestSocket.cs index 8a09184..f2806e6 100644 --- a/CryptoExchange.Net/Testing/Implementations/TestSocket.cs +++ b/CryptoExchange.Net/Testing/Implementations/TestSocket.cs @@ -33,9 +33,17 @@ namespace CryptoExchange.Net.Testing.Implementations public Uri Uri { get; set; } public Func>? GetReconnectionUrl { get; set; } + public static int lastId = 0; + public static object lastIdLock = new object(); + public TestSocket(string address) { Uri = new Uri(address); + lock (lastIdLock) + { + Id = lastId + 1; + lastId++; + } } public Task ConnectAsync() @@ -44,13 +52,14 @@ namespace CryptoExchange.Net.Testing.Implementations return Task.FromResult(CanConnect ? new CallResult(null) : new CallResult(new CantConnectError())); } - public void Send(int requestId, string data, int weight) + public bool Send(int requestId, string data, int weight) { if (!Connected) throw new Exception("Socket not connected"); OnRequestSent?.Invoke(requestId); OnMessageSend?.Invoke(data); + return true; } public Task CloseAsync()