1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-06-07 16:06:15 +00:00

Added some checks socket connections

This commit is contained in:
JKorf 2024-07-10 14:12:07 +02:00
parent 28da93af9d
commit f287ec1fa4
10 changed files with 167 additions and 143 deletions

View File

@ -58,9 +58,7 @@ namespace CryptoExchange.Net.UnitTests
options.ReconnectInterval = TimeSpan.Zero; options.ReconnectInterval = TimeSpan.Zero;
}); });
var socket = client.CreateSocket(); var socket = client.CreateSocket();
socket.ShouldReconnect = true;
socket.CanConnect = true; socket.CanConnect = true;
socket.DisconnectTime = DateTime.UtcNow;
var sub = new SocketConnection(new TraceLogger(), client.SubClient, socket, null); var sub = new SocketConnection(new TraceLogger(), client.SubClient, socket, null);
var rstEvent = new ManualResetEvent(false); var rstEvent = new ManualResetEvent(false);
Dictionary<string, string> result = null; Dictionary<string, string> result = null;
@ -75,7 +73,7 @@ namespace CryptoExchange.Net.UnitTests
sub.AddSubscription(subObj); sub.AddSubscription(subObj);
// act // act
socket.InvokeMessage("{\"property\": \"123\", \"topic\": \"topic\"}"); socket.InvokeMessage("{\"property\": \"123\", \"action\": \"update\", \"topic\": \"topic\"}");
rstEvent.WaitOne(1000); rstEvent.WaitOne(1000);
// assert // assert
@ -93,9 +91,7 @@ namespace CryptoExchange.Net.UnitTests
options.SubOptions.OutputOriginalData = enabled; options.SubOptions.OutputOriginalData = enabled;
}); });
var socket = client.CreateSocket(); var socket = client.CreateSocket();
socket.ShouldReconnect = true;
socket.CanConnect = true; socket.CanConnect = true;
socket.DisconnectTime = DateTime.UtcNow;
var sub = new SocketConnection(new TraceLogger(), client.SubClient, socket, null); var sub = new SocketConnection(new TraceLogger(), client.SubClient, socket, null);
var rstEvent = new ManualResetEvent(false); var rstEvent = new ManualResetEvent(false);
string original = null; string original = null;
@ -107,7 +103,7 @@ namespace CryptoExchange.Net.UnitTests
rstEvent.Set(); rstEvent.Set();
}); });
sub.AddSubscription(subObj); sub.AddSubscription(subObj);
var msgToSend = JsonConvert.SerializeObject(new { topic = "topic", property = 123 }); var msgToSend = JsonConvert.SerializeObject(new { topic = "topic", action = "update", property = 123 });
// act // act
socket.InvokeMessage(msgToSend); socket.InvokeMessage(msgToSend);
@ -202,7 +198,7 @@ namespace CryptoExchange.Net.UnitTests
// act // act
var sub = client.SubClient.SubscribeToSomethingAsync(channel, onUpdate => {}, ct: default); var sub = client.SubClient.SubscribeToSomethingAsync(channel, onUpdate => {}, ct: default);
socket.InvokeMessage(JsonConvert.SerializeObject(new { channel, status = "error" })); socket.InvokeMessage(JsonConvert.SerializeObject(new { channel, action = "subscribe", status = "error" }));
await sub; await sub;
// assert // assert
@ -225,7 +221,7 @@ namespace CryptoExchange.Net.UnitTests
// act // act
var sub = client.SubClient.SubscribeToSomethingAsync(channel, onUpdate => {}, ct: default); var sub = client.SubClient.SubscribeToSomethingAsync(channel, onUpdate => {}, ct: default);
socket.InvokeMessage(JsonConvert.SerializeObject(new { channel, status = "confirmed" })); socket.InvokeMessage(JsonConvert.SerializeObject(new { channel, action = "subscribe", status = "confirmed" }));
await sub; await sub;
// assert // assert

View File

@ -10,6 +10,10 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations.Sockets
{ {
internal class SubResponse internal class SubResponse
{ {
[JsonProperty("action")]
public string Action { get; set; } = null!;
[JsonProperty("channel")] [JsonProperty("channel")]
public string Channel { get; set; } = null!; public string Channel { get; set; } = null!;
@ -19,6 +23,9 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations.Sockets
internal class UnsubResponse internal class UnsubResponse
{ {
[JsonProperty("action")]
public string Action { get; set; } = null!;
[JsonProperty("status")] [JsonProperty("status")]
public string Status { get; set; } = null!; public string Status { get; set; } = null!;
} }
@ -29,7 +36,7 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations.Sockets
public TestChannelQuery(string channel, string request, bool authenticated, int weight = 1) : base(request, authenticated, weight) public TestChannelQuery(string channel, string request, bool authenticated, int weight = 1) : base(request, authenticated, weight)
{ {
ListenerIdentifiers = new HashSet<string> { channel }; ListenerIdentifiers = new HashSet<string> { request + "-" + channel };
} }
public override CallResult<SubResponse> HandleMessage(SocketConnection connection, DataEvent<SubResponse> message) public override CallResult<SubResponse> HandleMessage(SocketConnection connection, DataEvent<SubResponse> message)

View File

@ -15,7 +15,7 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations.Sockets
{ {
private readonly Action<DataEvent<T>> _handler; private readonly Action<DataEvent<T>> _handler;
public override HashSet<string> ListenerIdentifiers { get; set; } = new HashSet<string> { "topic" }; public override HashSet<string> ListenerIdentifiers { get; set; } = new HashSet<string> { "update-topic" };
public TestSubscription(ILogger logger, Action<DataEvent<T>> handler) : base(logger, false) public TestSubscription(ILogger logger, Action<DataEvent<T>> handler) : base(logger, false)
{ {

View File

@ -1,131 +1,132 @@
using System; //using System;
using System.IO; //using System.IO;
using System.Net.WebSockets; //using System.Net.WebSockets;
using System.Security.Authentication; //using System.Security.Authentication;
using System.Text; //using System.Text;
using System.Threading.Tasks; //using System.Threading.Tasks;
using CryptoExchange.Net.Interfaces; //using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Objects; //using CryptoExchange.Net.Objects;
namespace CryptoExchange.Net.UnitTests.TestImplementations //namespace CryptoExchange.Net.UnitTests.TestImplementations
{ //{
public class TestSocket: IWebsocket // public class TestSocket: IWebsocket
{ // {
public bool CanConnect { get; set; } // public bool CanConnect { get; set; }
public bool Connected { get; set; } // public bool Connected { get; set; }
public event Func<Task> OnClose; // public event Func<Task> OnClose;
#pragma warning disable 0067 //#pragma warning disable 0067
public event Func<Task> OnReconnected; // public event Func<Task> OnReconnected;
public event Func<Task> OnReconnecting; // public event Func<Task> OnReconnecting;
public event Func<int, Task> OnRequestRateLimited; // public event Func<int, Task> OnRequestRateLimited;
#pragma warning restore 0067 //#pragma warning restore 0067
public event Func<int, Task> OnRequestSent; // public event Func<int, Task> OnRequestSent;
public event Func<WebSocketMessageType, ReadOnlyMemory<byte>, Task> OnStreamMessage; // public event Func<WebSocketMessageType, ReadOnlyMemory<byte>, Task> OnStreamMessage;
public event Func<Exception, Task> OnError; // public event Func<Exception, Task> OnError;
public event Func<Task> OnOpen; // public event Func<Task> OnOpen;
public Func<Task<Uri>> GetReconnectionUrl { get; set; } // public Func<Task<Uri>> GetReconnectionUrl { get; set; }
public int Id { get; } // public int Id { get; }
public bool ShouldReconnect { get; set; } // public bool ShouldReconnect { get; set; }
public TimeSpan Timeout { get; set; } // public TimeSpan Timeout { get; set; }
public Func<string, string> DataInterpreterString { get; set; } // public Func<string, string> DataInterpreterString { get; set; }
public Func<byte[], string> DataInterpreterBytes { get; set; } // public Func<byte[], string> DataInterpreterBytes { get; set; }
public DateTime? DisconnectTime { get; set; } // public DateTime? DisconnectTime { get; set; }
public string Url { get; } // public string Url { get; }
public bool IsClosed => !Connected; // public bool IsClosed => !Connected;
public bool IsOpen => Connected; // public bool IsOpen => Connected;
public bool PingConnection { get; set; } // public bool PingConnection { get; set; }
public TimeSpan PingInterval { get; set; } // public TimeSpan PingInterval { get; set; }
public SslProtocols SSLProtocols { get; set; } // public SslProtocols SSLProtocols { get; set; }
public Encoding Encoding { get; set; } // public Encoding Encoding { get; set; }
public int ConnectCalls { get; private set; } // public int ConnectCalls { get; private set; }
public bool Reconnecting { get; set; } // public bool Reconnecting { get; set; }
public string Origin { get; set; } // public string Origin { get; set; }
public int? RatelimitPerSecond { get; set; } // public int? RatelimitPerSecond { get; set; }
public double IncomingKbps => throw new NotImplementedException(); // public double IncomingKbps => throw new NotImplementedException();
public Uri Uri => new Uri(""); // public Uri Uri => new Uri("");
public TimeSpan KeepAliveInterval { get; set; } // public TimeSpan KeepAliveInterval { get; set; }
public static int lastId = 0; // public static int lastId = 0;
public static object lastIdLock = new object(); // public static object lastIdLock = new object();
public TestSocket() // public TestSocket()
{ // {
lock (lastIdLock) // lock (lastIdLock)
{ // {
Id = lastId + 1; // Id = lastId + 1;
lastId++; // lastId++;
} // }
} // }
public Task<CallResult> ConnectAsync() // public Task<CallResult> ConnectAsync()
{ // {
Connected = CanConnect; // Connected = CanConnect;
ConnectCalls++; // ConnectCalls++;
if (CanConnect) // if (CanConnect)
InvokeOpen(); // InvokeOpen();
return Task.FromResult(CanConnect ? new CallResult(null) : new CallResult(new CantConnectError())); // return Task.FromResult(CanConnect ? new CallResult(null) : new CallResult(new CantConnectError()));
} // }
public void Send(int requestId, string data, int weight) // public bool Send(int requestId, string data, int weight)
{ // {
if(!Connected) // if(!Connected)
throw new Exception("Socket not connected"); // throw new Exception("Socket not connected");
OnRequestSent?.Invoke(requestId); // OnRequestSent?.Invoke(requestId);
} // return true;
// }
public void Reset() // public void Reset()
{ // {
} // }
public Task CloseAsync() // public Task CloseAsync()
{ // {
Connected = false; // Connected = false;
DisconnectTime = DateTime.UtcNow; // DisconnectTime = DateTime.UtcNow;
OnClose?.Invoke(); // OnClose?.Invoke();
return Task.FromResult(0); // return Task.FromResult(0);
} // }
public void SetProxy(string host, int port) // public void SetProxy(string host, int port)
{ // {
throw new NotImplementedException(); // throw new NotImplementedException();
} // }
public void Dispose() // public void Dispose()
{ // {
} // }
public void InvokeClose() // public void InvokeClose()
{ // {
Connected = false; // Connected = false;
DisconnectTime = DateTime.UtcNow; // DisconnectTime = DateTime.UtcNow;
Reconnecting = true; // Reconnecting = true;
OnClose?.Invoke(); // OnClose?.Invoke();
} // }
public void InvokeOpen() // public void InvokeOpen()
{ // {
OnOpen?.Invoke(); // OnOpen?.Invoke();
} // }
public void InvokeMessage(string data) // public void InvokeMessage(string data)
{ // {
OnStreamMessage?.Invoke(WebSocketMessageType.Text, new ReadOnlyMemory<byte>(Encoding.UTF8.GetBytes(data))).Wait(); // OnStreamMessage?.Invoke(WebSocketMessageType.Text, new ReadOnlyMemory<byte>(Encoding.UTF8.GetBytes(data))).Wait();
} // }
public void SetProxy(ApiProxy proxy) // public void SetProxy(ApiProxy proxy)
{ // {
throw new NotImplementedException(); // throw new NotImplementedException();
} // }
public void InvokeError(Exception error) // public void InvokeError(Exception error)
{ // {
OnError?.Invoke(error); // OnError?.Invoke(error);
} // }
public Task ReconnectAsync() => Task.CompletedTask; // public Task ReconnectAsync() => Task.CompletedTask;
} // }
} //}

View File

@ -13,11 +13,11 @@ using CryptoExchange.Net.Sockets;
using CryptoExchange.Net.UnitTests.TestImplementations.Sockets; using CryptoExchange.Net.UnitTests.TestImplementations.Sockets;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Moq; using Moq;
using Newtonsoft.Json.Linq; using CryptoExchange.Net.Testing.Implementations;
namespace CryptoExchange.Net.UnitTests.TestImplementations namespace CryptoExchange.Net.UnitTests.TestImplementations
{ {
public class TestSocketClient: BaseSocketClient internal class TestSocketClient: BaseSocketClient
{ {
public TestSubSocketClient SubClient { get; } public TestSubSocketClient SubClient { get; }
@ -41,12 +41,12 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations
SubClient = AddApiClient(new TestSubSocketClient(options, options.SubOptions)); SubClient = AddApiClient(new TestSubSocketClient(options, options.SubOptions));
SubClient.SocketFactory = new Mock<IWebsocketFactory>().Object; SubClient.SocketFactory = new Mock<IWebsocketFactory>().Object;
Mock.Get(SubClient.SocketFactory).Setup(f => f.CreateWebsocket(It.IsAny<ILogger>(), It.IsAny<WebSocketParameters>())).Returns(new TestSocket()); Mock.Get(SubClient.SocketFactory).Setup(f => f.CreateWebsocket(It.IsAny<ILogger>(), It.IsAny<WebSocketParameters>())).Returns(new TestSocket("https://test.com"));
} }
public TestSocket CreateSocket() public TestSocket CreateSocket()
{ {
Mock.Get(SubClient.SocketFactory).Setup(f => f.CreateWebsocket(It.IsAny<ILogger>(), It.IsAny<WebSocketParameters>())).Returns(new TestSocket()); Mock.Get(SubClient.SocketFactory).Setup(f => f.CreateWebsocket(It.IsAny<ILogger>(), It.IsAny<WebSocketParameters>())).Returns(new TestSocket("https://test.com"));
return (TestSocket)SubClient.CreateSocketInternal("https://localhost:123/"); return (TestSocket)SubClient.CreateSocketInternal("https://localhost:123/");
} }
@ -75,6 +75,7 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations
public class TestSubSocketClient : SocketApiClient public class TestSubSocketClient : SocketApiClient
{ {
private MessagePath _channelPath = MessagePath.Get().Property("channel"); private MessagePath _channelPath = MessagePath.Get().Property("channel");
private MessagePath _actionPath = MessagePath.Get().Property("action");
private MessagePath _topicPath = MessagePath.Get().Property("topic"); private MessagePath _topicPath = MessagePath.Get().Property("topic");
public Subscription TestSubscription { get; private set; } = null; public Subscription TestSubscription { get; private set; } = null;
@ -110,7 +111,7 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations
var id = message.GetValue<string>(_channelPath); var id = message.GetValue<string>(_channelPath);
id ??= message.GetValue<string>(_topicPath); id ??= message.GetValue<string>(_topicPath);
return id; return message.GetValue<string>(_actionPath) + "-" + id;
} }
public Task<CallResult<UpdateSubscription>> SubscribeToSomethingAsync(string channel, Action<DataEvent<string>> onUpdate, CancellationToken ct) public Task<CallResult<UpdateSubscription>> SubscribeToSomethingAsync(string channel, Action<DataEvent<string>> onUpdate, CancellationToken ct)

View File

@ -32,7 +32,7 @@ namespace CryptoExchange.Net.Converters.SystemTextJson
public ArrayPropertyAttribute ArrayProperty { get; set; } = null!; public ArrayPropertyAttribute ArrayProperty { get; set; } = null!;
public Type? JsonConverterType { get; set; } public Type? JsonConverterType { get; set; }
public bool DefaultDeserialization { get; set; } public bool DefaultDeserialization { get; set; }
public Type TargetType { get; set; } public Type TargetType { get; set; } = null!;
} }
private class ArrayConverterInner<T> : JsonConverter<T> private class ArrayConverterInner<T> : JsonConverter<T>

View File

@ -78,7 +78,7 @@ namespace CryptoExchange.Net.Interfaces
/// <param name="id"></param> /// <param name="id"></param>
/// <param name="data"></param> /// <param name="data"></param>
/// <param name="weight"></param> /// <param name="weight"></param>
void Send(int id, string data, int weight); bool Send(int id, string data, int weight);
/// <summary> /// <summary>
/// Reconnect the socket /// Reconnect the socket
/// </summary> /// </summary>

View File

@ -322,15 +322,16 @@ namespace CryptoExchange.Net.Sockets
} }
/// <inheritdoc /> /// <inheritdoc />
public virtual void Send(int id, string data, int weight) public virtual bool Send(int id, string data, int weight)
{ {
if (_ctsSource.IsCancellationRequested) if (_ctsSource.IsCancellationRequested || _processState != ProcessState.Processing)
return; return false;
var bytes = Parameters.Encoding.GetBytes(data); var bytes = Parameters.Encoding.GetBytes(data);
_logger.SocketAddingBytesToSendBuffer(Id, id, bytes); _logger.SocketAddingBytesToSendBuffer(Id, id, bytes);
_sendBuffer.Enqueue(new SendItem { Id = id, Weight = weight, Bytes = bytes }); _sendBuffer.Enqueue(new SendItem { Id = id, Weight = weight, Bytes = bytes });
_sendEvent.Set(); _sendEvent.Set();
return true;
} }
/// <inheritdoc /> /// <inheritdoc />
@ -389,9 +390,7 @@ namespace CryptoExchange.Net.Sockets
if (_disposed) if (_disposed)
return; return;
//_closeState = CloseState.Closing;
_ctsSource.Cancel(); _ctsSource.Cancel();
_sendEvent.Set();
if (_socket.State == WebSocketState.Open) if (_socket.State == WebSocketState.Open)
{ {
@ -436,6 +435,7 @@ namespace CryptoExchange.Net.Sockets
_disposed = true; _disposed = true;
_socket.Dispose(); _socket.Dispose();
_ctsSource?.Dispose(); _ctsSource?.Dispose();
_sendEvent.Dispose();
_logger.SocketDisposed(Id); _logger.SocketDisposed(Id);
} }
@ -450,10 +450,15 @@ namespace CryptoExchange.Net.Sockets
{ {
while (true) while (true)
{ {
if (_ctsSource.IsCancellationRequested) try
{
if (!_sendBuffer.Any())
await _sendEvent.WaitAsync(ct: _ctsSource.Token).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
break; break;
}
await _sendEvent.WaitAsync().ConfigureAwait(false);
if (_ctsSource.IsCancellationRequested) if (_ctsSource.IsCancellationRequested)
break; break;
@ -507,7 +512,8 @@ namespace CryptoExchange.Net.Sockets
// Make sure we at least let the owner know there was an error // Make sure we at least let the owner know there was an error
_logger.SocketSendLoopStoppedWithException(Id, e.Message, e); _logger.SocketSendLoopStoppedWithException(Id, e.Message, e);
await (OnError?.Invoke(e) ?? Task.CompletedTask).ConfigureAwait(false); await (OnError?.Invoke(e) ?? Task.CompletedTask).ConfigureAwait(false);
throw; if (_closeTask?.IsCompleted != false)
_closeTask = CloseInternalAsync();
} }
finally finally
{ {
@ -583,7 +589,6 @@ namespace CryptoExchange.Net.Sockets
// Received a complete message and it's not multi part // Received a complete message and it's not multi part
_logger.SocketReceivedSingleMessage(Id, receiveResult.Count); _logger.SocketReceivedSingleMessage(Id, receiveResult.Count);
await ProcessData(receiveResult.MessageType, new ReadOnlyMemory<byte>(buffer.Array, buffer.Offset, receiveResult.Count)).ConfigureAwait(false); await ProcessData(receiveResult.MessageType, new ReadOnlyMemory<byte>(buffer.Array, buffer.Offset, receiveResult.Count)).ConfigureAwait(false);
_logger.LogTrace($"[Sckt {Id}] process completed");
} }
else else
{ {
@ -693,7 +698,6 @@ namespace CryptoExchange.Net.Sockets
// any exception here will stop the timeout checking, but do so silently unless the socket get's stopped. // any exception here will stop the timeout checking, but do so silently unless the socket get's stopped.
// Make sure we at least let the owner know there was an error // Make sure we at least let the owner know there was an error
await (OnError?.Invoke(e) ?? Task.CompletedTask).ConfigureAwait(false); await (OnError?.Invoke(e) ?? Task.CompletedTask).ConfigureAwait(false);
throw;
} }
} }
@ -718,10 +722,14 @@ namespace CryptoExchange.Net.Sockets
var checkTime = DateTime.UtcNow; var checkTime = DateTime.UtcNow;
if (checkTime - _lastReceivedMessagesUpdate > TimeSpan.FromSeconds(1)) if (checkTime - _lastReceivedMessagesUpdate > TimeSpan.FromSeconds(1))
{ {
foreach (var msg in _receivedMessages.ToList()) // To list here because we're removing from the list for (var i = 0; i < _receivedMessages.Count; i++)
{ {
var msg = _receivedMessages[i];
if (checkTime - msg.Timestamp > TimeSpan.FromSeconds(3)) if (checkTime - msg.Timestamp > TimeSpan.FromSeconds(3))
{
_receivedMessages.Remove(msg); _receivedMessages.Remove(msg);
i--;
}
} }
_lastReceivedMessagesUpdate = checkTime; _lastReceivedMessagesUpdate = checkTime;

View File

@ -802,7 +802,9 @@ namespace CryptoExchange.Net.Sockets
_logger.SendingData(SocketId, requestId, data); _logger.SendingData(SocketId, requestId, data);
try try
{ {
_socket.Send(requestId, data, weight); if (!_socket.Send(requestId, data, weight))
return new CallResult(new WebError("Failed to send message, connection not open"));
return new CallResult(null); return new CallResult(null);
} }
catch(Exception ex) catch(Exception ex)

View File

@ -33,9 +33,17 @@ namespace CryptoExchange.Net.Testing.Implementations
public Uri Uri { get; set; } public Uri Uri { get; set; }
public Func<Task<Uri?>>? GetReconnectionUrl { get; set; } public Func<Task<Uri?>>? GetReconnectionUrl { get; set; }
public static int lastId = 0;
public static object lastIdLock = new object();
public TestSocket(string address) public TestSocket(string address)
{ {
Uri = new Uri(address); Uri = new Uri(address);
lock (lastIdLock)
{
Id = lastId + 1;
lastId++;
}
} }
public Task<CallResult> ConnectAsync() public Task<CallResult> ConnectAsync()
@ -44,13 +52,14 @@ namespace CryptoExchange.Net.Testing.Implementations
return Task.FromResult(CanConnect ? new CallResult(null) : new CallResult(new CantConnectError())); return Task.FromResult(CanConnect ? new CallResult(null) : new CallResult(new CantConnectError()));
} }
public void Send(int requestId, string data, int weight) public bool Send(int requestId, string data, int weight)
{ {
if (!Connected) if (!Connected)
throw new Exception("Socket not connected"); throw new Exception("Socket not connected");
OnRequestSent?.Invoke(requestId); OnRequestSent?.Invoke(requestId);
OnMessageSend?.Invoke(data); OnMessageSend?.Invoke(data);
return true;
} }
public Task CloseAsync() public Task CloseAsync()