1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2026-02-16 22:23:54 +00:00

Remove legacy websocket message handling

This commit is contained in:
Jkorf 2026-01-21 11:14:06 +01:00
parent 218e0260ce
commit 76772e91ba
22 changed files with 28 additions and 1330 deletions

View File

@ -114,12 +114,6 @@ namespace CryptoExchange.Net.Clients
RequestFactory.Configure(options, httpClient); RequestFactory.Configure(options, httpClient);
} }
/// <summary>
/// Create a message accessor instance
/// </summary>
/// <returns></returns>
protected abstract IStreamMessageAccessor CreateAccessor();
/// <summary> /// <summary>
/// Create a serializer instance /// Create a serializer instance
/// </summary> /// </summary>

View File

@ -99,11 +99,6 @@ namespace CryptoExchange.Net.Clients
/// </summary> /// </summary>
protected bool AllowTopicsOnTheSameConnection { get; set; } = true; protected bool AllowTopicsOnTheSameConnection { get; set; } = true;
/// <summary>
/// Whether to continue processing and forward unparsable messages to handlers
/// </summary>
protected internal bool ProcessUnparsableMessages { get; set; } = false;
/// <inheritdoc /> /// <inheritdoc />
public double IncomingKbps public double IncomingKbps
{ {
@ -165,12 +160,6 @@ namespace CryptoExchange.Net.Clients
{ {
} }
/// <summary>
/// Create a message accessor instance
/// </summary>
/// <returns></returns>
protected internal abstract IByteMessageAccessor CreateAccessor(WebSocketMessageType messageType);
/// <summary> /// <summary>
/// Create a serializer instance /// Create a serializer instance
/// </summary> /// </summary>
@ -754,7 +743,6 @@ namespace CryptoExchange.Net.Clients
// Create new socket connection // Create new socket connection
var socketConnection = new SocketConnection(_logger, SocketFactory, GetWebSocketParameters(connectionAddress.Data!), this, address); var socketConnection = new SocketConnection(_logger, SocketFactory, GetWebSocketParameters(connectionAddress.Data!), this, address);
socketConnection.UnhandledMessage += HandleUnhandledMessage;
socketConnection.ConnectRateLimitedAsync += HandleConnectRateLimitedAsync; socketConnection.ConnectRateLimitedAsync += HandleConnectRateLimitedAsync;
if (dedicatedRequestConnection) if (dedicatedRequestConnection)
{ {
@ -805,14 +793,6 @@ namespace CryptoExchange.Net.Clients
return new CallResult<HighPerfSocketConnection<TUpdateType>>(socketConnection); return new CallResult<HighPerfSocketConnection<TUpdateType>>(socketConnection);
} }
/// <summary>
/// Process an unhandled message
/// </summary>
/// <param name="message">The message that wasn't processed</param>
protected virtual void HandleUnhandledMessage(IMessageAccessor message)
{
}
/// <summary> /// <summary>
/// Process an unhandled message /// Process an unhandled message
/// </summary> /// </summary>
@ -873,7 +853,6 @@ namespace CryptoExchange.Net.Clients
Proxy = ClientOptions.Proxy, Proxy = ClientOptions.Proxy,
Timeout = ApiOptions.SocketNoDataTimeout ?? ClientOptions.SocketNoDataTimeout, Timeout = ApiOptions.SocketNoDataTimeout ?? ClientOptions.SocketNoDataTimeout,
ReceiveBufferSize = ClientOptions.ReceiveBufferSize, ReceiveBufferSize = ClientOptions.ReceiveBufferSize,
UseUpdatedDeserialization = ClientOptions.UseUpdatedDeserialization
}; };
/// <summary> /// <summary>
@ -1066,7 +1045,6 @@ namespace CryptoExchange.Net.Clients
sb.AppendLine($"\t\t\tId: {subState.Id}"); sb.AppendLine($"\t\t\tId: {subState.Id}");
sb.AppendLine($"\t\t\tStatus: {subState.Status}"); sb.AppendLine($"\t\t\tStatus: {subState.Status}");
sb.AppendLine($"\t\t\tInvocations: {subState.Invocations}"); sb.AppendLine($"\t\t\tInvocations: {subState.Invocations}");
sb.AppendLine($"\t\t\tIdentifiers: [{subState.ListenMatcher.ToString()}]");
}); });
} }
}); });
@ -1097,21 +1075,10 @@ namespace CryptoExchange.Net.Clients
base.Dispose(); base.Dispose();
} }
/// <summary>
/// Get the listener identifier for the message
/// </summary>
/// <param name="messageAccessor"></param>
/// <returns></returns>
public abstract string? GetListenerIdentifier(IMessageAccessor messageAccessor);
/// <summary> /// <summary>
/// Preprocess a stream message /// Preprocess a stream message
/// </summary> /// </summary>
public virtual ReadOnlySpan<byte> PreprocessStreamMessage(SocketConnection connection, WebSocketMessageType type, ReadOnlySpan<byte> data) => data; public virtual ReadOnlySpan<byte> PreprocessStreamMessage(SocketConnection connection, WebSocketMessageType type, ReadOnlySpan<byte> data) => data;
/// <summary>
/// Preprocess a stream message
/// </summary>
public virtual ReadOnlyMemory<byte> PreprocessStreamMessage(SocketConnection connection, WebSocketMessageType type, ReadOnlyMemory<byte> data) => data;
/// <summary> /// <summary>
/// Create a new message converter instance /// Create a new message converter instance

View File

@ -1,49 +0,0 @@
namespace CryptoExchange.Net.Converters.MessageParsing
{
/// <summary>
/// Node accessor
/// </summary>
public readonly struct NodeAccessor
{
/// <summary>
/// Index
/// </summary>
public int? Index { get; }
/// <summary>
/// Property name
/// </summary>
public string? Property { get; }
/// <summary>
/// Type (0 = int, 1 = string, 2 = prop name)
/// </summary>
public int Type { get; }
private NodeAccessor(int? index, string? property, int type)
{
Index = index;
Property = property;
Type = type;
}
/// <summary>
/// Create an int node accessor
/// </summary>
/// <param name="value"></param>
/// <returns></returns>
public static NodeAccessor Int(int value) { return new NodeAccessor(value, null, 0); }
/// <summary>
/// Create a string node accessor
/// </summary>
/// <param name="value"></param>
/// <returns></returns>
public static NodeAccessor String(string value) { return new NodeAccessor(null, value, 1); }
/// <summary>
/// Create a property name node accessor
/// </summary>
/// <returns></returns>
public static NodeAccessor PropertyName() { return new NodeAccessor(null, null, 2); }
}
}

View File

@ -1,50 +0,0 @@
using System.Collections;
using System.Collections.Generic;
namespace CryptoExchange.Net.Converters.MessageParsing
{
/// <summary>
/// Message access definition
/// </summary>
public readonly struct MessagePath : IEnumerable<NodeAccessor>
{
private readonly List<NodeAccessor> _path;
internal void Add(NodeAccessor node)
{
_path.Add(node);
}
/// <summary>
/// ctor
/// </summary>
public MessagePath()
{
_path = new List<NodeAccessor>();
}
/// <summary>
/// Create a new message path
/// </summary>
/// <returns></returns>
public static MessagePath Get()
{
return new MessagePath();
}
/// <summary>
/// IEnumerable implementation
/// </summary>
/// <returns></returns>
public IEnumerator<NodeAccessor> GetEnumerator()
{
for (var i = 0; i < _path.Count; i++)
yield return _path[i];
}
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
}
}

View File

@ -1,43 +0,0 @@
namespace CryptoExchange.Net.Converters.MessageParsing
{
/// <summary>
/// Message path extension methods
/// </summary>
public static class MessagePathExtension
{
/// <summary>
/// Add a string node accessor
/// </summary>
/// <param name="path"></param>
/// <param name="propName"></param>
/// <returns></returns>
public static MessagePath Property(this MessagePath path, string propName)
{
path.Add(NodeAccessor.String(propName));
return path;
}
/// <summary>
/// Add a property name node accessor
/// </summary>
/// <param name="path"></param>
/// <returns></returns>
public static MessagePath PropertyName(this MessagePath path)
{
path.Add(NodeAccessor.PropertyName());
return path;
}
/// <summary>
/// Add a int node accessor
/// </summary>
/// <param name="path"></param>
/// <param name="index"></param>
/// <returns></returns>
public static MessagePath Index(this MessagePath path, int index)
{
path.Add(NodeAccessor.Int(index));
return path;
}
}
}

View File

@ -1,21 +0,0 @@
namespace CryptoExchange.Net.Converters.MessageParsing
{
/// <summary>
/// Message node type
/// </summary>
public enum NodeType
{
/// <summary>
/// Array node
/// </summary>
Array,
/// <summary>
/// Object node
/// </summary>
Object,
/// <summary>
/// Value node
/// </summary>
Value
}
}

View File

@ -1,373 +0,0 @@
using CryptoExchange.Net.Converters.MessageParsing;
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Objects;
using System;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Converters.SystemTextJson
{
/// <summary>
/// System.Text.Json message accessor
/// </summary>
public abstract class SystemTextJsonMessageAccessor : IMessageAccessor
{
/// <summary>
/// The JsonDocument loaded
/// </summary>
protected JsonDocument? _document;
private readonly JsonSerializerOptions? _customSerializerOptions;
/// <inheritdoc />
public bool IsValid { get; set; }
/// <inheritdoc />
public abstract bool OriginalDataAvailable { get; }
/// <inheritdoc />
public object? Underlying => throw new NotImplementedException();
/// <summary>
/// ctor
/// </summary>
public SystemTextJsonMessageAccessor(JsonSerializerOptions options)
{
_customSerializerOptions = options;
}
/// <inheritdoc />
#if NET5_0_OR_GREATER
[UnconditionalSuppressMessage("AssemblyLoadTrimming", "IL2026:RequiresUnreferencedCode", Justification = "JsonSerializerOptions provided here has TypeInfoResolver set")]
[UnconditionalSuppressMessage("AssemblyLoadTrimming", "IL3050:RequiresUnreferencedCode", Justification = "JsonSerializerOptions provided here has TypeInfoResolver set")]
#endif
public CallResult<object> Deserialize(Type type, MessagePath? path = null)
{
if (!IsValid)
return new CallResult<object>(GetOriginalString());
if (_document == null)
throw new InvalidOperationException("No json document loaded");
try
{
var result = _document.Deserialize(type, _customSerializerOptions);
return new CallResult<object>(result!);
}
catch (JsonException ex)
{
var info = $"Json deserialization failed: {ex.Message}, Path: {ex.Path}, LineNumber: {ex.LineNumber}, LinePosition: {ex.BytePositionInLine}";
return new CallResult<object>(new DeserializeError(info, ex));
}
catch (Exception ex)
{
return new CallResult<object>(new DeserializeError($"Json deserialization failed: {ex.Message}", ex));
}
}
/// <inheritdoc />
#if NET5_0_OR_GREATER
[UnconditionalSuppressMessage("AssemblyLoadTrimming", "IL2026:RequiresUnreferencedCode", Justification = "JsonSerializerOptions provided here has TypeInfoResolver set")]
[UnconditionalSuppressMessage("AssemblyLoadTrimming", "IL3050:RequiresUnreferencedCode", Justification = "JsonSerializerOptions provided here has TypeInfoResolver set")]
#endif
public CallResult<T> Deserialize<T>(MessagePath? path = null)
{
if (_document == null)
throw new InvalidOperationException("No json document loaded");
try
{
var result = _document.Deserialize<T>(_customSerializerOptions);
return new CallResult<T>(result!);
}
catch (JsonException ex)
{
var info = $"Json deserialization failed: {ex.Message}, Path: {ex.Path}, LineNumber: {ex.LineNumber}, LinePosition: {ex.BytePositionInLine}";
return new CallResult<T>(new DeserializeError(info, ex));
}
catch (Exception ex)
{
return new CallResult<T>(new DeserializeError($"Json deserialization failed: {ex.Message}", ex));
}
}
/// <inheritdoc />
public NodeType? GetNodeType()
{
if (!IsValid)
throw new InvalidOperationException("Can't access json data on non-json message");
if (_document == null)
throw new InvalidOperationException("No json document loaded");
return _document.RootElement.ValueKind switch
{
JsonValueKind.Object => NodeType.Object,
JsonValueKind.Array => NodeType.Array,
_ => NodeType.Value
};
}
/// <inheritdoc />
public NodeType? GetNodeType(MessagePath path)
{
if (!IsValid)
throw new InvalidOperationException("Can't access json data on non-json message");
var node = GetPathNode(path);
if (!node.HasValue)
return null;
return node.Value.ValueKind switch
{
JsonValueKind.Object => NodeType.Object,
JsonValueKind.Array => NodeType.Array,
_ => NodeType.Value
};
}
/// <inheritdoc />
#if NET5_0_OR_GREATER
[UnconditionalSuppressMessage("AssemblyLoadTrimming", "IL2026:RequiresUnreferencedCode", Justification = "JsonSerializerOptions provided here has TypeInfoResolver set")]
[UnconditionalSuppressMessage("AssemblyLoadTrimming", "IL3050:RequiresUnreferencedCode", Justification = "JsonSerializerOptions provided here has TypeInfoResolver set")]
#endif
public T? GetValue<T>(MessagePath path)
{
if (!IsValid)
throw new InvalidOperationException("Can't access json data on non-json message");
var value = GetPathNode(path);
if (value == null)
return default;
if (value.Value.ValueKind == JsonValueKind.Object || value.Value.ValueKind == JsonValueKind.Array)
{
try
{
return value.Value.Deserialize<T>(_customSerializerOptions);
}
catch { }
return default;
}
if (typeof(T) == typeof(string))
{
if (value.Value.ValueKind == JsonValueKind.Number)
return (T)(object)value.Value.GetInt64().ToString();
}
return value.Value.Deserialize<T>(_customSerializerOptions);
}
/// <inheritdoc />
#if NET5_0_OR_GREATER
[UnconditionalSuppressMessage("AssemblyLoadTrimming", "IL2026:RequiresUnreferencedCode", Justification = "JsonSerializerOptions provided here has TypeInfoResolver set")]
[UnconditionalSuppressMessage("AssemblyLoadTrimming", "IL3050:RequiresUnreferencedCode", Justification = "JsonSerializerOptions provided here has TypeInfoResolver set")]
#endif
public T?[]? GetValues<T>(MessagePath path)
{
if (!IsValid)
throw new InvalidOperationException("Can't access json data on non-json message");
var value = GetPathNode(path);
if (value == null)
return default;
if (value.Value.ValueKind != JsonValueKind.Array)
return default;
return value.Value.Deserialize<T[]>(_customSerializerOptions)!;
}
private JsonElement? GetPathNode(MessagePath path)
{
if (!IsValid)
throw new InvalidOperationException("Can't access json data on non-json message");
if (_document == null)
throw new InvalidOperationException("No json document loaded");
JsonElement? currentToken = _document.RootElement;
foreach (var node in path)
{
if (node.Type == 0)
{
// Int value
var val = node.Index!.Value;
if (currentToken!.Value.ValueKind != JsonValueKind.Array || currentToken.Value.GetArrayLength() <= val)
return null;
currentToken = currentToken.Value[val];
}
else if (node.Type == 1)
{
// String value
if (currentToken!.Value.ValueKind != JsonValueKind.Object)
return null;
if (!currentToken.Value.TryGetProperty(node.Property!, out var token))
return null;
currentToken = token;
}
else
{
// Property name
if (currentToken!.Value.ValueKind != JsonValueKind.Object)
return null;
throw new NotImplementedException();
}
if (currentToken == null)
return null;
}
return currentToken;
}
/// <inheritdoc />
public abstract string GetOriginalString();
/// <inheritdoc />
public abstract void Clear();
}
/// <summary>
/// System.Text.Json stream message accessor
/// </summary>
public class SystemTextJsonStreamMessageAccessor : SystemTextJsonMessageAccessor, IStreamMessageAccessor
{
private Stream? _stream;
/// <inheritdoc />
public override bool OriginalDataAvailable => _stream?.CanSeek == true;
/// <summary>
/// ctor
/// </summary>
public SystemTextJsonStreamMessageAccessor(JsonSerializerOptions options): base(options)
{
}
/// <inheritdoc />
public async Task<CallResult> Read(Stream stream, bool bufferStream)
{
if (bufferStream && stream is not MemoryStream)
{
// We need to be buffer the stream, and it's not currently a seekable stream, so copy it to a new memory stream
_stream = new MemoryStream();
stream.CopyTo(_stream);
_stream.Position = 0;
}
else if (bufferStream)
{
// We need to buffer the stream, and the current stream is seekable, store as is
_stream = stream;
}
else
{
// We don't need to buffer the stream, so don't bother keeping the reference
}
try
{
_document = await JsonDocument.ParseAsync(_stream ?? stream).ConfigureAwait(false);
IsValid = true;
return CallResult.SuccessResult;
}
catch (Exception ex)
{
// Not a json message
IsValid = false;
return new CallResult(new DeserializeError($"Json deserialization failed: {ex.Message}", ex));
}
}
/// <inheritdoc />
public override string GetOriginalString()
{
if (_stream is null)
throw new NullReferenceException("Stream not initialized");
_stream.Position = 0;
using var textReader = new StreamReader(_stream, Encoding.UTF8, false, 1024, true);
return textReader.ReadToEnd();
}
/// <inheritdoc />
public override void Clear()
{
_stream?.Dispose();
_stream = null;
_document?.Dispose();
_document = null;
}
}
/// <summary>
/// System.Text.Json byte message accessor
/// </summary>
public class SystemTextJsonByteMessageAccessor : SystemTextJsonMessageAccessor, IByteMessageAccessor
{
private ReadOnlyMemory<byte> _bytes;
/// <summary>
/// ctor
/// </summary>
public SystemTextJsonByteMessageAccessor(JsonSerializerOptions options) : base(options)
{
}
/// <inheritdoc />
public CallResult Read(ReadOnlyMemory<byte> data)
{
_bytes = data;
try
{
var firstByte = data.Span[0];
if (firstByte != 0x7b && firstByte != 0x5b)
{
// Value doesn't start with `{` or `[`, prevent deserialization attempt as it's slow
IsValid = false;
return new CallResult(new DeserializeError("Not a json value"));
}
_document = JsonDocument.Parse(data);
IsValid = true;
return CallResult.SuccessResult;
}
catch (Exception ex)
{
// Not a json message
IsValid = false;
return new CallResult(new DeserializeError($"Json deserialization failed: {ex.Message}", ex));
}
}
/// <inheritdoc />
public override string GetOriginalString() =>
// NetStandard 2.0 doesn't support GetString from a ReadonlySpan<byte>, so use ToArray there instead
#if NETSTANDARD2_0
Encoding.UTF8.GetString(_bytes.ToArray());
#else
Encoding.UTF8.GetString(_bytes.Span);
#endif
/// <inheritdoc />
public override bool OriginalDataAvailable => true;
/// <inheritdoc />
public override void Clear()
{
_bytes = null;
_document?.Dispose();
_document = null;
}
}
}

View File

@ -292,122 +292,6 @@ namespace CryptoExchange.Net
return sb.ToString(); return sb.ToString();
} }
/// <summary>
/// Create a new uri with the provided parameters as query
/// </summary>
/// <param name="parameters"></param>
/// <param name="baseUri"></param>
/// <param name="arraySerialization"></param>
/// <returns></returns>
public static Uri SetParameters(this Uri baseUri, IDictionary<string, object> parameters, ArrayParametersSerialization arraySerialization)
{
var uriBuilder = new UriBuilder();
uriBuilder.Scheme = baseUri.Scheme;
uriBuilder.Host = baseUri.Host;
uriBuilder.Port = baseUri.Port;
uriBuilder.Path = baseUri.AbsolutePath;
var httpValueCollection = HttpUtility.ParseQueryString(string.Empty);
foreach (var parameter in parameters)
{
if (parameter.Value.GetType().IsArray)
{
if (arraySerialization == ArrayParametersSerialization.JsonArray)
{
httpValueCollection.Add(parameter.Key, $"[{string.Join(",", (object[])parameter.Value)}]");
}
else
{
foreach (var item in (object[])parameter.Value)
{
if (arraySerialization == ArrayParametersSerialization.Array)
{
httpValueCollection.Add(parameter.Key + "[]", item.ToString());
}
else
{
httpValueCollection.Add(parameter.Key, item.ToString());
}
}
}
}
else
{
httpValueCollection.Add(parameter.Key, parameter.Value.ToString());
}
}
uriBuilder.Query = httpValueCollection.ToString();
return uriBuilder.Uri;
}
/// <summary>
/// Create a new uri with the provided parameters as query
/// </summary>
/// <param name="parameters"></param>
/// <param name="baseUri"></param>
/// <param name="arraySerialization"></param>
/// <returns></returns>
public static Uri SetParameters(this Uri baseUri, IOrderedEnumerable<KeyValuePair<string, object>> parameters, ArrayParametersSerialization arraySerialization)
{
var uriBuilder = new UriBuilder();
uriBuilder.Scheme = baseUri.Scheme;
uriBuilder.Host = baseUri.Host;
uriBuilder.Port = baseUri.Port;
uriBuilder.Path = baseUri.AbsolutePath;
var httpValueCollection = HttpUtility.ParseQueryString(string.Empty);
foreach (var parameter in parameters)
{
if (parameter.Value.GetType().IsArray)
{
if (arraySerialization == ArrayParametersSerialization.JsonArray)
{
httpValueCollection.Add(parameter.Key, $"[{string.Join(",", (object[])parameter.Value)}]");
}
else
{
foreach (var item in (object[])parameter.Value)
{
if (arraySerialization == ArrayParametersSerialization.Array)
{
httpValueCollection.Add(parameter.Key + "[]", item.ToString());
}
else
{
httpValueCollection.Add(parameter.Key, item.ToString());
}
}
}
}
else
{
httpValueCollection.Add(parameter.Key, parameter.Value.ToString());
}
}
uriBuilder.Query = httpValueCollection.ToString();
return uriBuilder.Uri;
}
/// <summary>
/// Add parameter to URI
/// </summary>
/// <param name="uri"></param>
/// <param name="name"></param>
/// <param name="value"></param>
/// <returns></returns>
public static Uri AddQueryParameter(this Uri uri, string name, string value)
{
var httpValueCollection = HttpUtility.ParseQueryString(uri.Query);
httpValueCollection.Remove(name);
httpValueCollection.Add(name, value);
var ub = new UriBuilder(uri);
ub.Query = httpValueCollection.ToString();
return ub.Uri;
}
/// <summary> /// <summary>
/// Decompress using GzipStream /// Decompress using GzipStream
/// </summary> /// </summary>
@ -419,20 +303,6 @@ namespace CryptoExchange.Net
return new ReadOnlySpan<byte>(decompressedStream.GetBuffer(), 0, (int)decompressedStream.Length); return new ReadOnlySpan<byte>(decompressedStream.GetBuffer(), 0, (int)decompressedStream.Length);
} }
/// <summary>
/// Decompress using GzipStream
/// </summary>
public static ReadOnlyMemory<byte> DecompressGzip(this ReadOnlyMemory<byte> data)
{
using var decompressedStream = new MemoryStream();
using var dataStream = MemoryMarshal.TryGetArray(data, out var arraySegment)
? new MemoryStream(arraySegment.Array!, arraySegment.Offset, arraySegment.Count)
: new MemoryStream(data.ToArray());
using var deflateStream = new GZipStream(dataStream, CompressionMode.Decompress);
deflateStream.CopyTo(decompressedStream);
return new ReadOnlyMemory<byte>(decompressedStream.GetBuffer(), 0, (int)decompressedStream.Length);
}
/// <summary> /// <summary>
/// Decompress using GzipStream /// Decompress using GzipStream
/// </summary> /// </summary>
@ -445,22 +315,6 @@ namespace CryptoExchange.Net
return new ReadOnlySpan<byte>(output.GetBuffer(), 0, (int)output.Length); return new ReadOnlySpan<byte>(output.GetBuffer(), 0, (int)output.Length);
} }
/// <summary>
/// Decompress using DeflateStream
/// </summary>
/// <param name="input"></param>
/// <returns></returns>
public static ReadOnlyMemory<byte> Decompress(this ReadOnlyMemory<byte> input)
{
var output = new MemoryStream();
using var compressStream = new MemoryStream(input.ToArray());
using var decompressor = new DeflateStream(compressStream, CompressionMode.Decompress);
decompressor.CopyTo(output);
output.Position = 0;
return new ReadOnlyMemory<byte>(output.GetBuffer(), 0, (int)output.Length);
}
/// <summary> /// <summary>
/// Whether the trading mode is linear /// Whether the trading mode is linear
/// </summary> /// </summary>

View File

@ -1,109 +0,0 @@
using CryptoExchange.Net.Converters.MessageParsing;
using CryptoExchange.Net.Objects;
using System;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Interfaces
{
/// <summary>
/// Message accessor
/// </summary>
public interface IMessageAccessor
{
/// <summary>
/// Is this a valid message
/// </summary>
bool IsValid { get; }
/// <summary>
/// Is the original data available for retrieval
/// </summary>
bool OriginalDataAvailable { get; }
/// <summary>
/// The underlying data object
/// </summary>
object? Underlying { get; }
/// <summary>
/// Clear internal data structure
/// </summary>
void Clear();
/// <summary>
/// Get the type of node
/// </summary>
/// <returns></returns>
NodeType? GetNodeType();
/// <summary>
/// Get the type of node
/// </summary>
/// <param name="path">Access path</param>
/// <returns></returns>
NodeType? GetNodeType(MessagePath path);
/// <summary>
/// Get the value of a path
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="path"></param>
/// <returns></returns>
T? GetValue<T>(MessagePath path);
/// <summary>
/// Get the values of an array
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="path"></param>
/// <returns></returns>
T?[]? GetValues<T>(MessagePath path);
/// <summary>
/// Deserialize the message into this type
/// </summary>
/// <param name="type"></param>
/// <param name="path"></param>
/// <returns></returns>
#if NET5_0_OR_GREATER
[UnconditionalSuppressMessage("AssemblyLoadTrimming", "IL2092:RequiresUnreferencedCode", Justification = "JsonSerializerOptions provided here has TypeInfoResolver set")]
[UnconditionalSuppressMessage("AssemblyLoadTrimming", "IL2095:RequiresUnreferencedCode", Justification = "JsonSerializerOptions provided here has TypeInfoResolver set")]
#endif
CallResult<object> Deserialize(Type type, MessagePath? path = null);
/// <summary>
/// Deserialize the message into this type
/// </summary>
/// <param name="path"></param>
/// <returns></returns>
#if NET5_0_OR_GREATER
[UnconditionalSuppressMessage("AssemblyLoadTrimming", "IL2092:RequiresUnreferencedCode", Justification = "JsonSerializerOptions provided here has TypeInfoResolver set")]
[UnconditionalSuppressMessage("AssemblyLoadTrimming", "IL2095:RequiresUnreferencedCode", Justification = "JsonSerializerOptions provided here has TypeInfoResolver set")]
#endif
CallResult<T> Deserialize<T>(MessagePath? path = null);
/// <summary>
/// Get the original string value
/// </summary>
/// <returns></returns>
string GetOriginalString();
}
/// <summary>
/// Stream message accessor
/// </summary>
public interface IStreamMessageAccessor : IMessageAccessor
{
/// <summary>
/// Load a stream message
/// </summary>
/// <param name="stream"></param>
/// <param name="bufferStream"></param>
Task<CallResult> Read(Stream stream, bool bufferStream);
}
/// <summary>
/// Byte message accessor
/// </summary>
public interface IByteMessageAccessor : IMessageAccessor
{
/// <summary>
/// Load a data message
/// </summary>
/// <param name="data"></param>
CallResult Read(ReadOnlyMemory<byte> data);
}
}

View File

@ -75,11 +75,6 @@ namespace CryptoExchange.Net.Objects.Options
/// </remarks> /// </remarks>
public int? ReceiveBufferSize { get; set; } public int? ReceiveBufferSize { get; set; }
/// <summary>
/// Whether or not to use the updated deserialization logic, default is true
/// </summary>
public bool UseUpdatedDeserialization { get; set; } = true;
/// <summary> /// <summary>
/// Create a copy of this options /// Create a copy of this options
/// </summary> /// </summary>
@ -101,7 +96,6 @@ namespace CryptoExchange.Net.Objects.Options
item.RateLimitingBehaviour = RateLimitingBehaviour; item.RateLimitingBehaviour = RateLimitingBehaviour;
item.RateLimiterEnabled = RateLimiterEnabled; item.RateLimiterEnabled = RateLimiterEnabled;
item.ReceiveBufferSize = ReceiveBufferSize; item.ReceiveBufferSize = ReceiveBufferSize;
item.UseUpdatedDeserialization = UseUpdatedDeserialization;
return item; return item;
} }
} }

View File

@ -74,11 +74,6 @@ namespace CryptoExchange.Net.Objects.Sockets
/// </summary> /// </summary>
public int? ReceiveBufferSize { get; set; } = null; public int? ReceiveBufferSize { get; set; } = null;
/// <summary>
/// Whether or not to use the updated deserialization logic
/// </summary>
public bool UseUpdatedDeserialization { get; set; }
/// <summary> /// <summary>
/// ctor /// ctor
/// </summary> /// </summary>

View File

@ -95,9 +95,6 @@ namespace CryptoExchange.Net.Sockets.Default
/// <inheritdoc /> /// <inheritdoc />
public event Func<Task>? OnClose; public event Func<Task>? OnClose;
/// <inheritdoc />
public event Func<WebSocketMessageType, ReadOnlyMemory<byte>, Task>? OnStreamMessage;
/// <inheritdoc /> /// <inheritdoc />
public event Func<int, Task>? OnRequestSent; public event Func<int, Task>? OnRequestSent;
@ -139,10 +136,7 @@ namespace CryptoExchange.Net.Sockets.Default
_sendEvent = new AsyncResetEvent(); _sendEvent = new AsyncResetEvent();
_sendBuffer = new ConcurrentQueue<SendItem>(); _sendBuffer = new ConcurrentQueue<SendItem>();
_ctsSource = new CancellationTokenSource(); _ctsSource = new CancellationTokenSource();
if (websocketParameters.UseUpdatedDeserialization)
_receiveBufferSize = websocketParameters.ReceiveBufferSize ?? 65536; _receiveBufferSize = websocketParameters.ReceiveBufferSize ?? 65536;
else
_receiveBufferSize = websocketParameters.ReceiveBufferSize ?? _defaultReceiveBufferSize;
_closeSem = new SemaphoreSlim(1, 1); _closeSem = new SemaphoreSlim(1, 1);
_socket = CreateSocket(); _socket = CreateSocket();
@ -225,7 +219,9 @@ namespace CryptoExchange.Net.Sockets.Default
catch (Exception e) catch (Exception e)
{ {
if (ct.IsCancellationRequested) if (ct.IsCancellationRequested)
{
_logger.SocketConnectingCanceled(Id); _logger.SocketConnectingCanceled(Id);
}
else if (!_ctsSource.IsCancellationRequested) else if (!_ctsSource.IsCancellationRequested)
{ {
// if _ctsSource was canceled this was already logged // if _ctsSource was canceled this was already logged
@ -271,11 +267,10 @@ namespace CryptoExchange.Net.Sockets.Default
var sendTask = SendLoopAsync(); var sendTask = SendLoopAsync();
Task receiveTask; Task receiveTask;
#if !NETSTANDARD2_0 #if !NETSTANDARD2_0
if (Parameters.UseUpdatedDeserialization)
receiveTask = ReceiveLoopNewAsync(); receiveTask = ReceiveLoopNewAsync();
else #else
#endif
receiveTask = ReceiveLoopAsync(); receiveTask = ReceiveLoopAsync();
#endif
var timeoutTask = Parameters.Timeout != null && Parameters.Timeout > TimeSpan.FromSeconds(0) ? CheckTimeoutAsync() : Task.CompletedTask; var timeoutTask = Parameters.Timeout != null && Parameters.Timeout > TimeSpan.FromSeconds(0) ? CheckTimeoutAsync() : Task.CompletedTask;
await Task.WhenAll(sendTask, receiveTask, timeoutTask).ConfigureAwait(false); await Task.WhenAll(sendTask, receiveTask, timeoutTask).ConfigureAwait(false);
_logger.SocketFinishedProcessing(Id); _logger.SocketFinishedProcessing(Id);
@ -578,6 +573,7 @@ namespace CryptoExchange.Net.Sockets.Default
} }
} }
#if NETSTANDARD2_0
/// <summary> /// <summary>
/// Loop for receiving and reassembling data /// Loop for receiving and reassembling data
/// </summary> /// </summary>
@ -666,9 +662,6 @@ namespace CryptoExchange.Net.Sockets.Default
if (_logger.IsEnabled(LogLevel.Trace)) if (_logger.IsEnabled(LogLevel.Trace))
_logger.SocketReceivedSingleMessage(Id, receiveResult.Count); _logger.SocketReceivedSingleMessage(Id, receiveResult.Count);
if (!Parameters.UseUpdatedDeserialization)
await ProcessData(receiveResult.MessageType, new ReadOnlyMemory<byte>(buffer.Array!, buffer.Offset, receiveResult.Count)).ConfigureAwait(false);
else
ProcessDataNew(receiveResult.MessageType, new ReadOnlySpan<byte>(buffer.Array!, buffer.Offset, receiveResult.Count)); ProcessDataNew(receiveResult.MessageType, new ReadOnlySpan<byte>(buffer.Array!, buffer.Offset, receiveResult.Count));
} }
else else
@ -703,10 +696,6 @@ namespace CryptoExchange.Net.Sockets.Default
_logger.SocketReassembledMessage(Id, multipartStream!.Length); _logger.SocketReassembledMessage(Id, multipartStream!.Length);
// Get the underlying buffer of the memory stream holding the written data and delimit it (GetBuffer return the full array, not only the written part) // Get the underlying buffer of the memory stream holding the written data and delimit it (GetBuffer return the full array, not only the written part)
if (!Parameters.UseUpdatedDeserialization)
await ProcessData(receiveResult.MessageType, new ReadOnlyMemory<byte>(multipartStream!.GetBuffer(), 0, (int)multipartStream.Length)).ConfigureAwait(false);
else
ProcessDataNew(receiveResult.MessageType, new ReadOnlySpan<byte>(multipartStream!.GetBuffer(), 0, (int)multipartStream.Length)); ProcessDataNew(receiveResult.MessageType, new ReadOnlySpan<byte>(multipartStream!.GetBuffer(), 0, (int)multipartStream.Length));
} }
else else
@ -732,6 +721,7 @@ namespace CryptoExchange.Net.Sockets.Default
_logger.SocketReceiveLoopFinished(Id); _logger.SocketReceiveLoopFinished(Id);
} }
} }
#endif
#if !NETSTANDARD2_0 #if !NETSTANDARD2_0
/// <summary> /// <summary>
@ -895,18 +885,6 @@ namespace CryptoExchange.Net.Sockets.Default
_connection.HandleStreamMessage2(type, data); _connection.HandleStreamMessage2(type, data);
} }
/// <summary>
/// Process a stream message
/// </summary>
/// <param name="type"></param>
/// <param name="data"></param>
/// <returns></returns>
protected async Task ProcessData(WebSocketMessageType type, ReadOnlyMemory<byte> data)
{
LastActionTime = DateTime.UtcNow;
await (OnStreamMessage?.Invoke(type, data) ?? Task.CompletedTask).ConfigureAwait(false);
}
/// <summary> /// <summary>
/// Checks if there is no data received for a period longer than the specified timeout /// Checks if there is no data received for a period longer than the specified timeout
/// </summary> /// </summary>

View File

@ -16,10 +16,6 @@ namespace CryptoExchange.Net.Sockets.Default.Interfaces
/// </summary> /// </summary>
event Func<Task> OnClose; event Func<Task> OnClose;
/// <summary> /// <summary>
/// Websocket message received event
/// </summary>
event Func<WebSocketMessageType, ReadOnlyMemory<byte>, Task> OnStreamMessage;
/// <summary>
/// Websocket sent event, RequestId as parameter /// Websocket sent event, RequestId as parameter
/// </summary> /// </summary>
event Func<int, Task> OnRequestSent; event Func<int, Task> OnRequestSent;

View File

@ -111,11 +111,6 @@ namespace CryptoExchange.Net.Sockets.Default
/// </summary> /// </summary>
public event Action? ActivityUnpaused; public event Action? ActivityUnpaused;
/// <summary>
/// Unhandled message event
/// </summary>
public event Action<IMessageAccessor>? UnhandledMessage;
/// <summary> /// <summary>
/// Connection was rate limited and couldn't be established /// Connection was rate limited and couldn't be established
/// </summary> /// </summary>
@ -269,8 +264,6 @@ namespace CryptoExchange.Net.Sockets.Default
private SocketStatus _status; private SocketStatus _status;
private readonly IMessageSerializer _serializer; private readonly IMessageSerializer _serializer;
private IByteMessageAccessor? _stringMessageAccessor;
private IByteMessageAccessor? _byteMessageAccessor;
private ISocketMessageHandler? _byteMessageConverter; private ISocketMessageHandler? _byteMessageConverter;
private ISocketMessageHandler? _textMessageConverter; private ISocketMessageHandler? _textMessageConverter;
@ -292,11 +285,6 @@ namespace CryptoExchange.Net.Sockets.Default
/// </summary> /// </summary>
private readonly IWebsocket _socket; private readonly IWebsocket _socket;
/// <summary>
/// Cache for deserialization, only caches for a single message
/// </summary>
private readonly Dictionary<Type, object> _deserializationCache = new Dictionary<Type, object>();
/// <summary> /// <summary>
/// New socket connection /// New socket connection
/// </summary> /// </summary>
@ -310,7 +298,6 @@ namespace CryptoExchange.Net.Sockets.Default
_socket = socketFactory.CreateWebsocket(logger, this, parameters); _socket = socketFactory.CreateWebsocket(logger, this, parameters);
_logger.SocketCreatedForAddress(_socket.Id, parameters.Uri.ToString()); _logger.SocketCreatedForAddress(_socket.Id, parameters.Uri.ToString());
_socket.OnStreamMessage += HandleStreamMessage;
_socket.OnRequestSent += HandleRequestSentAsync; _socket.OnRequestSent += HandleRequestSentAsync;
_socket.OnRequestRateLimited += HandleRequestRateLimitedAsync; _socket.OnRequestRateLimited += HandleRequestRateLimitedAsync;
_socket.OnConnectRateLimited += HandleConnectRateLimitedAsync; _socket.OnConnectRateLimited += HandleConnectRateLimitedAsync;
@ -671,144 +658,6 @@ namespace CryptoExchange.Net.Sockets.Default
} }
} }
/// <summary>
/// Handle a message
/// </summary>
protected virtual async Task HandleStreamMessage(WebSocketMessageType type, ReadOnlyMemory<byte> data)
{
var sw = Stopwatch.StartNew();
var receiveTime = DateTime.UtcNow;
string? originalData = null;
// 1. Decrypt/Preprocess if necessary
data = ApiClient.PreprocessStreamMessage(this, type, data);
// 2. Read data into accessor
IByteMessageAccessor accessor;
if (type == WebSocketMessageType.Binary)
accessor = _stringMessageAccessor ??= ApiClient.CreateAccessor(type);
else
accessor = _byteMessageAccessor ??= ApiClient.CreateAccessor(type);
var result = accessor.Read(data);
try
{
bool outputOriginalData = ApiClient.ApiOptions.OutputOriginalData ?? ApiClient.ClientOptions.OutputOriginalData;
if (outputOriginalData)
{
originalData = accessor.GetOriginalString();
_logger.ReceivedData(SocketId, originalData);
}
if (!accessor.IsValid && !ApiClient.ProcessUnparsableMessages)
{
_logger.FailedToParse(SocketId, result.Error!.Message ?? result.Error!.ErrorDescription!);
return;
}
// 3. Determine the identifying properties of this message
var listenId = ApiClient.GetListenerIdentifier(accessor);
if (listenId == null)
{
originalData ??= "[OutputOriginalData is false]";
if (!ApiClient.UnhandledMessageExpected)
_logger.FailedToEvaluateMessage(SocketId, originalData);
UnhandledMessage?.Invoke(accessor);
return;
}
bool processed = false;
var totalUserTime = 0;
List<IMessageProcessor> localListeners;
lock (_listenersLock)
localListeners = _listeners.ToList();
foreach (var processor in localListeners)
{
foreach (var listener in processor.MessageMatcher.GetHandlerLinks(listenId))
{
processed = true;
_logger.ProcessorMatched(SocketId, listener.ToString(), listenId);
// 4. Determine the type to deserialize to for this processor
var messageType = listener.DeserializationType;
if (messageType == null)
{
_logger.ReceivedMessageNotRecognized(SocketId, processor.Id);
continue;
}
if (processor is Subscription subscriptionProcessor && subscriptionProcessor.Status == SubscriptionStatus.Subscribing)
{
// If this message is for this listener then it is automatically confirmed, even if the subscription is not (yet) confirmed
subscriptionProcessor.Status = SubscriptionStatus.Subscribed;
if (subscriptionProcessor.SubscriptionQuery?.TimeoutBehavior == TimeoutBehavior.Succeed)
// If this subscription has a query waiting for a timeout (success if there is no error response)
// then time it out now as the data is being received, so we assume it's successful
subscriptionProcessor.SubscriptionQuery.Timeout();
}
// 5. Deserialize the message
_deserializationCache.TryGetValue(messageType, out var deserialized);
if (deserialized == null)
{
var desResult = processor.Deserialize(accessor, messageType);
if (!desResult)
{
_logger.FailedToDeserializeMessage(SocketId, desResult.Error?.ToString(), desResult.Error?.Exception);
continue;
}
deserialized = desResult.Data;
_deserializationCache.Add(messageType, deserialized);
}
// 6. Pass the message to the handler
try
{
var innerSw = Stopwatch.StartNew();
processor.Handle(this, receiveTime, originalData, deserialized, listener);
if (processor is Query query && query.RequiredResponses != 1)
_logger.LogDebug($"[Sckt {SocketId}] [Req {query.Id}] responses: {query.CurrentResponses}/{query.RequiredResponses}");
totalUserTime += (int)innerSw.ElapsedMilliseconds;
}
catch (Exception ex)
{
_logger.UserMessageProcessingFailed(SocketId, ex.Message, ex);
if (processor is Subscription subscription)
subscription.InvokeExceptionHandler(ex);
}
}
}
if (!processed)
{
if (!ApiClient.UnhandledMessageExpected)
{
List<string> listenerIds;
lock (_listenersLock)
listenerIds = _listeners.Select(l => l.MessageMatcher.ToString()).ToList();
_logger.ReceivedMessageNotMatchedToAnyListener(SocketId, listenId, string.Join(",", listenerIds));
UnhandledMessage?.Invoke(accessor);
}
return;
}
_logger.MessageProcessed(SocketId, sw.ElapsedMilliseconds, sw.ElapsedMilliseconds - totalUserTime);
}
finally
{
_deserializationCache.Clear();
accessor.Clear();
}
}
/// <summary> /// <summary>
/// Connect the websocket /// Connect the websocket
/// </summary> /// </summary>
@ -886,16 +735,8 @@ namespace CryptoExchange.Net.Sockets.Default
subscription.CancellationTokenRegistration.Value.Dispose(); subscription.CancellationTokenRegistration.Value.Dispose();
bool anyDuplicateSubscription; bool anyDuplicateSubscription;
if (ApiClient.ClientOptions.UseUpdatedDeserialization)
{
lock (_listenersLock) lock (_listenersLock)
anyDuplicateSubscription = _listeners.OfType<Subscription>().Any(x => x != subscription && x.MessageRouter.Routes.All(l => subscription.MessageRouter.ContainsCheck(l))); anyDuplicateSubscription = _listeners.OfType<Subscription>().Any(x => x != subscription && x.MessageRouter.Routes.All(l => subscription.MessageRouter.ContainsCheck(l)));
}
else
{
lock (_listenersLock)
anyDuplicateSubscription = _listeners.OfType<Subscription>().Any(x => x != subscription && x.MessageMatcher.HandlerLinks.All(l => subscription.MessageMatcher.ContainsCheck(l)));
}
bool shouldCloseConnection; bool shouldCloseConnection;
lock (_listenersLock) lock (_listenersLock)
@ -947,12 +788,6 @@ namespace CryptoExchange.Net.Sockets.Default
_socket.Dispose(); _socket.Dispose();
} }
/// <summary>
/// Whether or not a new subscription can be added to this connection
/// </summary>
/// <returns></returns>
public bool CanAddSubscription() => Status == SocketStatus.None || Status == SocketStatus.Connected;
/// <summary> /// <summary>
/// Add a subscription to this connection /// Add a subscription to this connection
/// </summary> /// </summary>

View File

@ -70,11 +70,6 @@ namespace CryptoExchange.Net.Sockets.Default
/// </summary> /// </summary>
public bool Authenticated { get; } public bool Authenticated { get; }
/// <summary>
/// Matcher for this subscription
/// </summary>
public MessageMatcher MessageMatcher { get; set; }
/// <summary> /// <summary>
/// Router for this subscription /// Router for this subscription
/// </summary> /// </summary>
@ -154,6 +149,7 @@ namespace CryptoExchange.Net.Sockets.Default
/// <summary> /// <summary>
/// Handle an unsubscription query response /// Handle an unsubscription query response
/// </summary> /// </summary>
#warning ?
public virtual void HandleUnsubQueryResponse(SocketConnection connection, object message) { } public virtual void HandleUnsubQueryResponse(SocketConnection connection, object message) { }
/// <summary> /// <summary>
@ -172,19 +168,6 @@ namespace CryptoExchange.Net.Sockets.Default
/// <returns></returns> /// <returns></returns>
protected abstract Query? GetUnsubQuery(SocketConnection connection); protected abstract Query? GetUnsubQuery(SocketConnection connection);
/// <inheritdoc />
public virtual CallResult<object> Deserialize(IMessageAccessor message, Type type) => message.Deserialize(type);
/// <summary>
/// Handle an update message
/// </summary>
public CallResult Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object data, MessageHandlerLink matcher)
{
ConnectionInvocations++;
TotalInvocations++;
return matcher.Handle(connection, receiveTime, originalData, data);
}
/// <summary> /// <summary>
/// Handle an update message /// Handle an update message
/// </summary> /// </summary>
@ -224,12 +207,12 @@ namespace CryptoExchange.Net.Sockets.Default
/// <param name="Id">The id of the subscription</param> /// <param name="Id">The id of the subscription</param>
/// <param name="Status">Subscription status</param> /// <param name="Status">Subscription status</param>
/// <param name="Invocations">Number of times this subscription got a message</param> /// <param name="Invocations">Number of times this subscription got a message</param>
/// <param name="ListenMatcher">Matcher for this subscription</param> /// <param name="MessageRouter">Router for this subscription</param>
public record SubscriptionState( public record SubscriptionState(
int Id, int Id,
SubscriptionStatus Status, SubscriptionStatus Status,
int Invocations, int Invocations,
MessageMatcher ListenMatcher MessageRouter MessageRouter
); );
/// <summary> /// <summary>
@ -238,7 +221,7 @@ namespace CryptoExchange.Net.Sockets.Default
/// <returns></returns> /// <returns></returns>
public SubscriptionState GetState() public SubscriptionState GetState()
{ {
return new SubscriptionState(Id, Status, TotalInvocations, MessageMatcher); return new SubscriptionState(Id, Status, TotalInvocations, MessageRouter);
} }
} }
} }

View File

@ -245,7 +245,9 @@ namespace CryptoExchange.Net.Sockets.HighPerf
public virtual ValueTask<CallResult> SendAsync<T>(T obj) public virtual ValueTask<CallResult> SendAsync<T>(T obj)
{ {
if (_serializer is IByteMessageSerializer byteSerializer) if (_serializer is IByteMessageSerializer byteSerializer)
{
return SendBytesAsync(byteSerializer.Serialize(obj)); return SendBytesAsync(byteSerializer.Serialize(obj));
}
else if (_serializer is IStringMessageSerializer stringSerializer) else if (_serializer is IStringMessageSerializer stringSerializer)
{ {
if (obj is string str) if (obj is string str)

View File

@ -15,27 +15,12 @@ namespace CryptoExchange.Net.Sockets.Interfaces
/// </summary> /// </summary>
public int Id { get; } public int Id { get; }
/// <summary> /// <summary>
/// The matcher for this listener
/// </summary>
public MessageMatcher MessageMatcher { get; }
/// <summary>
/// The message router for this processor /// The message router for this processor
/// </summary> /// </summary>
public MessageRouter MessageRouter { get; } public MessageRouter MessageRouter { get; }
/// <summary> /// <summary>
/// Handle a message /// Handle a message
/// </summary> /// </summary>
CallResult Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object result, MessageHandlerLink matchedHandler);
/// <summary>
/// Handle a message
/// </summary>
CallResult? Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object result, MessageRoute route); CallResult? Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object result, MessageRoute route);
/// <summary>
/// Deserialize a message into object of type
/// </summary>
/// <param name="accessor"></param>
/// <param name="type"></param>
/// <returns></returns>
CallResult<object> Deserialize(IMessageAccessor accessor, Type type);
} }
} }

View File

@ -1,185 +0,0 @@
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Sockets.Default;
using System;
using System.Collections.Generic;
using System.Linq;
namespace CryptoExchange.Net.Sockets
{
/// <summary>
/// Message link type
/// </summary>
public enum MessageLinkType
{
/// <summary>
/// Match when the listen id matches fully to the value
/// </summary>
Full,
/// <summary>
/// Match when the listen id starts with the value
/// </summary>
StartsWith
}
/// <summary>
/// Matches a message listen id to a specific listener
/// </summary>
public class MessageMatcher
{
/// <summary>
/// Linkers in this matcher
/// </summary>
public MessageHandlerLink[] HandlerLinks { get; }
/// <summary>
/// ctor
/// </summary>
private MessageMatcher(params MessageHandlerLink[] links)
{
HandlerLinks = links;
}
/// <summary>
/// Create message matcher
/// </summary>
public static MessageMatcher Create(string value)
{
return new MessageMatcher(new MessageHandlerLink<string>(MessageLinkType.Full, value, (con, receiveTime, originalData, msg) => new CallResult<string>(default, null, null)));
}
/// <summary>
/// Create message matcher
/// </summary>
public static MessageMatcher Create<T>(string value)
{
return new MessageMatcher(new MessageHandlerLink<T>(MessageLinkType.Full, value, (con, receiveTime, originalData, msg) => new CallResult<T>(default, null, null)));
}
/// <summary>
/// Create message matcher
/// </summary>
public static MessageMatcher Create<T>(string value, Func<SocketConnection, DateTime, string?, T, CallResult> handler)
{
return new MessageMatcher(new MessageHandlerLink<T>(MessageLinkType.Full, value, handler));
}
/// <summary>
/// Create message matcher
/// </summary>
public static MessageMatcher Create<T>(IEnumerable<string> values, Func<SocketConnection, DateTime, string?, T, CallResult> handler)
{
return new MessageMatcher(values.Select(x => new MessageHandlerLink<T>(MessageLinkType.Full, x, handler)).ToArray());
}
/// <summary>
/// Create message matcher
/// </summary>
public static MessageMatcher Create<T>(MessageLinkType type, string value, Func<SocketConnection, DateTime, string?, T, CallResult> handler)
{
return new MessageMatcher(new MessageHandlerLink<T>(type, value, handler));
}
/// <summary>
/// Create message matcher
/// </summary>
public static MessageMatcher Create(params MessageHandlerLink[] linkers)
{
return new MessageMatcher(linkers);
}
/// <summary>
/// Whether this matcher contains a specific link
/// </summary>
public bool ContainsCheck(MessageHandlerLink link) => HandlerLinks.Any(x => x.Type == link.Type && x.Value == link.Value);
/// <summary>
/// Get any handler links matching with the listen id
/// </summary>
public IEnumerable<MessageHandlerLink> GetHandlerLinks(string listenId) => HandlerLinks.Where(x => x.Check(listenId));
/// <inheritdoc />
public override string ToString() => string.Join(",", HandlerLinks.Select(x => x.ToString()));
}
/// <summary>
/// Message handler link
/// </summary>
public abstract class MessageHandlerLink
{
/// <summary>
/// Type of check
/// </summary>
public MessageLinkType Type { get; }
/// <summary>
/// String value of the check
/// </summary>
public string Value { get; }
/// <summary>
/// Deserialization type
/// </summary>
public abstract Type DeserializationType { get; }
/// <summary>
/// ctor
/// </summary>
public MessageHandlerLink(MessageLinkType type, string value)
{
Type = type;
Value = value;
}
/// <summary>
/// Whether this listen id matches this link
/// </summary>
public bool Check(string listenId)
{
if (Type == MessageLinkType.Full)
return Value.Equals(listenId, StringComparison.Ordinal);
return listenId.StartsWith(Value, StringComparison.Ordinal);
}
/// <summary>
/// Message handler
/// </summary>
public abstract CallResult Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object data);
/// <inheritdoc />
public override string ToString() => $"{Type} match for \"{Value}\"";
}
/// <summary>
/// Message handler link
/// </summary>
public class MessageHandlerLink<TServer>: MessageHandlerLink
{
private Func<SocketConnection, DateTime, string?, TServer, CallResult> _handler;
/// <inheritdoc />
public override Type DeserializationType => typeof(TServer);
/// <summary>
/// ctor
/// </summary>
public MessageHandlerLink(string value, Func<SocketConnection, DateTime, string?, TServer, CallResult> handler)
: this(MessageLinkType.Full, value, handler)
{
}
/// <summary>
/// ctor
/// </summary>
public MessageHandlerLink(MessageLinkType type, string value, Func<SocketConnection, DateTime, string?, TServer, CallResult> handler)
: base(type, value)
{
_handler = handler;
}
/// <inheritdoc />
public override CallResult Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object data)
{
return _handler(connection, receiveTime, originalData, (TServer)data);
}
}
}

View File

@ -59,11 +59,6 @@ namespace CryptoExchange.Net.Sockets
/// </summary> /// </summary>
public object? Response { get; set; } public object? Response { get; set; }
/// <summary>
/// Matcher for this query
/// </summary>
public MessageMatcher MessageMatcher { get; set; }
/// <summary> /// <summary>
/// Router for this query /// Router for this query
/// </summary> /// </summary>
@ -146,9 +141,6 @@ namespace CryptoExchange.Net.Sockets
/// <returns></returns> /// <returns></returns>
public async Task WaitAsync(TimeSpan timeout, CancellationToken ct) => await _event.WaitAsync(timeout, ct).ConfigureAwait(false); public async Task WaitAsync(TimeSpan timeout, CancellationToken ct) => await _event.WaitAsync(timeout, ct).ConfigureAwait(false);
/// <inheritdoc />
public virtual CallResult<object> Deserialize(IMessageAccessor message, Type type) => message.Deserialize(type);
/// <summary> /// <summary>
/// Mark request as timeout /// Mark request as timeout
/// </summary> /// </summary>
@ -160,11 +152,6 @@ namespace CryptoExchange.Net.Sockets
/// <param name="error"></param> /// <param name="error"></param>
public abstract void Fail(Error error); public abstract void Fail(Error error);
/// <summary>
/// Handle a response message
/// </summary>
public abstract CallResult Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object message, MessageHandlerLink check);
/// <summary> /// <summary>
/// Handle a response message /// Handle a response message
/// </summary> /// </summary>
@ -223,35 +210,6 @@ namespace CryptoExchange.Net.Sockets
return Result ?? CallResult.SuccessResult; return Result ?? CallResult.SuccessResult;
} }
/// <inheritdoc />
public override CallResult Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object message, MessageHandlerLink check)
{
if (!PreCheckMessage(connection, message))
return CallResult.SuccessResult;
CurrentResponses++;
if (CurrentResponses == RequiredResponses)
Response = message;
if (Result?.Success != false)
// If an error result is already set don't override that
Result = check.Handle(connection, receiveTime, originalData, message);
if (CurrentResponses == RequiredResponses)
{
Completed = true;
_event.Set();
OnComplete?.Invoke();
}
return Result;
}
/// <summary>
/// Validate if a message is actually processable by this query
/// </summary>
public virtual bool PreCheckMessage(SocketConnection connection, object message) => true;
/// <inheritdoc /> /// <inheritdoc />
public override void Timeout() public override void Timeout()
{ {

View File

@ -28,7 +28,6 @@ namespace CryptoExchange.Net.Testing.Implementations
public event Func<Exception, Task>? OnError; public event Func<Exception, Task>? OnError;
#pragma warning restore 0067 #pragma warning restore 0067
public event Func<int, Task>? OnRequestSent; public event Func<int, Task>? OnRequestSent;
public event Func<WebSocketMessageType, ReadOnlyMemory<byte>, Task>? OnStreamMessage;
public event Func<Task>? OnOpen; public event Func<Task>? OnOpen;
public int Id { get; } public int Id { get; }
@ -45,14 +44,10 @@ namespace CryptoExchange.Net.Testing.Implementations
public static readonly object lastIdLock = new object(); public static readonly object lastIdLock = new object();
#endif #endif
private bool _newDeserialization;
public SocketConnection? Connection { get; set; } public SocketConnection? Connection { get; set; }
public TestSocket(bool newDeserialization, string address) public TestSocket(string address)
{ {
_newDeserialization = newDeserialization;
Uri = new Uri(address); Uri = new Uri(address);
lock (lastIdLock) lock (lastIdLock)
{ {
@ -106,19 +101,12 @@ namespace CryptoExchange.Net.Testing.Implementations
} }
public void InvokeMessage(string data) public void InvokeMessage(string data)
{
if (!_newDeserialization)
{
OnStreamMessage?.Invoke(WebSocketMessageType.Text, new ReadOnlyMemory<byte>(Encoding.UTF8.GetBytes(data))).Wait();
}
else
{ {
if (Connection == null) if (Connection == null)
throw new ArgumentNullException(nameof(Connection)); throw new ArgumentNullException(nameof(Connection));
Connection.HandleStreamMessage2(WebSocketMessageType.Text, Encoding.UTF8.GetBytes(data)); Connection.HandleStreamMessage2(WebSocketMessageType.Text, Encoding.UTF8.GetBytes(data));
} }
}
public Task ReconnectAsync() => Task.CompletedTask; public Task ReconnectAsync() => Task.CompletedTask;
public void Dispose() { } public void Dispose() { }

View File

@ -18,7 +18,7 @@ namespace CryptoExchange.Net.Testing
/// <summary> /// <summary>
/// Get a client instance /// Get a client instance
/// </summary> /// </summary>
public abstract TClient GetClient(ILoggerFactory loggerFactory, bool newDeserialization); public abstract TClient GetClient(ILoggerFactory loggerFactory);
/// <summary> /// <summary>
/// Whether the test should be run. By default integration tests aren't executed, can be set to true to force execution. /// Whether the test should be run. By default integration tests aren't executed, can be set to true to force execution.
@ -34,11 +34,11 @@ namespace CryptoExchange.Net.Testing
/// Create a client /// Create a client
/// </summary> /// </summary>
/// <returns></returns> /// <returns></returns>
protected TClient CreateClient(bool useNewDeserialization) protected TClient CreateClient()
{ {
var fact = new LoggerFactory(); var fact = new LoggerFactory();
fact.AddProvider(new TraceLoggerProvider()); fact.AddProvider(new TraceLoggerProvider());
return GetClient(fact, useNewDeserialization); return GetClient(fact);
} }
/// <summary> /// <summary>
@ -58,16 +58,15 @@ namespace CryptoExchange.Net.Testing
/// Execute a REST endpoint call and check for any errors or warnings. /// Execute a REST endpoint call and check for any errors or warnings.
/// </summary> /// </summary>
/// <typeparam name="T">Type of the update</typeparam> /// <typeparam name="T">Type of the update</typeparam>
/// <param name="useNewDeserialization">Whether to use the new deserialization method</param>
/// <param name="expression">The call expression</param> /// <param name="expression">The call expression</param>
/// <param name="expectUpdate">Whether an update is expected</param> /// <param name="expectUpdate">Whether an update is expected</param>
/// <param name="authRequest">Whether this is an authenticated request</param> /// <param name="authRequest">Whether this is an authenticated request</param>
public async Task RunAndCheckUpdate<T>(bool useNewDeserialization, Expression<Func<TClient, Action<DataEvent<T>>, Task<CallResult<UpdateSubscription>>>> expression, bool expectUpdate, bool authRequest) public async Task RunAndCheckUpdate<T>(Expression<Func<TClient, Action<DataEvent<T>>, Task<CallResult<UpdateSubscription>>>> expression, bool expectUpdate, bool authRequest)
{ {
if (!ShouldRun()) if (!ShouldRun())
return; return;
var client = CreateClient(useNewDeserialization); var client = CreateClient();
var expressionBody = (MethodCallExpression)expression.Body; var expressionBody = (MethodCallExpression)expression.Body;
if (authRequest && !Authenticated) if (authRequest && !Authenticated)

View File

@ -63,7 +63,7 @@ namespace CryptoExchange.Net.Testing
internal static TestSocket ConfigureSocketClient<T>(T client, string address) where T : BaseSocketClient internal static TestSocket ConfigureSocketClient<T>(T client, string address) where T : BaseSocketClient
{ {
var socket = new TestSocket(client.ClientOptions.UseUpdatedDeserialization, address); var socket = new TestSocket(address);
foreach (var apiClient in client.ApiClients.OfType<SocketApiClient>()) foreach (var apiClient in client.ApiClients.OfType<SocketApiClient>())
{ {
apiClient.SocketFactory = new TestWebsocketFactory(socket); apiClient.SocketFactory = new TestWebsocketFactory(socket);