1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2026-04-13 00:22:22 +00:00

Compare commits

..

No commits in common. "204bda86226710c09f5741c7f7f1a8cc718d5b16" and "89a73747b026f0ea866aa36380138801de61ccd8" have entirely different histories.

3 changed files with 323 additions and 146 deletions

View File

@ -6,9 +6,9 @@
<PackageId>CryptoExchange.Net</PackageId>
<Authors>JKorf</Authors>
<Description>CryptoExchange.Net is a base library which is used to implement different cryptocurrency (exchange) API's. It provides a standardized way of implementing different API's, which results in a very similar experience for users of the API implementations.</Description>
<PackageVersion>10.7.1</PackageVersion>
<AssemblyVersion>10.7.1</AssemblyVersion>
<FileVersion>10.7.1</FileVersion>
<PackageVersion>10.7.0</PackageVersion>
<AssemblyVersion>10.7.0</AssemblyVersion>
<FileVersion>10.7.0</FileVersion>
<PackageRequireLicenseAcceptance>false</PackageRequireLicenseAcceptance>
<PackageTags>OKX;OKX.Net;Mexc;Mexc.Net;Kucoin;Kucoin.Net;Kraken;Kraken.Net;Huobi;Huobi.Net;CoinEx;CoinEx.Net;Bybit;Bybit.Net;Bitget;Bitget.Net;Bitfinex;Bitfinex.Net;Binance;Binance.Net;CryptoCurrency;CryptoCurrency Exchange;CryptoExchange.Net</PackageTags>
<RepositoryType>git</RepositoryType>

View File

@ -8,9 +8,7 @@ using CryptoExchange.Net.Sockets.Default.Interfaces;
using CryptoExchange.Net.Sockets.Interfaces;
using Microsoft.Extensions.Logging;
using System;
using System.Collections;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Diagnostics;
using System.Linq;
using System.Net.WebSockets;
@ -125,7 +123,15 @@ namespace CryptoExchange.Net.Sockets.Default
{
get
{
return _listeners.OfType<Subscription>().Count(h => h.UserSubscription);
_listenersLock.EnterReadLock();
try
{
return _listeners.OfType<Subscription>().Count(h => h.UserSubscription);
}
finally
{
_listenersLock.ExitReadLock();
}
}
}
@ -136,7 +142,16 @@ namespace CryptoExchange.Net.Sockets.Default
{
get
{
return _listeners.OfType<Subscription>().Where(h => h.UserSubscription).ToArray();
_listenersLock.EnterReadLock();
try
{
return _listeners.OfType<Subscription>().Where(h => h.UserSubscription).ToArray();
}
finally
{
_listenersLock.ExitReadLock();
}
}
}
@ -240,7 +255,15 @@ namespace CryptoExchange.Net.Sockets.Default
{
get
{
return _listeners.OfType<Subscription>().Select(x => x.Topic).Where(t => t != null).ToArray()!;
_listenersLock.EnterReadLock();
try
{
return _listeners.OfType<Subscription>().Select(x => x.Topic).Where(t => t != null).ToArray()!;
}
finally
{
_listenersLock.ExitReadLock();
}
}
}
@ -251,18 +274,22 @@ namespace CryptoExchange.Net.Sockets.Default
{
get
{
return _listeners.OfType<Query>().Where(x => !x.Completed).Count();
_listenersLock.EnterReadLock();
try
{
return _listeners.OfType<Query>().Where(x => !x.Completed).Count();
}
finally
{
_listenersLock.ExitReadLock();
}
}
}
private bool _pausedActivity;
#if NET9_0_OR_GREATER
private readonly Lock _listenersLock = new Lock();
#else
private readonly object _listenersLock = new object();
#endif
private ReadOnlyCollection<IMessageProcessor> _listeners;
private readonly ReaderWriterLockSlim _listenersLock = new ReaderWriterLockSlim();
private readonly List<IMessageProcessor> _listeners;
private readonly ILogger _logger;
private SocketStatus _status;
@ -311,7 +338,7 @@ namespace CryptoExchange.Net.Sockets.Default
_socket.OnError += HandleErrorAsync;
_socket.GetReconnectionUrl = GetReconnectionUrlAsync;
_listeners = new ReadOnlyCollection<IMessageProcessor>([]);
_listeners = new List<IMessageProcessor>();
_serializer = apiClient.CreateSerializer();
}
@ -338,17 +365,25 @@ namespace CryptoExchange.Net.Sockets.Default
if (ApiClient._socketConnections.ContainsKey(SocketId))
ApiClient._socketConnections.TryRemove(SocketId, out _);
foreach (var subscription in _listeners.OfType<Subscription>().Where(l => l.UserSubscription && !l.IsClosingConnection))
_listenersLock.EnterWriteLock();
try
{
subscription.IsClosingConnection = true;
subscription.Reset();
foreach (var subscription in _listeners.OfType<Subscription>().Where(l => l.UserSubscription && !l.IsClosingConnection))
{
subscription.IsClosingConnection = true;
subscription.Reset();
}
foreach (var query in _listeners.OfType<Query>().ToList())
{
query.Fail(new WebError("Connection interrupted"));
_listeners.Remove(query);
}
}
finally
{
_listenersLock.ExitWriteLock();
}
var queryList = _listeners.OfType<Query>().ToList();
foreach (var query in queryList)
query.Fail(new WebError("Connection interrupted"));
RemoveMessageProcessors(queryList);
_ = Task.Run(() => ConnectionClosed?.Invoke());
return Task.CompletedTask;
@ -364,14 +399,22 @@ namespace CryptoExchange.Net.Sockets.Default
Authenticated = false;
_lastSequenceNumber = 0;
foreach (var subscription in _listeners.OfType<Subscription>().Where(l => l.UserSubscription))
subscription.Reset();
_listenersLock.EnterWriteLock();
try
{
foreach (var subscription in _listeners.OfType<Subscription>().Where(l => l.UserSubscription))
subscription.Reset();
var queryList = _listeners.OfType<Query>().ToList();
foreach (var query in queryList)
query.Fail(new WebError("Connection interrupted"));
RemoveMessageProcessors(queryList);
foreach (var query in _listeners.OfType<Query>().ToList())
{
query.Fail(new WebError("Connection interrupted"));
_listeners.Remove(query);
}
}
finally
{
_listenersLock.ExitWriteLock();
}
_ = Task.Run(() => ConnectionLost?.Invoke());
return Task.CompletedTask;
@ -393,11 +436,19 @@ namespace CryptoExchange.Net.Sockets.Default
{
Status = SocketStatus.Resubscribing;
var queryList = _listeners.OfType<Query>().ToList();
foreach (var query in queryList)
query.Fail(new WebError("Connection interrupted"));
RemoveMessageProcessors(queryList);
_listenersLock.EnterWriteLock();
try
{
foreach (var query in _listeners.OfType<Query>().ToList())
{
query.Fail(new WebError("Connection interrupted"));
_listeners.Remove(query);
}
}
finally
{
_listenersLock.ExitWriteLock();
}
// Can't wait for this as it would cause a deadlock
_ = Task.Run(async () =>
@ -452,7 +503,17 @@ namespace CryptoExchange.Net.Sockets.Default
/// <returns></returns>
protected virtual Task HandleRequestRateLimitedAsync(int requestId)
{
var query = _listeners.OfType<Query>().FirstOrDefault(x => x.Id == requestId);
Query? query;
_listenersLock.EnterReadLock();
try
{
query = _listeners.OfType<Query>().FirstOrDefault(x => x.Id == requestId);
}
finally
{
_listenersLock.ExitReadLock();
}
if (query == null)
return Task.CompletedTask;
@ -476,7 +537,17 @@ namespace CryptoExchange.Net.Sockets.Default
/// <param name="requestId">Id of the request sent</param>
protected virtual Task HandleRequestSentAsync(int requestId)
{
var query = _listeners.OfType<Query>().FirstOrDefault(x => x.Id == requestId);
Query? query;
_listenersLock.EnterReadLock();
try
{
query = _listeners.OfType<Query>().FirstOrDefault(x => x.Id == requestId);
}
finally
{
_listenersLock.ExitReadLock();
}
if (query == null)
return Task.CompletedTask;
@ -522,19 +593,27 @@ namespace CryptoExchange.Net.Sockets.Default
}
Type? deserializationType = null;
foreach (var subscription in _listeners)
_listenersLock.EnterReadLock();
try
{
foreach (var route in subscription.MessageRouter.Routes)
foreach (var subscription in _listeners)
{
if (!route.TypeIdentifier.Equals(typeIdentifier, StringComparison.Ordinal))
continue;
foreach (var route in subscription.MessageRouter.Routes)
{
if (!route.TypeIdentifier.Equals(typeIdentifier, StringComparison.Ordinal))
continue;
deserializationType = route.DeserializationType;
break;
deserializationType = route.DeserializationType;
break;
}
if (deserializationType != null)
break;
}
if (deserializationType != null)
break;
}
finally
{
_listenersLock.ExitReadLock();
}
if (deserializationType == null)
@ -581,69 +660,94 @@ namespace CryptoExchange.Net.Sockets.Default
var topicFilter = messageConverter.GetTopicFilter(result);
bool processed = false;
foreach (var processor in _listeners)
_listenersLock.EnterReadLock();
try
{
bool isQuery = false;
Query? query = null;
if (processor is Query cquery)
var currentCount = _listeners.Count;
for(var i = 0; i < _listeners.Count; i++)
{
isQuery = true;
query = cquery;
}
var complete = false;
foreach (var route in processor.MessageRouter.Routes)
{
if (route.TypeIdentifier != typeIdentifier)
continue;
// Forward message rules:
// | Message Topic | Route Topic Filter | Topics Match | Forward | Description
// | N | N | - | Y | No topic filter applied
// | N | Y | - | N | Route only listens to specific topic
// | Y | N | - | Y | Route listens to all message regardless of topic
// | Y | Y | Y | Y | Route listens to specific message topic
// | Y | Y | N | N | Route listens to different topic
if (topicFilter == null)
if (_listeners.Count != currentCount)
{
if (route.TopicFilter != null)
// No topic on message, but route is filtering on topic
continue;
}
else
{
if (route.TopicFilter != null && !route.TopicFilter.Equals(topicFilter, StringComparison.Ordinal))
// Message has a topic, and the route has a filter for another topic
continue;
// Possible a query added or removed. If added it's not a problem, if removed it is
if (_listeners.Count < currentCount)
throw new Exception("Listeners list adjusted, can't continue processing");
}
processed = true;
if (isQuery && query!.Completed)
continue;
processor.Handle(this, receiveTime, originalData, result, route);
if (isQuery && !route.MultipleReaders)
var processor = _listeners[i];
bool isQuery = false;
Query? query = null;
if (processor is Query cquery)
{
complete = true;
isQuery = true;
query = cquery;
}
var complete = false;
foreach (var route in processor.MessageRouter.Routes)
{
if (route.TypeIdentifier != typeIdentifier)
continue;
// Forward message rules:
// | Message Topic | Route Topic Filter | Topics Match | Forward | Description
// | N | N | - | Y | No topic filter applied
// | N | Y | - | N | Route only listens to specific topic
// | Y | N | - | Y | Route listens to all message regardless of topic
// | Y | Y | Y | Y | Route listens to specific message topic
// | Y | Y | N | N | Route listens to different topic
if (topicFilter == null)
{
if (route.TopicFilter != null)
// No topic on message, but route is filtering on topic
continue;
}
else
{
if (route.TopicFilter != null && !route.TopicFilter.Equals(topicFilter, StringComparison.Ordinal))
// Message has a topic, and the route has a filter for another topic
continue;
}
processed = true;
if (isQuery && query!.Completed)
continue;
processor.Handle(this, receiveTime, originalData, result, route);
if (isQuery && !route.MultipleReaders)
{
complete = true;
break;
}
}
if (complete)
break;
}
}
if (complete)
break;
}
finally
{
_listenersLock.ExitReadLock();
}
if (!processed)
{
if (!ApiClient.HandleUnhandledMessage(this, typeIdentifier, data))
{
_logger.ReceivedMessageNotMatchedToAnyListener(
SocketId,
typeIdentifier,
topicFilter!,
string.Join(",", _listeners.Select(x => string.Join(",", x.MessageRouter.Routes.Where(x => x.TypeIdentifier == typeIdentifier).Select(x => x.TopicFilter != null ? string.Join(",", x.TopicFilter) : "[null]")))));
_listenersLock.EnterReadLock();
try
{
_logger.ReceivedMessageNotMatchedToAnyListener(
SocketId,
typeIdentifier,
topicFilter!,
string.Join(",", _listeners.Select(x => string.Join(",", x.MessageRouter.Routes.Where(x => x.TypeIdentifier == typeIdentifier).Select(x => x.TopicFilter != null ? string.Join(",", x.TopicFilter) : "[null]")))));
}
finally
{
_listenersLock.ExitReadLock();
}
}
}
}
@ -688,10 +792,18 @@ namespace CryptoExchange.Net.Sockets.Default
if (ApiClient._socketConnections.ContainsKey(SocketId))
ApiClient._socketConnections.TryRemove(SocketId, out _);
foreach (var subscription in _listeners.OfType<Subscription>())
_listenersLock.EnterReadLock();
try
{
if (subscription.CancellationTokenRegistration.HasValue)
subscription.CancellationTokenRegistration.Value.Dispose();
foreach (var subscription in _listeners.OfType<Subscription>())
{
if (subscription.CancellationTokenRegistration.HasValue)
subscription.CancellationTokenRegistration.Value.Dispose();
}
}
finally
{
_listenersLock.ExitReadLock();
}
await _socket.CloseAsync().ConfigureAwait(false);
@ -721,12 +833,32 @@ namespace CryptoExchange.Net.Sockets.Default
if (subscription.CancellationTokenRegistration.HasValue)
subscription.CancellationTokenRegistration.Value.Dispose();
bool anyDuplicateSubscription = _listeners.OfType<Subscription>().Any(x => x != subscription && x.MessageRouter.Routes.All(l => subscription.MessageRouter.ContainsCheck(l)));
bool shouldCloseConnection = _listeners.OfType<Subscription>().All(r => !r.UserSubscription || r.Status == SubscriptionStatus.Closing || r.Status == SubscriptionStatus.Closed) && !DedicatedRequestConnection.IsDedicatedRequestConnection;
bool anyDuplicateSubscription;
bool shouldCloseConnection;
_listenersLock.EnterReadLock();
try
{
anyDuplicateSubscription = _listeners.OfType<Subscription>().Any(x => x != subscription && x.MessageRouter.Routes.All(l => subscription.MessageRouter.ContainsCheck(l)));
shouldCloseConnection = _listeners.OfType<Subscription>().All(r => !r.UserSubscription || r.Status == SubscriptionStatus.Closing || r.Status == SubscriptionStatus.Closed) && !DedicatedRequestConnection.IsDedicatedRequestConnection;
}
finally
{
_listenersLock.ExitReadLock();
}
if (!anyDuplicateSubscription)
{
var needUnsub = _listeners.Contains(subscription) && !shouldCloseConnection;
bool needUnsub;
_listenersLock.EnterReadLock();
try
{
needUnsub = _listeners.Contains(subscription) && !shouldCloseConnection;
}
finally
{
_listenersLock.ExitReadLock();
}
if (needUnsub && _socket.IsOpen)
await UnsubscribeAsync(subscription).ConfigureAwait(false);
}
@ -750,7 +882,15 @@ namespace CryptoExchange.Net.Sockets.Default
await CloseAsync().ConfigureAwait(false);
}
RemoveMessageProcessor(subscription);
_listenersLock.EnterWriteLock();
try
{
_listeners.Remove(subscription);
}
finally
{
_listenersLock.ExitWriteLock();
}
subscription.Status = SubscriptionStatus.Closed;
}
@ -774,7 +914,15 @@ namespace CryptoExchange.Net.Sockets.Default
if (Status != SocketStatus.None && Status != SocketStatus.Connected)
return false;
AddMessageProcessor(subscription);
_listenersLock.EnterWriteLock();
try
{
_listeners.Add(subscription);
}
finally
{
_listenersLock.ExitWriteLock();
}
if (subscription.UserSubscription)
_logger.AddingNewSubscription(SocketId, subscription.Id, UserSubscriptionCount);
@ -787,7 +935,15 @@ namespace CryptoExchange.Net.Sockets.Default
/// <param name="id"></param>
public Subscription? GetSubscription(int id)
{
return _listeners.OfType<Subscription>().SingleOrDefault(s => s.Id == id);
_listenersLock.EnterReadLock();
try
{
return _listeners.OfType<Subscription>().SingleOrDefault(s => s.Id == id);
}
finally
{
_listenersLock.ExitReadLock();
}
}
/// <summary>
@ -835,12 +991,29 @@ namespace CryptoExchange.Net.Sockets.Default
private async Task SendAndWaitIntAsync(Query query, CancellationToken ct = default)
{
AddMessageProcessor(query);
_listenersLock.EnterWriteLock();
try
{
_listeners.Add(query);
}
finally
{
_listenersLock.ExitWriteLock();
}
var sendResult = await SendAsync(query.Id, query.Request, query.Weight).ConfigureAwait(false);
if (!sendResult)
{
query.Fail(sendResult.Error!);
RemoveMessageProcessor(query);
_listenersLock.EnterWriteLock();
try
{
_listeners.Remove(query);
}
finally
{
_listenersLock.ExitWriteLock();
}
return;
}
@ -871,7 +1044,15 @@ namespace CryptoExchange.Net.Sockets.Default
}
finally
{
RemoveMessageProcessor(query);
_listenersLock.EnterWriteLock();
try
{
_listeners.Remove(query);
}
finally
{
_listenersLock.ExitWriteLock();
}
}
}
@ -977,7 +1158,17 @@ namespace CryptoExchange.Net.Sockets.Default
if (!DedicatedRequestConnection.IsDedicatedRequestConnection)
{
var anySubscriptions = _listeners.OfType<Subscription>().Any(s => s.UserSubscription);
bool anySubscriptions;
_listenersLock.EnterReadLock();
try
{
anySubscriptions = _listeners.OfType<Subscription>().Any(s => s.UserSubscription);
}
finally
{
_listenersLock.ExitReadLock();
}
if (!anySubscriptions)
{
// No need to resubscribe anything
@ -987,8 +1178,18 @@ namespace CryptoExchange.Net.Sockets.Default
}
}
bool anyAuthenticated = _listeners.OfType<Subscription>().Any(s => s.Authenticated)
bool anyAuthenticated;
_listenersLock.EnterReadLock();
try
{
anyAuthenticated = _listeners.OfType<Subscription>().Any(s => s.Authenticated)
|| DedicatedRequestConnection.IsDedicatedRequestConnection && DedicatedRequestConnection.Authenticated;
}
finally
{
_listenersLock.ExitReadLock();
}
if (anyAuthenticated)
{
// If we reconnected a authenticated connection we need to re-authenticate
@ -1011,7 +1212,17 @@ namespace CryptoExchange.Net.Sockets.Default
if (!_socket.IsOpen)
return new CallResult(new WebError("Socket not connected"));
var subList = _listeners.OfType<Subscription>().Where(x => x.Active).Skip(batch * batchSize).Take(batchSize).ToList();
List<Subscription> subList;
_listenersLock.EnterReadLock();
try
{
subList = _listeners.OfType<Subscription>().Where(x => x.Active).Skip(batch * batchSize).Take(batchSize).ToList();
}
finally
{
_listenersLock.ExitReadLock();
}
if (subList.Count == 0)
break;
@ -1193,37 +1404,6 @@ namespace CryptoExchange.Net.Sockets.Default
});
}
private void AddMessageProcessor(IMessageProcessor processor)
{
lock (_listenersLock)
{
var updatedList = new List<IMessageProcessor>(_listeners);
updatedList.Add(processor);
_listeners = updatedList.AsReadOnly();
}
}
private void RemoveMessageProcessor(IMessageProcessor processor)
{
lock (_listenersLock)
{
var updatedList = new List<IMessageProcessor>(_listeners);
updatedList.Remove(processor);
_listeners = updatedList.AsReadOnly();
}
}
private void RemoveMessageProcessors(IEnumerable<IMessageProcessor> processors)
{
lock (_listenersLock)
{
var updatedList = new List<IMessageProcessor>(_listeners);
foreach (var processor in processors)
updatedList.Remove(processor);
_listeners = updatedList.AsReadOnly();
}
}
}
}

View File

@ -67,9 +67,6 @@ Make a one time donation in a crypto currency of your choice. If you prefer to d
Alternatively, sponsor me on Github using [Github Sponsors](https://github.com/sponsors/JKorf).
## Release notes
* Version 10.7.1 - 25 Feb 2026
* Fixed deadlock scenario in websocket connection when subscribe and handling message concurrently
* Version 10.7.0 - 24 Feb 2026
* Added parsing of REST response data up to 128 characters for error responses
* Added check for invalid json in JsonSocketMessageHandler