1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2026-04-12 16:13:12 +00:00

Compare commits

...

3 Commits

Author SHA1 Message Date
Jkorf
5bf89897e8 wip 2026-04-08 11:43:53 +02:00
Jkorf
6dbc84e6fc wip 2026-04-08 11:15:53 +02:00
Jkorf
f66730ca8c wip 2026-04-08 09:33:45 +02:00
22 changed files with 894 additions and 283 deletions

View File

@ -1,5 +1,4 @@
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.UnitTests.TestImplementations;
using NUnit.Framework;
using System;
using System.Net.Http;

View File

@ -1,7 +1,6 @@
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Testing;
using CryptoExchange.Net.UnitTests.Implementations;
using CryptoExchange.Net.UnitTests.TestImplementations;
using NUnit.Framework;
using System;
using System.Linq;

View File

@ -4,7 +4,7 @@ using NUnit.Framework;
using System;
using System.Text.Json;
namespace CryptoExchange.Net.UnitTests
namespace CryptoExchange.Net.UnitTests.ConverterTests
{
[TestFixture()]
public class SharedModelConversionTests

View File

@ -23,7 +23,7 @@ namespace CryptoExchange.Net.UnitTests.Implementations
{ }
/// <summary>
/// Get the CryptoCom environment by name
/// Get the environment by name
/// </summary>
public static TestEnvironment? GetEnvironmentByName(string? name)
=> name switch

View File

@ -1,6 +1,6 @@
using System.Text.Json.Serialization;
namespace CryptoExchange.Net.UnitTests.TestImplementations
namespace CryptoExchange.Net.UnitTests.Implementations
{
public class TestObject
{

View File

@ -1,6 +1,5 @@
using CryptoExchange.Net.UnitTests.ConverterTests;
using CryptoExchange.Net.UnitTests.Implementations;
using CryptoExchange.Net.UnitTests.TestImplementations;
using System.Collections.Generic;
using System.Text.Json.Serialization;

View File

@ -18,7 +18,7 @@ namespace CryptoExchange.Net.UnitTests
}
[Test]
public void AddingBasicNullValue_ThrowExecption()
public void AddingBasicNullValue_ThrowsException()
{
var parameters = new ParameterCollection();
Assert.Throws<ArgumentNullException>(() => parameters.Add("test", null!));

View File

@ -1,94 +0,0 @@
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Sockets.Default;
using CryptoExchange.Net.Sockets.Default.Routing;
using CryptoExchange.Net.Sockets.Interfaces;
using NUnit.Framework;
using System;
namespace CryptoExchange.Net.UnitTests
{
internal class RoutingTableTests
{
[Test]
public void Constructor_CreatesEntriesAndDeduplicatesHandlersPerTypeIdentifier()
{
var processor = new TestMessageProcessor(1, MessageRouter.Create(
MessageRoute<string>.CreateWithoutTopicFilter("ticker", EmptyHandler<string>()),
MessageRoute<string>.CreateWithTopicFilter("ticker", "btcusdt", EmptyHandler<string>()),
MessageRoute<int>.CreateWithoutTopicFilter("trade", EmptyHandler<int>())));
var routingTable = new RoutingTable(new[] { processor });
var tickerEntry = routingTable.GetRouteTableEntry("ticker");
var tradeEntry = routingTable.GetRouteTableEntry("trade");
Assert.Multiple(() =>
{
Assert.That(tickerEntry, Is.Not.Null);
Assert.That(tickerEntry!.DeserializationType, Is.EqualTo(typeof(string)));
Assert.That(tickerEntry.IsStringOutput, Is.True);
Assert.That(tickerEntry.Handlers, Has.Count.EqualTo(1));
Assert.That(tickerEntry.Handlers[0], Is.SameAs(processor));
Assert.That(tradeEntry, Is.Not.Null);
Assert.That(tradeEntry!.DeserializationType, Is.EqualTo(typeof(int)));
Assert.That(tradeEntry.IsStringOutput, Is.False);
Assert.That(tradeEntry.Handlers, Has.Count.EqualTo(1));
Assert.That(tradeEntry.Handlers[0], Is.SameAs(processor));
});
}
[Test]
public void Constructor_AddsAllProcessorsForSameTypeIdentifier()
{
var processor1 = new TestMessageProcessor(1, MessageRouter.Create(
MessageRoute<string>.CreateWithoutTopicFilter("ticker", EmptyHandler<string>())));
var processor2 = new TestMessageProcessor(2, MessageRouter.Create(
MessageRoute<string>.CreateWithTopicFilter("ticker", "ethusdt", EmptyHandler<string>())));
var routingTable = new RoutingTable(new IMessageProcessor[] { processor1, processor2 });
var entry = routingTable.GetRouteTableEntry("ticker");
Assert.Multiple(() =>
{
Assert.That(entry, Is.Not.Null);
Assert.That(entry!.Handlers, Has.Count.EqualTo(2));
Assert.That(entry.Handlers, Does.Contain(processor1));
Assert.That(entry.Handlers, Does.Contain(processor2));
});
}
[Test]
public void GetRouteTableEntry_ReturnsNullForUnknownTypeIdentifier()
{
var processor = new TestMessageProcessor(1, MessageRouter.Create(
MessageRoute<string>.CreateWithoutTopicFilter("ticker", EmptyHandler<string>())));
var routingTable = new RoutingTable(new[] { processor });
Assert.That(routingTable.GetRouteTableEntry("unknown"), Is.Null);
}
private static Func<SocketConnection, DateTime, string?, T, CallResult?> EmptyHandler<T>()
{
return (_, _, _, _) => null;
}
private class TestMessageProcessor : IMessageProcessor
{
public int Id { get; }
public MessageRouter MessageRouter { get; }
public event Action? OnMessageRouterUpdated;
public TestMessageProcessor(int id, MessageRouter messageRouter)
{
Id = id;
MessageRouter = messageRouter;
}
public bool Handle(string typeIdentifier, string? topicFilter, SocketConnection socketConnection, DateTime receiveTime, string? originalData, object result)
{
return false;
}
}
}
}

View File

@ -0,0 +1,235 @@
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Sockets.Default;
using CryptoExchange.Net.Sockets.Default.Routing;
using NUnit.Framework;
using System;
using System.Collections.Generic;
namespace CryptoExchange.Net.UnitTests.SocketRoutingTests
{
[TestFixture]
public class QueryRouterTests
{
[Test]
public void BuildFromRoutes_Should_GroupRoutesByTypeIdentifier_AndSetDeserializationType()
{
// arrange
var routes = new MessageRoute[]
{
MessageRoute<string>.CreateWithoutTopicFilter("type1", (_, _, _, _) => null),
MessageRoute<string>.CreateWithTopicFilter("type1", "topic1", (_, _, _, _) => null),
MessageRoute<int>.CreateWithTopicFilter("type2", "topic2", (_, _, _, _) => null)
};
var router = new QueryRouter(routes);
// act
var type1Routes = router.GetRoutes("type1");
var type2Routes = router.GetRoutes("type2");
var missingRoutes = router.GetRoutes("missing");
// assert
Assert.That(type1Routes, Is.Not.Null);
Assert.That(type2Routes, Is.Not.Null);
Assert.That(missingRoutes, Is.Null);
Assert.That(type1Routes, Is.TypeOf<QueryRouteCollection>());
Assert.That(type2Routes, Is.TypeOf<QueryRouteCollection>());
Assert.That(type1Routes!.DeserializationType, Is.EqualTo(typeof(string)));
Assert.That(type2Routes!.DeserializationType, Is.EqualTo(typeof(int)));
}
[Test]
public void AddRoute_Should_SetMultipleReaders_WhenAnyRouteAllowsMultipleReaders()
{
// arrange
var collection = new QueryRouteCollection(typeof(string));
// act
collection.AddRoute(null, MessageRoute<string>.CreateWithoutTopicFilter("type", (_, _, _, _) => null));
var beforeMultipleReaders = collection.MultipleReaders;
collection.AddRoute("topic", MessageRoute<string>.CreateWithTopicFilter("type", "topic", (_, _, _, _) => null, true));
var afterMultipleReaders = collection.MultipleReaders;
// assert
Assert.That(beforeMultipleReaders, Is.False);
Assert.That(afterMultipleReaders, Is.True);
}
[Test]
public void Handle_Should_InvokeRoutesWithoutTopicFilter_WhenTopicFilterIsNull()
{
// arrange
var calls = new List<string>();
var collection = new QueryRouteCollection(typeof(string));
collection.AddRoute(null, MessageRoute<string>.CreateWithoutTopicFilter("type", (_, _, _, _) =>
{
calls.Add("no-topic");
return null;
}));
collection.AddRoute("topic", MessageRoute<string>.CreateWithTopicFilter("type", "topic", (_, _, _, _) =>
{
calls.Add("topic");
return null;
}));
collection.Build();
// act
var handled = collection.Handle(null, null!, DateTime.UtcNow, "original", "data", out var result);
// assert
Assert.That(handled, Is.True);
Assert.That(result, Is.Null);
Assert.That(calls, Is.EqualTo(new[] { "no-topic" }));
}
[Test]
public void Handle_Should_ReturnFalse_WhenNoRoutesMatch()
{
// arrange
var collection = new QueryRouteCollection(typeof(string));
collection.AddRoute("other-topic", MessageRoute<string>.CreateWithTopicFilter("type", "other-topic", (_, _, _, _) => null));
collection.Build();
// act
var handled = collection.Handle("topic", null!, DateTime.UtcNow, "original", "data", out var result);
// assert
Assert.That(handled, Is.False);
Assert.That(result, Is.Null);
}
[Test]
public void Handle_Should_InvokeRoutesWithoutTopicFilter_AndMatchingTopicRoutes()
{
// arrange
var calls = new List<string>();
var collection = new QueryRouteCollection(typeof(string));
collection.AddRoute(null, MessageRoute<string>.CreateWithoutTopicFilter("type", (_, _, _, _) =>
{
calls.Add("no-topic");
return null;
}));
collection.AddRoute("topic", MessageRoute<string>.CreateWithTopicFilter("type", "topic", (_, _, _, _) =>
{
calls.Add("topic");
return null;
}));
collection.Build();
// act
var handled = collection.Handle("topic", null!, DateTime.UtcNow, "original", "data", out var result);
// assert
Assert.That(handled, Is.True);
Assert.That(result, Is.Null);
Assert.That(calls, Is.EqualTo(new[] { "no-topic", "topic" }));
}
[Test]
public void Handle_Should_StopAfterFirstNonNullMatchingResult_WhenMultipleReadersIsFalse()
{
// arrange
var calls = new List<string>();
var expectedResult = CallResult.SuccessResult;
var collection = new QueryRouteCollection(typeof(string));
collection.AddRoute("topic", MessageRoute<string>.CreateWithTopicFilter("type", "topic", (_, _, _, _) =>
{
calls.Add("first");
return expectedResult;
}));
collection.AddRoute("topic", MessageRoute<string>.CreateWithTopicFilter("type", "topic", (_, _, _, _) =>
{
calls.Add("second");
return new CallResult(null);
}));
collection.Build();
// act
var handled = collection.Handle("topic", null!, DateTime.UtcNow, "original", "data", out var result);
// assert
Assert.That(handled, Is.True);
Assert.That(result, Is.SameAs(expectedResult));
Assert.That(calls, Is.EqualTo(new[] { "first" }));
}
[Test]
public void Handle_Should_ContinueAfterNonNullMatchingResult_WhenMultipleReadersIsTrue()
{
// arrange
var calls = new List<string>();
var expectedResult = CallResult.SuccessResult;
var collection = new QueryRouteCollection(typeof(string));
collection.AddRoute("topic", MessageRoute<string>.CreateWithTopicFilter("type", "topic", (_, _, _, _) =>
{
calls.Add("first");
return expectedResult;
}, true));
collection.AddRoute("topic", MessageRoute<string>.CreateWithTopicFilter("type", "topic", (_, _, _, _) =>
{
calls.Add("second");
return new CallResult(null);
}));
collection.Build();
// act
var handled = collection.Handle("topic", null!, DateTime.UtcNow, "original", "data", out var result);
// assert
Assert.That(handled, Is.True);
Assert.That(result, Is.SameAs(expectedResult));
Assert.That(calls, Is.EqualTo(new[] { "first", "second" }));
}
[Test]
public void Handle_Should_ContinueUntilNonNullResult_WhenEarlierMatchingRoutesReturnNull()
{
// arrange
var calls = new List<string>();
var expectedResult = CallResult.SuccessResult;
var collection = new QueryRouteCollection(typeof(string));
collection.AddRoute("topic", MessageRoute<string>.CreateWithTopicFilter("type", "topic", (_, _, _, _) =>
{
calls.Add("first");
return null;
}));
collection.AddRoute("topic", MessageRoute<string>.CreateWithTopicFilter("type", "topic", (_, _, _, _) =>
{
calls.Add("second");
return expectedResult;
}));
collection.AddRoute("topic", MessageRoute<string>.CreateWithTopicFilter("type", "topic", (_, _, _, _) =>
{
calls.Add("third");
return new CallResult(null);
}));
collection.Build();
// act
var handled = collection.Handle("topic", null!, DateTime.UtcNow, "original", "data", out var result);
// assert
Assert.That(handled, Is.True);
Assert.That(result, Is.SameAs(expectedResult));
Assert.That(calls, Is.EqualTo(new[] { "first", "second" }));
}
[Test]
public void Handle_Should_ReturnHandledTrue_WhenMatchingRoutesReturnNull()
{
// arrange
var collection = new QueryRouteCollection(typeof(string));
collection.AddRoute("topic", MessageRoute<string>.CreateWithTopicFilter("type", "topic", (_, _, _, _) => null));
collection.Build();
// act
var handled = collection.Handle("topic", null!, DateTime.UtcNow, "original", "data", out var result);
// assert
Assert.That(handled, Is.True);
Assert.That(result, Is.Null);
}
}
}

View File

@ -0,0 +1,167 @@
using CryptoExchange.Net.Sockets.Default;
using CryptoExchange.Net.Sockets.Default.Routing;
using CryptoExchange.Net.Sockets.Interfaces;
using NUnit.Framework;
using System;
using System.Linq;
namespace CryptoExchange.Net.UnitTests.SocketRoutingTests
{
[TestFixture]
public class RoutingTableTests
{
[Test]
public void Update_Should_CreateEntriesPerTypeIdentifier_WithCorrectDeserializationTypeAndHandlers()
{
// arrange
var processor1 = new TestMessageProcessor(
1,
MessageRouter.Create(
MessageRoute<string>.CreateWithoutTopicFilter("type1", (_, _, _, _) => null),
MessageRoute<string>.CreateWithTopicFilter("type1", "topic1", (_, _, _, _) => null)));
var processor2 = new TestMessageProcessor(
2,
MessageRouter.Create(
MessageRoute<int>.CreateWithTopicFilter("type2", "topic2", (_, _, _, _) => null)));
var table = new RoutingTable();
// act
table.Update(new IMessageProcessor[] { processor1, processor2 });
var type1Entry = table.GetRouteTableEntry("type1");
var type2Entry = table.GetRouteTableEntry("type2");
var missingEntry = table.GetRouteTableEntry("missing");
// assert
Assert.That(type1Entry, Is.Not.Null);
Assert.That(type2Entry, Is.Not.Null);
Assert.That(missingEntry, Is.Null);
Assert.That(type1Entry!.DeserializationType, Is.EqualTo(typeof(string)));
Assert.That(type1Entry.IsStringOutput, Is.True);
Assert.That(type1Entry.Handlers, Has.Count.EqualTo(1));
Assert.That(type1Entry.Handlers.Single(), Is.SameAs(processor1));
Assert.That(type2Entry!.DeserializationType, Is.EqualTo(typeof(int)));
Assert.That(type2Entry.IsStringOutput, Is.False);
Assert.That(type2Entry.Handlers, Has.Count.EqualTo(1));
Assert.That(type2Entry.Handlers.Single(), Is.SameAs(processor2));
}
[Test]
public void Update_Should_AddMultipleProcessors_ForSameTypeIdentifier()
{
// arrange
var processor1 = new TestMessageProcessor(
1,
MessageRouter.Create(
MessageRoute<string>.CreateWithoutTopicFilter("type1", (_, _, _, _) => null)));
var processor2 = new TestMessageProcessor(
2,
MessageRouter.Create(
MessageRoute<string>.CreateWithTopicFilter("type1", "topic1", (_, _, _, _) => null)));
var table = new RoutingTable();
// act
table.Update(new IMessageProcessor[] { processor1, processor2 });
var entry = table.GetRouteTableEntry("type1");
// assert
Assert.That(entry, Is.Not.Null);
Assert.That(entry!.DeserializationType, Is.EqualTo(typeof(string)));
Assert.That(entry.Handlers, Has.Count.EqualTo(2));
Assert.That(entry.Handlers, Does.Contain(processor1));
Assert.That(entry.Handlers, Does.Contain(processor2));
}
[Test]
public void Update_Should_ReplacePreviousEntries()
{
// arrange
var initialProcessor = new TestMessageProcessor(
1,
MessageRouter.Create(
MessageRoute<string>.CreateWithoutTopicFilter("type1", (_, _, _, _) => null)));
var replacementProcessor = new TestMessageProcessor(
2,
MessageRouter.Create(
MessageRoute<int>.CreateWithoutTopicFilter("type2", (_, _, _, _) => null)));
var table = new RoutingTable();
table.Update(new IMessageProcessor[] { initialProcessor });
// act
table.Update(new IMessageProcessor[] { replacementProcessor });
var oldEntry = table.GetRouteTableEntry("type1");
var newEntry = table.GetRouteTableEntry("type2");
// assert
Assert.That(oldEntry, Is.Null);
Assert.That(newEntry, Is.Not.Null);
Assert.That(newEntry!.DeserializationType, Is.EqualTo(typeof(int)));
Assert.That(newEntry.Handlers, Has.Count.EqualTo(1));
Assert.That(newEntry.Handlers.Single(), Is.SameAs(replacementProcessor));
}
[Test]
public void Update_WithEmptyProcessors_Should_ClearEntries()
{
// arrange
var processor = new TestMessageProcessor(
1,
MessageRouter.Create(
MessageRoute<string>.CreateWithoutTopicFilter("type1", (_, _, _, _) => null)));
var table = new RoutingTable();
table.Update(new IMessageProcessor[] { processor });
// act
table.Update(Array.Empty<IMessageProcessor>());
// assert
Assert.That(table.GetRouteTableEntry("type1"), Is.Null);
}
[Test]
public void TypeRoutingCollection_Should_SetIsStringOutput_BasedOnDeserializationType()
{
// arrange & act
var stringCollection = new TypeRoutingCollection(typeof(string));
var intCollection = new TypeRoutingCollection(typeof(int));
// assert
Assert.That(stringCollection.IsStringOutput, Is.True);
Assert.That(stringCollection.DeserializationType, Is.EqualTo(typeof(string)));
Assert.That(stringCollection.Handlers, Is.Empty);
Assert.That(intCollection.IsStringOutput, Is.False);
Assert.That(intCollection.DeserializationType, Is.EqualTo(typeof(int)));
Assert.That(intCollection.Handlers, Is.Empty);
}
private sealed class TestMessageProcessor : IMessageProcessor
{
public int Id { get; }
public MessageRouter MessageRouter { get; }
public TestMessageProcessor(int id, MessageRouter messageRouter)
{
Id = id;
MessageRouter = messageRouter;
}
public event Action? OnMessageRouterUpdated;
public bool Handle(string typeIdentifier, string? topicFilter, SocketConnection socketConnection, DateTime receiveTime, string? originalData, object result)
{
return true;
}
}
}
}

View File

@ -0,0 +1,160 @@
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Sockets.Default.Routing;
using NUnit.Framework;
using System;
using System.Collections.Generic;
namespace CryptoExchange.Net.UnitTests.SocketRoutingTests
{
[TestFixture]
public class SubscriptionRouterTests
{
[Test]
public void BuildFromRoutes_Should_GroupRoutesByTypeIdentifier_AndSetDeserializationType()
{
// arrange
var routes = new MessageRoute[]
{
MessageRoute<string>.CreateWithoutTopicFilter("type1", (_, _, _, _) => null),
MessageRoute<string>.CreateWithTopicFilter("type1", "topic1", (_, _, _, _) => null),
MessageRoute<int>.CreateWithTopicFilter("type2", "topic2", (_, _, _, _) => null)
};
var router = new SubscriptionRouter(routes);
// act
var type1Routes = router.GetRoutes("type1");
var type2Routes = router.GetRoutes("type2");
var missingRoutes = router.GetRoutes("missing");
// assert
Assert.That(type1Routes, Is.Not.Null);
Assert.That(type2Routes, Is.Not.Null);
Assert.That(missingRoutes, Is.Null);
Assert.That(type1Routes, Is.TypeOf<SubscriptionRouteCollection>());
Assert.That(type2Routes, Is.TypeOf<SubscriptionRouteCollection>());
Assert.That(type1Routes!.DeserializationType, Is.EqualTo(typeof(string)));
Assert.That(type2Routes!.DeserializationType, Is.EqualTo(typeof(int)));
}
[Test]
public void Handle_Should_InvokeRoutesWithoutTopicFilter_WhenTopicFilterIsNull()
{
// arrange
var calls = new List<string>();
var collection = new SubscriptionRouteCollection(typeof(string));
collection.AddRoute(null, MessageRoute<string>.CreateWithoutTopicFilter("type", (_, _, _, _) =>
{
calls.Add("no-topic");
return null;
}));
collection.AddRoute("topic", MessageRoute<string>.CreateWithTopicFilter("type", "topic", (_, _, _, _) =>
{
calls.Add("topic");
return null;
}));
collection.Build();
// act
var handled = collection.Handle(null, null!, DateTime.UtcNow, "original", "data", out var result);
// assert
Assert.That(handled, Is.True);
Assert.That(result, Is.SameAs(CallResult.SuccessResult));
Assert.That(calls, Is.EqualTo(new[] { "no-topic" }));
}
[Test]
public void Handle_Should_ReturnFalse_WhenNoRoutesMatch()
{
// arrange
var collection = new SubscriptionRouteCollection(typeof(string));
collection.AddRoute("other-topic", MessageRoute<string>.CreateWithTopicFilter("type", "other-topic", (_, _, _, _) => null));
collection.Build();
// act
var handled = collection.Handle("topic", null!, DateTime.UtcNow, "original", "data", out var result);
// assert
Assert.That(handled, Is.False);
Assert.That(result, Is.SameAs(CallResult.SuccessResult));
}
[Test]
public void Handle_Should_InvokeRoutesWithoutTopicFilter_AndMatchingTopicRoutes()
{
// arrange
var calls = new List<string>();
var collection = new SubscriptionRouteCollection(typeof(string));
collection.AddRoute(null, MessageRoute<string>.CreateWithoutTopicFilter("type", (_, _, _, _) =>
{
calls.Add("no-topic");
return null;
}));
collection.AddRoute("topic", MessageRoute<string>.CreateWithTopicFilter("type", "topic", (_, _, _, _) =>
{
calls.Add("topic");
return null;
}));
collection.Build();
// act
var handled = collection.Handle("topic", null!, DateTime.UtcNow, "original", "data", out var result);
// assert
Assert.That(handled, Is.True);
Assert.That(result, Is.SameAs(CallResult.SuccessResult));
Assert.That(calls, Is.EqualTo(new[] { "no-topic", "topic" }));
}
[Test]
public void Handle_Should_InvokeAllMatchingTopicRoutes()
{
// arrange
var calls = new List<string>();
var collection = new SubscriptionRouteCollection(typeof(string));
collection.AddRoute("topic", MessageRoute<string>.CreateWithTopicFilter("type", "topic", (_, _, _, _) =>
{
calls.Add("first");
return CallResult.SuccessResult;
}));
collection.AddRoute("topic", MessageRoute<string>.CreateWithTopicFilter("type", "topic", (_, _, _, _) =>
{
calls.Add("second");
return null;
}));
collection.Build();
// act
var handled = collection.Handle("topic", null!, DateTime.UtcNow, "original", "data", out var result);
// assert
Assert.That(handled, Is.True);
Assert.That(result, Is.SameAs(CallResult.SuccessResult));
Assert.That(calls, Is.EqualTo(new[] { "first", "second" }));
}
[Test]
public void Handle_Should_NotInvokeTopicRoutes_WhenTopicFilterIsNull()
{
// arrange
var calls = new List<string>();
var collection = new SubscriptionRouteCollection(typeof(string));
collection.AddRoute("topic", MessageRoute<string>.CreateWithTopicFilter("type", "topic", (_, _, _, _) =>
{
calls.Add("topic");
return null;
}));
collection.Build();
// act
var handled = collection.Handle(null, null!, DateTime.UtcNow, "original", "data", out var result);
// assert
Assert.That(handled, Is.False);
Assert.That(result, Is.SameAs(CallResult.SuccessResult));
Assert.That(calls, Is.Empty);
}
}
}

View File

@ -230,7 +230,7 @@ namespace CryptoExchange.Net.Converters.SystemTextJson
/// <summary>
/// Try to get the enum value based on the string value using the Utf8JsonReader's ValueTextEquals method.
/// This is an optimization to avoid string allocations when possible, but can only match case insensitively
/// This is an optimization to avoid string allocations when possible, but can only match case sensitively
/// </summary>
private static T? GetValueOptimistic(ref Utf8JsonReader reader)
{

View File

@ -10,7 +10,7 @@ namespace CryptoExchange.Net.Sockets.Default.Routing
/// </summary>
public class MessageRouter
{
private RoutingSubTable? _routingTable;
private ProcessorRouter? _routingTable;
/// <summary>
/// The routes registered for this router
@ -28,17 +28,29 @@ namespace CryptoExchange.Net.Sockets.Default.Routing
/// <summary>
/// Build the route mapping
/// </summary>
public void BuildRouteMap()
public void BuildQueryRouter()
{
_routingTable = new RoutingSubTable(Routes);
_routingTable = new QueryRouter(Routes);
}
/// <summary>
/// Get routes matching the type identifier
/// Build the route mapping
/// </summary>
internal RoutingSubTableEntry? this[string identifier]
public void BuildSubscriptionRouter()
{
get => (_routingTable ?? throw new InvalidOperationException("Route map not initialized before use"))[identifier];
_routingTable = new SubscriptionRouter(Routes);
}
/// <summary>
/// Handle message
/// </summary>
public bool Handle(string typeIdentifier, string? topicFilter, SocketConnection connection, DateTime receiveTime, string? originalData, object data, out CallResult? result)
{
var routeCollection = (_routingTable ?? throw new NullReferenceException("Routing table not build before handling")).GetRoutes(typeIdentifier);
if (routeCollection == null)
throw new InvalidOperationException($"No routes for {typeIdentifier} message type");
return routeCollection.Handle(topicFilter, connection, receiveTime, originalData, data, out result);
}
/// <summary>

View File

@ -0,0 +1,36 @@
using System.Collections.Generic;
#if NET8_0_OR_GREATER
using System.Collections.Frozen;
#endif
namespace CryptoExchange.Net.Sockets.Default.Routing
{
internal abstract class ProcessorRouter
{
public abstract RouteCollection? GetRoutes(string identifier);
}
internal abstract class ProcessorRouter<T> : ProcessorRouter
where T : RouteCollection
{
#if NET8_0_OR_GREATER
private FrozenDictionary<string, T> _routeMap;
#else
private Dictionary<string, T> _routeMap;
#endif
public ProcessorRouter(IEnumerable<MessageRoute> routes)
{
var map = BuildFromRoutes(routes);
#if NET8_0_OR_GREATER
_routeMap = map.ToFrozenDictionary();
#else
_routeMap = map;
#endif
}
public abstract Dictionary<string, T> BuildFromRoutes(IEnumerable<MessageRoute> routes);
public override RouteCollection? GetRoutes(string identifier) => _routeMap.TryGetValue(identifier, out var routes) ? routes : null;
}
}

View File

@ -0,0 +1,90 @@
using CryptoExchange.Net.Objects;
using System;
using System.Collections.Generic;
namespace CryptoExchange.Net.Sockets.Default.Routing
{
internal class QueryRouter : ProcessorRouter<QueryRouteCollection>
{
public QueryRouter(IEnumerable<MessageRoute> routes) : base(routes)
{
}
public override Dictionary<string, QueryRouteCollection> BuildFromRoutes(IEnumerable<MessageRoute> routes)
{
var newMap = new Dictionary<string, QueryRouteCollection>();
foreach (var route in routes)
{
if (!newMap.TryGetValue(route.TypeIdentifier, out var typeMap))
{
typeMap = new QueryRouteCollection(route.DeserializationType);
newMap.Add(route.TypeIdentifier, typeMap);
}
typeMap.AddRoute(route.TopicFilter, route);
}
foreach (var subEntry in newMap.Values)
subEntry.Build();
return newMap;
}
}
internal class QueryRouteCollection : RouteCollection
{
public bool MultipleReaders { get; private set; }
public QueryRouteCollection(Type routeType) : base(routeType)
{
}
public override void AddRoute(string? topicFilter, MessageRoute route)
{
base.AddRoute(topicFilter, route);
if (route.MultipleReaders)
MultipleReaders = true;
}
public override bool Handle(string? topicFilter, SocketConnection connection, DateTime receiveTime, string? originalData, object data, out CallResult? result)
{
result = null;
// Routes without topic filter handle both when the message topic is empty and when it is not, so we always call them
var handled = false;
foreach (var route in _routesWithoutTopicFilter)
{
var thisResult = route.Handle(connection, receiveTime, originalData, data);
if (thisResult != null)
result ??= thisResult;
handled = true;
}
// Forward to routes with matching topic filter, if any
if (topicFilter == null)
return handled;
var matchingTopicRoutes = GetRoutesWithMatchingTopicFilter(topicFilter);
if (matchingTopicRoutes == null)
return handled;
foreach (var route in matchingTopicRoutes)
{
var thisResult = route.Handle(connection, receiveTime, originalData, data);
handled = true;
if (thisResult != null)
{
result ??= thisResult;
if (!MultipleReaders)
break;
}
}
return handled;
}
}
}

View File

@ -0,0 +1,65 @@
using CryptoExchange.Net.Objects;
using System;
#if NET8_0_OR_GREATER
using System.Collections.Frozen;
#endif
using System.Collections.Generic;
namespace CryptoExchange.Net.Sockets.Default.Routing
{
internal abstract class RouteCollection
{
protected List<MessageRoute> _routesWithoutTopicFilter;
protected Dictionary<string, List<MessageRoute>> _routesWithTopicFilter;
#if NET8_0_OR_GREATER
protected FrozenDictionary<string, List<MessageRoute>>? _routesWithTopicFilterFrozen;
#endif
public Type DeserializationType { get; }
public RouteCollection(Type routeType)
{
_routesWithoutTopicFilter = new List<MessageRoute>();
_routesWithTopicFilter = new Dictionary<string, List<MessageRoute>>();
DeserializationType = routeType;
}
public virtual void AddRoute(string? topicFilter, MessageRoute route)
{
if (string.IsNullOrEmpty(topicFilter))
{
_routesWithoutTopicFilter.Add(route);
}
else
{
if (!_routesWithTopicFilter.TryGetValue(topicFilter!, out var list))
{
list = new List<MessageRoute>();
_routesWithTopicFilter.Add(topicFilter!, list);
}
list.Add(route);
}
}
public void Build()
{
#if NET8_0_OR_GREATER
_routesWithTopicFilterFrozen = _routesWithTopicFilter.ToFrozenDictionary();
#endif
}
protected List<MessageRoute>? GetRoutesWithMatchingTopicFilter(string topicFilter)
{
#if NET8_0_OR_GREATER
_routesWithTopicFilterFrozen!.TryGetValue(topicFilter, out var matchingTopicRoutes);
#else
_routesWithTopicFilter.TryGetValue(topicFilter, out var matchingTopicRoutes);
#endif
return matchingTopicRoutes;
}
public abstract bool Handle(string? topicFilter, SocketConnection connection, DateTime receiveTime, string? originalData, object data, out CallResult? result);
}
}

View File

@ -1,142 +0,0 @@
using CryptoExchange.Net.Objects;
using System;
#if NET8_0_OR_GREATER
using System.Collections.Frozen;
#endif
using System.Collections.Generic;
namespace CryptoExchange.Net.Sockets.Default.Routing
{
internal class RoutingSubTable
{
#if NET8_0_OR_GREATER
// Used for mapping a type identifier to the routes matching it
private FrozenDictionary<string, RoutingSubTableEntry> _routeMap;
#else
private Dictionary<string, RoutingSubTableEntry> _routeMap;
#endif
public RoutingSubTable(IEnumerable<MessageRoute> routes)
{
var newMap = new Dictionary<string, RoutingSubTableEntry>();
foreach (var route in routes)
{
if (!newMap.TryGetValue(route.TypeIdentifier, out var typeMap))
{
typeMap = new RoutingSubTableEntry(route.DeserializationType);
newMap.Add(route.TypeIdentifier, typeMap);
}
typeMap.AddRoute(route.TopicFilter, route);
}
foreach(var subEntry in newMap.Values)
subEntry.Build();
#if NET8_0_OR_GREATER
_routeMap = newMap.ToFrozenDictionary();
#else
_routeMap = newMap;
#endif
}
/// <summary>
/// Get routes matching the type identifier
/// </summary>
public RoutingSubTableEntry? this[string identifier]
{
get => _routeMap.TryGetValue(identifier, out var routes) ? routes : null;
}
}
internal record RoutingSubTableEntry
{
public Type DeserializationType { get; }
public bool MultipleReaders { get; private set; }
private List<MessageRoute> _routesWithoutTopicFilter;
private Dictionary<string, List<MessageRoute>> _routesWithTopicFilter;
#if NET8_0_OR_GREATER
private FrozenDictionary<string, List<MessageRoute>>? _routesWithTopicFilterFrozen;
#endif
public RoutingSubTableEntry(Type routeType)
{
_routesWithoutTopicFilter = new List<MessageRoute>();
_routesWithTopicFilter = new Dictionary<string, List<MessageRoute>>();
DeserializationType = routeType;
}
public void AddRoute(string? topicFilter, MessageRoute route)
{
if (string.IsNullOrEmpty(topicFilter))
{
_routesWithoutTopicFilter.Add(route);
}
else
{
if (!_routesWithTopicFilter.TryGetValue(topicFilter!, out var list))
{
list = new List<MessageRoute>();
_routesWithTopicFilter.Add(topicFilter!, list);
}
list.Add(route);
}
if (route.MultipleReaders)
MultipleReaders = true;
}
public void Build()
{
#if NET8_0_OR_GREATER
_routesWithTopicFilterFrozen = _routesWithTopicFilter.ToFrozenDictionary();
#endif
}
internal bool Handle(string? topicFilter, SocketConnection connection, DateTime receiveTime, string? originalData, object data, out CallResult? result)
{
result = null;
// Routes without topic filter handle both when the message topic is empty and when it is not, so we always call them
var handled = false;
foreach (var route in _routesWithoutTopicFilter)
{
var thisResult = route.Handle(connection, receiveTime, originalData, data);
if (thisResult != null)
result ??= thisResult;
handled = true;
}
// Forward to routes with matching topic filter, if any
if (topicFilter != null)
{
#if NET8_0_OR_GREATER
_routesWithTopicFilterFrozen!.TryGetValue(topicFilter, out var matchingTopicRoutes);
#else
_routesWithTopicFilter.TryGetValue(topicFilter, out var matchingTopicRoutes);
#endif
foreach (var route in matchingTopicRoutes ?? [])
{
var thisResult = route.Handle(connection, receiveTime, originalData, data);
handled = true;
if (thisResult != null)
{
result ??= thisResult;
#warning MultipleReaders is only for queries, subscriptions should always have multiple readers = true. Maybe create different RoutingSubTable implementations for Queries and Subscriptions?
if (!MultipleReaders)
break;
}
}
}
return handled;
}
}
}

View File

@ -2,6 +2,9 @@
using System;
using System.Collections.Generic;
using System.Text;
#if NET8_0_OR_GREATER
using System.Collections.Frozen;
#endif
namespace CryptoExchange.Net.Sockets.Default.Routing
{
@ -10,40 +13,51 @@ namespace CryptoExchange.Net.Sockets.Default.Routing
/// </summary>
public class RoutingTable
{
private Dictionary<string, RoutingTableEntry> _routeTableEntries;
#if NET8_0_OR_GREATER
private FrozenDictionary<string, TypeRoutingCollection> _typeRoutingCollections = new Dictionary<string, TypeRoutingCollection>().ToFrozenDictionary();
#else
private Dictionary<string, TypeRoutingCollection> _typeRoutingCollections = new();
#endif
/// <summary>
/// Create routing table for provided processors
/// Update the routing table
/// </summary>
public RoutingTable(IEnumerable<IMessageProcessor> processors)
/// <param name="processors"></param>
public void Update(IEnumerable<IMessageProcessor> processors)
{
_routeTableEntries = new Dictionary<string, RoutingTableEntry>();
var newTypeMap = new Dictionary<string, TypeRoutingCollection>();
foreach (var entry in processors)
{
foreach (var route in entry.MessageRouter.Routes)
{
if (!_routeTableEntries.ContainsKey(route.TypeIdentifier))
_routeTableEntries.Add(route.TypeIdentifier, new RoutingTableEntry(route.DeserializationType));
if (!newTypeMap.ContainsKey(route.TypeIdentifier))
newTypeMap.Add(route.TypeIdentifier, new TypeRoutingCollection(route.DeserializationType));
if (!_routeTableEntries[route.TypeIdentifier].Handlers.Contains(entry))
_routeTableEntries[route.TypeIdentifier].Handlers.Add(entry);
if (!newTypeMap[route.TypeIdentifier].Handlers.Contains(entry))
newTypeMap[route.TypeIdentifier].Handlers.Add(entry);
}
}
#if NET8_0_OR_GREATER
_typeRoutingCollections = newTypeMap.ToFrozenDictionary();
#else
_typeRoutingCollections = newTypeMap;
#endif
}
/// <summary>
/// Get route table entry for a type identifier
/// </summary>
public RoutingTableEntry? GetRouteTableEntry(string typeIdentifier)
public TypeRoutingCollection? GetRouteTableEntry(string typeIdentifier)
{
return _routeTableEntries.TryGetValue(typeIdentifier, out var entry) ? entry : null;
return _typeRoutingCollections.TryGetValue(typeIdentifier, out var entry) ? entry : null;
}
/// <inheritdoc />
public override string ToString()
{
var sb = new StringBuilder();
foreach (var entry in _routeTableEntries)
foreach (var entry in _typeRoutingCollections)
{
sb.AppendLine($"{entry.Key}, {entry.Value.DeserializationType.Name}");
foreach(var item in entry.Value.Handlers)
@ -69,7 +83,7 @@ namespace CryptoExchange.Net.Sockets.Default.Routing
/// <summary>
/// Routing table entry
/// </summary>
public record RoutingTableEntry
public record TypeRoutingCollection
{
/// <summary>
/// Whether the deserialization type is string
@ -87,7 +101,7 @@ namespace CryptoExchange.Net.Sockets.Default.Routing
/// <summary>
/// ctor
/// </summary>
public RoutingTableEntry(Type deserializationType)
public TypeRoutingCollection(Type deserializationType)
{
IsStringOutput = deserializationType == typeof(string);
DeserializationType = deserializationType;

View File

@ -0,0 +1,69 @@
using CryptoExchange.Net.Objects;
using System;
using System.Collections.Generic;
namespace CryptoExchange.Net.Sockets.Default.Routing
{
internal class SubscriptionRouter : ProcessorRouter<SubscriptionRouteCollection>
{
public SubscriptionRouter(IEnumerable<MessageRoute> routes) : base(routes)
{
}
public override Dictionary<string, SubscriptionRouteCollection> BuildFromRoutes(IEnumerable<MessageRoute> routes)
{
var newMap = new Dictionary<string, SubscriptionRouteCollection>();
foreach (var route in routes)
{
if (!newMap.TryGetValue(route.TypeIdentifier, out var typeMap))
{
typeMap = new SubscriptionRouteCollection(route.DeserializationType);
newMap.Add(route.TypeIdentifier, typeMap);
}
typeMap.AddRoute(route.TopicFilter, route);
}
foreach (var subEntry in newMap.Values)
subEntry.Build();
return newMap;
}
}
internal class SubscriptionRouteCollection : RouteCollection
{
public SubscriptionRouteCollection(Type routeType) : base(routeType)
{
}
public override bool Handle(string? topicFilter, SocketConnection connection, DateTime receiveTime, string? originalData, object data, out CallResult? result)
{
result = CallResult.SuccessResult;
// Routes without topic filter handle both when the message topic is empty and when it is not, so we always call them
var handled = false;
foreach (var route in _routesWithoutTopicFilter)
{
route.Handle(connection, receiveTime, originalData, data);
handled = true;
}
// Forward to routes with matching topic filter, if any
if (topicFilter == null)
return handled;
var matchingTopicRoutes = GetRoutesWithMatchingTopicFilter(topicFilter);
if (matchingTopicRoutes == null)
return handled;
foreach (var route in matchingTopicRoutes)
{
route.Handle(connection, receiveTime, originalData, data);
handled = true;
}
return handled;
}
}
}

View File

@ -262,7 +262,7 @@ namespace CryptoExchange.Net.Sockets.Default
private readonly object _listenersLock = new object();
#endif
private RoutingTable _routeTable = new RoutingTable([]);
private RoutingTable _routingTable = new RoutingTable();
private ReadOnlyCollection<IMessageProcessor> _listeners;
private readonly ILogger _logger;
@ -531,7 +531,7 @@ namespace CryptoExchange.Net.Sockets.Default
return;
}
var routingEntry = _routeTable.GetRouteTableEntry(typeIdentifier);
var routingEntry = _routingTable.GetRouteTableEntry(typeIdentifier);
if (routingEntry == null)
{
if (!ApiClient.HandleUnhandledMessage(this, typeIdentifier, data))
@ -1139,9 +1139,9 @@ namespace CryptoExchange.Net.Sockets.Default
});
}
private void BuildRoutingTable()
private void UpdateRoutingTable()
{
_routeTable = new RoutingTable(_listeners);
_routingTable.Update(_listeners);
}
private void AddMessageProcessor(IMessageProcessor processor)
@ -1150,13 +1150,13 @@ namespace CryptoExchange.Net.Sockets.Default
{
var updatedList = new List<IMessageProcessor>(_listeners);
updatedList.Add(processor);
processor.OnMessageRouterUpdated += BuildRoutingTable;
processor.OnMessageRouterUpdated += UpdateRoutingTable;
_listeners = updatedList.AsReadOnly();
if (processor.MessageRouter.Routes.Length > 0)
{
BuildRoutingTable();
UpdateRoutingTable();
#if DEBUG
_logger.LogTrace("Processor added, new routing table:\r\n" + _routeTable.ToString());
_logger.LogTrace("Processor added, new routing table:\r\n" + _routingTable.ToString());
#endif
}
}
@ -1167,14 +1167,14 @@ namespace CryptoExchange.Net.Sockets.Default
lock (_listenersLock)
{
var updatedList = new List<IMessageProcessor>(_listeners);
processor.OnMessageRouterUpdated -= UpdateRoutingTable;
if (!updatedList.Remove(processor))
return; // If nothing removed nothing has changed
processor.OnMessageRouterUpdated -= BuildRoutingTable;
_listeners = updatedList.AsReadOnly();
BuildRoutingTable();
UpdateRoutingTable();
#if DEBUG
_logger.LogTrace("Processor removed, new routing table:\r\n" + _routeTable.ToString());
_logger.LogTrace("Processor removed, new routing table:\r\n" + _routingTable.ToString());
#endif
}
}
@ -1187,7 +1187,7 @@ namespace CryptoExchange.Net.Sockets.Default
var anyRemoved = false;
foreach (var processor in processors)
{
processor.OnMessageRouterUpdated -= BuildRoutingTable;
processor.OnMessageRouterUpdated -= UpdateRoutingTable;
if (updatedList.Remove(processor))
anyRemoved = true;
}
@ -1196,9 +1196,9 @@ namespace CryptoExchange.Net.Sockets.Default
return; // If nothing removed nothing has changed
_listeners = updatedList.AsReadOnly();
BuildRoutingTable();
UpdateRoutingTable();
#if DEBUG
_logger.LogTrace("Processors removed, new routing table:\r\n" + _routeTable.ToString());
_logger.LogTrace("Processors removed, new routing table:\r\n" + _routingTable.ToString());
#endif
}
}

View File

@ -82,7 +82,7 @@ namespace CryptoExchange.Net.Sockets.Default
set
{
_router = value;
_router.BuildRouteMap();
_router.BuildSubscriptionRouter();
OnMessageRouterUpdated?.Invoke();
}
}
@ -198,7 +198,7 @@ namespace CryptoExchange.Net.Sockets.Default
SubscriptionQuery.Timeout();
}
return MessageRouter[typeIdentifier]?.Handle(topicFilter, connection, receiveTime, originalData, data, out _) ?? false;
return MessageRouter.Handle(typeIdentifier, topicFilter, connection, receiveTime, originalData, data, out _);
}
/// <summary>

View File

@ -62,7 +62,7 @@ namespace CryptoExchange.Net.Sockets
private MessageRouter _router;
/// <summary>
/// Router for this subscription
/// Router for this query
/// </summary>
public MessageRouter MessageRouter
{
@ -70,7 +70,7 @@ namespace CryptoExchange.Net.Sockets
set
{
_router = value;
_router.BuildRouteMap();
_router.BuildQueryRouter();
OnMessageRouterUpdated?.Invoke();
}
}
@ -208,12 +208,14 @@ namespace CryptoExchange.Net.Sockets
if (CurrentResponses == RequiredResponses)
Response = message;
var handled = false;
if (Result?.Success != false)
{
// If an error result is already set don't override that
MessageRouter[typeIdentifier]!.Handle(topicFilter, connection, receiveTime, originalData, message, out var result);
MessageRouter.Handle(typeIdentifier, topicFilter, connection, receiveTime, originalData, message, out var result);
Result = result;
if (Result == null)
handled = Result != null;
if (!handled)
// Null from Handle means it wasn't actually for this query
CurrentResponses -= 1;
}
@ -225,7 +227,7 @@ namespace CryptoExchange.Net.Sockets
OnComplete?.Invoke();
}
return true;
return handled;
}
/// <inheritdoc />