mirror of
https://github.com/JKorf/CryptoExchange.Net
synced 2026-02-16 14:13:46 +00:00
Compare commits
6 Commits
96b3904266
...
ce3fa5f186
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ce3fa5f186 | ||
|
|
fff70a9c65 | ||
|
|
cff33bb5ac | ||
|
|
21c8133292 | ||
|
|
76772e91ba | ||
|
|
218e0260ce |
@ -19,20 +19,6 @@ namespace CryptoExchange.Net.UnitTests
|
||||
Assert.That(result.Success);
|
||||
}
|
||||
|
||||
[TestCase]
|
||||
public void DeserializingInvalidJson_Should_GiveErrorResult()
|
||||
{
|
||||
// arrange
|
||||
var client = new TestBaseClient();
|
||||
|
||||
// act
|
||||
var result = client.SubClient.Deserialize<object>("{\"testProperty\": 123");
|
||||
|
||||
// assert
|
||||
ClassicAssert.IsFalse(result.Success);
|
||||
Assert.That(result.Error != null);
|
||||
}
|
||||
|
||||
[TestCase("https://api.test.com/api", new[] { "path1", "path2" }, "https://api.test.com/api/path1/path2")]
|
||||
[TestCase("https://api.test.com/api", new[] { "path1", "/path2" }, "https://api.test.com/api/path1/path2")]
|
||||
[TestCase("https://api.test.com/api", new[] { "path1/", "path2" }, "https://api.test.com/api/path1/path2")]
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
using System;
|
||||
using System.IO;
|
||||
using System.Text;
|
||||
using System.Text.Json;
|
||||
using System.Threading.Tasks;
|
||||
using CryptoExchange.Net.Authentication;
|
||||
using CryptoExchange.Net.Clients;
|
||||
@ -51,19 +52,11 @@ namespace CryptoExchange.Net.UnitTests
|
||||
|
||||
public CallResult<T> Deserialize<T>(string data)
|
||||
{
|
||||
var stream = new MemoryStream(Encoding.UTF8.GetBytes(data));
|
||||
var accessor = CreateAccessor();
|
||||
var valid = accessor.Read(stream, true).Result;
|
||||
if (!valid)
|
||||
return new CallResult<T>(new ServerError(ErrorInfo.Unknown with { Message = data }));
|
||||
|
||||
var deserializeResult = accessor.Deserialize<T>();
|
||||
return deserializeResult;
|
||||
return new CallResult<T>(JsonSerializer.Deserialize<T>(data));
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public override string FormatSymbol(string baseAsset, string quoteAsset, TradingMode futuresType, DateTime? deliverDate = null) => $"{baseAsset.ToUpperInvariant()}{quoteAsset.ToUpperInvariant()}";
|
||||
protected override IStreamMessageAccessor CreateAccessor() => new SystemTextJsonStreamMessageAccessor(new System.Text.Json.JsonSerializerOptions());
|
||||
protected override IMessageSerializer CreateSerializer() => new SystemTextJsonMessageSerializer(new System.Text.Json.JsonSerializerOptions());
|
||||
protected override AuthenticationProvider CreateAuthenticationProvider(ApiCredentials credentials) => throw new NotImplementedException();
|
||||
protected override Task<WebCallResult<DateTime>> GetServerTimestampAsync() => throw new NotImplementedException();
|
||||
|
||||
@ -142,7 +142,6 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations
|
||||
/// <inheritdoc />
|
||||
public override string FormatSymbol(string baseAsset, string quoteAsset, TradingMode futuresType, DateTime? deliverDate = null) => $"{baseAsset.ToUpperInvariant()}{quoteAsset.ToUpperInvariant()}";
|
||||
|
||||
protected override IStreamMessageAccessor CreateAccessor() => new SystemTextJsonStreamMessageAccessor(new System.Text.Json.JsonSerializerOptions() { TypeInfoResolver = new TestSerializerContext() });
|
||||
protected override IMessageSerializer CreateSerializer() => new SystemTextJsonMessageSerializer(new System.Text.Json.JsonSerializerOptions());
|
||||
|
||||
public async Task<CallResult<T>> Request<T>(CancellationToken ct = default) where T : class
|
||||
@ -178,7 +177,6 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations
|
||||
RequestFactory = new Mock<IRequestFactory>().Object;
|
||||
}
|
||||
|
||||
protected override IStreamMessageAccessor CreateAccessor() => new SystemTextJsonStreamMessageAccessor(new System.Text.Json.JsonSerializerOptions());
|
||||
protected override IMessageSerializer CreateSerializer() => new SystemTextJsonMessageSerializer(new System.Text.Json.JsonSerializerOptions());
|
||||
|
||||
/// <inheritdoc />
|
||||
|
||||
@ -114,12 +114,6 @@ namespace CryptoExchange.Net.Clients
|
||||
RequestFactory.Configure(options, httpClient);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Create a message accessor instance
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
protected abstract IStreamMessageAccessor CreateAccessor();
|
||||
|
||||
/// <summary>
|
||||
/// Create a serializer instance
|
||||
/// </summary>
|
||||
@ -727,7 +721,16 @@ namespace CryptoExchange.Net.Clients
|
||||
return;
|
||||
|
||||
var localTime = DateTime.UtcNow;
|
||||
var result = await GetServerTimestampAsync().ConfigureAwait(false);
|
||||
WebCallResult<DateTime> result;
|
||||
try
|
||||
{
|
||||
result = await GetServerTimestampAsync().ConfigureAwait(false);
|
||||
}
|
||||
catch (NotImplementedException)
|
||||
{
|
||||
throw new ArgumentException("AutoTimestamp is not available for this API");
|
||||
}
|
||||
|
||||
if (!result)
|
||||
{
|
||||
_logger.LogWarning("Failed to determine time offset between client and server, timestamping might fail");
|
||||
|
||||
@ -99,11 +99,6 @@ namespace CryptoExchange.Net.Clients
|
||||
/// </summary>
|
||||
protected bool AllowTopicsOnTheSameConnection { get; set; } = true;
|
||||
|
||||
/// <summary>
|
||||
/// Whether to continue processing and forward unparsable messages to handlers
|
||||
/// </summary>
|
||||
protected internal bool ProcessUnparsableMessages { get; set; } = false;
|
||||
|
||||
/// <inheritdoc />
|
||||
public double IncomingKbps
|
||||
{
|
||||
@ -165,12 +160,6 @@ namespace CryptoExchange.Net.Clients
|
||||
{
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Create a message accessor instance
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
protected internal abstract IByteMessageAccessor CreateAccessor(WebSocketMessageType messageType);
|
||||
|
||||
/// <summary>
|
||||
/// Create a serializer instance
|
||||
/// </summary>
|
||||
@ -754,7 +743,6 @@ namespace CryptoExchange.Net.Clients
|
||||
|
||||
// Create new socket connection
|
||||
var socketConnection = new SocketConnection(_logger, SocketFactory, GetWebSocketParameters(connectionAddress.Data!), this, address);
|
||||
socketConnection.UnhandledMessage += HandleUnhandledMessage;
|
||||
socketConnection.ConnectRateLimitedAsync += HandleConnectRateLimitedAsync;
|
||||
if (dedicatedRequestConnection)
|
||||
{
|
||||
@ -805,14 +793,6 @@ namespace CryptoExchange.Net.Clients
|
||||
return new CallResult<HighPerfSocketConnection<TUpdateType>>(socketConnection);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Process an unhandled message
|
||||
/// </summary>
|
||||
/// <param name="message">The message that wasn't processed</param>
|
||||
protected virtual void HandleUnhandledMessage(IMessageAccessor message)
|
||||
{
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Process an unhandled message
|
||||
/// </summary>
|
||||
@ -873,7 +853,6 @@ namespace CryptoExchange.Net.Clients
|
||||
Proxy = ClientOptions.Proxy,
|
||||
Timeout = ApiOptions.SocketNoDataTimeout ?? ClientOptions.SocketNoDataTimeout,
|
||||
ReceiveBufferSize = ClientOptions.ReceiveBufferSize,
|
||||
UseUpdatedDeserialization = ClientOptions.UseUpdatedDeserialization
|
||||
};
|
||||
|
||||
/// <summary>
|
||||
@ -1066,7 +1045,6 @@ namespace CryptoExchange.Net.Clients
|
||||
sb.AppendLine($"\t\t\tId: {subState.Id}");
|
||||
sb.AppendLine($"\t\t\tStatus: {subState.Status}");
|
||||
sb.AppendLine($"\t\t\tInvocations: {subState.Invocations}");
|
||||
sb.AppendLine($"\t\t\tIdentifiers: [{subState.ListenMatcher.ToString()}]");
|
||||
});
|
||||
}
|
||||
});
|
||||
@ -1097,21 +1075,10 @@ namespace CryptoExchange.Net.Clients
|
||||
base.Dispose();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Get the listener identifier for the message
|
||||
/// </summary>
|
||||
/// <param name="messageAccessor"></param>
|
||||
/// <returns></returns>
|
||||
public abstract string? GetListenerIdentifier(IMessageAccessor messageAccessor);
|
||||
|
||||
/// <summary>
|
||||
/// Preprocess a stream message
|
||||
/// </summary>
|
||||
public virtual ReadOnlySpan<byte> PreprocessStreamMessage(SocketConnection connection, WebSocketMessageType type, ReadOnlySpan<byte> data) => data;
|
||||
/// <summary>
|
||||
/// Preprocess a stream message
|
||||
/// </summary>
|
||||
public virtual ReadOnlyMemory<byte> PreprocessStreamMessage(SocketConnection connection, WebSocketMessageType type, ReadOnlyMemory<byte> data) => data;
|
||||
|
||||
/// <summary>
|
||||
/// Create a new message converter instance
|
||||
|
||||
@ -1,49 +0,0 @@
|
||||
namespace CryptoExchange.Net.Converters.MessageParsing
|
||||
{
|
||||
/// <summary>
|
||||
/// Node accessor
|
||||
/// </summary>
|
||||
public readonly struct NodeAccessor
|
||||
{
|
||||
/// <summary>
|
||||
/// Index
|
||||
/// </summary>
|
||||
public int? Index { get; }
|
||||
/// <summary>
|
||||
/// Property name
|
||||
/// </summary>
|
||||
public string? Property { get; }
|
||||
|
||||
/// <summary>
|
||||
/// Type (0 = int, 1 = string, 2 = prop name)
|
||||
/// </summary>
|
||||
public int Type { get; }
|
||||
|
||||
private NodeAccessor(int? index, string? property, int type)
|
||||
{
|
||||
Index = index;
|
||||
Property = property;
|
||||
Type = type;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Create an int node accessor
|
||||
/// </summary>
|
||||
/// <param name="value"></param>
|
||||
/// <returns></returns>
|
||||
public static NodeAccessor Int(int value) { return new NodeAccessor(value, null, 0); }
|
||||
|
||||
/// <summary>
|
||||
/// Create a string node accessor
|
||||
/// </summary>
|
||||
/// <param name="value"></param>
|
||||
/// <returns></returns>
|
||||
public static NodeAccessor String(string value) { return new NodeAccessor(null, value, 1); }
|
||||
|
||||
/// <summary>
|
||||
/// Create a property name node accessor
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
public static NodeAccessor PropertyName() { return new NodeAccessor(null, null, 2); }
|
||||
}
|
||||
}
|
||||
@ -1,50 +0,0 @@
|
||||
using System.Collections;
|
||||
using System.Collections.Generic;
|
||||
|
||||
namespace CryptoExchange.Net.Converters.MessageParsing
|
||||
{
|
||||
/// <summary>
|
||||
/// Message access definition
|
||||
/// </summary>
|
||||
public readonly struct MessagePath : IEnumerable<NodeAccessor>
|
||||
{
|
||||
private readonly List<NodeAccessor> _path;
|
||||
|
||||
internal void Add(NodeAccessor node)
|
||||
{
|
||||
_path.Add(node);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// ctor
|
||||
/// </summary>
|
||||
public MessagePath()
|
||||
{
|
||||
_path = new List<NodeAccessor>();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Create a new message path
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
public static MessagePath Get()
|
||||
{
|
||||
return new MessagePath();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// IEnumerable implementation
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
public IEnumerator<NodeAccessor> GetEnumerator()
|
||||
{
|
||||
for (var i = 0; i < _path.Count; i++)
|
||||
yield return _path[i];
|
||||
}
|
||||
|
||||
IEnumerator IEnumerable.GetEnumerator()
|
||||
{
|
||||
return GetEnumerator();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1,43 +0,0 @@
|
||||
namespace CryptoExchange.Net.Converters.MessageParsing
|
||||
{
|
||||
/// <summary>
|
||||
/// Message path extension methods
|
||||
/// </summary>
|
||||
public static class MessagePathExtension
|
||||
{
|
||||
/// <summary>
|
||||
/// Add a string node accessor
|
||||
/// </summary>
|
||||
/// <param name="path"></param>
|
||||
/// <param name="propName"></param>
|
||||
/// <returns></returns>
|
||||
public static MessagePath Property(this MessagePath path, string propName)
|
||||
{
|
||||
path.Add(NodeAccessor.String(propName));
|
||||
return path;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Add a property name node accessor
|
||||
/// </summary>
|
||||
/// <param name="path"></param>
|
||||
/// <returns></returns>
|
||||
public static MessagePath PropertyName(this MessagePath path)
|
||||
{
|
||||
path.Add(NodeAccessor.PropertyName());
|
||||
return path;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Add a int node accessor
|
||||
/// </summary>
|
||||
/// <param name="path"></param>
|
||||
/// <param name="index"></param>
|
||||
/// <returns></returns>
|
||||
public static MessagePath Index(this MessagePath path, int index)
|
||||
{
|
||||
path.Add(NodeAccessor.Int(index));
|
||||
return path;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1,21 +0,0 @@
|
||||
namespace CryptoExchange.Net.Converters.MessageParsing
|
||||
{
|
||||
/// <summary>
|
||||
/// Message node type
|
||||
/// </summary>
|
||||
public enum NodeType
|
||||
{
|
||||
/// <summary>
|
||||
/// Array node
|
||||
/// </summary>
|
||||
Array,
|
||||
/// <summary>
|
||||
/// Object node
|
||||
/// </summary>
|
||||
Object,
|
||||
/// <summary>
|
||||
/// Value node
|
||||
/// </summary>
|
||||
Value
|
||||
}
|
||||
}
|
||||
@ -1,373 +0,0 @@
|
||||
using CryptoExchange.Net.Converters.MessageParsing;
|
||||
using CryptoExchange.Net.Interfaces;
|
||||
using CryptoExchange.Net.Objects;
|
||||
using System;
|
||||
using System.Diagnostics.CodeAnalysis;
|
||||
using System.IO;
|
||||
using System.Text;
|
||||
using System.Text.Json;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace CryptoExchange.Net.Converters.SystemTextJson
|
||||
{
|
||||
/// <summary>
|
||||
/// System.Text.Json message accessor
|
||||
/// </summary>
|
||||
public abstract class SystemTextJsonMessageAccessor : IMessageAccessor
|
||||
{
|
||||
/// <summary>
|
||||
/// The JsonDocument loaded
|
||||
/// </summary>
|
||||
protected JsonDocument? _document;
|
||||
|
||||
private readonly JsonSerializerOptions? _customSerializerOptions;
|
||||
|
||||
/// <inheritdoc />
|
||||
public bool IsValid { get; set; }
|
||||
|
||||
/// <inheritdoc />
|
||||
public abstract bool OriginalDataAvailable { get; }
|
||||
|
||||
/// <inheritdoc />
|
||||
public object? Underlying => throw new NotImplementedException();
|
||||
|
||||
/// <summary>
|
||||
/// ctor
|
||||
/// </summary>
|
||||
public SystemTextJsonMessageAccessor(JsonSerializerOptions options)
|
||||
{
|
||||
_customSerializerOptions = options;
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
#if NET5_0_OR_GREATER
|
||||
[UnconditionalSuppressMessage("AssemblyLoadTrimming", "IL2026:RequiresUnreferencedCode", Justification = "JsonSerializerOptions provided here has TypeInfoResolver set")]
|
||||
[UnconditionalSuppressMessage("AssemblyLoadTrimming", "IL3050:RequiresUnreferencedCode", Justification = "JsonSerializerOptions provided here has TypeInfoResolver set")]
|
||||
#endif
|
||||
public CallResult<object> Deserialize(Type type, MessagePath? path = null)
|
||||
{
|
||||
if (!IsValid)
|
||||
return new CallResult<object>(GetOriginalString());
|
||||
|
||||
if (_document == null)
|
||||
throw new InvalidOperationException("No json document loaded");
|
||||
|
||||
try
|
||||
{
|
||||
var result = _document.Deserialize(type, _customSerializerOptions);
|
||||
return new CallResult<object>(result!);
|
||||
}
|
||||
catch (JsonException ex)
|
||||
{
|
||||
var info = $"Json deserialization failed: {ex.Message}, Path: {ex.Path}, LineNumber: {ex.LineNumber}, LinePosition: {ex.BytePositionInLine}";
|
||||
return new CallResult<object>(new DeserializeError(info, ex));
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
return new CallResult<object>(new DeserializeError($"Json deserialization failed: {ex.Message}", ex));
|
||||
}
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
#if NET5_0_OR_GREATER
|
||||
[UnconditionalSuppressMessage("AssemblyLoadTrimming", "IL2026:RequiresUnreferencedCode", Justification = "JsonSerializerOptions provided here has TypeInfoResolver set")]
|
||||
[UnconditionalSuppressMessage("AssemblyLoadTrimming", "IL3050:RequiresUnreferencedCode", Justification = "JsonSerializerOptions provided here has TypeInfoResolver set")]
|
||||
#endif
|
||||
public CallResult<T> Deserialize<T>(MessagePath? path = null)
|
||||
{
|
||||
if (_document == null)
|
||||
throw new InvalidOperationException("No json document loaded");
|
||||
|
||||
try
|
||||
{
|
||||
var result = _document.Deserialize<T>(_customSerializerOptions);
|
||||
return new CallResult<T>(result!);
|
||||
}
|
||||
catch (JsonException ex)
|
||||
{
|
||||
var info = $"Json deserialization failed: {ex.Message}, Path: {ex.Path}, LineNumber: {ex.LineNumber}, LinePosition: {ex.BytePositionInLine}";
|
||||
return new CallResult<T>(new DeserializeError(info, ex));
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
return new CallResult<T>(new DeserializeError($"Json deserialization failed: {ex.Message}", ex));
|
||||
}
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public NodeType? GetNodeType()
|
||||
{
|
||||
if (!IsValid)
|
||||
throw new InvalidOperationException("Can't access json data on non-json message");
|
||||
|
||||
if (_document == null)
|
||||
throw new InvalidOperationException("No json document loaded");
|
||||
|
||||
return _document.RootElement.ValueKind switch
|
||||
{
|
||||
JsonValueKind.Object => NodeType.Object,
|
||||
JsonValueKind.Array => NodeType.Array,
|
||||
_ => NodeType.Value
|
||||
};
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public NodeType? GetNodeType(MessagePath path)
|
||||
{
|
||||
if (!IsValid)
|
||||
throw new InvalidOperationException("Can't access json data on non-json message");
|
||||
|
||||
var node = GetPathNode(path);
|
||||
if (!node.HasValue)
|
||||
return null;
|
||||
|
||||
return node.Value.ValueKind switch
|
||||
{
|
||||
JsonValueKind.Object => NodeType.Object,
|
||||
JsonValueKind.Array => NodeType.Array,
|
||||
_ => NodeType.Value
|
||||
};
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
#if NET5_0_OR_GREATER
|
||||
[UnconditionalSuppressMessage("AssemblyLoadTrimming", "IL2026:RequiresUnreferencedCode", Justification = "JsonSerializerOptions provided here has TypeInfoResolver set")]
|
||||
[UnconditionalSuppressMessage("AssemblyLoadTrimming", "IL3050:RequiresUnreferencedCode", Justification = "JsonSerializerOptions provided here has TypeInfoResolver set")]
|
||||
#endif
|
||||
public T? GetValue<T>(MessagePath path)
|
||||
{
|
||||
if (!IsValid)
|
||||
throw new InvalidOperationException("Can't access json data on non-json message");
|
||||
|
||||
var value = GetPathNode(path);
|
||||
if (value == null)
|
||||
return default;
|
||||
|
||||
if (value.Value.ValueKind == JsonValueKind.Object || value.Value.ValueKind == JsonValueKind.Array)
|
||||
{
|
||||
try
|
||||
{
|
||||
return value.Value.Deserialize<T>(_customSerializerOptions);
|
||||
}
|
||||
catch { }
|
||||
|
||||
return default;
|
||||
}
|
||||
|
||||
if (typeof(T) == typeof(string))
|
||||
{
|
||||
if (value.Value.ValueKind == JsonValueKind.Number)
|
||||
return (T)(object)value.Value.GetInt64().ToString();
|
||||
}
|
||||
|
||||
return value.Value.Deserialize<T>(_customSerializerOptions);
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
#if NET5_0_OR_GREATER
|
||||
[UnconditionalSuppressMessage("AssemblyLoadTrimming", "IL2026:RequiresUnreferencedCode", Justification = "JsonSerializerOptions provided here has TypeInfoResolver set")]
|
||||
[UnconditionalSuppressMessage("AssemblyLoadTrimming", "IL3050:RequiresUnreferencedCode", Justification = "JsonSerializerOptions provided here has TypeInfoResolver set")]
|
||||
#endif
|
||||
public T?[]? GetValues<T>(MessagePath path)
|
||||
{
|
||||
if (!IsValid)
|
||||
throw new InvalidOperationException("Can't access json data on non-json message");
|
||||
|
||||
var value = GetPathNode(path);
|
||||
if (value == null)
|
||||
return default;
|
||||
|
||||
if (value.Value.ValueKind != JsonValueKind.Array)
|
||||
return default;
|
||||
|
||||
return value.Value.Deserialize<T[]>(_customSerializerOptions)!;
|
||||
}
|
||||
|
||||
private JsonElement? GetPathNode(MessagePath path)
|
||||
{
|
||||
if (!IsValid)
|
||||
throw new InvalidOperationException("Can't access json data on non-json message");
|
||||
|
||||
if (_document == null)
|
||||
throw new InvalidOperationException("No json document loaded");
|
||||
|
||||
JsonElement? currentToken = _document.RootElement;
|
||||
foreach (var node in path)
|
||||
{
|
||||
if (node.Type == 0)
|
||||
{
|
||||
// Int value
|
||||
var val = node.Index!.Value;
|
||||
if (currentToken!.Value.ValueKind != JsonValueKind.Array || currentToken.Value.GetArrayLength() <= val)
|
||||
return null;
|
||||
|
||||
currentToken = currentToken.Value[val];
|
||||
}
|
||||
else if (node.Type == 1)
|
||||
{
|
||||
// String value
|
||||
if (currentToken!.Value.ValueKind != JsonValueKind.Object)
|
||||
return null;
|
||||
|
||||
if (!currentToken.Value.TryGetProperty(node.Property!, out var token))
|
||||
return null;
|
||||
currentToken = token;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Property name
|
||||
if (currentToken!.Value.ValueKind != JsonValueKind.Object)
|
||||
return null;
|
||||
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
if (currentToken == null)
|
||||
return null;
|
||||
}
|
||||
|
||||
return currentToken;
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public abstract string GetOriginalString();
|
||||
|
||||
/// <inheritdoc />
|
||||
public abstract void Clear();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// System.Text.Json stream message accessor
|
||||
/// </summary>
|
||||
public class SystemTextJsonStreamMessageAccessor : SystemTextJsonMessageAccessor, IStreamMessageAccessor
|
||||
{
|
||||
private Stream? _stream;
|
||||
|
||||
/// <inheritdoc />
|
||||
public override bool OriginalDataAvailable => _stream?.CanSeek == true;
|
||||
|
||||
/// <summary>
|
||||
/// ctor
|
||||
/// </summary>
|
||||
public SystemTextJsonStreamMessageAccessor(JsonSerializerOptions options): base(options)
|
||||
{
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public async Task<CallResult> Read(Stream stream, bool bufferStream)
|
||||
{
|
||||
if (bufferStream && stream is not MemoryStream)
|
||||
{
|
||||
// We need to be buffer the stream, and it's not currently a seekable stream, so copy it to a new memory stream
|
||||
_stream = new MemoryStream();
|
||||
stream.CopyTo(_stream);
|
||||
_stream.Position = 0;
|
||||
}
|
||||
else if (bufferStream)
|
||||
{
|
||||
// We need to buffer the stream, and the current stream is seekable, store as is
|
||||
_stream = stream;
|
||||
}
|
||||
else
|
||||
{
|
||||
// We don't need to buffer the stream, so don't bother keeping the reference
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
_document = await JsonDocument.ParseAsync(_stream ?? stream).ConfigureAwait(false);
|
||||
IsValid = true;
|
||||
return CallResult.SuccessResult;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
// Not a json message
|
||||
IsValid = false;
|
||||
return new CallResult(new DeserializeError($"Json deserialization failed: {ex.Message}", ex));
|
||||
}
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public override string GetOriginalString()
|
||||
{
|
||||
if (_stream is null)
|
||||
throw new NullReferenceException("Stream not initialized");
|
||||
|
||||
_stream.Position = 0;
|
||||
using var textReader = new StreamReader(_stream, Encoding.UTF8, false, 1024, true);
|
||||
return textReader.ReadToEnd();
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public override void Clear()
|
||||
{
|
||||
_stream?.Dispose();
|
||||
_stream = null;
|
||||
_document?.Dispose();
|
||||
_document = null;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// System.Text.Json byte message accessor
|
||||
/// </summary>
|
||||
public class SystemTextJsonByteMessageAccessor : SystemTextJsonMessageAccessor, IByteMessageAccessor
|
||||
{
|
||||
private ReadOnlyMemory<byte> _bytes;
|
||||
|
||||
/// <summary>
|
||||
/// ctor
|
||||
/// </summary>
|
||||
public SystemTextJsonByteMessageAccessor(JsonSerializerOptions options) : base(options)
|
||||
{
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public CallResult Read(ReadOnlyMemory<byte> data)
|
||||
{
|
||||
_bytes = data;
|
||||
|
||||
try
|
||||
{
|
||||
var firstByte = data.Span[0];
|
||||
if (firstByte != 0x7b && firstByte != 0x5b)
|
||||
{
|
||||
// Value doesn't start with `{` or `[`, prevent deserialization attempt as it's slow
|
||||
IsValid = false;
|
||||
return new CallResult(new DeserializeError("Not a json value"));
|
||||
}
|
||||
|
||||
_document = JsonDocument.Parse(data);
|
||||
IsValid = true;
|
||||
return CallResult.SuccessResult;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
// Not a json message
|
||||
IsValid = false;
|
||||
return new CallResult(new DeserializeError($"Json deserialization failed: {ex.Message}", ex));
|
||||
}
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public override string GetOriginalString() =>
|
||||
// NetStandard 2.0 doesn't support GetString from a ReadonlySpan<byte>, so use ToArray there instead
|
||||
#if NETSTANDARD2_0
|
||||
Encoding.UTF8.GetString(_bytes.ToArray());
|
||||
#else
|
||||
Encoding.UTF8.GetString(_bytes.Span);
|
||||
#endif
|
||||
|
||||
/// <inheritdoc />
|
||||
public override bool OriginalDataAvailable => true;
|
||||
|
||||
/// <inheritdoc />
|
||||
public override void Clear()
|
||||
{
|
||||
_bytes = null;
|
||||
_document?.Dispose();
|
||||
_document = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -6,9 +6,9 @@
|
||||
<PackageId>CryptoExchange.Net</PackageId>
|
||||
<Authors>JKorf</Authors>
|
||||
<Description>CryptoExchange.Net is a base library which is used to implement different cryptocurrency (exchange) API's. It provides a standardized way of implementing different API's, which results in a very similar experience for users of the API implementations.</Description>
|
||||
<PackageVersion>10.2.5</PackageVersion>
|
||||
<AssemblyVersion>10.2.5</AssemblyVersion>
|
||||
<FileVersion>10.2.5</FileVersion>
|
||||
<PackageVersion>10.3.0</PackageVersion>
|
||||
<AssemblyVersion>10.3.0</AssemblyVersion>
|
||||
<FileVersion>10.3.0</FileVersion>
|
||||
<PackageRequireLicenseAcceptance>false</PackageRequireLicenseAcceptance>
|
||||
<PackageTags>OKX;OKX.Net;Mexc;Mexc.Net;Kucoin;Kucoin.Net;Kraken;Kraken.Net;Huobi;Huobi.Net;CoinEx;CoinEx.Net;Bybit;Bybit.Net;Bitget;Bitget.Net;Bitfinex;Bitfinex.Net;Binance;Binance.Net;CryptoCurrency;CryptoCurrency Exchange;CryptoExchange.Net</PackageTags>
|
||||
<RepositoryType>git</RepositoryType>
|
||||
|
||||
@ -292,122 +292,6 @@ namespace CryptoExchange.Net
|
||||
return sb.ToString();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Create a new uri with the provided parameters as query
|
||||
/// </summary>
|
||||
/// <param name="parameters"></param>
|
||||
/// <param name="baseUri"></param>
|
||||
/// <param name="arraySerialization"></param>
|
||||
/// <returns></returns>
|
||||
public static Uri SetParameters(this Uri baseUri, IDictionary<string, object> parameters, ArrayParametersSerialization arraySerialization)
|
||||
{
|
||||
var uriBuilder = new UriBuilder();
|
||||
uriBuilder.Scheme = baseUri.Scheme;
|
||||
uriBuilder.Host = baseUri.Host;
|
||||
uriBuilder.Port = baseUri.Port;
|
||||
uriBuilder.Path = baseUri.AbsolutePath;
|
||||
var httpValueCollection = HttpUtility.ParseQueryString(string.Empty);
|
||||
foreach (var parameter in parameters)
|
||||
{
|
||||
if (parameter.Value.GetType().IsArray)
|
||||
{
|
||||
if (arraySerialization == ArrayParametersSerialization.JsonArray)
|
||||
{
|
||||
httpValueCollection.Add(parameter.Key, $"[{string.Join(",", (object[])parameter.Value)}]");
|
||||
}
|
||||
else
|
||||
{
|
||||
foreach (var item in (object[])parameter.Value)
|
||||
{
|
||||
if (arraySerialization == ArrayParametersSerialization.Array)
|
||||
{
|
||||
httpValueCollection.Add(parameter.Key + "[]", item.ToString());
|
||||
}
|
||||
else
|
||||
{
|
||||
httpValueCollection.Add(parameter.Key, item.ToString());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
httpValueCollection.Add(parameter.Key, parameter.Value.ToString());
|
||||
}
|
||||
}
|
||||
|
||||
uriBuilder.Query = httpValueCollection.ToString();
|
||||
return uriBuilder.Uri;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Create a new uri with the provided parameters as query
|
||||
/// </summary>
|
||||
/// <param name="parameters"></param>
|
||||
/// <param name="baseUri"></param>
|
||||
/// <param name="arraySerialization"></param>
|
||||
/// <returns></returns>
|
||||
public static Uri SetParameters(this Uri baseUri, IOrderedEnumerable<KeyValuePair<string, object>> parameters, ArrayParametersSerialization arraySerialization)
|
||||
{
|
||||
var uriBuilder = new UriBuilder();
|
||||
uriBuilder.Scheme = baseUri.Scheme;
|
||||
uriBuilder.Host = baseUri.Host;
|
||||
uriBuilder.Port = baseUri.Port;
|
||||
uriBuilder.Path = baseUri.AbsolutePath;
|
||||
var httpValueCollection = HttpUtility.ParseQueryString(string.Empty);
|
||||
foreach (var parameter in parameters)
|
||||
{
|
||||
if (parameter.Value.GetType().IsArray)
|
||||
{
|
||||
if (arraySerialization == ArrayParametersSerialization.JsonArray)
|
||||
{
|
||||
httpValueCollection.Add(parameter.Key, $"[{string.Join(",", (object[])parameter.Value)}]");
|
||||
}
|
||||
else
|
||||
{
|
||||
foreach (var item in (object[])parameter.Value)
|
||||
{
|
||||
if (arraySerialization == ArrayParametersSerialization.Array)
|
||||
{
|
||||
httpValueCollection.Add(parameter.Key + "[]", item.ToString());
|
||||
}
|
||||
else
|
||||
{
|
||||
httpValueCollection.Add(parameter.Key, item.ToString());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
httpValueCollection.Add(parameter.Key, parameter.Value.ToString());
|
||||
}
|
||||
}
|
||||
|
||||
uriBuilder.Query = httpValueCollection.ToString();
|
||||
return uriBuilder.Uri;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Add parameter to URI
|
||||
/// </summary>
|
||||
/// <param name="uri"></param>
|
||||
/// <param name="name"></param>
|
||||
/// <param name="value"></param>
|
||||
/// <returns></returns>
|
||||
public static Uri AddQueryParameter(this Uri uri, string name, string value)
|
||||
{
|
||||
var httpValueCollection = HttpUtility.ParseQueryString(uri.Query);
|
||||
|
||||
httpValueCollection.Remove(name);
|
||||
httpValueCollection.Add(name, value);
|
||||
|
||||
var ub = new UriBuilder(uri);
|
||||
ub.Query = httpValueCollection.ToString();
|
||||
|
||||
return ub.Uri;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Decompress using GzipStream
|
||||
/// </summary>
|
||||
@ -419,20 +303,6 @@ namespace CryptoExchange.Net
|
||||
return new ReadOnlySpan<byte>(decompressedStream.GetBuffer(), 0, (int)decompressedStream.Length);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Decompress using GzipStream
|
||||
/// </summary>
|
||||
public static ReadOnlyMemory<byte> DecompressGzip(this ReadOnlyMemory<byte> data)
|
||||
{
|
||||
using var decompressedStream = new MemoryStream();
|
||||
using var dataStream = MemoryMarshal.TryGetArray(data, out var arraySegment)
|
||||
? new MemoryStream(arraySegment.Array!, arraySegment.Offset, arraySegment.Count)
|
||||
: new MemoryStream(data.ToArray());
|
||||
using var deflateStream = new GZipStream(dataStream, CompressionMode.Decompress);
|
||||
deflateStream.CopyTo(decompressedStream);
|
||||
return new ReadOnlyMemory<byte>(decompressedStream.GetBuffer(), 0, (int)decompressedStream.Length);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Decompress using GzipStream
|
||||
/// </summary>
|
||||
@ -445,22 +315,6 @@ namespace CryptoExchange.Net
|
||||
return new ReadOnlySpan<byte>(output.GetBuffer(), 0, (int)output.Length);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Decompress using DeflateStream
|
||||
/// </summary>
|
||||
/// <param name="input"></param>
|
||||
/// <returns></returns>
|
||||
public static ReadOnlyMemory<byte> Decompress(this ReadOnlyMemory<byte> input)
|
||||
{
|
||||
var output = new MemoryStream();
|
||||
|
||||
using var compressStream = new MemoryStream(input.ToArray());
|
||||
using var decompressor = new DeflateStream(compressStream, CompressionMode.Decompress);
|
||||
decompressor.CopyTo(output);
|
||||
output.Position = 0;
|
||||
return new ReadOnlyMemory<byte>(output.GetBuffer(), 0, (int)output.Length);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Whether the trading mode is linear
|
||||
/// </summary>
|
||||
|
||||
@ -1,109 +0,0 @@
|
||||
using CryptoExchange.Net.Converters.MessageParsing;
|
||||
using CryptoExchange.Net.Objects;
|
||||
using System;
|
||||
using System.Diagnostics.CodeAnalysis;
|
||||
using System.IO;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace CryptoExchange.Net.Interfaces
|
||||
{
|
||||
/// <summary>
|
||||
/// Message accessor
|
||||
/// </summary>
|
||||
public interface IMessageAccessor
|
||||
{
|
||||
/// <summary>
|
||||
/// Is this a valid message
|
||||
/// </summary>
|
||||
bool IsValid { get; }
|
||||
/// <summary>
|
||||
/// Is the original data available for retrieval
|
||||
/// </summary>
|
||||
bool OriginalDataAvailable { get; }
|
||||
/// <summary>
|
||||
/// The underlying data object
|
||||
/// </summary>
|
||||
object? Underlying { get; }
|
||||
/// <summary>
|
||||
/// Clear internal data structure
|
||||
/// </summary>
|
||||
void Clear();
|
||||
/// <summary>
|
||||
/// Get the type of node
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
NodeType? GetNodeType();
|
||||
/// <summary>
|
||||
/// Get the type of node
|
||||
/// </summary>
|
||||
/// <param name="path">Access path</param>
|
||||
/// <returns></returns>
|
||||
NodeType? GetNodeType(MessagePath path);
|
||||
/// <summary>
|
||||
/// Get the value of a path
|
||||
/// </summary>
|
||||
/// <typeparam name="T"></typeparam>
|
||||
/// <param name="path"></param>
|
||||
/// <returns></returns>
|
||||
T? GetValue<T>(MessagePath path);
|
||||
/// <summary>
|
||||
/// Get the values of an array
|
||||
/// </summary>
|
||||
/// <typeparam name="T"></typeparam>
|
||||
/// <param name="path"></param>
|
||||
/// <returns></returns>
|
||||
T?[]? GetValues<T>(MessagePath path);
|
||||
/// <summary>
|
||||
/// Deserialize the message into this type
|
||||
/// </summary>
|
||||
/// <param name="type"></param>
|
||||
/// <param name="path"></param>
|
||||
/// <returns></returns>
|
||||
#if NET5_0_OR_GREATER
|
||||
[UnconditionalSuppressMessage("AssemblyLoadTrimming", "IL2092:RequiresUnreferencedCode", Justification = "JsonSerializerOptions provided here has TypeInfoResolver set")]
|
||||
[UnconditionalSuppressMessage("AssemblyLoadTrimming", "IL2095:RequiresUnreferencedCode", Justification = "JsonSerializerOptions provided here has TypeInfoResolver set")]
|
||||
#endif
|
||||
CallResult<object> Deserialize(Type type, MessagePath? path = null);
|
||||
/// <summary>
|
||||
/// Deserialize the message into this type
|
||||
/// </summary>
|
||||
/// <param name="path"></param>
|
||||
/// <returns></returns>
|
||||
#if NET5_0_OR_GREATER
|
||||
[UnconditionalSuppressMessage("AssemblyLoadTrimming", "IL2092:RequiresUnreferencedCode", Justification = "JsonSerializerOptions provided here has TypeInfoResolver set")]
|
||||
[UnconditionalSuppressMessage("AssemblyLoadTrimming", "IL2095:RequiresUnreferencedCode", Justification = "JsonSerializerOptions provided here has TypeInfoResolver set")]
|
||||
#endif
|
||||
CallResult<T> Deserialize<T>(MessagePath? path = null);
|
||||
|
||||
/// <summary>
|
||||
/// Get the original string value
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
string GetOriginalString();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Stream message accessor
|
||||
/// </summary>
|
||||
public interface IStreamMessageAccessor : IMessageAccessor
|
||||
{
|
||||
/// <summary>
|
||||
/// Load a stream message
|
||||
/// </summary>
|
||||
/// <param name="stream"></param>
|
||||
/// <param name="bufferStream"></param>
|
||||
Task<CallResult> Read(Stream stream, bool bufferStream);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Byte message accessor
|
||||
/// </summary>
|
||||
public interface IByteMessageAccessor : IMessageAccessor
|
||||
{
|
||||
/// <summary>
|
||||
/// Load a data message
|
||||
/// </summary>
|
||||
/// <param name="data"></param>
|
||||
CallResult Read(ReadOnlyMemory<byte> data);
|
||||
}
|
||||
}
|
||||
@ -250,6 +250,40 @@
|
||||
DEX
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Type of platform
|
||||
/// </summary>
|
||||
public enum PlatformType
|
||||
{
|
||||
/// <summary>
|
||||
/// Platform to trade cryptocurrency
|
||||
/// </summary>
|
||||
CryptoCurrencyExchange,
|
||||
/// <summary>
|
||||
/// Platform for trading on predictions
|
||||
/// </summary>
|
||||
PredictionMarket,
|
||||
/// <summary>
|
||||
/// Other
|
||||
/// </summary>
|
||||
Other
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Centralization type
|
||||
/// </summary>
|
||||
public enum CentralizationType
|
||||
{
|
||||
/// <summary>
|
||||
/// Centralized, a person or company is in full control
|
||||
/// </summary>
|
||||
Centralized,
|
||||
/// <summary>
|
||||
/// Decentralized, governance is split over different entities with no single entity in full control
|
||||
/// </summary>
|
||||
Decentralized
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Timeout behavior for queries
|
||||
/// </summary>
|
||||
|
||||
@ -75,11 +75,6 @@ namespace CryptoExchange.Net.Objects.Options
|
||||
/// </remarks>
|
||||
public int? ReceiveBufferSize { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Whether or not to use the updated deserialization logic, default is true
|
||||
/// </summary>
|
||||
public bool UseUpdatedDeserialization { get; set; } = true;
|
||||
|
||||
/// <summary>
|
||||
/// Create a copy of this options
|
||||
/// </summary>
|
||||
@ -101,7 +96,6 @@ namespace CryptoExchange.Net.Objects.Options
|
||||
item.RateLimitingBehaviour = RateLimitingBehaviour;
|
||||
item.RateLimiterEnabled = RateLimiterEnabled;
|
||||
item.ReceiveBufferSize = ReceiveBufferSize;
|
||||
item.UseUpdatedDeserialization = UseUpdatedDeserialization;
|
||||
return item;
|
||||
}
|
||||
}
|
||||
|
||||
51
CryptoExchange.Net/Objects/PlatformInfo.cs
Normal file
51
CryptoExchange.Net/Objects/PlatformInfo.cs
Normal file
@ -0,0 +1,51 @@
|
||||
namespace CryptoExchange.Net.Objects
|
||||
{
|
||||
/// <summary>
|
||||
/// Information on the platform
|
||||
/// </summary>
|
||||
public record PlatformInfo
|
||||
{
|
||||
/// <summary>
|
||||
/// Platform id
|
||||
/// </summary>
|
||||
public string Id { get; }
|
||||
/// <summary>
|
||||
/// Display name
|
||||
/// </summary>
|
||||
public string DisplayName { get; }
|
||||
/// <summary>
|
||||
/// Logo
|
||||
/// </summary>
|
||||
public string Logo { get; }
|
||||
/// <summary>
|
||||
/// Url to main application
|
||||
/// </summary>
|
||||
public string Url { get; }
|
||||
/// <summary>
|
||||
/// Urls to the API documentation
|
||||
/// </summary>
|
||||
public string[] ApiDocsUrl { get; }
|
||||
/// <summary>
|
||||
/// Platform type
|
||||
/// </summary>
|
||||
public PlatformType PlatformType { get; }
|
||||
/// <summary>
|
||||
/// Centralization type
|
||||
/// </summary>
|
||||
public CentralizationType CentralizationType { get; }
|
||||
|
||||
/// <summary>
|
||||
/// ctor
|
||||
/// </summary>
|
||||
public PlatformInfo(string id, string displayName, string logo, string url, string[] apiDocsUrl, PlatformType platformType, CentralizationType centralizationType)
|
||||
{
|
||||
Id = id;
|
||||
DisplayName = displayName;
|
||||
Logo = logo;
|
||||
Url = url;
|
||||
ApiDocsUrl = apiDocsUrl;
|
||||
PlatformType = platformType;
|
||||
CentralizationType = centralizationType;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -74,11 +74,6 @@ namespace CryptoExchange.Net.Objects.Sockets
|
||||
/// </summary>
|
||||
public int? ReceiveBufferSize { get; set; } = null;
|
||||
|
||||
/// <summary>
|
||||
/// Whether or not to use the updated deserialization logic
|
||||
/// </summary>
|
||||
public bool UseUpdatedDeserialization { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// ctor
|
||||
/// </summary>
|
||||
|
||||
@ -95,9 +95,6 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
/// <inheritdoc />
|
||||
public event Func<Task>? OnClose;
|
||||
|
||||
/// <inheritdoc />
|
||||
public event Func<WebSocketMessageType, ReadOnlyMemory<byte>, Task>? OnStreamMessage;
|
||||
|
||||
/// <inheritdoc />
|
||||
public event Func<int, Task>? OnRequestSent;
|
||||
|
||||
@ -139,10 +136,7 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
_sendEvent = new AsyncResetEvent();
|
||||
_sendBuffer = new ConcurrentQueue<SendItem>();
|
||||
_ctsSource = new CancellationTokenSource();
|
||||
if (websocketParameters.UseUpdatedDeserialization)
|
||||
_receiveBufferSize = websocketParameters.ReceiveBufferSize ?? 65536;
|
||||
else
|
||||
_receiveBufferSize = websocketParameters.ReceiveBufferSize ?? _defaultReceiveBufferSize;
|
||||
_receiveBufferSize = websocketParameters.ReceiveBufferSize ?? 65536;
|
||||
|
||||
_closeSem = new SemaphoreSlim(1, 1);
|
||||
_socket = CreateSocket();
|
||||
@ -225,7 +219,9 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
catch (Exception e)
|
||||
{
|
||||
if (ct.IsCancellationRequested)
|
||||
{
|
||||
_logger.SocketConnectingCanceled(Id);
|
||||
}
|
||||
else if (!_ctsSource.IsCancellationRequested)
|
||||
{
|
||||
// if _ctsSource was canceled this was already logged
|
||||
@ -271,11 +267,10 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
var sendTask = SendLoopAsync();
|
||||
Task receiveTask;
|
||||
#if !NETSTANDARD2_0
|
||||
if (Parameters.UseUpdatedDeserialization)
|
||||
receiveTask = ReceiveLoopNewAsync();
|
||||
else
|
||||
receiveTask = ReceiveLoopNewAsync();
|
||||
#else
|
||||
receiveTask = ReceiveLoopAsync();
|
||||
#endif
|
||||
receiveTask = ReceiveLoopAsync();
|
||||
var timeoutTask = Parameters.Timeout != null && Parameters.Timeout > TimeSpan.FromSeconds(0) ? CheckTimeoutAsync() : Task.CompletedTask;
|
||||
await Task.WhenAll(sendTask, receiveTask, timeoutTask).ConfigureAwait(false);
|
||||
_logger.SocketFinishedProcessing(Id);
|
||||
@ -578,6 +573,7 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
}
|
||||
}
|
||||
|
||||
#if NETSTANDARD2_0
|
||||
/// <summary>
|
||||
/// Loop for receiving and reassembling data
|
||||
/// </summary>
|
||||
@ -666,10 +662,7 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
if (_logger.IsEnabled(LogLevel.Trace))
|
||||
_logger.SocketReceivedSingleMessage(Id, receiveResult.Count);
|
||||
|
||||
if (!Parameters.UseUpdatedDeserialization)
|
||||
await ProcessData(receiveResult.MessageType, new ReadOnlyMemory<byte>(buffer.Array!, buffer.Offset, receiveResult.Count)).ConfigureAwait(false);
|
||||
else
|
||||
ProcessDataNew(receiveResult.MessageType, new ReadOnlySpan<byte>(buffer.Array!, buffer.Offset, receiveResult.Count));
|
||||
ProcessDataNew(receiveResult.MessageType, new ReadOnlySpan<byte>(buffer.Array!, buffer.Offset, receiveResult.Count));
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -703,11 +696,7 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
_logger.SocketReassembledMessage(Id, multipartStream!.Length);
|
||||
|
||||
// Get the underlying buffer of the memory stream holding the written data and delimit it (GetBuffer return the full array, not only the written part)
|
||||
|
||||
if (!Parameters.UseUpdatedDeserialization)
|
||||
await ProcessData(receiveResult.MessageType, new ReadOnlyMemory<byte>(multipartStream!.GetBuffer(), 0, (int)multipartStream.Length)).ConfigureAwait(false);
|
||||
else
|
||||
ProcessDataNew(receiveResult.MessageType, new ReadOnlySpan<byte>(multipartStream!.GetBuffer(), 0, (int)multipartStream.Length));
|
||||
ProcessDataNew(receiveResult.MessageType, new ReadOnlySpan<byte>(multipartStream!.GetBuffer(), 0, (int)multipartStream.Length));
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -732,6 +721,7 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
_logger.SocketReceiveLoopFinished(Id);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
#if !NETSTANDARD2_0
|
||||
/// <summary>
|
||||
@ -895,18 +885,6 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
_connection.HandleStreamMessage2(type, data);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Process a stream message
|
||||
/// </summary>
|
||||
/// <param name="type"></param>
|
||||
/// <param name="data"></param>
|
||||
/// <returns></returns>
|
||||
protected async Task ProcessData(WebSocketMessageType type, ReadOnlyMemory<byte> data)
|
||||
{
|
||||
LastActionTime = DateTime.UtcNow;
|
||||
await (OnStreamMessage?.Invoke(type, data) ?? Task.CompletedTask).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Checks if there is no data received for a period longer than the specified timeout
|
||||
/// </summary>
|
||||
|
||||
@ -16,10 +16,6 @@ namespace CryptoExchange.Net.Sockets.Default.Interfaces
|
||||
/// </summary>
|
||||
event Func<Task> OnClose;
|
||||
/// <summary>
|
||||
/// Websocket message received event
|
||||
/// </summary>
|
||||
event Func<WebSocketMessageType, ReadOnlyMemory<byte>, Task> OnStreamMessage;
|
||||
/// <summary>
|
||||
/// Websocket sent event, RequestId as parameter
|
||||
/// </summary>
|
||||
event Func<int, Task> OnRequestSent;
|
||||
|
||||
@ -111,11 +111,6 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
/// </summary>
|
||||
public event Action? ActivityUnpaused;
|
||||
|
||||
/// <summary>
|
||||
/// Unhandled message event
|
||||
/// </summary>
|
||||
public event Action<IMessageAccessor>? UnhandledMessage;
|
||||
|
||||
/// <summary>
|
||||
/// Connection was rate limited and couldn't be established
|
||||
/// </summary>
|
||||
@ -269,8 +264,6 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
private SocketStatus _status;
|
||||
|
||||
private readonly IMessageSerializer _serializer;
|
||||
private IByteMessageAccessor? _stringMessageAccessor;
|
||||
private IByteMessageAccessor? _byteMessageAccessor;
|
||||
|
||||
private ISocketMessageHandler? _byteMessageConverter;
|
||||
private ISocketMessageHandler? _textMessageConverter;
|
||||
@ -292,11 +285,6 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
/// </summary>
|
||||
private readonly IWebsocket _socket;
|
||||
|
||||
/// <summary>
|
||||
/// Cache for deserialization, only caches for a single message
|
||||
/// </summary>
|
||||
private readonly Dictionary<Type, object> _deserializationCache = new Dictionary<Type, object>();
|
||||
|
||||
/// <summary>
|
||||
/// New socket connection
|
||||
/// </summary>
|
||||
@ -310,7 +298,6 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
_socket = socketFactory.CreateWebsocket(logger, this, parameters);
|
||||
_logger.SocketCreatedForAddress(_socket.Id, parameters.Uri.ToString());
|
||||
|
||||
_socket.OnStreamMessage += HandleStreamMessage;
|
||||
_socket.OnRequestSent += HandleRequestSentAsync;
|
||||
_socket.OnRequestRateLimited += HandleRequestRateLimitedAsync;
|
||||
_socket.OnConnectRateLimited += HandleConnectRateLimitedAsync;
|
||||
@ -671,144 +658,6 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Handle a message
|
||||
/// </summary>
|
||||
protected virtual async Task HandleStreamMessage(WebSocketMessageType type, ReadOnlyMemory<byte> data)
|
||||
{
|
||||
var sw = Stopwatch.StartNew();
|
||||
var receiveTime = DateTime.UtcNow;
|
||||
string? originalData = null;
|
||||
|
||||
// 1. Decrypt/Preprocess if necessary
|
||||
data = ApiClient.PreprocessStreamMessage(this, type, data);
|
||||
|
||||
// 2. Read data into accessor
|
||||
IByteMessageAccessor accessor;
|
||||
if (type == WebSocketMessageType.Binary)
|
||||
accessor = _stringMessageAccessor ??= ApiClient.CreateAccessor(type);
|
||||
else
|
||||
accessor = _byteMessageAccessor ??= ApiClient.CreateAccessor(type);
|
||||
|
||||
var result = accessor.Read(data);
|
||||
try
|
||||
{
|
||||
bool outputOriginalData = ApiClient.ApiOptions.OutputOriginalData ?? ApiClient.ClientOptions.OutputOriginalData;
|
||||
if (outputOriginalData)
|
||||
{
|
||||
originalData = accessor.GetOriginalString();
|
||||
_logger.ReceivedData(SocketId, originalData);
|
||||
}
|
||||
|
||||
if (!accessor.IsValid && !ApiClient.ProcessUnparsableMessages)
|
||||
{
|
||||
_logger.FailedToParse(SocketId, result.Error!.Message ?? result.Error!.ErrorDescription!);
|
||||
return;
|
||||
}
|
||||
|
||||
// 3. Determine the identifying properties of this message
|
||||
var listenId = ApiClient.GetListenerIdentifier(accessor);
|
||||
if (listenId == null)
|
||||
{
|
||||
originalData ??= "[OutputOriginalData is false]";
|
||||
if (!ApiClient.UnhandledMessageExpected)
|
||||
_logger.FailedToEvaluateMessage(SocketId, originalData);
|
||||
|
||||
UnhandledMessage?.Invoke(accessor);
|
||||
return;
|
||||
}
|
||||
|
||||
bool processed = false;
|
||||
var totalUserTime = 0;
|
||||
|
||||
List<IMessageProcessor> localListeners;
|
||||
lock (_listenersLock)
|
||||
localListeners = _listeners.ToList();
|
||||
|
||||
foreach (var processor in localListeners)
|
||||
{
|
||||
foreach (var listener in processor.MessageMatcher.GetHandlerLinks(listenId))
|
||||
{
|
||||
processed = true;
|
||||
_logger.ProcessorMatched(SocketId, listener.ToString(), listenId);
|
||||
|
||||
// 4. Determine the type to deserialize to for this processor
|
||||
var messageType = listener.DeserializationType;
|
||||
if (messageType == null)
|
||||
{
|
||||
_logger.ReceivedMessageNotRecognized(SocketId, processor.Id);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (processor is Subscription subscriptionProcessor && subscriptionProcessor.Status == SubscriptionStatus.Subscribing)
|
||||
{
|
||||
// If this message is for this listener then it is automatically confirmed, even if the subscription is not (yet) confirmed
|
||||
subscriptionProcessor.Status = SubscriptionStatus.Subscribed;
|
||||
if (subscriptionProcessor.SubscriptionQuery?.TimeoutBehavior == TimeoutBehavior.Succeed)
|
||||
// If this subscription has a query waiting for a timeout (success if there is no error response)
|
||||
// then time it out now as the data is being received, so we assume it's successful
|
||||
subscriptionProcessor.SubscriptionQuery.Timeout();
|
||||
}
|
||||
|
||||
// 5. Deserialize the message
|
||||
_deserializationCache.TryGetValue(messageType, out var deserialized);
|
||||
|
||||
if (deserialized == null)
|
||||
{
|
||||
var desResult = processor.Deserialize(accessor, messageType);
|
||||
if (!desResult)
|
||||
{
|
||||
_logger.FailedToDeserializeMessage(SocketId, desResult.Error?.ToString(), desResult.Error?.Exception);
|
||||
continue;
|
||||
}
|
||||
|
||||
deserialized = desResult.Data;
|
||||
_deserializationCache.Add(messageType, deserialized);
|
||||
}
|
||||
|
||||
// 6. Pass the message to the handler
|
||||
try
|
||||
{
|
||||
var innerSw = Stopwatch.StartNew();
|
||||
processor.Handle(this, receiveTime, originalData, deserialized, listener);
|
||||
if (processor is Query query && query.RequiredResponses != 1)
|
||||
_logger.LogDebug($"[Sckt {SocketId}] [Req {query.Id}] responses: {query.CurrentResponses}/{query.RequiredResponses}");
|
||||
totalUserTime += (int)innerSw.ElapsedMilliseconds;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.UserMessageProcessingFailed(SocketId, ex.Message, ex);
|
||||
if (processor is Subscription subscription)
|
||||
subscription.InvokeExceptionHandler(ex);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
if (!processed)
|
||||
{
|
||||
if (!ApiClient.UnhandledMessageExpected)
|
||||
{
|
||||
List<string> listenerIds;
|
||||
lock (_listenersLock)
|
||||
listenerIds = _listeners.Select(l => l.MessageMatcher.ToString()).ToList();
|
||||
|
||||
_logger.ReceivedMessageNotMatchedToAnyListener(SocketId, listenId, string.Join(",", listenerIds));
|
||||
UnhandledMessage?.Invoke(accessor);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
_logger.MessageProcessed(SocketId, sw.ElapsedMilliseconds, sw.ElapsedMilliseconds - totalUserTime);
|
||||
}
|
||||
finally
|
||||
{
|
||||
_deserializationCache.Clear();
|
||||
accessor.Clear();
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Connect the websocket
|
||||
/// </summary>
|
||||
@ -886,16 +735,8 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
subscription.CancellationTokenRegistration.Value.Dispose();
|
||||
|
||||
bool anyDuplicateSubscription;
|
||||
if (ApiClient.ClientOptions.UseUpdatedDeserialization)
|
||||
{
|
||||
lock (_listenersLock)
|
||||
anyDuplicateSubscription = _listeners.OfType<Subscription>().Any(x => x != subscription && x.MessageRouter.Routes.All(l => subscription.MessageRouter.ContainsCheck(l)));
|
||||
}
|
||||
else
|
||||
{
|
||||
lock (_listenersLock)
|
||||
anyDuplicateSubscription = _listeners.OfType<Subscription>().Any(x => x != subscription && x.MessageMatcher.HandlerLinks.All(l => subscription.MessageMatcher.ContainsCheck(l)));
|
||||
}
|
||||
lock (_listenersLock)
|
||||
anyDuplicateSubscription = _listeners.OfType<Subscription>().Any(x => x != subscription && x.MessageRouter.Routes.All(l => subscription.MessageRouter.ContainsCheck(l)));
|
||||
|
||||
bool shouldCloseConnection;
|
||||
lock (_listenersLock)
|
||||
@ -947,12 +788,6 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
_socket.Dispose();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Whether or not a new subscription can be added to this connection
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
public bool CanAddSubscription() => Status == SocketStatus.None || Status == SocketStatus.Connected;
|
||||
|
||||
/// <summary>
|
||||
/// Add a subscription to this connection
|
||||
/// </summary>
|
||||
@ -1279,6 +1114,7 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
return;
|
||||
|
||||
await SendAndWaitQueryAsync(unsubscribeRequest).ConfigureAwait(false);
|
||||
subscription.HandleUnsubQueryResponse(this, unsubscribeRequest.Response);
|
||||
_logger.SubscriptionUnsubscribed(SocketId, subscription.Id);
|
||||
}
|
||||
|
||||
@ -1292,7 +1128,7 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
return CallResult.SuccessResult;
|
||||
|
||||
var result = await SendAndWaitQueryAsync(subQuery).ConfigureAwait(false);
|
||||
subscription.HandleSubQueryResponse(this, subQuery.Response!);
|
||||
subscription.HandleSubQueryResponse(this, subQuery.Response);
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
@ -70,11 +70,6 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
/// </summary>
|
||||
public bool Authenticated { get; }
|
||||
|
||||
/// <summary>
|
||||
/// Matcher for this subscription
|
||||
/// </summary>
|
||||
public MessageMatcher MessageMatcher { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Router for this subscription
|
||||
/// </summary>
|
||||
@ -154,7 +149,7 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
/// <summary>
|
||||
/// Handle an unsubscription query response
|
||||
/// </summary>
|
||||
public virtual void HandleUnsubQueryResponse(SocketConnection connection, object message) { }
|
||||
public virtual void HandleUnsubQueryResponse(SocketConnection connection, object? message) { }
|
||||
|
||||
/// <summary>
|
||||
/// Create a new unsubscription query
|
||||
@ -172,19 +167,6 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
/// <returns></returns>
|
||||
protected abstract Query? GetUnsubQuery(SocketConnection connection);
|
||||
|
||||
/// <inheritdoc />
|
||||
public virtual CallResult<object> Deserialize(IMessageAccessor message, Type type) => message.Deserialize(type);
|
||||
|
||||
/// <summary>
|
||||
/// Handle an update message
|
||||
/// </summary>
|
||||
public CallResult Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object data, MessageHandlerLink matcher)
|
||||
{
|
||||
ConnectionInvocations++;
|
||||
TotalInvocations++;
|
||||
return matcher.Handle(connection, receiveTime, originalData, data);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Handle an update message
|
||||
/// </summary>
|
||||
@ -224,12 +206,12 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
/// <param name="Id">The id of the subscription</param>
|
||||
/// <param name="Status">Subscription status</param>
|
||||
/// <param name="Invocations">Number of times this subscription got a message</param>
|
||||
/// <param name="ListenMatcher">Matcher for this subscription</param>
|
||||
/// <param name="MessageRouter">Router for this subscription</param>
|
||||
public record SubscriptionState(
|
||||
int Id,
|
||||
SubscriptionStatus Status,
|
||||
int Invocations,
|
||||
MessageMatcher ListenMatcher
|
||||
MessageRouter MessageRouter
|
||||
);
|
||||
|
||||
/// <summary>
|
||||
@ -238,7 +220,7 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
/// <returns></returns>
|
||||
public SubscriptionState GetState()
|
||||
{
|
||||
return new SubscriptionState(Id, Status, TotalInvocations, MessageMatcher);
|
||||
return new SubscriptionState(Id, Status, TotalInvocations, MessageRouter);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -245,7 +245,9 @@ namespace CryptoExchange.Net.Sockets.HighPerf
|
||||
public virtual ValueTask<CallResult> SendAsync<T>(T obj)
|
||||
{
|
||||
if (_serializer is IByteMessageSerializer byteSerializer)
|
||||
{
|
||||
return SendBytesAsync(byteSerializer.Serialize(obj));
|
||||
}
|
||||
else if (_serializer is IStringMessageSerializer stringSerializer)
|
||||
{
|
||||
if (obj is string str)
|
||||
|
||||
@ -15,27 +15,12 @@ namespace CryptoExchange.Net.Sockets.Interfaces
|
||||
/// </summary>
|
||||
public int Id { get; }
|
||||
/// <summary>
|
||||
/// The matcher for this listener
|
||||
/// </summary>
|
||||
public MessageMatcher MessageMatcher { get; }
|
||||
/// <summary>
|
||||
/// The message router for this processor
|
||||
/// </summary>
|
||||
public MessageRouter MessageRouter { get; }
|
||||
/// <summary>
|
||||
/// Handle a message
|
||||
/// </summary>
|
||||
CallResult Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object result, MessageHandlerLink matchedHandler);
|
||||
/// <summary>
|
||||
/// Handle a message
|
||||
/// </summary>
|
||||
CallResult? Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object result, MessageRoute route);
|
||||
/// <summary>
|
||||
/// Deserialize a message into object of type
|
||||
/// </summary>
|
||||
/// <param name="accessor"></param>
|
||||
/// <param name="type"></param>
|
||||
/// <returns></returns>
|
||||
CallResult<object> Deserialize(IMessageAccessor accessor, Type type);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,185 +0,0 @@
|
||||
using CryptoExchange.Net.Objects;
|
||||
using CryptoExchange.Net.Sockets.Default;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
|
||||
namespace CryptoExchange.Net.Sockets
|
||||
{
|
||||
/// <summary>
|
||||
/// Message link type
|
||||
/// </summary>
|
||||
public enum MessageLinkType
|
||||
{
|
||||
/// <summary>
|
||||
/// Match when the listen id matches fully to the value
|
||||
/// </summary>
|
||||
Full,
|
||||
/// <summary>
|
||||
/// Match when the listen id starts with the value
|
||||
/// </summary>
|
||||
StartsWith
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Matches a message listen id to a specific listener
|
||||
/// </summary>
|
||||
public class MessageMatcher
|
||||
{
|
||||
/// <summary>
|
||||
/// Linkers in this matcher
|
||||
/// </summary>
|
||||
public MessageHandlerLink[] HandlerLinks { get; }
|
||||
|
||||
/// <summary>
|
||||
/// ctor
|
||||
/// </summary>
|
||||
private MessageMatcher(params MessageHandlerLink[] links)
|
||||
{
|
||||
HandlerLinks = links;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Create message matcher
|
||||
/// </summary>
|
||||
public static MessageMatcher Create(string value)
|
||||
{
|
||||
return new MessageMatcher(new MessageHandlerLink<string>(MessageLinkType.Full, value, (con, receiveTime, originalData, msg) => new CallResult<string>(default, null, null)));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Create message matcher
|
||||
/// </summary>
|
||||
public static MessageMatcher Create<T>(string value)
|
||||
{
|
||||
return new MessageMatcher(new MessageHandlerLink<T>(MessageLinkType.Full, value, (con, receiveTime, originalData, msg) => new CallResult<T>(default, null, null)));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Create message matcher
|
||||
/// </summary>
|
||||
public static MessageMatcher Create<T>(string value, Func<SocketConnection, DateTime, string?, T, CallResult> handler)
|
||||
{
|
||||
return new MessageMatcher(new MessageHandlerLink<T>(MessageLinkType.Full, value, handler));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Create message matcher
|
||||
/// </summary>
|
||||
public static MessageMatcher Create<T>(IEnumerable<string> values, Func<SocketConnection, DateTime, string?, T, CallResult> handler)
|
||||
{
|
||||
return new MessageMatcher(values.Select(x => new MessageHandlerLink<T>(MessageLinkType.Full, x, handler)).ToArray());
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Create message matcher
|
||||
/// </summary>
|
||||
public static MessageMatcher Create<T>(MessageLinkType type, string value, Func<SocketConnection, DateTime, string?, T, CallResult> handler)
|
||||
{
|
||||
return new MessageMatcher(new MessageHandlerLink<T>(type, value, handler));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Create message matcher
|
||||
/// </summary>
|
||||
public static MessageMatcher Create(params MessageHandlerLink[] linkers)
|
||||
{
|
||||
return new MessageMatcher(linkers);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Whether this matcher contains a specific link
|
||||
/// </summary>
|
||||
public bool ContainsCheck(MessageHandlerLink link) => HandlerLinks.Any(x => x.Type == link.Type && x.Value == link.Value);
|
||||
|
||||
/// <summary>
|
||||
/// Get any handler links matching with the listen id
|
||||
/// </summary>
|
||||
public IEnumerable<MessageHandlerLink> GetHandlerLinks(string listenId) => HandlerLinks.Where(x => x.Check(listenId));
|
||||
|
||||
/// <inheritdoc />
|
||||
public override string ToString() => string.Join(",", HandlerLinks.Select(x => x.ToString()));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Message handler link
|
||||
/// </summary>
|
||||
public abstract class MessageHandlerLink
|
||||
{
|
||||
/// <summary>
|
||||
/// Type of check
|
||||
/// </summary>
|
||||
public MessageLinkType Type { get; }
|
||||
/// <summary>
|
||||
/// String value of the check
|
||||
/// </summary>
|
||||
public string Value { get; }
|
||||
/// <summary>
|
||||
/// Deserialization type
|
||||
/// </summary>
|
||||
public abstract Type DeserializationType { get; }
|
||||
|
||||
/// <summary>
|
||||
/// ctor
|
||||
/// </summary>
|
||||
public MessageHandlerLink(MessageLinkType type, string value)
|
||||
{
|
||||
Type = type;
|
||||
Value = value;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Whether this listen id matches this link
|
||||
/// </summary>
|
||||
public bool Check(string listenId)
|
||||
{
|
||||
if (Type == MessageLinkType.Full)
|
||||
return Value.Equals(listenId, StringComparison.Ordinal);
|
||||
|
||||
return listenId.StartsWith(Value, StringComparison.Ordinal);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Message handler
|
||||
/// </summary>
|
||||
public abstract CallResult Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object data);
|
||||
|
||||
/// <inheritdoc />
|
||||
public override string ToString() => $"{Type} match for \"{Value}\"";
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Message handler link
|
||||
/// </summary>
|
||||
public class MessageHandlerLink<TServer>: MessageHandlerLink
|
||||
{
|
||||
private Func<SocketConnection, DateTime, string?, TServer, CallResult> _handler;
|
||||
|
||||
/// <inheritdoc />
|
||||
public override Type DeserializationType => typeof(TServer);
|
||||
|
||||
/// <summary>
|
||||
/// ctor
|
||||
/// </summary>
|
||||
public MessageHandlerLink(string value, Func<SocketConnection, DateTime, string?, TServer, CallResult> handler)
|
||||
: this(MessageLinkType.Full, value, handler)
|
||||
{
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// ctor
|
||||
/// </summary>
|
||||
public MessageHandlerLink(MessageLinkType type, string value, Func<SocketConnection, DateTime, string?, TServer, CallResult> handler)
|
||||
: base(type, value)
|
||||
{
|
||||
_handler = handler;
|
||||
}
|
||||
|
||||
|
||||
/// <inheritdoc />
|
||||
public override CallResult Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object data)
|
||||
{
|
||||
return _handler(connection, receiveTime, originalData, (TServer)data);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -59,11 +59,6 @@ namespace CryptoExchange.Net.Sockets
|
||||
/// </summary>
|
||||
public object? Response { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Matcher for this query
|
||||
/// </summary>
|
||||
public MessageMatcher MessageMatcher { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Router for this query
|
||||
/// </summary>
|
||||
@ -146,9 +141,6 @@ namespace CryptoExchange.Net.Sockets
|
||||
/// <returns></returns>
|
||||
public async Task WaitAsync(TimeSpan timeout, CancellationToken ct) => await _event.WaitAsync(timeout, ct).ConfigureAwait(false);
|
||||
|
||||
/// <inheritdoc />
|
||||
public virtual CallResult<object> Deserialize(IMessageAccessor message, Type type) => message.Deserialize(type);
|
||||
|
||||
/// <summary>
|
||||
/// Mark request as timeout
|
||||
/// </summary>
|
||||
@ -160,11 +152,6 @@ namespace CryptoExchange.Net.Sockets
|
||||
/// <param name="error"></param>
|
||||
public abstract void Fail(Error error);
|
||||
|
||||
/// <summary>
|
||||
/// Handle a response message
|
||||
/// </summary>
|
||||
public abstract CallResult Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object message, MessageHandlerLink check);
|
||||
|
||||
/// <summary>
|
||||
/// Handle a response message
|
||||
/// </summary>
|
||||
@ -223,35 +210,6 @@ namespace CryptoExchange.Net.Sockets
|
||||
return Result ?? CallResult.SuccessResult;
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public override CallResult Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object message, MessageHandlerLink check)
|
||||
{
|
||||
if (!PreCheckMessage(connection, message))
|
||||
return CallResult.SuccessResult;
|
||||
|
||||
CurrentResponses++;
|
||||
if (CurrentResponses == RequiredResponses)
|
||||
Response = message;
|
||||
|
||||
if (Result?.Success != false)
|
||||
// If an error result is already set don't override that
|
||||
Result = check.Handle(connection, receiveTime, originalData, message);
|
||||
|
||||
if (CurrentResponses == RequiredResponses)
|
||||
{
|
||||
Completed = true;
|
||||
_event.Set();
|
||||
OnComplete?.Invoke();
|
||||
}
|
||||
|
||||
return Result;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Validate if a message is actually processable by this query
|
||||
/// </summary>
|
||||
public virtual bool PreCheckMessage(SocketConnection connection, object message) => true;
|
||||
|
||||
/// <inheritdoc />
|
||||
public override void Timeout()
|
||||
{
|
||||
|
||||
@ -12,7 +12,8 @@ using CryptoExchange.Net.Sockets.Default.Interfaces;
|
||||
|
||||
namespace CryptoExchange.Net.Testing.Implementations
|
||||
{
|
||||
internal class TestSocket : IWebsocket
|
||||
#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member
|
||||
public class TestSocket : IWebsocket
|
||||
{
|
||||
public event Action<string>? OnMessageSend;
|
||||
|
||||
@ -28,7 +29,6 @@ namespace CryptoExchange.Net.Testing.Implementations
|
||||
public event Func<Exception, Task>? OnError;
|
||||
#pragma warning restore 0067
|
||||
public event Func<int, Task>? OnRequestSent;
|
||||
public event Func<WebSocketMessageType, ReadOnlyMemory<byte>, Task>? OnStreamMessage;
|
||||
public event Func<Task>? OnOpen;
|
||||
|
||||
public int Id { get; }
|
||||
@ -45,14 +45,10 @@ namespace CryptoExchange.Net.Testing.Implementations
|
||||
public static readonly object lastIdLock = new object();
|
||||
#endif
|
||||
|
||||
private bool _newDeserialization;
|
||||
|
||||
public SocketConnection? Connection { get; set; }
|
||||
|
||||
public TestSocket(bool newDeserialization, string address)
|
||||
public TestSocket(string address)
|
||||
{
|
||||
_newDeserialization = newDeserialization;
|
||||
|
||||
Uri = new Uri(address);
|
||||
lock (lastIdLock)
|
||||
{
|
||||
@ -107,20 +103,23 @@ namespace CryptoExchange.Net.Testing.Implementations
|
||||
|
||||
public void InvokeMessage(string data)
|
||||
{
|
||||
if (!_newDeserialization)
|
||||
{
|
||||
OnStreamMessage?.Invoke(WebSocketMessageType.Text, new ReadOnlyMemory<byte>(Encoding.UTF8.GetBytes(data))).Wait();
|
||||
}
|
||||
else
|
||||
{
|
||||
if (Connection == null)
|
||||
throw new ArgumentNullException(nameof(Connection));
|
||||
if (Connection == null)
|
||||
throw new ArgumentNullException(nameof(Connection));
|
||||
|
||||
Connection.HandleStreamMessage2(WebSocketMessageType.Text, Encoding.UTF8.GetBytes(data));
|
||||
}
|
||||
Connection.HandleStreamMessage2(WebSocketMessageType.Text, Encoding.UTF8.GetBytes(data));
|
||||
}
|
||||
|
||||
public async Task ReconnectAsync()
|
||||
{
|
||||
if (OnReconnecting != null)
|
||||
await OnReconnecting().ConfigureAwait(false);
|
||||
|
||||
await Task.Delay(10).ConfigureAwait(false);
|
||||
|
||||
if (OnReconnected != null)
|
||||
await OnReconnected().ConfigureAwait(false);
|
||||
}
|
||||
|
||||
public Task ReconnectAsync() => Task.CompletedTask;
|
||||
public void Dispose() { }
|
||||
|
||||
public void UpdateProxy(ApiProxy? proxy) => throw new NotImplementedException();
|
||||
|
||||
@ -18,7 +18,7 @@ namespace CryptoExchange.Net.Testing
|
||||
/// <summary>
|
||||
/// Get a client instance
|
||||
/// </summary>
|
||||
public abstract TClient GetClient(ILoggerFactory loggerFactory, bool newDeserialization);
|
||||
public abstract TClient GetClient(ILoggerFactory loggerFactory);
|
||||
|
||||
/// <summary>
|
||||
/// Whether the test should be run. By default integration tests aren't executed, can be set to true to force execution.
|
||||
@ -34,11 +34,11 @@ namespace CryptoExchange.Net.Testing
|
||||
/// Create a client
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
protected TClient CreateClient(bool useNewDeserialization)
|
||||
protected TClient CreateClient()
|
||||
{
|
||||
var fact = new LoggerFactory();
|
||||
fact.AddProvider(new TraceLoggerProvider());
|
||||
return GetClient(fact, useNewDeserialization);
|
||||
return GetClient(fact);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@ -58,16 +58,15 @@ namespace CryptoExchange.Net.Testing
|
||||
/// Execute a REST endpoint call and check for any errors or warnings.
|
||||
/// </summary>
|
||||
/// <typeparam name="T">Type of the update</typeparam>
|
||||
/// <param name="useNewDeserialization">Whether to use the new deserialization method</param>
|
||||
/// <param name="expression">The call expression</param>
|
||||
/// <param name="expectUpdate">Whether an update is expected</param>
|
||||
/// <param name="authRequest">Whether this is an authenticated request</param>
|
||||
public async Task RunAndCheckUpdate<T>(bool useNewDeserialization, Expression<Func<TClient, Action<DataEvent<T>>, Task<CallResult<UpdateSubscription>>>> expression, bool expectUpdate, bool authRequest)
|
||||
public async Task RunAndCheckUpdate<T>(Expression<Func<TClient, Action<DataEvent<T>>, Task<CallResult<UpdateSubscription>>>> expression, bool expectUpdate, bool authRequest)
|
||||
{
|
||||
if (!ShouldRun())
|
||||
return;
|
||||
|
||||
var client = CreateClient(useNewDeserialization);
|
||||
var client = CreateClient();
|
||||
|
||||
var expressionBody = (MethodCallExpression)expression.Body;
|
||||
if (authRequest && !Authenticated)
|
||||
|
||||
@ -27,8 +27,11 @@ namespace CryptoExchange.Net.Testing
|
||||
/// </summary>
|
||||
public class TestHelpers
|
||||
{
|
||||
/// <summary>
|
||||
/// Deep compare the values of two objects
|
||||
/// </summary>
|
||||
[ExcludeFromCodeCoverage]
|
||||
internal static bool AreEqual<T>(T? self, T? to, params string[] ignore) where T : class
|
||||
public static bool AreEqual<T>(T? self, T? to, params string[] ignore) where T : class
|
||||
{
|
||||
if (self != null && to != null)
|
||||
{
|
||||
@ -61,9 +64,12 @@ namespace CryptoExchange.Net.Testing
|
||||
return self == to;
|
||||
}
|
||||
|
||||
internal static TestSocket ConfigureSocketClient<T>(T client, string address) where T : BaseSocketClient
|
||||
/// <summary>
|
||||
/// Configure a socket client
|
||||
/// </summary>
|
||||
public static TestSocket ConfigureSocketClient<T>(T client, string address) where T : BaseSocketClient
|
||||
{
|
||||
var socket = new TestSocket(client.ClientOptions.UseUpdatedDeserialization, address);
|
||||
var socket = new TestSocket(address);
|
||||
foreach (var apiClient in client.ApiClients.OfType<SocketApiClient>())
|
||||
{
|
||||
apiClient.SocketFactory = new TestWebsocketFactory(socket);
|
||||
|
||||
@ -66,6 +66,13 @@ Make a one time donation in a crypto currency of your choice. If you prefer to d
|
||||
Alternatively, sponsor me on Github using [Github Sponsors](https://github.com/sponsors/JKorf).
|
||||
|
||||
## Release notes
|
||||
* Version 10.3.0 - 22 Jan 2026
|
||||
* Added PlatformInfo class for specifying platform metadata
|
||||
* Added better handling for enabling AutoTimestamp in client options when not implemented in the API
|
||||
* Fixed state handling for subscriptions where queries do not get a response
|
||||
* Fixed HandleSubQueryResponse not getting called
|
||||
* Removed legacy websocket message handling and the corresponding UseUpdatedDeserialization client option
|
||||
|
||||
* Version 10.2.5 - 19 Jan 2026
|
||||
* Updated SymbolOrderBook.WaitUntilFirstUpdateBufferedAsync
|
||||
* Added GetRestOffsets and GetWebsocketOffsets to TimeOffsetManager
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user