mirror of
https://github.com/JKorf/CryptoExchange.Net
synced 2025-06-08 16:36:15 +00:00
wip
This commit is contained in:
parent
3de04e4828
commit
ac434fa2c6
@ -1,4 +1,5 @@
|
|||||||
using CryptoExchange.Net.Objects.Sockets;
|
using CryptoExchange.Net.Objects;
|
||||||
|
using CryptoExchange.Net.Objects.Sockets;
|
||||||
using CryptoExchange.Net.Sockets;
|
using CryptoExchange.Net.Sockets;
|
||||||
using Newtonsoft.Json;
|
using Newtonsoft.Json;
|
||||||
using Newtonsoft.Json.Linq;
|
using Newtonsoft.Json.Linq;
|
||||||
@ -17,27 +18,12 @@ namespace CryptoExchange.Net.Converters
|
|||||||
{
|
{
|
||||||
private static JsonSerializer _serializer = JsonSerializer.Create(SerializerOptions.WithConverters);
|
private static JsonSerializer _serializer = JsonSerializer.Create(SerializerOptions.WithConverters);
|
||||||
|
|
||||||
/// <summary>
|
public abstract List<StreamMessageParseCallback> InterpreterPipeline { get; }
|
||||||
/// Fields to use for the message subscription identifier
|
|
||||||
/// </summary>
|
|
||||||
public virtual string[]? SubscriptionIdFields => null;
|
|
||||||
/// <summary>
|
|
||||||
/// Fields to use for the message type identifier
|
|
||||||
/// </summary>
|
|
||||||
public abstract string[] TypeIdFields { get; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// Return the type of object that the message should be parsed to based on the type id values dictionary
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="idValues"></param>
|
|
||||||
/// <param name="listeners"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
public abstract Type? GetDeserializationType(Dictionary<string, string?> idValues, List<BasePendingRequest> pendingRequests, List<Subscription> listeners);
|
|
||||||
|
|
||||||
public virtual string CreateIdentifierString(Dictionary<string, string?> idValues) => string.Join("-", idValues.Values.Where(v => v != null).Select(v => v!.ToLower()));
|
public virtual string CreateIdentifierString(Dictionary<string, string?> idValues) => string.Join("-", idValues.Values.Where(v => v != null).Select(v => v!.ToLower()));
|
||||||
|
|
||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
public BaseParsedMessage? ReadJson(Stream stream, List<BasePendingRequest> pendingRequests, List<Subscription> listeners, bool outputOriginalData)
|
public BaseParsedMessage? ReadJson(Stream stream, ConcurrentList<BasePendingRequest> pendingRequests, ConcurrentList<Subscription> listeners, bool outputOriginalData)
|
||||||
{
|
{
|
||||||
// Start reading the data
|
// Start reading the data
|
||||||
// Once we reach the properties that identify the message we save those in a dict
|
// Once we reach the properties that identify the message we save those in a dict
|
||||||
@ -63,24 +49,38 @@ namespace CryptoExchange.Net.Converters
|
|||||||
token = token.First!;
|
token = token.First!;
|
||||||
}
|
}
|
||||||
|
|
||||||
var typeIdDict = new Dictionary<string, string?>();
|
Type? resultType = null;
|
||||||
foreach (var idField in TypeIdFields)
|
Dictionary<string, string> typeIdDict = new Dictionary<string, string>();
|
||||||
typeIdDict[idField] = GetValueForKey(token, idField);
|
StreamMessageParseCallback? usedParser = null;
|
||||||
|
foreach (var callback in InterpreterPipeline)
|
||||||
Dictionary<string, string?>? subIdDict = null;
|
|
||||||
if (SubscriptionIdFields != null)
|
|
||||||
{
|
{
|
||||||
subIdDict = new Dictionary<string, string?>();
|
bool allFieldsPresent = true;
|
||||||
foreach (var idField in SubscriptionIdFields)
|
foreach(var field in callback.TypeFields)
|
||||||
subIdDict[idField] = GetValueForKey(token, idField);
|
{
|
||||||
|
var value = typeIdDict.TryGetValue(field, out var cachedValue) ? cachedValue : GetValueForKey(token, field);
|
||||||
|
if (value == null)
|
||||||
|
{
|
||||||
|
allFieldsPresent = false;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
typeIdDict[field] = value;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (allFieldsPresent)
|
||||||
|
{
|
||||||
|
resultType = callback.Callback(typeIdDict, pendingRequests, listeners);
|
||||||
|
usedParser = callback;
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var resultType = GetDeserializationType(typeIdDict, pendingRequests, listeners);
|
if (usedParser == null)
|
||||||
if (resultType == null)
|
throw new Exception("No parser found for message");
|
||||||
{
|
|
||||||
// ?
|
var subIdDict = new Dictionary<string, string?>();
|
||||||
return null;
|
foreach (var field in usedParser.IdFields)
|
||||||
}
|
subIdDict[field] = typeIdDict.TryGetValue(field, out var cachedValue) ? cachedValue : GetValueForKey(token, field);
|
||||||
|
|
||||||
var resultMessageType = typeof(ParsedMessage<>).MakeGenericType(resultType);
|
var resultMessageType = typeof(ParsedMessage<>).MakeGenericType(resultType);
|
||||||
var instance = (BaseParsedMessage)Activator.CreateInstance(resultMessageType, resultType == null ? null : token.ToObject(resultType, _serializer));
|
var instance = (BaseParsedMessage)Activator.CreateInstance(resultMessageType, resultType == null ? null : token.ToObject(resultType, _serializer));
|
||||||
@ -90,7 +90,7 @@ namespace CryptoExchange.Net.Converters
|
|||||||
instance.OriginalData = sr.ReadToEnd();
|
instance.OriginalData = sr.ReadToEnd();
|
||||||
}
|
}
|
||||||
|
|
||||||
instance.Identifier = CreateIdentifierString(subIdDict ?? typeIdDict);
|
instance.Identifier = CreateIdentifierString(subIdDict);
|
||||||
instance.Parsed = resultType != null;
|
instance.Parsed = resultType != null;
|
||||||
return instance;
|
return instance;
|
||||||
}
|
}
|
||||||
|
81
CryptoExchange.Net/Objects/ConcurrentList.cs
Normal file
81
CryptoExchange.Net/Objects/ConcurrentList.cs
Normal file
@ -0,0 +1,81 @@
|
|||||||
|
using System;
|
||||||
|
using System.Collections;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Linq;
|
||||||
|
using System.Text;
|
||||||
|
using static System.Collections.Specialized.BitVector32;
|
||||||
|
|
||||||
|
namespace CryptoExchange.Net.Objects
|
||||||
|
{
|
||||||
|
public class ConcurrentList<T> : IEnumerable<T>
|
||||||
|
{
|
||||||
|
private readonly object _lock = new object();
|
||||||
|
private readonly List<T> _collection = new List<T>();
|
||||||
|
|
||||||
|
public void Add(T item)
|
||||||
|
{
|
||||||
|
lock (_lock)
|
||||||
|
_collection.Add(item);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public void Remove(T item)
|
||||||
|
{
|
||||||
|
lock (_lock)
|
||||||
|
_collection.Remove(item);
|
||||||
|
}
|
||||||
|
|
||||||
|
public T? SingleOrDefault(Func<T, bool> action)
|
||||||
|
{
|
||||||
|
lock (_lock)
|
||||||
|
return _collection.SingleOrDefault(action);
|
||||||
|
}
|
||||||
|
|
||||||
|
public bool All(Func<T, bool> action)
|
||||||
|
{
|
||||||
|
lock (_lock)
|
||||||
|
return _collection.All(action);
|
||||||
|
}
|
||||||
|
|
||||||
|
public bool Any(Func<T, bool> action)
|
||||||
|
{
|
||||||
|
lock (_lock)
|
||||||
|
return _collection.Any(action);
|
||||||
|
}
|
||||||
|
|
||||||
|
public int Count(Func<T, bool> action)
|
||||||
|
{
|
||||||
|
lock (_lock)
|
||||||
|
return _collection.Count(action);
|
||||||
|
}
|
||||||
|
|
||||||
|
public bool Contains(T item)
|
||||||
|
{
|
||||||
|
lock (_lock)
|
||||||
|
return _collection.Contains(item);
|
||||||
|
}
|
||||||
|
|
||||||
|
public T[] ToArray(Func<T, bool> predicate)
|
||||||
|
{
|
||||||
|
lock (_lock)
|
||||||
|
return _collection.Where(predicate).ToArray();
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<T> ToList()
|
||||||
|
{
|
||||||
|
lock (_lock)
|
||||||
|
return _collection.ToList();
|
||||||
|
}
|
||||||
|
|
||||||
|
public IEnumerator<T> GetEnumerator()
|
||||||
|
{
|
||||||
|
lock (_lock)
|
||||||
|
{
|
||||||
|
foreach (var item in _collection)
|
||||||
|
yield return item;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
|
||||||
|
}
|
||||||
|
}
|
21
CryptoExchange.Net/Objects/Sockets/MatchingStrategy.cs
Normal file
21
CryptoExchange.Net/Objects/Sockets/MatchingStrategy.cs
Normal file
@ -0,0 +1,21 @@
|
|||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Text;
|
||||||
|
|
||||||
|
namespace CryptoExchange.Net.Objects.Sockets
|
||||||
|
{
|
||||||
|
public interface IMatchingStrategy
|
||||||
|
{
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
internal class IdMatchingStrategy : IMatchingStrategy
|
||||||
|
{
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
internal class FieldsMatchingStrategy : IMatchingStrategy
|
||||||
|
{
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,14 @@
|
|||||||
|
using CryptoExchange.Net.Sockets;
|
||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Text;
|
||||||
|
|
||||||
|
namespace CryptoExchange.Net.Objects.Sockets
|
||||||
|
{
|
||||||
|
public class StreamMessageParseCallback
|
||||||
|
{
|
||||||
|
public List<string> TypeFields { get; set; } = new List<string>();
|
||||||
|
public List<string> IdFields { get; set; } = new List<string>();
|
||||||
|
public Func<Dictionary<string, string>, IEnumerable<BasePendingRequest>, IEnumerable<Subscription>, Type?> Callback { get; set; }
|
||||||
|
}
|
||||||
|
}
|
@ -11,6 +11,9 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
public abstract class BaseQuery : IMessageProcessor
|
public abstract class BaseQuery : IMessageProcessor
|
||||||
{
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Unique identifier
|
||||||
|
/// </summary>
|
||||||
public int Id { get; } = ExchangeHelpers.NextId();
|
public int Id { get; } = ExchangeHelpers.NextId();
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Strings to identify this subscription with
|
/// Strings to identify this subscription with
|
||||||
@ -32,7 +35,10 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
public int Weight { get; }
|
public int Weight { get; }
|
||||||
|
|
||||||
public BasePendingRequest PendingRequest { get; private set; }
|
/// <summary>
|
||||||
|
/// The pending request for this query
|
||||||
|
/// </summary>
|
||||||
|
public BasePendingRequest? PendingRequest { get; private set; }
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// ctor
|
/// ctor
|
||||||
@ -56,6 +62,11 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
return PendingRequest;
|
return PendingRequest;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Create a pending request for this query
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="id"></param>
|
||||||
|
/// <returns></returns>
|
||||||
public abstract BasePendingRequest GetPendingRequest(int id);
|
public abstract BasePendingRequest GetPendingRequest(int id);
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -86,7 +97,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
public override async Task<CallResult> HandleMessageAsync(DataEvent<BaseParsedMessage> message)
|
public override async Task<CallResult> HandleMessageAsync(DataEvent<BaseParsedMessage> message)
|
||||||
{
|
{
|
||||||
await PendingRequest.ProcessAsync(message).ConfigureAwait(false);
|
await PendingRequest!.ProcessAsync(message).ConfigureAwait(false);
|
||||||
return await HandleMessageAsync(message.As((ParsedMessage<TResponse>)message.Data)).ConfigureAwait(false);
|
return await HandleMessageAsync(message.As((ParsedMessage<TResponse>)message.Data)).ConfigureAwait(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -11,6 +11,7 @@ using System.Net.WebSockets;
|
|||||||
using System.IO;
|
using System.IO;
|
||||||
using CryptoExchange.Net.Objects.Sockets;
|
using CryptoExchange.Net.Objects.Sockets;
|
||||||
using System.Text;
|
using System.Text;
|
||||||
|
using System.Collections.Concurrent;
|
||||||
|
|
||||||
namespace CryptoExchange.Net.Sockets
|
namespace CryptoExchange.Net.Sockets
|
||||||
{
|
{
|
||||||
@ -57,11 +58,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
/// <summary>
|
/// <summary>
|
||||||
/// The amount of subscriptions on this connection
|
/// The amount of subscriptions on this connection
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public int UserSubscriptionCount
|
public int UserSubscriptionCount => _subscriptions.Count(h => h.UserSubscription);
|
||||||
{
|
|
||||||
get { lock (_subscriptionLock)
|
|
||||||
return _subscriptions.Count(h => h.UserSubscription); }
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Get a copy of the current message subscriptions
|
/// Get a copy of the current message subscriptions
|
||||||
@ -70,8 +67,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
{
|
{
|
||||||
get
|
get
|
||||||
{
|
{
|
||||||
lock (_subscriptionLock)
|
return _subscriptions.ToArray(h => h.UserSubscription);
|
||||||
return _subscriptions.Where(h => h.UserSubscription).ToArray();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -156,13 +152,10 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
}
|
}
|
||||||
|
|
||||||
private bool _pausedActivity;
|
private bool _pausedActivity;
|
||||||
private readonly List<BasePendingRequest> _pendingRequests;
|
private readonly ConcurrentList<BasePendingRequest> _pendingRequests;
|
||||||
private readonly List<Subscription> _subscriptions;
|
private readonly ConcurrentList<Subscription> _subscriptions;
|
||||||
private readonly Dictionary<string, IMessageProcessor> _messageIdMap;
|
private readonly ConcurrentDictionary<string, IMessageProcessor> _messageIdMap;
|
||||||
|
|
||||||
private readonly object _subscriptionLock = new();
|
|
||||||
private readonly ILogger _logger;
|
private readonly ILogger _logger;
|
||||||
|
|
||||||
private SocketStatus _status;
|
private SocketStatus _status;
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -184,9 +177,9 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
Tag = tag;
|
Tag = tag;
|
||||||
Properties = new Dictionary<string, object>();
|
Properties = new Dictionary<string, object>();
|
||||||
|
|
||||||
_pendingRequests = new List<BasePendingRequest>();
|
_pendingRequests = new ConcurrentList<BasePendingRequest>();
|
||||||
_subscriptions = new List<Subscription>();
|
_subscriptions = new ConcurrentList<Subscription>();
|
||||||
_messageIdMap = new Dictionary<string, IMessageProcessor>();
|
_messageIdMap = new ConcurrentDictionary<string, IMessageProcessor>();
|
||||||
|
|
||||||
_socket = socket;
|
_socket = socket;
|
||||||
_socket.OnStreamMessage += HandleStreamMessage;
|
_socket.OnStreamMessage += HandleStreamMessage;
|
||||||
@ -215,11 +208,10 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
{
|
{
|
||||||
Status = SocketStatus.Closed;
|
Status = SocketStatus.Closed;
|
||||||
Authenticated = false;
|
Authenticated = false;
|
||||||
lock(_subscriptionLock)
|
|
||||||
{
|
foreach (var subscription in _subscriptions)
|
||||||
foreach (var subscription in _subscriptions)
|
subscription.Confirmed = false;
|
||||||
subscription.Confirmed = false;
|
|
||||||
}
|
|
||||||
Task.Run(() => ConnectionClosed?.Invoke());
|
Task.Run(() => ConnectionClosed?.Invoke());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -231,11 +223,9 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
Status = SocketStatus.Reconnecting;
|
Status = SocketStatus.Reconnecting;
|
||||||
DisconnectTime = DateTime.UtcNow;
|
DisconnectTime = DateTime.UtcNow;
|
||||||
Authenticated = false;
|
Authenticated = false;
|
||||||
lock (_subscriptionLock)
|
|
||||||
{
|
foreach (var subscription in _subscriptions)
|
||||||
foreach (var subscription in _subscriptions)
|
subscription.Confirmed = false;
|
||||||
subscription.Confirmed = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
_ = Task.Run(() => ConnectionLost?.Invoke());
|
_ = Task.Run(() => ConnectionLost?.Invoke());
|
||||||
}
|
}
|
||||||
@ -255,13 +245,11 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
protected virtual async void HandleReconnected()
|
protected virtual async void HandleReconnected()
|
||||||
{
|
{
|
||||||
Status = SocketStatus.Resubscribing;
|
Status = SocketStatus.Resubscribing;
|
||||||
lock (_subscriptions)
|
|
||||||
|
foreach (var pendingRequest in _pendingRequests.ToList())
|
||||||
{
|
{
|
||||||
foreach (var pendingRequest in _pendingRequests.ToList())
|
pendingRequest.Fail("Connection interupted");
|
||||||
{
|
// Remove?
|
||||||
pendingRequest.Fail("Connection interupted");
|
|
||||||
// Remove?
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var reconnectSuccessful = await ProcessReconnectAsync().ConfigureAwait(false);
|
var reconnectSuccessful = await ProcessReconnectAsync().ConfigureAwait(false);
|
||||||
@ -299,10 +287,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
/// <param name="requestId">Id of the request sent</param>
|
/// <param name="requestId">Id of the request sent</param>
|
||||||
protected virtual void HandleRequestSent(int requestId)
|
protected virtual void HandleRequestSent(int requestId)
|
||||||
{
|
{
|
||||||
BasePendingRequest pendingRequest;
|
var pendingRequest = _pendingRequests.SingleOrDefault(p => p.Id == requestId);
|
||||||
lock (_pendingRequests)
|
|
||||||
pendingRequest = _pendingRequests.SingleOrDefault(p => p.Id == requestId);
|
|
||||||
|
|
||||||
if (pendingRequest == null)
|
if (pendingRequest == null)
|
||||||
{
|
{
|
||||||
_logger.Log(LogLevel.Debug, $"Socket {SocketId} - msg {requestId} - message sent, but not pending");
|
_logger.Log(LogLevel.Debug, $"Socket {SocketId} - msg {requestId} - message sent, but not pending");
|
||||||
@ -322,11 +307,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
var timestamp = DateTime.UtcNow;
|
var timestamp = DateTime.UtcNow;
|
||||||
TimeSpan userCodeDuration = TimeSpan.Zero;
|
TimeSpan userCodeDuration = TimeSpan.Zero;
|
||||||
|
|
||||||
List<Subscription> subscriptions;
|
var result = ApiClient.StreamConverter.ReadJson(stream, _pendingRequests, _subscriptions, ApiClient.ApiOptions.OutputOriginalData ?? ApiClient.ClientOptions.OutputOriginalData);
|
||||||
lock (_subscriptionLock)
|
|
||||||
subscriptions = _subscriptions.OrderByDescending(x => !x.UserSubscription).ToList();
|
|
||||||
|
|
||||||
var result = ApiClient.StreamConverter.ReadJson(stream, _pendingRequests, subscriptions, ApiClient.ApiOptions.OutputOriginalData ?? ApiClient.ClientOptions.OutputOriginalData);
|
|
||||||
if(result == null)
|
if(result == null)
|
||||||
{
|
{
|
||||||
stream.Position = 0;
|
stream.Position = 0;
|
||||||
@ -358,12 +339,13 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
stream.Dispose();
|
||||||
_logger.Log(LogLevel.Trace, $"Socket {SocketId} Message mapped to processor {messageProcessor.Id} with identifier {result.Identifier}");
|
_logger.Log(LogLevel.Trace, $"Socket {SocketId} Message mapped to processor {messageProcessor.Id} with identifier {result.Identifier}");
|
||||||
|
|
||||||
if (messageProcessor is BaseQuery query)
|
if (messageProcessor is BaseQuery query)
|
||||||
{
|
{
|
||||||
foreach (var id in query.Identifiers)
|
foreach (var id in query.Identifiers)
|
||||||
_messageIdMap.Remove(id);
|
_messageIdMap.TryRemove(id, out _);
|
||||||
|
|
||||||
if (query.PendingRequest != null)
|
if (query.PendingRequest != null)
|
||||||
_pendingRequests.Remove(query.PendingRequest);
|
_pendingRequests.Remove(query.PendingRequest);
|
||||||
@ -412,13 +394,10 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
if (ApiClient.socketConnections.ContainsKey(SocketId))
|
if (ApiClient.socketConnections.ContainsKey(SocketId))
|
||||||
ApiClient.socketConnections.TryRemove(SocketId, out _);
|
ApiClient.socketConnections.TryRemove(SocketId, out _);
|
||||||
|
|
||||||
lock (_subscriptionLock)
|
foreach (var subscription in _subscriptions)
|
||||||
{
|
{
|
||||||
foreach (var subscription in _subscriptions)
|
if (subscription.CancellationTokenRegistration.HasValue)
|
||||||
{
|
subscription.CancellationTokenRegistration.Value.Dispose();
|
||||||
if (subscription.CancellationTokenRegistration.HasValue)
|
|
||||||
subscription.CancellationTokenRegistration.Value.Dispose();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
await _socket.CloseAsync().ConfigureAwait(false);
|
await _socket.CloseAsync().ConfigureAwait(false);
|
||||||
@ -433,13 +412,10 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public async Task CloseAsync(Subscription subscription, bool unsubEvenIfNotConfirmed = false)
|
public async Task CloseAsync(Subscription subscription, bool unsubEvenIfNotConfirmed = false)
|
||||||
{
|
{
|
||||||
lock (_subscriptionLock)
|
if (!_subscriptions.Contains(subscription))
|
||||||
{
|
return;
|
||||||
if (!_subscriptions.Contains(subscription))
|
|
||||||
return;
|
|
||||||
|
|
||||||
subscription.Closed = true;
|
subscription.Closed = true;
|
||||||
}
|
|
||||||
|
|
||||||
if (Status == SocketStatus.Closing || Status == SocketStatus.Closed || Status == SocketStatus.Disposed)
|
if (Status == SocketStatus.Closing || Status == SocketStatus.Closed || Status == SocketStatus.Disposed)
|
||||||
return;
|
return;
|
||||||
@ -451,32 +427,25 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
if ((unsubEvenIfNotConfirmed || subscription.Confirmed) && _socket.IsOpen)
|
if ((unsubEvenIfNotConfirmed || subscription.Confirmed) && _socket.IsOpen)
|
||||||
await UnsubscribeAsync(subscription).ConfigureAwait(false);
|
await UnsubscribeAsync(subscription).ConfigureAwait(false);
|
||||||
|
|
||||||
bool shouldCloseConnection;
|
if (Status == SocketStatus.Closing)
|
||||||
lock (_subscriptionLock)
|
|
||||||
{
|
{
|
||||||
if (Status == SocketStatus.Closing)
|
_logger.Log(LogLevel.Debug, $"Socket {SocketId} already closing");
|
||||||
{
|
return;
|
||||||
_logger.Log(LogLevel.Debug, $"Socket {SocketId} already closing");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
shouldCloseConnection = _subscriptions.All(r => !r.UserSubscription || r.Closed);
|
|
||||||
if (shouldCloseConnection)
|
|
||||||
Status = SocketStatus.Closing;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var shouldCloseConnection = _subscriptions.All(r => !r.UserSubscription || r.Closed);
|
||||||
|
if (shouldCloseConnection)
|
||||||
|
Status = SocketStatus.Closing;
|
||||||
|
|
||||||
if (shouldCloseConnection)
|
if (shouldCloseConnection)
|
||||||
{
|
{
|
||||||
_logger.Log(LogLevel.Debug, $"Socket {SocketId} closing as there are no more subscriptions");
|
_logger.Log(LogLevel.Debug, $"Socket {SocketId} closing as there are no more subscriptions");
|
||||||
await CloseAsync().ConfigureAwait(false);
|
await CloseAsync().ConfigureAwait(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
lock (_subscriptionLock)
|
_subscriptions.Remove(subscription);
|
||||||
{
|
foreach (var id in subscription.Identifiers)
|
||||||
_subscriptions.Remove(subscription);
|
_messageIdMap.TryRemove(id, out _);
|
||||||
foreach (var id in subscription.Identifiers)
|
|
||||||
_messageIdMap.Remove(id);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -494,44 +463,36 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
/// <param name="subscription"></param>
|
/// <param name="subscription"></param>
|
||||||
public bool AddSubscription(Subscription subscription)
|
public bool AddSubscription(Subscription subscription)
|
||||||
{
|
{
|
||||||
lock (_subscriptionLock)
|
if (Status != SocketStatus.None && Status != SocketStatus.Connected)
|
||||||
|
return false;
|
||||||
|
|
||||||
|
_subscriptions.Add(subscription);
|
||||||
|
if (subscription.Identifiers != null)
|
||||||
{
|
{
|
||||||
if (Status != SocketStatus.None && Status != SocketStatus.Connected)
|
foreach (var id in subscription.Identifiers)
|
||||||
return false;
|
|
||||||
|
|
||||||
_subscriptions.Add(subscription);
|
|
||||||
if (subscription.Identifiers != null)
|
|
||||||
{
|
{
|
||||||
foreach (var id in subscription.Identifiers)
|
if (!_messageIdMap.TryAdd(id.ToLowerInvariant(), subscription))
|
||||||
_messageIdMap.Add(id.ToLowerInvariant(), subscription);
|
throw new InvalidOperationException($"Failed to register subscription id {id}, already registered");
|
||||||
}
|
}
|
||||||
|
|
||||||
if (subscription.UserSubscription)
|
|
||||||
_logger.Log(LogLevel.Debug, $"Socket {SocketId} adding new subscription with id {subscription.Id}, total subscriptions on connection: {_subscriptions.Count(s => s.UserSubscription)}");
|
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (subscription.UserSubscription)
|
||||||
|
_logger.Log(LogLevel.Debug, $"Socket {SocketId} adding new subscription with id {subscription.Id}, total subscriptions on connection: {_subscriptions.Count(s => s.UserSubscription)}");
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Get a subscription on this connection by id
|
/// Get a subscription on this connection by id
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="id"></param>
|
/// <param name="id"></param>
|
||||||
public Subscription? GetSubscription(int id)
|
public Subscription? GetSubscription(int id) => _subscriptions.SingleOrDefault(s => s.Id == id);
|
||||||
{
|
|
||||||
lock (_subscriptionLock)
|
|
||||||
return _subscriptions.SingleOrDefault(s => s.Id == id);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Get a subscription on this connection by its subscribe request
|
/// Get a subscription on this connection by its subscribe request
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="predicate">Filter for a request</param>
|
/// <param name="predicate">Filter for a request</param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public Subscription? GetSubscriptionByRequest(Func<object?, bool> predicate)
|
public Subscription? GetSubscriptionByRequest(Func<object?, bool> predicate) => _subscriptions.SingleOrDefault(s => predicate(s));
|
||||||
{
|
|
||||||
lock(_subscriptionLock)
|
|
||||||
return _subscriptions.SingleOrDefault(s => predicate(s));
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Send a query request and wait for an answer
|
/// Send a query request and wait for an answer
|
||||||
@ -544,7 +505,10 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
if (query.Identifiers != null)
|
if (query.Identifiers != null)
|
||||||
{
|
{
|
||||||
foreach (var id in query.Identifiers)
|
foreach (var id in query.Identifiers)
|
||||||
_messageIdMap.Add(id.ToLowerInvariant(), query);
|
{
|
||||||
|
if(!_messageIdMap.TryAdd(id.ToLowerInvariant(), query))
|
||||||
|
throw new InvalidOperationException($"Failed to register subscription id {id}, already registered");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
await SendAndWaitAsync(pendingRequest, query.Weight).ConfigureAwait(false);
|
await SendAndWaitAsync(pendingRequest, query.Weight).ConfigureAwait(false);
|
||||||
@ -563,7 +527,10 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
if (query.Identifiers != null)
|
if (query.Identifiers != null)
|
||||||
{
|
{
|
||||||
foreach (var id in query.Identifiers)
|
foreach (var id in query.Identifiers)
|
||||||
_messageIdMap.Add(id.ToLowerInvariant(), query);
|
{
|
||||||
|
if (!_messageIdMap.TryAdd(id.ToLowerInvariant(), query))
|
||||||
|
throw new InvalidOperationException($"Failed to register subscription id {id}, already registered");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
await SendAndWaitAsync(pendingRequest, query.Weight).ConfigureAwait(false);
|
await SendAndWaitAsync(pendingRequest, query.Weight).ConfigureAwait(false);
|
||||||
@ -641,10 +608,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
if (!_socket.IsOpen)
|
if (!_socket.IsOpen)
|
||||||
return new CallResult<bool>(new WebError("Socket not connected"));
|
return new CallResult<bool>(new WebError("Socket not connected"));
|
||||||
|
|
||||||
bool anySubscriptions = false;
|
var anySubscriptions = _subscriptions.Any(s => s.UserSubscription);
|
||||||
lock (_subscriptionLock)
|
|
||||||
anySubscriptions = _subscriptions.Any(s => s.UserSubscription);
|
|
||||||
|
|
||||||
if (!anySubscriptions)
|
if (!anySubscriptions)
|
||||||
{
|
{
|
||||||
// No need to resubscribe anything
|
// No need to resubscribe anything
|
||||||
@ -653,10 +617,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
return new CallResult<bool>(true);
|
return new CallResult<bool>(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool anyAuthenticated = false;
|
var anyAuthenticated = _subscriptions.Any(s => s.Authenticated);
|
||||||
lock (_subscriptionLock)
|
|
||||||
anyAuthenticated = _subscriptions.Any(s => s.Authenticated);
|
|
||||||
|
|
||||||
if (anyAuthenticated)
|
if (anyAuthenticated)
|
||||||
{
|
{
|
||||||
// If we reconnected a authenticated connection we need to re-authenticate
|
// If we reconnected a authenticated connection we need to re-authenticate
|
||||||
@ -672,9 +633,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Get a list of all subscriptions on the socket
|
// Get a list of all subscriptions on the socket
|
||||||
List<Subscription> subList = new List<Subscription>();
|
var subList = _subscriptions.ToList();
|
||||||
lock (_subscriptionLock)
|
|
||||||
subList = _subscriptions.ToList();
|
|
||||||
|
|
||||||
foreach(var subscription in subList)
|
foreach(var subscription in subList)
|
||||||
{
|
{
|
||||||
|
@ -64,10 +64,12 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="logger"></param>
|
/// <param name="logger"></param>
|
||||||
/// <param name="authenticated"></param>
|
/// <param name="authenticated"></param>
|
||||||
public Subscription(ILogger logger, bool authenticated)
|
/// <param name="userSubscription"></param>
|
||||||
|
public Subscription(ILogger logger, bool authenticated, bool userSubscription = true)
|
||||||
{
|
{
|
||||||
_logger = logger;
|
_logger = logger;
|
||||||
Authenticated = authenticated;
|
Authenticated = authenticated;
|
||||||
|
UserSubscription = userSubscription;
|
||||||
Id = ExchangeHelpers.NextId();
|
Id = ExchangeHelpers.NextId();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -12,7 +12,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="logger"></param>
|
/// <param name="logger"></param>
|
||||||
/// <param name="authenticated"></param>
|
/// <param name="authenticated"></param>
|
||||||
public SystemSubscription(ILogger logger, bool authenticated = false) : base(logger, authenticated)
|
public SystemSubscription(ILogger logger, bool authenticated = false) : base(logger, authenticated, false)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user