diff --git a/CryptoExchange.Net/CryptoExchange.Net.xml b/CryptoExchange.Net/CryptoExchange.Net.xml
index edc332b..889f56b 100644
--- a/CryptoExchange.Net/CryptoExchange.Net.xml
+++ b/CryptoExchange.Net/CryptoExchange.Net.xml
@@ -1353,6 +1353,11 @@
+
+
+ The current kilobytes per second of data being received by all connection from this client, averaged over the last 3 seconds
+
+
Unsubscribe from a stream
@@ -1548,6 +1553,11 @@
The max amount of outgoing messages per second
+
+
+ The current kilobytes per second of data being received, averaged over the last 3 seconds
+
+
Handler for byte data
@@ -1843,6 +1853,14 @@
+
+
+ Copy the WebCallResult to a new data type
+
+ The new type
+ The data of the new type
+
+
The result of a request
@@ -3129,6 +3147,11 @@
The max amount of outgoing messages per socket per second
+
+
+ The current kilobytes per second of data being received by all connection from this client, averaged over the last 3 seconds
+
+
ctor
@@ -3454,6 +3477,11 @@
The timespan no data is received on the socket. If no data is received within this time an error is generated
+
+
+ The current kilobytes per second of data being received, averaged over the last 3 seconds
+
+
Socket closed event
@@ -3969,148 +3997,5 @@
-
-
- Specifies that is allowed as an input even if the
- corresponding type disallows it.
-
-
-
-
- Initializes a new instance of the class.
-
-
-
-
- Specifies that is disallowed as an input even if the
- corresponding type allows it.
-
-
-
-
- Initializes a new instance of the class.
-
-
-
-
- Specifies that a method that will never return under any circumstance.
-
-
-
-
- Initializes a new instance of the class.
-
-
-
-
- Specifies that the method will not return if the associated
- parameter is passed the specified value.
-
-
-
-
- Gets the condition parameter value.
- Code after the method is considered unreachable by diagnostics if the argument
- to the associated parameter matches this value.
-
-
-
-
- Initializes a new instance of the
- class with the specified parameter value.
-
-
- The condition parameter value.
- Code after the method is considered unreachable by diagnostics if the argument
- to the associated parameter matches this value.
-
-
-
-
- Specifies that an output may be even if the
- corresponding type disallows it.
-
-
-
-
- Initializes a new instance of the class.
-
-
-
-
- Specifies that when a method returns ,
- the parameter may be even if the corresponding type disallows it.
-
-
-
-
- Gets the return value condition.
- If the method returns this value, the associated parameter may be .
-
-
-
-
- Initializes the attribute with the specified return value condition.
-
-
- The return value condition.
- If the method returns this value, the associated parameter may be .
-
-
-
-
- Specifies that an output is not even if the
- corresponding type allows it.
-
-
-
-
- Initializes a new instance of the class.
-
-
-
-
- Specifies that the output will be non- if the
- named parameter is non-.
-
-
-
-
- Gets the associated parameter name.
- The output will be non- if the argument to the
- parameter specified is non-.
-
-
-
-
- Initializes the attribute with the associated parameter name.
-
-
- The associated parameter name.
- The output will be non- if the argument to the
- parameter specified is non-.
-
-
-
-
- Specifies that when a method returns ,
- the parameter will not be even if the corresponding type allows it.
-
-
-
-
- Gets the return value condition.
- If the method returns this value, the associated parameter will not be .
-
-
-
-
- Initializes the attribute with the specified return value condition.
-
-
- The return value condition.
- If the method returns this value, the associated parameter will not be .
-
-
diff --git a/CryptoExchange.Net/Interfaces/ISocketClient.cs b/CryptoExchange.Net/Interfaces/ISocketClient.cs
index fb1658a..3b0b5c8 100644
--- a/CryptoExchange.Net/Interfaces/ISocketClient.cs
+++ b/CryptoExchange.Net/Interfaces/ISocketClient.cs
@@ -49,6 +49,10 @@ namespace CryptoExchange.Net.Interfaces
int? MaxResubscribeTries { get; }
///
int MaxConcurrentResubscriptionsPerSocket { get; }
+ ///
+ /// The current kilobytes per second of data being received by all connection from this client, averaged over the last 3 seconds
+ ///
+ double IncomingKbps { get; }
///
/// Unsubscribe from a stream
diff --git a/CryptoExchange.Net/Interfaces/IWebsocket.cs b/CryptoExchange.Net/Interfaces/IWebsocket.cs
index b75b7b6..9e05ac6 100644
--- a/CryptoExchange.Net/Interfaces/IWebsocket.cs
+++ b/CryptoExchange.Net/Interfaces/IWebsocket.cs
@@ -47,7 +47,11 @@ namespace CryptoExchange.Net.Interfaces
///
/// The max amount of outgoing messages per second
///
- public int? RatelimitPerSecond { get; set; }
+ int? RatelimitPerSecond { get; set; }
+ ///
+ /// The current kilobytes per second of data being received, averaged over the last 3 seconds
+ ///
+ double IncomingKbps { get; }
///
/// Handler for byte data
///
diff --git a/CryptoExchange.Net/Objects/CallResult.cs b/CryptoExchange.Net/Objects/CallResult.cs
index 4ce9b75..2607204 100644
--- a/CryptoExchange.Net/Objects/CallResult.cs
+++ b/CryptoExchange.Net/Objects/CallResult.cs
@@ -120,6 +120,17 @@ namespace CryptoExchange.Net.Objects
{
return new WebCallResult(null, null, default, error);
}
+
+ ///
+ /// Copy the WebCallResult to a new data type
+ ///
+ /// The new type
+ /// The data of the new type
+ ///
+ public CallResult As([AllowNull] K data)
+ {
+ return new CallResult(data, Error);
+ }
}
///
diff --git a/CryptoExchange.Net/OrderBook/SymbolOrderBook.cs b/CryptoExchange.Net/OrderBook/SymbolOrderBook.cs
index 4a7c942..3787e90 100644
--- a/CryptoExchange.Net/OrderBook/SymbolOrderBook.cs
+++ b/CryptoExchange.Net/OrderBook/SymbolOrderBook.cs
@@ -447,7 +447,8 @@ namespace CryptoExchange.Net.OrderBook
if (asks.First().Key < bids.First().Key)
{
- log.Write(LogLevel.Warning, $"{Id} order book {Symbol} detected out of sync order book. Resyncing");
+ log.Write(LogLevel.Warning, $"{Id} order book {Symbol} detected out of sync order book. First ask: {asks.First().Key}, first bid: {bids.First().Key}. Resyncing");
+ _stopProcessing = true;
Resubscribe();
return;
}
@@ -636,6 +637,7 @@ namespace CryptoExchange.Net.OrderBook
{
// Out of sync
log.Write(LogLevel.Warning, $"{Id} order book {Symbol} out of sync (expected { LastSequenceNumber + 1}, was {sequence}), reconnecting");
+ _stopProcessing = true;
Resubscribe();
return false;
}
diff --git a/CryptoExchange.Net/SocketClient.cs b/CryptoExchange.Net/SocketClient.cs
index f277c43..c0f2a41 100644
--- a/CryptoExchange.Net/SocketClient.cs
+++ b/CryptoExchange.Net/SocketClient.cs
@@ -96,6 +96,20 @@ namespace CryptoExchange.Net
/// The max amount of outgoing messages per socket per second
///
protected internal int? RateLimitPerSocketPerSecond { get; set; }
+
+ ///
+ /// The current kilobytes per second of data being received by all connection from this client, averaged over the last 3 seconds
+ ///
+ public double IncomingKbps
+ {
+ get
+ {
+ if (!sockets.Any())
+ return 0;
+
+ return sockets.Sum(s => s.Value.Socket.IncomingKbps);
+ }
+ }
#endregion
///
@@ -193,7 +207,7 @@ namespace CryptoExchange.Net
if (socketConnection.PausedActivity)
{
- log.Write(LogLevel.Information, "Socket has been paused, can't subscribe at this moment");
+ log.Write(LogLevel.Information, $"Socket {socketConnection.Socket.Id} has been paused, can't subscribe at this moment");
return new CallResult(default, new ServerError("Socket is paused"));
}
@@ -284,7 +298,7 @@ namespace CryptoExchange.Net
if (socketConnection.PausedActivity)
{
- log.Write(LogLevel.Information, "Socket has been paused, can't send query at this moment");
+ log.Write(LogLevel.Information, $"Socket {socketConnection.Socket.Id} has been paused, can't send query at this moment");
return new CallResult(default, new ServerError("Socket is paused"));
}
@@ -334,7 +348,7 @@ namespace CryptoExchange.Net
var result = await AuthenticateSocketAsync(socket).ConfigureAwait(false);
if (!result)
{
- log.Write(LogLevel.Warning, "Socket authentication failed");
+ log.Write(LogLevel.Warning, $"Socket {socket.Socket.Id} authentication failed");
result.Error!.Message = "Authentication failed: " + result.Error.Message;
return new CallResult(false, result.Error);
}
@@ -435,7 +449,7 @@ namespace CryptoExchange.Net
var desResult = Deserialize(messageEvent.JsonData, false);
if (!desResult)
{
- log.Write(LogLevel.Warning, $"Failed to deserialize data into type {typeof(T)}: {desResult.Error}");
+ log.Write(LogLevel.Warning, $"Socket {connection.Socket.Id} Failed to deserialize data into type {typeof(T)}: {desResult.Error}");
return;
}
@@ -528,7 +542,7 @@ namespace CryptoExchange.Net
protected virtual IWebsocket CreateSocket(string address)
{
var socket = SocketFactory.CreateWebsocket(log, address);
- log.Write(LogLevel.Debug, "Created new socket for " + address);
+ log.Write(LogLevel.Debug, $"Socket {socket.Id} new socket created for " + address);
if (apiProxy != null)
socket.SetProxy(apiProxy);
@@ -566,9 +580,6 @@ namespace CryptoExchange.Net
if (disposing)
break;
- if (sockets.Any())
- log.Write(LogLevel.Debug, "Sending periodic");
-
foreach (var socket in sockets.Values)
{
if (disposing)
@@ -578,13 +589,15 @@ namespace CryptoExchange.Net
if (obj == null)
continue;
+ log.Write(LogLevel.Trace, $"Socket {socket.Socket.Id} sending periodic");
+
try
{
socket.Send(obj);
}
catch (Exception ex)
{
- log.Write(LogLevel.Warning, "Periodic send failed: " + ex);
+ log.Write(LogLevel.Warning, $"Socket {socket.Socket.Id} Periodic send failed: " + ex);
}
}
}
diff --git a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs
index 2ab4723..da7ecc1 100644
--- a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs
+++ b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs
@@ -7,6 +7,7 @@ using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
+using System.Linq;
using System.Net;
using System.Net.WebSockets;
using System.Security.Authentication;
@@ -36,7 +37,11 @@ namespace CryptoExchange.Net.Sockets
private bool _closing;
private bool _startedSent;
private bool _startedReceive;
- private readonly List outgoingMessages;
+
+ private readonly List _outgoingMessages;
+ protected readonly Dictionary _receivedMessages;
+ private DateTime _lastReceivedMessagesUpdate;
+ protected readonly object _receivedMessagesLock;
///
/// Log
@@ -126,6 +131,25 @@ namespace CryptoExchange.Net.Sockets
///
public TimeSpan Timeout { get; set; }
+ ///
+ /// The current kilobytes per second of data being received, averaged over the last 3 seconds
+ ///
+ public double IncomingKbps
+ {
+ get
+ {
+ UpdateReceivedMessages();
+
+ lock (_receivedMessagesLock)
+ {
+ if (!_receivedMessages.Any())
+ return 0;
+
+ return Math.Round(_receivedMessages.Values.Sum(v => v) / 1000 / 3d);
+ }
+ }
+ }
+
///
/// Socket closed event
///
@@ -183,10 +207,12 @@ namespace CryptoExchange.Net.Sockets
this.cookies = cookies;
this.headers = headers;
- outgoingMessages = new List();
+ _outgoingMessages = new List();
+ _receivedMessages = new Dictionary();
_sendEvent = new AutoResetEvent(false);
_sendBuffer = new ConcurrentQueue();
_ctsSource = new CancellationTokenSource();
+ _receivedMessagesLock = new object();
_socket = CreateSocket();
}
@@ -244,7 +270,7 @@ namespace CryptoExchange.Net.Sockets
public virtual void Send(string data)
{
if (_closing)
- throw new InvalidOperationException("Can't send data when socket is not connected");
+ throw new InvalidOperationException($"Socket {Id} Can't send data when socket is not connected");
var bytes = _encoding.GetBytes(data);
log.Write(LogLevel.Trace, $"Socket {Id} Adding {bytes.Length} to sent buffer");
@@ -371,14 +397,14 @@ namespace CryptoExchange.Net.Sockets
}
if (start != null)
- log.Write(LogLevel.Trace, $"Websocket sent delayed {Math.Round((DateTime.UtcNow - start.Value).TotalMilliseconds)}ms because of rate limit");
-
- outgoingMessages.Add(DateTime.UtcNow);
+ log.Write(LogLevel.Trace, $"Socket {Id} sent delayed {Math.Round((DateTime.UtcNow - start.Value).TotalMilliseconds)}ms because of rate limit");
}
try
{
await _socket.SendAsync(new ArraySegment(data, 0, data.Length), WebSocketMessageType.Text, true, _ctsSource.Token).ConfigureAwait(false);
+ _outgoingMessages.Add(DateTime.UtcNow);
+ log.Write(LogLevel.Trace, $"Socket {Id} sent {data.Length} bytes");
}
catch (OperationCanceledException)
{
@@ -427,6 +453,8 @@ namespace CryptoExchange.Net.Sockets
{
receiveResult = await _socket.ReceiveAsync(buffer, _ctsSource.Token).ConfigureAwait(false);
received += receiveResult.Count;
+ lock(_receivedMessagesLock)
+ _receivedMessages.Add(DateTime.UtcNow, receiveResult.Count);
}
catch (OperationCanceledException)
{
@@ -462,20 +490,30 @@ namespace CryptoExchange.Net.Sockets
multiPartMessage = true;
if (memoryStream == null)
memoryStream = new MemoryStream();
+ log.Write(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in partial message");
await memoryStream.WriteAsync(buffer.Array, buffer.Offset, receiveResult.Count).ConfigureAwait(false);
}
else
{
if (!multiPartMessage)
+ {
// Received a complete message and it's not multi part
+ log.Write(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in single message");
HandleMessage(buffer.Array, buffer.Offset, receiveResult.Count, receiveResult.MessageType);
+ }
else
+ {
// Received the end of a multipart message, write to memory stream for reassembling
+ log.Write(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in partial message");
await memoryStream!.WriteAsync(buffer.Array, buffer.Offset, receiveResult.Count).ConfigureAwait(false);
+ }
break;
}
}
+ lock (_receivedMessagesLock)
+ UpdateReceivedMessages();
+
if (receiveResult?.MessageType == WebSocketMessageType.Close)
{
// Received close message
@@ -491,6 +529,7 @@ namespace CryptoExchange.Net.Sockets
if (multiPartMessage)
{
// Reassemble complete message from memory stream
+ log.Write(LogLevel.Trace, $"Socket {Id} reassembled message of {memoryStream!.Length} bytes");
HandleMessage(memoryStream!.ToArray(), 0, (int)memoryStream.Length, receiveResult.MessageType);
memoryStream.Dispose();
}
@@ -514,7 +553,9 @@ namespace CryptoExchange.Net.Sockets
try
{
- strData = DataInterpreterBytes(data);
+ var relevantData = new byte[count];
+ Array.Copy(data, offset, relevantData, 0, count);
+ strData = DataInterpreterBytes(relevantData);
}
catch(Exception e)
{
@@ -619,8 +660,21 @@ namespace CryptoExchange.Net.Sockets
private int MessagesSentLastSecond()
{
var testTime = DateTime.UtcNow;
- outgoingMessages.RemoveAll(r => testTime - r > TimeSpan.FromSeconds(1));
- return outgoingMessages.Count;
+ _outgoingMessages.RemoveAll(r => testTime - r > TimeSpan.FromSeconds(1));
+ return _outgoingMessages.Count;
+ }
+
+ protected void UpdateReceivedMessages()
+ {
+ var checkTime = DateTime.UtcNow;
+ if (checkTime - _lastReceivedMessagesUpdate > TimeSpan.FromSeconds(1))
+ {
+ foreach (var msgTime in _receivedMessages.Keys)
+ if (checkTime - msgTime > TimeSpan.FromSeconds(3))
+ _receivedMessages.Remove(msgTime);
+
+ _lastReceivedMessagesUpdate = checkTime;
+ }
}
}
}
diff --git a/CryptoExchange.Net/Sockets/SocketConnection.cs b/CryptoExchange.Net/Sockets/SocketConnection.cs
index 530c4cc..0fdce68 100644
--- a/CryptoExchange.Net/Sockets/SocketConnection.cs
+++ b/CryptoExchange.Net/Sockets/SocketConnection.cs
@@ -97,7 +97,7 @@ namespace CryptoExchange.Net.Sockets
if (pausedActivity != value)
{
pausedActivity = value;
- log.Write(LogLevel.Debug, "Paused activity: " + value);
+ log.Write(LogLevel.Debug, $"Socket {Socket.Id} Paused activity: " + value);
if(pausedActivity) ActivityPaused?.Invoke();
else ActivityUnpaused?.Invoke();
}