1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2026-04-07 02:01:12 +00:00
This commit is contained in:
JKorf 2026-04-06 21:22:10 +02:00
parent 08d361b259
commit aed98140aa
6 changed files with 121 additions and 67 deletions

View File

@ -6,9 +6,9 @@
<PackageId>CryptoExchange.Net</PackageId>
<Authors>JKorf</Authors>
<Description>CryptoExchange.Net is a base library which is used to implement different cryptocurrency (exchange) API's. It provides a standardized way of implementing different API's, which results in a very similar experience for users of the API implementations.</Description>
<PackageVersion>11.0.3</PackageVersion>
<AssemblyVersion>11.0.3</AssemblyVersion>
<FileVersion>11.0.3</FileVersion>
<PackageVersion>11.0.4</PackageVersion>
<AssemblyVersion>11.0.4</AssemblyVersion>
<FileVersion>11.0.4</FileVersion>
<PackageRequireLicenseAcceptance>false</PackageRequireLicenseAcceptance>
<PackageTags>OKX;OKX.Net;Mexc;Mexc.Net;Kucoin;Kucoin.Net;Kraken;Kraken.Net;Huobi;Huobi.Net;CoinEx;CoinEx.Net;Bybit;Bybit.Net;Bitget;Bitget.Net;Bitfinex;Bitfinex.Net;Binance;Binance.Net;CryptoCurrency;CryptoCurrency Exchange;CryptoExchange.Net</PackageTags>
<RepositoryType>git</RepositoryType>

View File

@ -11,6 +11,7 @@ using System;
using System.Collections;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Data;
using System.Diagnostics;
using System.Linq;
using System.Net.WebSockets;
@ -489,6 +490,14 @@ namespace CryptoExchange.Net.Sockets.Default
/// </summary>
protected internal virtual void HandleStreamMessage2(WebSocketMessageType type, ReadOnlySpan<byte> data)
{
// Forward message rules:
// | Message Topic | Route Topic Filter | Topics Match | Forward | Description
// | N | N | - | Y | No topic filter applied
// | N | Y | - | N | Route only listens to specific topic
// | Y | N | - | Y | Route listens to all message regardless of topic
// | Y | Y | Y | Y | Route listens to specific message topic
// | Y | Y | N | N | Route listens to different topic
var receiveTime = DateTime.UtcNow;
// 1. Decrypt/Preprocess if necessary
@ -525,10 +534,9 @@ namespace CryptoExchange.Net.Sockets.Default
foreach (var listener in _listeners)
{
var routes = listener.MessageRouter[typeIdentifier];
if (routes.Count > 0) {
deserializationType = routes[0].DeserializationType;
break;
}
deserializationType = routes?.RouteType;
if (deserializationType != null)
break;
}
if (deserializationType == null)
@ -577,55 +585,18 @@ namespace CryptoExchange.Net.Sockets.Default
bool processed = false;
foreach (var listener in _listeners)
{
var routes = listener.MessageRouter[typeIdentifier];
bool isQuery = false;
Query? query = null;
if (listener is Query cquery)
var routeMap = listener.MessageRouter[typeIdentifier];
if (routeMap != null)
{
isQuery = true;
query = cquery;
}
// Could be null if listeners was updated while handling message
var handled = listener.Handle(topicFilter, this, receiveTime, originalData, result, routeMap);
if (!processed)
processed = handled;
var complete = false;
foreach (var route in routes)
{
// Forward message rules:
// | Message Topic | Route Topic Filter | Topics Match | Forward | Description
// | N | N | - | Y | No topic filter applied
// | N | Y | - | N | Route only listens to specific topic
// | Y | N | - | Y | Route listens to all message regardless of topic
// | Y | Y | Y | Y | Route listens to specific message topic
// | Y | Y | N | N | Route listens to different topic
if (topicFilter == null)
{
if (route.TopicFilter != null)
// No topic on message, but route is filtering on topic
continue;
}
else
{
if (route.TopicFilter != null && !route.TopicFilter.Equals(topicFilter, StringComparison.Ordinal))
// Message has a topic, and the route has a filter for another topic
continue;
}
processed = true;
if (isQuery && query!.Completed)
continue;
listener.Handle(this, receiveTime, originalData, result, route);
if (isQuery && !route.MultipleReaders)
{
complete = true;
if (!routeMap.MultipleReaders)
// If this was a response to a query and there are no other routes expecting this message we can break here
break;
}
}
if (complete)
break;
}
if (!processed)

View File

@ -180,10 +180,11 @@ namespace CryptoExchange.Net.Sockets.Default
/// <summary>
/// Handle an update message
/// </summary>
public CallResult? Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object data, MessageRoute route)
public bool Handle(string? topicFilter, SocketConnection connection, DateTime receiveTime, string? originalData, object data, RouteMapEntry routeMap)
{
ConnectionInvocations++;
TotalInvocations++;
if (SubscriptionQuery != null && !SubscriptionQuery.Completed && SubscriptionQuery.TimeoutBehavior == TimeoutBehavior.Succeed)
{
// The subscription query is one where it is successful if there is no error returned
@ -192,7 +193,7 @@ namespace CryptoExchange.Net.Sockets.Default
SubscriptionQuery.Timeout();
}
return route.Handle(connection, receiveTime, originalData, data);
return routeMap.Handle(topicFilter, connection, receiveTime, originalData, data, out _);
}
/// <summary>

View File

@ -21,6 +21,6 @@ namespace CryptoExchange.Net.Sockets.Interfaces
/// <summary>
/// Handle a message
/// </summary>
CallResult? Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object result, MessageRoute route);
bool Handle(string? topicFilter, SocketConnection connection, DateTime receiveTime, string? originalData, object result, RouteMapEntry route);
}
}

View File

@ -9,6 +9,82 @@ using System.Linq;
namespace CryptoExchange.Net.Sockets
{
public record RouteMapEntry
{
public Type RouteType { get; }
public bool MultipleReaders { get; private set; }
private List<MessageRoute> _routesWithoutTopicFilter;
private Dictionary<string, List<MessageRoute>> _routesWithTopicFilter;
public RouteMapEntry(Type routeType)
{
_routesWithoutTopicFilter = new List<MessageRoute>();
_routesWithTopicFilter = new Dictionary<string, List<MessageRoute>>();
RouteType = routeType;
}
public void AddRoute(string? topicFilter, MessageRoute route)
{
if (string.IsNullOrEmpty(topicFilter))
{
_routesWithoutTopicFilter.Add(route);
}
else
{
if (!_routesWithTopicFilter.TryGetValue(topicFilter!, out var list))
{
list = new List<MessageRoute>();
_routesWithTopicFilter.Add(topicFilter!, list);
}
list.Add(route);
}
if (route.MultipleReaders)
MultipleReaders = true;
}
internal bool Handle(string? topicFilter, SocketConnection connection, DateTime receiveTime, string? originalData, object data, out CallResult? result)
{
result = null;
// Routes without topic filter handle both when the message topic is empty and when it is not, so we always call them
var handled = false;
foreach (var route in _routesWithoutTopicFilter)
{
var thisResult = route.Handle(connection, receiveTime, originalData, data);
if (thisResult != null)
result ??= thisResult;
handled = true;
}
// Forward to routes with matching topic filter, if any
if (topicFilter != null)
{
_routesWithTopicFilter.TryGetValue(topicFilter, out var matchingTopicRoutes);
foreach (var route in matchingTopicRoutes ?? [])
{
var thisResult = route.Handle(connection, receiveTime, originalData, data);
handled = true;
if (thisResult != null)
{
result ??= thisResult;
if (!MultipleReaders)
break;
}
}
}
return handled;
}
}
/// <summary>
/// Message router
/// </summary>
@ -20,9 +96,10 @@ namespace CryptoExchange.Net.Sockets
public MessageRoute[] Routes { get; }
#if NET8_0_OR_GREATER
private FrozenDictionary<string, List<MessageRoute>>? _routeMap;
// Used for mapping a type identifier to the routes matching it
private FrozenDictionary<string, RouteMapEntry>? _routeMap;
#else
private Dictionary<string, List<MessageRoute>>? _routeMap;
private Dictionary<string, RouteMapEntry>? _routeMap;
#endif
/// <summary>
@ -38,12 +115,16 @@ namespace CryptoExchange.Net.Sockets
/// </summary>
public void BuildRouteMap()
{
var newMap = new Dictionary<string, List<MessageRoute>>();
var newMap = new Dictionary<string, RouteMapEntry>();
foreach (var route in Routes)
{
if (!newMap.ContainsKey(route.TypeIdentifier))
newMap.Add(route.TypeIdentifier, new List<MessageRoute>());
newMap[route.TypeIdentifier].Add(route);
if (!newMap.TryGetValue(route.TypeIdentifier, out var typeMap))
{
typeMap = new RouteMapEntry(route.DeserializationType);
newMap.Add(route.TypeIdentifier, typeMap);
}
typeMap.AddRoute(route.TopicFilter, route);
}
#if NET8_0_OR_GREATER
@ -56,9 +137,9 @@ namespace CryptoExchange.Net.Sockets
/// <summary>
/// Get routes matching the type identifier
/// </summary>
public List<MessageRoute> this[string identifier]
public RouteMapEntry? this[string identifier]
{
get => (_routeMap ?? throw new InvalidOperationException("Route map not initialized before use")).TryGetValue(identifier, out var routes) ? routes: [];
get => (_routeMap ?? throw new InvalidOperationException("Route map not initialized before use")).TryGetValue(identifier, out var routes) ? routes: null;
}
/// <summary>

View File

@ -164,7 +164,7 @@ namespace CryptoExchange.Net.Sockets
/// <summary>
/// Handle a response message
/// </summary>
public abstract CallResult Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object message, MessageRoute route);
public abstract bool Handle(string? topicFilter, SocketConnection connection, DateTime receiveTime, string? originalData, object message, RouteMapEntry routeMap);
}
@ -194,7 +194,7 @@ namespace CryptoExchange.Net.Sockets
}
/// <inheritdoc />
public override CallResult Handle(SocketConnection connection, DateTime receiveTime, string? originalData, object message, MessageRoute route)
public override bool Handle(string? topicFilter, SocketConnection connection, DateTime receiveTime, string? originalData, object message, RouteMapEntry routeMap)
{
CurrentResponses++;
if (CurrentResponses == RequiredResponses)
@ -203,7 +203,8 @@ namespace CryptoExchange.Net.Sockets
if (Result?.Success != false)
{
// If an error result is already set don't override that
Result = route.Handle(connection, receiveTime, originalData, message);
routeMap.Handle(topicFilter, connection, receiveTime, originalData, message, out var result);
Result = result;
if (Result == null)
// Null from Handle means it wasn't actually for this query
CurrentResponses -= 1;
@ -216,7 +217,7 @@ namespace CryptoExchange.Net.Sockets
OnComplete?.Invoke();
}
return Result ?? CallResult.SuccessResult;
return true;
}
/// <inheritdoc />