From aa07029c218df028ff9638c053358132657d7b2c Mon Sep 17 00:00:00 2001 From: Jkorf Date: Wed, 29 Sep 2021 13:28:35 +0200 Subject: [PATCH] added incoming kbps property for sockets, updated logging --- CryptoExchange.Net/CryptoExchange.Net.xml | 171 +++--------------- .../Interfaces/ISocketClient.cs | 4 + CryptoExchange.Net/Interfaces/IWebsocket.cs | 6 +- CryptoExchange.Net/Objects/CallResult.cs | 11 ++ .../OrderBook/SymbolOrderBook.cs | 4 +- CryptoExchange.Net/SocketClient.cs | 31 +++- .../Sockets/CryptoExchangeWebSocketClient.cs | 72 +++++++- .../Sockets/SocketConnection.cs | 2 +- 8 files changed, 137 insertions(+), 164 deletions(-) diff --git a/CryptoExchange.Net/CryptoExchange.Net.xml b/CryptoExchange.Net/CryptoExchange.Net.xml index edc332b..889f56b 100644 --- a/CryptoExchange.Net/CryptoExchange.Net.xml +++ b/CryptoExchange.Net/CryptoExchange.Net.xml @@ -1353,6 +1353,11 @@ + + + The current kilobytes per second of data being received by all connection from this client, averaged over the last 3 seconds + + Unsubscribe from a stream @@ -1548,6 +1553,11 @@ The max amount of outgoing messages per second + + + The current kilobytes per second of data being received, averaged over the last 3 seconds + + Handler for byte data @@ -1843,6 +1853,14 @@ + + + Copy the WebCallResult to a new data type + + The new type + The data of the new type + + The result of a request @@ -3129,6 +3147,11 @@ The max amount of outgoing messages per socket per second + + + The current kilobytes per second of data being received by all connection from this client, averaged over the last 3 seconds + + ctor @@ -3454,6 +3477,11 @@ The timespan no data is received on the socket. If no data is received within this time an error is generated + + + The current kilobytes per second of data being received, averaged over the last 3 seconds + + Socket closed event @@ -3969,148 +3997,5 @@ - - - Specifies that is allowed as an input even if the - corresponding type disallows it. - - - - - Initializes a new instance of the class. - - - - - Specifies that is disallowed as an input even if the - corresponding type allows it. - - - - - Initializes a new instance of the class. - - - - - Specifies that a method that will never return under any circumstance. - - - - - Initializes a new instance of the class. - - - - - Specifies that the method will not return if the associated - parameter is passed the specified value. - - - - - Gets the condition parameter value. - Code after the method is considered unreachable by diagnostics if the argument - to the associated parameter matches this value. - - - - - Initializes a new instance of the - class with the specified parameter value. - - - The condition parameter value. - Code after the method is considered unreachable by diagnostics if the argument - to the associated parameter matches this value. - - - - - Specifies that an output may be even if the - corresponding type disallows it. - - - - - Initializes a new instance of the class. - - - - - Specifies that when a method returns , - the parameter may be even if the corresponding type disallows it. - - - - - Gets the return value condition. - If the method returns this value, the associated parameter may be . - - - - - Initializes the attribute with the specified return value condition. - - - The return value condition. - If the method returns this value, the associated parameter may be . - - - - - Specifies that an output is not even if the - corresponding type allows it. - - - - - Initializes a new instance of the class. - - - - - Specifies that the output will be non- if the - named parameter is non-. - - - - - Gets the associated parameter name. - The output will be non- if the argument to the - parameter specified is non-. - - - - - Initializes the attribute with the associated parameter name. - - - The associated parameter name. - The output will be non- if the argument to the - parameter specified is non-. - - - - - Specifies that when a method returns , - the parameter will not be even if the corresponding type allows it. - - - - - Gets the return value condition. - If the method returns this value, the associated parameter will not be . - - - - - Initializes the attribute with the specified return value condition. - - - The return value condition. - If the method returns this value, the associated parameter will not be . - - diff --git a/CryptoExchange.Net/Interfaces/ISocketClient.cs b/CryptoExchange.Net/Interfaces/ISocketClient.cs index fb1658a..3b0b5c8 100644 --- a/CryptoExchange.Net/Interfaces/ISocketClient.cs +++ b/CryptoExchange.Net/Interfaces/ISocketClient.cs @@ -49,6 +49,10 @@ namespace CryptoExchange.Net.Interfaces int? MaxResubscribeTries { get; } /// int MaxConcurrentResubscriptionsPerSocket { get; } + /// + /// The current kilobytes per second of data being received by all connection from this client, averaged over the last 3 seconds + /// + double IncomingKbps { get; } /// /// Unsubscribe from a stream diff --git a/CryptoExchange.Net/Interfaces/IWebsocket.cs b/CryptoExchange.Net/Interfaces/IWebsocket.cs index b75b7b6..9e05ac6 100644 --- a/CryptoExchange.Net/Interfaces/IWebsocket.cs +++ b/CryptoExchange.Net/Interfaces/IWebsocket.cs @@ -47,7 +47,11 @@ namespace CryptoExchange.Net.Interfaces /// /// The max amount of outgoing messages per second /// - public int? RatelimitPerSecond { get; set; } + int? RatelimitPerSecond { get; set; } + /// + /// The current kilobytes per second of data being received, averaged over the last 3 seconds + /// + double IncomingKbps { get; } /// /// Handler for byte data /// diff --git a/CryptoExchange.Net/Objects/CallResult.cs b/CryptoExchange.Net/Objects/CallResult.cs index 4ce9b75..2607204 100644 --- a/CryptoExchange.Net/Objects/CallResult.cs +++ b/CryptoExchange.Net/Objects/CallResult.cs @@ -120,6 +120,17 @@ namespace CryptoExchange.Net.Objects { return new WebCallResult(null, null, default, error); } + + /// + /// Copy the WebCallResult to a new data type + /// + /// The new type + /// The data of the new type + /// + public CallResult As([AllowNull] K data) + { + return new CallResult(data, Error); + } } /// diff --git a/CryptoExchange.Net/OrderBook/SymbolOrderBook.cs b/CryptoExchange.Net/OrderBook/SymbolOrderBook.cs index 4a7c942..3787e90 100644 --- a/CryptoExchange.Net/OrderBook/SymbolOrderBook.cs +++ b/CryptoExchange.Net/OrderBook/SymbolOrderBook.cs @@ -447,7 +447,8 @@ namespace CryptoExchange.Net.OrderBook if (asks.First().Key < bids.First().Key) { - log.Write(LogLevel.Warning, $"{Id} order book {Symbol} detected out of sync order book. Resyncing"); + log.Write(LogLevel.Warning, $"{Id} order book {Symbol} detected out of sync order book. First ask: {asks.First().Key}, first bid: {bids.First().Key}. Resyncing"); + _stopProcessing = true; Resubscribe(); return; } @@ -636,6 +637,7 @@ namespace CryptoExchange.Net.OrderBook { // Out of sync log.Write(LogLevel.Warning, $"{Id} order book {Symbol} out of sync (expected { LastSequenceNumber + 1}, was {sequence}), reconnecting"); + _stopProcessing = true; Resubscribe(); return false; } diff --git a/CryptoExchange.Net/SocketClient.cs b/CryptoExchange.Net/SocketClient.cs index f277c43..c0f2a41 100644 --- a/CryptoExchange.Net/SocketClient.cs +++ b/CryptoExchange.Net/SocketClient.cs @@ -96,6 +96,20 @@ namespace CryptoExchange.Net /// The max amount of outgoing messages per socket per second /// protected internal int? RateLimitPerSocketPerSecond { get; set; } + + /// + /// The current kilobytes per second of data being received by all connection from this client, averaged over the last 3 seconds + /// + public double IncomingKbps + { + get + { + if (!sockets.Any()) + return 0; + + return sockets.Sum(s => s.Value.Socket.IncomingKbps); + } + } #endregion /// @@ -193,7 +207,7 @@ namespace CryptoExchange.Net if (socketConnection.PausedActivity) { - log.Write(LogLevel.Information, "Socket has been paused, can't subscribe at this moment"); + log.Write(LogLevel.Information, $"Socket {socketConnection.Socket.Id} has been paused, can't subscribe at this moment"); return new CallResult(default, new ServerError("Socket is paused")); } @@ -284,7 +298,7 @@ namespace CryptoExchange.Net if (socketConnection.PausedActivity) { - log.Write(LogLevel.Information, "Socket has been paused, can't send query at this moment"); + log.Write(LogLevel.Information, $"Socket {socketConnection.Socket.Id} has been paused, can't send query at this moment"); return new CallResult(default, new ServerError("Socket is paused")); } @@ -334,7 +348,7 @@ namespace CryptoExchange.Net var result = await AuthenticateSocketAsync(socket).ConfigureAwait(false); if (!result) { - log.Write(LogLevel.Warning, "Socket authentication failed"); + log.Write(LogLevel.Warning, $"Socket {socket.Socket.Id} authentication failed"); result.Error!.Message = "Authentication failed: " + result.Error.Message; return new CallResult(false, result.Error); } @@ -435,7 +449,7 @@ namespace CryptoExchange.Net var desResult = Deserialize(messageEvent.JsonData, false); if (!desResult) { - log.Write(LogLevel.Warning, $"Failed to deserialize data into type {typeof(T)}: {desResult.Error}"); + log.Write(LogLevel.Warning, $"Socket {connection.Socket.Id} Failed to deserialize data into type {typeof(T)}: {desResult.Error}"); return; } @@ -528,7 +542,7 @@ namespace CryptoExchange.Net protected virtual IWebsocket CreateSocket(string address) { var socket = SocketFactory.CreateWebsocket(log, address); - log.Write(LogLevel.Debug, "Created new socket for " + address); + log.Write(LogLevel.Debug, $"Socket {socket.Id} new socket created for " + address); if (apiProxy != null) socket.SetProxy(apiProxy); @@ -566,9 +580,6 @@ namespace CryptoExchange.Net if (disposing) break; - if (sockets.Any()) - log.Write(LogLevel.Debug, "Sending periodic"); - foreach (var socket in sockets.Values) { if (disposing) @@ -578,13 +589,15 @@ namespace CryptoExchange.Net if (obj == null) continue; + log.Write(LogLevel.Trace, $"Socket {socket.Socket.Id} sending periodic"); + try { socket.Send(obj); } catch (Exception ex) { - log.Write(LogLevel.Warning, "Periodic send failed: " + ex); + log.Write(LogLevel.Warning, $"Socket {socket.Socket.Id} Periodic send failed: " + ex); } } } diff --git a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs index 2ab4723..da7ecc1 100644 --- a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs +++ b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs @@ -7,6 +7,7 @@ using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.IO; +using System.Linq; using System.Net; using System.Net.WebSockets; using System.Security.Authentication; @@ -36,7 +37,11 @@ namespace CryptoExchange.Net.Sockets private bool _closing; private bool _startedSent; private bool _startedReceive; - private readonly List outgoingMessages; + + private readonly List _outgoingMessages; + protected readonly Dictionary _receivedMessages; + private DateTime _lastReceivedMessagesUpdate; + protected readonly object _receivedMessagesLock; /// /// Log @@ -126,6 +131,25 @@ namespace CryptoExchange.Net.Sockets /// public TimeSpan Timeout { get; set; } + /// + /// The current kilobytes per second of data being received, averaged over the last 3 seconds + /// + public double IncomingKbps + { + get + { + UpdateReceivedMessages(); + + lock (_receivedMessagesLock) + { + if (!_receivedMessages.Any()) + return 0; + + return Math.Round(_receivedMessages.Values.Sum(v => v) / 1000 / 3d); + } + } + } + /// /// Socket closed event /// @@ -183,10 +207,12 @@ namespace CryptoExchange.Net.Sockets this.cookies = cookies; this.headers = headers; - outgoingMessages = new List(); + _outgoingMessages = new List(); + _receivedMessages = new Dictionary(); _sendEvent = new AutoResetEvent(false); _sendBuffer = new ConcurrentQueue(); _ctsSource = new CancellationTokenSource(); + _receivedMessagesLock = new object(); _socket = CreateSocket(); } @@ -244,7 +270,7 @@ namespace CryptoExchange.Net.Sockets public virtual void Send(string data) { if (_closing) - throw new InvalidOperationException("Can't send data when socket is not connected"); + throw new InvalidOperationException($"Socket {Id} Can't send data when socket is not connected"); var bytes = _encoding.GetBytes(data); log.Write(LogLevel.Trace, $"Socket {Id} Adding {bytes.Length} to sent buffer"); @@ -371,14 +397,14 @@ namespace CryptoExchange.Net.Sockets } if (start != null) - log.Write(LogLevel.Trace, $"Websocket sent delayed {Math.Round((DateTime.UtcNow - start.Value).TotalMilliseconds)}ms because of rate limit"); - - outgoingMessages.Add(DateTime.UtcNow); + log.Write(LogLevel.Trace, $"Socket {Id} sent delayed {Math.Round((DateTime.UtcNow - start.Value).TotalMilliseconds)}ms because of rate limit"); } try { await _socket.SendAsync(new ArraySegment(data, 0, data.Length), WebSocketMessageType.Text, true, _ctsSource.Token).ConfigureAwait(false); + _outgoingMessages.Add(DateTime.UtcNow); + log.Write(LogLevel.Trace, $"Socket {Id} sent {data.Length} bytes"); } catch (OperationCanceledException) { @@ -427,6 +453,8 @@ namespace CryptoExchange.Net.Sockets { receiveResult = await _socket.ReceiveAsync(buffer, _ctsSource.Token).ConfigureAwait(false); received += receiveResult.Count; + lock(_receivedMessagesLock) + _receivedMessages.Add(DateTime.UtcNow, receiveResult.Count); } catch (OperationCanceledException) { @@ -462,20 +490,30 @@ namespace CryptoExchange.Net.Sockets multiPartMessage = true; if (memoryStream == null) memoryStream = new MemoryStream(); + log.Write(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in partial message"); await memoryStream.WriteAsync(buffer.Array, buffer.Offset, receiveResult.Count).ConfigureAwait(false); } else { if (!multiPartMessage) + { // Received a complete message and it's not multi part + log.Write(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in single message"); HandleMessage(buffer.Array, buffer.Offset, receiveResult.Count, receiveResult.MessageType); + } else + { // Received the end of a multipart message, write to memory stream for reassembling + log.Write(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in partial message"); await memoryStream!.WriteAsync(buffer.Array, buffer.Offset, receiveResult.Count).ConfigureAwait(false); + } break; } } + lock (_receivedMessagesLock) + UpdateReceivedMessages(); + if (receiveResult?.MessageType == WebSocketMessageType.Close) { // Received close message @@ -491,6 +529,7 @@ namespace CryptoExchange.Net.Sockets if (multiPartMessage) { // Reassemble complete message from memory stream + log.Write(LogLevel.Trace, $"Socket {Id} reassembled message of {memoryStream!.Length} bytes"); HandleMessage(memoryStream!.ToArray(), 0, (int)memoryStream.Length, receiveResult.MessageType); memoryStream.Dispose(); } @@ -514,7 +553,9 @@ namespace CryptoExchange.Net.Sockets try { - strData = DataInterpreterBytes(data); + var relevantData = new byte[count]; + Array.Copy(data, offset, relevantData, 0, count); + strData = DataInterpreterBytes(relevantData); } catch(Exception e) { @@ -619,8 +660,21 @@ namespace CryptoExchange.Net.Sockets private int MessagesSentLastSecond() { var testTime = DateTime.UtcNow; - outgoingMessages.RemoveAll(r => testTime - r > TimeSpan.FromSeconds(1)); - return outgoingMessages.Count; + _outgoingMessages.RemoveAll(r => testTime - r > TimeSpan.FromSeconds(1)); + return _outgoingMessages.Count; + } + + protected void UpdateReceivedMessages() + { + var checkTime = DateTime.UtcNow; + if (checkTime - _lastReceivedMessagesUpdate > TimeSpan.FromSeconds(1)) + { + foreach (var msgTime in _receivedMessages.Keys) + if (checkTime - msgTime > TimeSpan.FromSeconds(3)) + _receivedMessages.Remove(msgTime); + + _lastReceivedMessagesUpdate = checkTime; + } } } } diff --git a/CryptoExchange.Net/Sockets/SocketConnection.cs b/CryptoExchange.Net/Sockets/SocketConnection.cs index 530c4cc..0fdce68 100644 --- a/CryptoExchange.Net/Sockets/SocketConnection.cs +++ b/CryptoExchange.Net/Sockets/SocketConnection.cs @@ -97,7 +97,7 @@ namespace CryptoExchange.Net.Sockets if (pausedActivity != value) { pausedActivity = value; - log.Write(LogLevel.Debug, "Paused activity: " + value); + log.Write(LogLevel.Debug, $"Socket {Socket.Id} Paused activity: " + value); if(pausedActivity) ActivityPaused?.Invoke(); else ActivityUnpaused?.Invoke(); }