1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-06-12 02:16:23 +00:00

Resolved some code issues

This commit is contained in:
Jkorf 2021-11-17 10:23:01 +01:00
parent 3784b0c62b
commit 7ac7a11dfe
12 changed files with 68 additions and 112 deletions

View File

@ -1,16 +1,12 @@
using CryptoExchange.Net.Attributes; using CryptoExchange.Net.Authentication;
using CryptoExchange.Net.Authentication;
using CryptoExchange.Net.Logging; using CryptoExchange.Net.Logging;
using CryptoExchange.Net.Objects; using CryptoExchange.Net.Objects;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Newtonsoft.Json; using Newtonsoft.Json;
using Newtonsoft.Json.Linq; using Newtonsoft.Json.Linq;
using System; using System;
using System.Collections.Generic;
using System.Globalization; using System.Globalization;
using System.IO; using System.IO;
using System.Linq;
using System.Reflection;
using System.Text; using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
@ -152,8 +148,7 @@ namespace CryptoExchange.Net
/// <returns></returns> /// <returns></returns>
protected CallResult<T> Deserialize<T>(JToken obj, JsonSerializer? serializer = null, int? requestId = null) protected CallResult<T> Deserialize<T>(JToken obj, JsonSerializer? serializer = null, int? requestId = null)
{ {
if (serializer == null) serializer ??= defaultSerializer;
serializer = defaultSerializer;
try try
{ {
@ -191,8 +186,7 @@ namespace CryptoExchange.Net
/// <returns></returns> /// <returns></returns>
protected async Task<CallResult<T>> DeserializeAsync<T>(Stream stream, JsonSerializer? serializer = null, int? requestId = null, long? elapsedMilliseconds = null) protected async Task<CallResult<T>> DeserializeAsync<T>(Stream stream, JsonSerializer? serializer = null, int? requestId = null, long? elapsedMilliseconds = null)
{ {
if (serializer == null) serializer ??= defaultSerializer;
serializer = defaultSerializer;
try try
{ {

View File

@ -36,7 +36,7 @@ namespace CryptoExchange.Net.Converters
return ParseObject(arr, result, objectType); return ParseObject(arr, result, objectType);
} }
private static object? ParseObject(JArray arr, object result, Type objectType) private static object ParseObject(JArray arr, object result, Type objectType)
{ {
foreach (var property in objectType.GetProperties()) foreach (var property in objectType.GetProperties())
{ {
@ -63,8 +63,8 @@ namespace CryptoExchange.Net.Converters
var arrayResult = (IList)Activator.CreateInstance(property.PropertyType, new [] { innerArray.Count }); var arrayResult = (IList)Activator.CreateInstance(property.PropertyType, new [] { innerArray.Count });
foreach (var obj in innerArray) foreach (var obj in innerArray)
{ {
var innerObj = Activator.CreateInstance(objType); var innerObj = Activator.CreateInstance(objType!);
arrayResult[count] = ParseObject((JArray)obj, innerObj, objType); arrayResult[count] = ParseObject((JArray)obj, innerObj, objType!);
count++; count++;
} }
property.SetValue(result, arrayResult); property.SetValue(result, arrayResult);
@ -72,8 +72,8 @@ namespace CryptoExchange.Net.Converters
else else
{ {
var arrayResult = (IList)Activator.CreateInstance(property.PropertyType, new [] { 1 }); var arrayResult = (IList)Activator.CreateInstance(property.PropertyType, new [] { 1 });
var innerObj = Activator.CreateInstance(objType); var innerObj = Activator.CreateInstance(objType!);
arrayResult[0] = ParseObject(innerArray, innerObj, objType); arrayResult[0] = ParseObject(innerArray, innerObj, objType!);
property.SetValue(result, arrayResult); property.SetValue(result, arrayResult);
} }
continue; continue;

View File

@ -5,8 +5,6 @@ using System.Linq;
using System.Runtime.InteropServices; using System.Runtime.InteropServices;
using System.Security; using System.Security;
using System.Text; using System.Text;
using System.Threading;
using System.Threading.Tasks;
using CryptoExchange.Net.Logging; using CryptoExchange.Net.Logging;
using CryptoExchange.Net.Objects; using CryptoExchange.Net.Objects;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
@ -333,7 +331,7 @@ namespace CryptoExchange.Net
/// </summary> /// </summary>
/// <param name="exception"></param> /// <param name="exception"></param>
/// <returns></returns> /// <returns></returns>
public static string ToLogString(this Exception exception) public static string ToLogString(this Exception? exception)
{ {
var message = new StringBuilder(); var message = new StringBuilder();
var indent = 0; var indent = 0;

View File

@ -1,6 +1,5 @@
using CryptoExchange.Net.Logging; using CryptoExchange.Net.Logging;
using CryptoExchange.Net.Objects; using CryptoExchange.Net.Objects;
using System;
using System.Net.Http; using System.Net.Http;
using System.Security; using System.Security;
using System.Threading; using System.Threading;

View File

@ -1,7 +1,4 @@
using System; using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using CryptoExchange.Net.Objects; using CryptoExchange.Net.Objects;
namespace CryptoExchange.Net.Interfaces namespace CryptoExchange.Net.Interfaces
@ -21,17 +18,6 @@ namespace CryptoExchange.Net.Interfaces
/// </summary> /// </summary>
int TotalRequestsMade { get; } int TotalRequestsMade { get; }
/// <summary>
/// Adds a rate limiter to the client. There are 2 choices, the <see cref="RateLimiterTotal"/> and the <see cref="RateLimiterPerEndpoint"/>.
/// </summary>
/// <param name="limiter">The limiter to add</param>
void AddRateLimiter(IRateLimiter limiter);
/// <summary>
/// Removes all rate limiters from this client
/// </summary>
void RemoveRateLimiters();
/// <summary> /// <summary>
/// The options provided for this client /// The options provided for this client
/// </summary> /// </summary>

View File

@ -1,6 +1,5 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Diagnostics;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
@ -12,10 +11,10 @@ namespace CryptoExchange.Net.Objects
/// </summary> /// </summary>
public class AsyncResetEvent : IDisposable public class AsyncResetEvent : IDisposable
{ {
private readonly static Task<bool> _completed = Task.FromResult(true); private static readonly Task<bool> _completed = Task.FromResult(true);
private readonly Queue<TaskCompletionSource<bool>> _waits = new Queue<TaskCompletionSource<bool>>(); private readonly Queue<TaskCompletionSource<bool>> _waits = new Queue<TaskCompletionSource<bool>>();
private bool _signaled; private bool _signaled;
private bool _reset; private readonly bool _reset;
/// <summary> /// <summary>
/// New AsyncResetEvent /// New AsyncResetEvent

View File

@ -1,6 +1,5 @@
using CryptoExchange.Net.Interfaces; using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Logging; using CryptoExchange.Net.Logging;
using CryptoExchange.Net.Objects;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
@ -106,7 +105,7 @@ namespace CryptoExchange.Net.Objects
{ {
int totalWaitTime = 0; int totalWaitTime = 0;
EndpointRateLimiter endpointLimit; EndpointRateLimiter? endpointLimit;
lock (_limiterLock) lock (_limiterLock)
endpointLimit = Limiters.OfType<EndpointRateLimiter>().SingleOrDefault(h => h.Endpoints.Contains(endpoint) && (h.Method == null || h.Method == method)); endpointLimit = Limiters.OfType<EndpointRateLimiter>().SingleOrDefault(h => h.Endpoints.Contains(endpoint) && (h.Method == null || h.Method == method));
if(endpointLimit != null) if(endpointLimit != null)
@ -128,7 +127,7 @@ namespace CryptoExchange.Net.Objects
{ {
if (partialEndpointLimit.CountPerEndpoint) if (partialEndpointLimit.CountPerEndpoint)
{ {
SingleTopicRateLimiter thisEndpointLimit; SingleTopicRateLimiter? thisEndpointLimit;
lock (_limiterLock) lock (_limiterLock)
{ {
thisEndpointLimit = Limiters.OfType<SingleTopicRateLimiter>().SingleOrDefault(h => h.Type == RateLimitType.PartialEndpoint && (string)h.Topic == endpoint); thisEndpointLimit = Limiters.OfType<SingleTopicRateLimiter>().SingleOrDefault(h => h.Type == RateLimitType.PartialEndpoint && (string)h.Topic == endpoint);
@ -158,7 +157,7 @@ namespace CryptoExchange.Net.Objects
if(partialEndpointLimits.Any(p => p.IgnoreOtherRateLimits)) if(partialEndpointLimits.Any(p => p.IgnoreOtherRateLimits))
return new CallResult<int>(totalWaitTime, null); return new CallResult<int>(totalWaitTime, null);
ApiKeyRateLimiter apiLimit; ApiKeyRateLimiter? apiLimit;
lock (_limiterLock) lock (_limiterLock)
apiLimit = Limiters.OfType<ApiKeyRateLimiter>().SingleOrDefault(h => h.Type == RateLimitType.ApiKey); apiLimit = Limiters.OfType<ApiKeyRateLimiter>().SingleOrDefault(h => h.Type == RateLimitType.ApiKey);
if (apiLimit != null) if (apiLimit != null)
@ -176,7 +175,7 @@ namespace CryptoExchange.Net.Objects
} }
else if (signed || !apiLimit.OnlyForSignedRequests) else if (signed || !apiLimit.OnlyForSignedRequests)
{ {
SingleTopicRateLimiter thisApiLimit; SingleTopicRateLimiter? thisApiLimit;
lock (_limiterLock) lock (_limiterLock)
{ {
thisApiLimit = Limiters.OfType<SingleTopicRateLimiter>().SingleOrDefault(h => h.Type == RateLimitType.ApiKey && ((SecureString)h.Topic).IsEqualTo(apiKey)); thisApiLimit = Limiters.OfType<SingleTopicRateLimiter>().SingleOrDefault(h => h.Type == RateLimitType.ApiKey && ((SecureString)h.Topic).IsEqualTo(apiKey));
@ -198,7 +197,7 @@ namespace CryptoExchange.Net.Objects
if ((signed || apiLimit?.OnlyForSignedRequests == false) && apiLimit?.IgnoreTotalRateLimit == true) if ((signed || apiLimit?.OnlyForSignedRequests == false) && apiLimit?.IgnoreTotalRateLimit == true)
return new CallResult<int>(totalWaitTime, null); return new CallResult<int>(totalWaitTime, null);
TotalRateLimiter totalLimit; TotalRateLimiter? totalLimit;
lock (_limiterLock) lock (_limiterLock)
totalLimit = Limiters.OfType<TotalRateLimiter>().SingleOrDefault(); totalLimit = Limiters.OfType<TotalRateLimiter>().SingleOrDefault();
if (totalLimit != null) if (totalLimit != null)

View File

@ -18,22 +18,24 @@ namespace CryptoExchange.Net.OrderBook
/// </summary> /// </summary>
public abstract class SymbolOrderBook : ISymbolOrderBook, IDisposable public abstract class SymbolOrderBook : ISymbolOrderBook, IDisposable
{ {
private readonly object bookLock = new object(); private readonly object _bookLock = new object();
private OrderBookStatus status; private OrderBookStatus _status;
private UpdateSubscription? subscription; private UpdateSubscription? _subscription;
private bool _stopProcessing; private bool _stopProcessing;
private Task? _processTask; private Task? _processTask;
private readonly AutoResetEvent _queueEvent; private readonly AutoResetEvent _queueEvent;
private readonly ConcurrentQueue<object> _processQueue; private readonly ConcurrentQueue<object> _processQueue;
private readonly bool validateChecksum; private readonly bool _validateChecksum;
private class EmptySymbolOrderBookEntry : ISymbolOrderBookEntry private class EmptySymbolOrderBookEntry : ISymbolOrderBookEntry
{ {
public decimal Quantity { get { return 0m; } set {; } } public decimal Quantity { get => 0m;
public decimal Price { get { return 0m; } set {; } } set { } }
public decimal Price { get => 0m;
set { } }
} }
private static readonly ISymbolOrderBookEntry emptySymbolOrderBookEntry = new EmptySymbolOrderBookEntry(); private static readonly ISymbolOrderBookEntry emptySymbolOrderBookEntry = new EmptySymbolOrderBookEntry();
@ -90,16 +92,16 @@ namespace CryptoExchange.Net.OrderBook
/// <inheritdoc/> /// <inheritdoc/>
public OrderBookStatus Status public OrderBookStatus Status
{ {
get => status; get => _status;
set set
{ {
if (value == status) if (value == _status)
return; return;
var old = status; var old = _status;
status = value; _status = value;
log.Write(LogLevel.Information, $"{Id} order book {Symbol} status changed: {old} => {value}"); log.Write(LogLevel.Information, $"{Id} order book {Symbol} status changed: {old} => {value}");
OnStatusChange?.Invoke(old, status); OnStatusChange?.Invoke(old, _status);
} }
} }
@ -132,7 +134,7 @@ namespace CryptoExchange.Net.OrderBook
{ {
get get
{ {
lock (bookLock) lock (_bookLock)
return asks.Select(a => a.Value).ToList(); return asks.Select(a => a.Value).ToList();
} }
} }
@ -142,7 +144,7 @@ namespace CryptoExchange.Net.OrderBook
{ {
get get
{ {
lock (bookLock) lock (_bookLock)
return bids.Select(a => a.Value).ToList(); return bids.Select(a => a.Value).ToList();
} }
} }
@ -152,7 +154,7 @@ namespace CryptoExchange.Net.OrderBook
{ {
get get
{ {
lock (bookLock) lock (_bookLock)
return (Bids, Asks); return (Bids, Asks);
} }
} }
@ -162,7 +164,7 @@ namespace CryptoExchange.Net.OrderBook
{ {
get get
{ {
lock (bookLock) lock (_bookLock)
return bids.FirstOrDefault().Value ?? emptySymbolOrderBookEntry; return bids.FirstOrDefault().Value ?? emptySymbolOrderBookEntry;
} }
} }
@ -172,7 +174,7 @@ namespace CryptoExchange.Net.OrderBook
{ {
get get
{ {
lock (bookLock) lock (_bookLock)
return asks.FirstOrDefault().Value ?? emptySymbolOrderBookEntry; return asks.FirstOrDefault().Value ?? emptySymbolOrderBookEntry;
} }
} }
@ -180,7 +182,7 @@ namespace CryptoExchange.Net.OrderBook
/// <inheritdoc/> /// <inheritdoc/>
public (ISymbolOrderBookEntry Bid, ISymbolOrderBookEntry Ask) BestOffers { public (ISymbolOrderBookEntry Bid, ISymbolOrderBookEntry Ask) BestOffers {
get { get {
lock (bookLock) lock (_bookLock)
return (BestBid,BestAsk); return (BestBid,BestAsk);
} }
} }
@ -204,7 +206,7 @@ namespace CryptoExchange.Net.OrderBook
_processQueue = new ConcurrentQueue<object>(); _processQueue = new ConcurrentQueue<object>();
_queueEvent = new AutoResetEvent(false); _queueEvent = new AutoResetEvent(false);
validateChecksum = options.ChecksumValidationEnabled; _validateChecksum = options.ChecksumValidationEnabled;
Symbol = symbol; Symbol = symbol;
Status = OrderBookStatus.Disconnected; Status = OrderBookStatus.Disconnected;
@ -233,21 +235,21 @@ namespace CryptoExchange.Net.OrderBook
return new CallResult<bool>(false, startResult.Error); return new CallResult<bool>(false, startResult.Error);
} }
subscription = startResult.Data; _subscription = startResult.Data;
subscription.ConnectionLost += () => _subscription.ConnectionLost += () =>
{ {
log.Write(LogLevel.Warning, $"{Id} order book {Symbol} connection lost"); log.Write(LogLevel.Warning, $"{Id} order book {Symbol} connection lost");
Status = OrderBookStatus.Reconnecting; Status = OrderBookStatus.Reconnecting;
Reset(); Reset();
}; };
subscription.ConnectionClosed += () => _subscription.ConnectionClosed += () =>
{ {
log.Write(LogLevel.Warning, $"{Id} order book {Symbol} disconnected"); log.Write(LogLevel.Warning, $"{Id} order book {Symbol} disconnected");
Status = OrderBookStatus.Disconnected; Status = OrderBookStatus.Disconnected;
_ = StopAsync(); _ = StopAsync();
}; };
subscription.ConnectionRestored += async time => await ResyncAsync().ConfigureAwait(false); _subscription.ConnectionRestored += async time => await ResyncAsync().ConfigureAwait(false);
Status = OrderBookStatus.Synced; Status = OrderBookStatus.Synced;
return new CallResult<bool>(true, null); return new CallResult<bool>(true, null);
} }
@ -261,8 +263,8 @@ namespace CryptoExchange.Net.OrderBook
if (_processTask != null) if (_processTask != null)
await _processTask.ConfigureAwait(false); await _processTask.ConfigureAwait(false);
if (subscription != null) if (_subscription != null)
await subscription.CloseAsync().ConfigureAwait(false); await _subscription.CloseAsync().ConfigureAwait(false);
log.Write(LogLevel.Debug, $"{Id} order book {Symbol} stopped"); log.Write(LogLevel.Debug, $"{Id} order book {Symbol} stopped");
} }
@ -275,7 +277,7 @@ namespace CryptoExchange.Net.OrderBook
var totalCost = 0m; var totalCost = 0m;
var totalAmount = 0m; var totalAmount = 0m;
var amountLeft = quantity; var amountLeft = quantity;
lock (bookLock) lock (_bookLock)
{ {
var list = type == OrderBookEntryType.Ask ? asks : bids; var list = type == OrderBookEntryType.Ask ? asks : bids;
@ -283,7 +285,7 @@ namespace CryptoExchange.Net.OrderBook
while (amountLeft > 0) while (amountLeft > 0)
{ {
if (step == list.Count) if (step == list.Count)
return new CallResult<decimal>(0, new InvalidOperationError($"Quantity is larger than order in the order book")); return new CallResult<decimal>(0, new InvalidOperationError("Quantity is larger than order in the order book"));
var element = list.ElementAt(step); var element = list.ElementAt(step);
var stepAmount = Math.Min(element.Value.Quantity, amountLeft); var stepAmount = Math.Min(element.Value.Quantity, amountLeft);
@ -550,7 +552,7 @@ namespace CryptoExchange.Net.OrderBook
if (_stopProcessing) if (_stopProcessing)
{ {
log.Write(LogLevel.Trace, $"Skipping message because of resubscribing"); log.Write(LogLevel.Trace, "Skipping message because of resubscribing");
continue; continue;
} }
@ -566,7 +568,7 @@ namespace CryptoExchange.Net.OrderBook
private void ProcessInitialOrderBookItem(InitialOrderBookItem item) private void ProcessInitialOrderBookItem(InitialOrderBookItem item)
{ {
lock (bookLock) lock (_bookLock)
{ {
bookSet = true; bookSet = true;
asks.Clear(); asks.Clear();
@ -591,7 +593,7 @@ namespace CryptoExchange.Net.OrderBook
private void ProcessQueueItem(ProcessQueueItem item) private void ProcessQueueItem(ProcessQueueItem item)
{ {
lock (bookLock) lock (_bookLock)
{ {
if (!bookSet) if (!bookSet)
{ {
@ -629,9 +631,9 @@ namespace CryptoExchange.Net.OrderBook
private void ProcessChecksum(ChecksumItem ci) private void ProcessChecksum(ChecksumItem ci)
{ {
lock (bookLock) lock (_bookLock)
{ {
if (!validateChecksum) if (!_validateChecksum)
return; return;
bool checksumResult = false; bool checksumResult = false;
@ -652,7 +654,6 @@ namespace CryptoExchange.Net.OrderBook
log.Write(LogLevel.Warning, $"{Id} order book {Symbol} out of sync. Resyncing"); log.Write(LogLevel.Warning, $"{Id} order book {Symbol} out of sync. Resyncing");
_stopProcessing = true; _stopProcessing = true;
Resubscribe(); Resubscribe();
return;
} }
} }
} }
@ -662,15 +663,15 @@ namespace CryptoExchange.Net.OrderBook
Status = OrderBookStatus.Syncing; Status = OrderBookStatus.Syncing;
_ = Task.Run(async () => _ = Task.Run(async () =>
{ {
await subscription!.UnsubscribeAsync().ConfigureAwait(false); await _subscription!.UnsubscribeAsync().ConfigureAwait(false);
Reset(); Reset();
_stopProcessing = false; _stopProcessing = false;
if (!await subscription!.ResubscribeAsync().ConfigureAwait(false)) if (!await _subscription!.ResubscribeAsync().ConfigureAwait(false))
{ {
// Resubscribing failed, reconnect the socket // Resubscribing failed, reconnect the socket
log.Write(LogLevel.Warning, $"{Id} order book {Symbol} resync failed, reconnecting socket"); log.Write(LogLevel.Warning, $"{Id} order book {Symbol} resync failed, reconnecting socket");
Status = OrderBookStatus.Reconnecting; Status = OrderBookStatus.Reconnecting;
_ = subscription!.ReconnectAsync(); _ = _subscription!.ReconnectAsync();
} }
else else
await ResyncAsync().ConfigureAwait(false); await ResyncAsync().ConfigureAwait(false);

View File

@ -5,8 +5,6 @@ using System.Diagnostics.CodeAnalysis;
using System.IO; using System.IO;
using System.Linq; using System.Linq;
using System.Net.Http; using System.Net.Http;
using System.Net.NetworkInformation;
using System.Net.Sockets;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using System.Web; using System.Web;
@ -64,7 +62,7 @@ namespace CryptoExchange.Net
/// <summary> /// <summary>
/// List of rate limiters /// List of rate limiters
/// </summary> /// </summary>
protected IEnumerable<IRateLimiter> RateLimiters { get; private set; } protected IEnumerable<IRateLimiter> RateLimiters { get; }
/// <inheritdoc /> /// <inheritdoc />
public int TotalRequestsMade { get; private set; } public int TotalRequestsMade { get; private set; }
@ -99,28 +97,6 @@ namespace CryptoExchange.Net
RateLimiters = rateLimiters; RateLimiters = rateLimiters;
} }
/// <summary>
/// Adds a rate limiter to the client. There are 2 choices, the <see cref="RateLimiterTotal"/> and the <see cref="RateLimiterPerEndpoint"/>.
/// </summary>
/// <param name="limiter">The limiter to add</param>
public void AddRateLimiter(IRateLimiter limiter)
{
if (limiter == null)
throw new ArgumentNullException(nameof(limiter));
var rateLimiters = RateLimiters.ToList();
rateLimiters.Add(limiter);
RateLimiters = rateLimiters;
}
/// <summary>
/// Removes all rate limiters from this client
/// </summary>
public void RemoveRateLimiters()
{
RateLimiters = new List<IRateLimiter>();
}
/// <summary> /// <summary>
/// Execute a request to the uri and deserialize the response into the provided type parameter /// Execute a request to the uri and deserialize the response into the provided type parameter
/// </summary> /// </summary>
@ -308,8 +284,7 @@ namespace CryptoExchange.Net
int requestId, int requestId,
Dictionary<string, string>? additionalHeaders) Dictionary<string, string>? additionalHeaders)
{ {
if (parameters == null) parameters ??= new Dictionary<string, object>();
parameters = new Dictionary<string, object>();
var uriString = uri.ToString(); var uriString = uri.ToString();
if (authProvider != null) if (authProvider != null)

View File

@ -157,7 +157,15 @@ namespace CryptoExchange.Net
var released = false; var released = false;
// Wait for a semaphore here, so we only connect 1 socket at a time. // Wait for a semaphore here, so we only connect 1 socket at a time.
// This is necessary for being able to see if connections can be combined // This is necessary for being able to see if connections can be combined
await semaphoreSlim.WaitAsync().ConfigureAwait(false); try
{
await semaphoreSlim.WaitAsync(ct).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
return new CallResult<UpdateSubscription>(null, new CancellationRequestedError());
}
try try
{ {
// Get a new or existing socket connection // Get a new or existing socket connection

View File

@ -142,7 +142,7 @@ namespace CryptoExchange.Net.Sockets
if (!_receivedMessages.Any()) if (!_receivedMessages.Any())
return 0; return 0;
return Math.Round(_receivedMessages.Sum(v => v.Bytes) / 1000 / 3d); return Math.Round(_receivedMessages.Sum(v => v.Bytes) / 1000d / 3d);
} }
} }
} }
@ -371,8 +371,7 @@ namespace CryptoExchange.Net.Sockets
DateTime? start = null; DateTime? start = null;
while (MessagesSentLastSecond() >= RatelimitPerSecond) while (MessagesSentLastSecond() >= RatelimitPerSecond)
{ {
if (start == null) start ??= DateTime.UtcNow;
start = DateTime.UtcNow;
await Task.Delay(10).ConfigureAwait(false); await Task.Delay(10).ConfigureAwait(false);
} }
@ -479,8 +478,7 @@ namespace CryptoExchange.Net.Sockets
{ {
// We received data, but it is not complete, write it to a memory stream for reassembling // We received data, but it is not complete, write it to a memory stream for reassembling
multiPartMessage = true; multiPartMessage = true;
if (memoryStream == null) memoryStream ??= new MemoryStream();
memoryStream = new MemoryStream();
log.Write(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in partial message"); log.Write(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in partial message");
await memoryStream.WriteAsync(buffer.Array, buffer.Offset, receiveResult.Count).ConfigureAwait(false); await memoryStream.WriteAsync(buffer.Array, buffer.Offset, receiveResult.Count).ConfigureAwait(false);
} }
@ -490,7 +488,7 @@ namespace CryptoExchange.Net.Sockets
{ {
// Received a complete message and it's not multi part // Received a complete message and it's not multi part
log.Write(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in single message"); log.Write(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in single message");
HandleMessage(buffer.Array, buffer.Offset, receiveResult.Count, receiveResult.MessageType); HandleMessage(buffer.Array!, buffer.Offset, receiveResult.Count, receiveResult.MessageType);
} }
else else
{ {
@ -586,7 +584,6 @@ namespace CryptoExchange.Net.Sockets
catch(Exception e) catch(Exception e)
{ {
log.Write(LogLevel.Error, $"Socket {Id} unhandled exception during message processing: " + e.ToLogString()); log.Write(LogLevel.Error, $"Socket {Id} unhandled exception during message processing: " + e.ToLogString());
return;
} }
} }

View File

@ -562,7 +562,7 @@ namespace CryptoExchange.Net.Sockets
if (subscription.Confirmed) if (subscription.Confirmed)
await socketClient.UnsubscribeAsync(this, subscription).ConfigureAwait(false); await socketClient.UnsubscribeAsync(this, subscription).ConfigureAwait(false);
var shouldCloseConnection = false; bool shouldCloseConnection;
lock (subscriptionLock) lock (subscriptionLock)
shouldCloseConnection = !subscriptions.Any(r => r.UserSubscription && subscription != r); shouldCloseConnection = !subscriptions.Any(r => r.UserSubscription && subscription != r);