mirror of
https://github.com/JKorf/CryptoExchange.Net
synced 2026-04-12 16:13:12 +00:00
Compare commits
3 Commits
61c66300af
...
5bf89897e8
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5bf89897e8 | ||
|
|
6dbc84e6fc | ||
|
|
f66730ca8c |
@ -1,5 +1,4 @@
|
||||
using CryptoExchange.Net.Objects;
|
||||
using CryptoExchange.Net.UnitTests.TestImplementations;
|
||||
using NUnit.Framework;
|
||||
using System;
|
||||
using System.Net.Http;
|
||||
|
||||
@ -1,7 +1,6 @@
|
||||
using CryptoExchange.Net.Objects;
|
||||
using CryptoExchange.Net.Testing;
|
||||
using CryptoExchange.Net.UnitTests.Implementations;
|
||||
using CryptoExchange.Net.UnitTests.TestImplementations;
|
||||
using NUnit.Framework;
|
||||
using System;
|
||||
using System.Linq;
|
||||
|
||||
@ -4,7 +4,7 @@ using NUnit.Framework;
|
||||
using System;
|
||||
using System.Text.Json;
|
||||
|
||||
namespace CryptoExchange.Net.UnitTests
|
||||
namespace CryptoExchange.Net.UnitTests.ConverterTests
|
||||
{
|
||||
[TestFixture()]
|
||||
public class SharedModelConversionTests
|
||||
|
||||
@ -23,7 +23,7 @@ namespace CryptoExchange.Net.UnitTests.Implementations
|
||||
{ }
|
||||
|
||||
/// <summary>
|
||||
/// Get the CryptoCom environment by name
|
||||
/// Get the environment by name
|
||||
/// </summary>
|
||||
public static TestEnvironment? GetEnvironmentByName(string? name)
|
||||
=> name switch
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
using System.Text.Json.Serialization;
|
||||
|
||||
namespace CryptoExchange.Net.UnitTests.TestImplementations
|
||||
namespace CryptoExchange.Net.UnitTests.Implementations
|
||||
{
|
||||
public class TestObject
|
||||
{
|
||||
|
||||
@ -1,6 +1,5 @@
|
||||
using CryptoExchange.Net.UnitTests.ConverterTests;
|
||||
using CryptoExchange.Net.UnitTests.Implementations;
|
||||
using CryptoExchange.Net.UnitTests.TestImplementations;
|
||||
using System.Collections.Generic;
|
||||
using System.Text.Json.Serialization;
|
||||
|
||||
|
||||
@ -18,7 +18,7 @@ namespace CryptoExchange.Net.UnitTests
|
||||
}
|
||||
|
||||
[Test]
|
||||
public void AddingBasicNullValue_ThrowExecption()
|
||||
public void AddingBasicNullValue_ThrowsException()
|
||||
{
|
||||
var parameters = new ParameterCollection();
|
||||
Assert.Throws<ArgumentNullException>(() => parameters.Add("test", null!));
|
||||
|
||||
@ -1,94 +0,0 @@
|
||||
using CryptoExchange.Net.Objects;
|
||||
using CryptoExchange.Net.Sockets.Default;
|
||||
using CryptoExchange.Net.Sockets.Default.Routing;
|
||||
using CryptoExchange.Net.Sockets.Interfaces;
|
||||
using NUnit.Framework;
|
||||
using System;
|
||||
|
||||
namespace CryptoExchange.Net.UnitTests
|
||||
{
|
||||
internal class RoutingTableTests
|
||||
{
|
||||
[Test]
|
||||
public void Constructor_CreatesEntriesAndDeduplicatesHandlersPerTypeIdentifier()
|
||||
{
|
||||
var processor = new TestMessageProcessor(1, MessageRouter.Create(
|
||||
MessageRoute<string>.CreateWithoutTopicFilter("ticker", EmptyHandler<string>()),
|
||||
MessageRoute<string>.CreateWithTopicFilter("ticker", "btcusdt", EmptyHandler<string>()),
|
||||
MessageRoute<int>.CreateWithoutTopicFilter("trade", EmptyHandler<int>())));
|
||||
|
||||
var routingTable = new RoutingTable(new[] { processor });
|
||||
var tickerEntry = routingTable.GetRouteTableEntry("ticker");
|
||||
var tradeEntry = routingTable.GetRouteTableEntry("trade");
|
||||
|
||||
Assert.Multiple(() =>
|
||||
{
|
||||
Assert.That(tickerEntry, Is.Not.Null);
|
||||
Assert.That(tickerEntry!.DeserializationType, Is.EqualTo(typeof(string)));
|
||||
Assert.That(tickerEntry.IsStringOutput, Is.True);
|
||||
Assert.That(tickerEntry.Handlers, Has.Count.EqualTo(1));
|
||||
Assert.That(tickerEntry.Handlers[0], Is.SameAs(processor));
|
||||
|
||||
Assert.That(tradeEntry, Is.Not.Null);
|
||||
Assert.That(tradeEntry!.DeserializationType, Is.EqualTo(typeof(int)));
|
||||
Assert.That(tradeEntry.IsStringOutput, Is.False);
|
||||
Assert.That(tradeEntry.Handlers, Has.Count.EqualTo(1));
|
||||
Assert.That(tradeEntry.Handlers[0], Is.SameAs(processor));
|
||||
});
|
||||
}
|
||||
|
||||
[Test]
|
||||
public void Constructor_AddsAllProcessorsForSameTypeIdentifier()
|
||||
{
|
||||
var processor1 = new TestMessageProcessor(1, MessageRouter.Create(
|
||||
MessageRoute<string>.CreateWithoutTopicFilter("ticker", EmptyHandler<string>())));
|
||||
var processor2 = new TestMessageProcessor(2, MessageRouter.Create(
|
||||
MessageRoute<string>.CreateWithTopicFilter("ticker", "ethusdt", EmptyHandler<string>())));
|
||||
|
||||
var routingTable = new RoutingTable(new IMessageProcessor[] { processor1, processor2 });
|
||||
var entry = routingTable.GetRouteTableEntry("ticker");
|
||||
|
||||
Assert.Multiple(() =>
|
||||
{
|
||||
Assert.That(entry, Is.Not.Null);
|
||||
Assert.That(entry!.Handlers, Has.Count.EqualTo(2));
|
||||
Assert.That(entry.Handlers, Does.Contain(processor1));
|
||||
Assert.That(entry.Handlers, Does.Contain(processor2));
|
||||
});
|
||||
}
|
||||
|
||||
[Test]
|
||||
public void GetRouteTableEntry_ReturnsNullForUnknownTypeIdentifier()
|
||||
{
|
||||
var processor = new TestMessageProcessor(1, MessageRouter.Create(
|
||||
MessageRoute<string>.CreateWithoutTopicFilter("ticker", EmptyHandler<string>())));
|
||||
|
||||
var routingTable = new RoutingTable(new[] { processor });
|
||||
|
||||
Assert.That(routingTable.GetRouteTableEntry("unknown"), Is.Null);
|
||||
}
|
||||
|
||||
private static Func<SocketConnection, DateTime, string?, T, CallResult?> EmptyHandler<T>()
|
||||
{
|
||||
return (_, _, _, _) => null;
|
||||
}
|
||||
|
||||
private class TestMessageProcessor : IMessageProcessor
|
||||
{
|
||||
public int Id { get; }
|
||||
public MessageRouter MessageRouter { get; }
|
||||
public event Action? OnMessageRouterUpdated;
|
||||
|
||||
public TestMessageProcessor(int id, MessageRouter messageRouter)
|
||||
{
|
||||
Id = id;
|
||||
MessageRouter = messageRouter;
|
||||
}
|
||||
|
||||
public bool Handle(string typeIdentifier, string? topicFilter, SocketConnection socketConnection, DateTime receiveTime, string? originalData, object result)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,235 @@
|
||||
using CryptoExchange.Net.Objects;
|
||||
using CryptoExchange.Net.Sockets.Default;
|
||||
using CryptoExchange.Net.Sockets.Default.Routing;
|
||||
using NUnit.Framework;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
|
||||
namespace CryptoExchange.Net.UnitTests.SocketRoutingTests
|
||||
{
|
||||
[TestFixture]
|
||||
public class QueryRouterTests
|
||||
{
|
||||
[Test]
|
||||
public void BuildFromRoutes_Should_GroupRoutesByTypeIdentifier_AndSetDeserializationType()
|
||||
{
|
||||
// arrange
|
||||
var routes = new MessageRoute[]
|
||||
{
|
||||
MessageRoute<string>.CreateWithoutTopicFilter("type1", (_, _, _, _) => null),
|
||||
MessageRoute<string>.CreateWithTopicFilter("type1", "topic1", (_, _, _, _) => null),
|
||||
MessageRoute<int>.CreateWithTopicFilter("type2", "topic2", (_, _, _, _) => null)
|
||||
};
|
||||
|
||||
var router = new QueryRouter(routes);
|
||||
|
||||
// act
|
||||
var type1Routes = router.GetRoutes("type1");
|
||||
var type2Routes = router.GetRoutes("type2");
|
||||
var missingRoutes = router.GetRoutes("missing");
|
||||
|
||||
// assert
|
||||
Assert.That(type1Routes, Is.Not.Null);
|
||||
Assert.That(type2Routes, Is.Not.Null);
|
||||
Assert.That(missingRoutes, Is.Null);
|
||||
|
||||
Assert.That(type1Routes, Is.TypeOf<QueryRouteCollection>());
|
||||
Assert.That(type2Routes, Is.TypeOf<QueryRouteCollection>());
|
||||
Assert.That(type1Routes!.DeserializationType, Is.EqualTo(typeof(string)));
|
||||
Assert.That(type2Routes!.DeserializationType, Is.EqualTo(typeof(int)));
|
||||
}
|
||||
|
||||
[Test]
|
||||
public void AddRoute_Should_SetMultipleReaders_WhenAnyRouteAllowsMultipleReaders()
|
||||
{
|
||||
// arrange
|
||||
var collection = new QueryRouteCollection(typeof(string));
|
||||
|
||||
// act
|
||||
collection.AddRoute(null, MessageRoute<string>.CreateWithoutTopicFilter("type", (_, _, _, _) => null));
|
||||
var beforeMultipleReaders = collection.MultipleReaders;
|
||||
|
||||
collection.AddRoute("topic", MessageRoute<string>.CreateWithTopicFilter("type", "topic", (_, _, _, _) => null, true));
|
||||
var afterMultipleReaders = collection.MultipleReaders;
|
||||
|
||||
// assert
|
||||
Assert.That(beforeMultipleReaders, Is.False);
|
||||
Assert.That(afterMultipleReaders, Is.True);
|
||||
}
|
||||
|
||||
[Test]
|
||||
public void Handle_Should_InvokeRoutesWithoutTopicFilter_WhenTopicFilterIsNull()
|
||||
{
|
||||
// arrange
|
||||
var calls = new List<string>();
|
||||
var collection = new QueryRouteCollection(typeof(string));
|
||||
collection.AddRoute(null, MessageRoute<string>.CreateWithoutTopicFilter("type", (_, _, _, _) =>
|
||||
{
|
||||
calls.Add("no-topic");
|
||||
return null;
|
||||
}));
|
||||
collection.AddRoute("topic", MessageRoute<string>.CreateWithTopicFilter("type", "topic", (_, _, _, _) =>
|
||||
{
|
||||
calls.Add("topic");
|
||||
return null;
|
||||
}));
|
||||
collection.Build();
|
||||
|
||||
// act
|
||||
var handled = collection.Handle(null, null!, DateTime.UtcNow, "original", "data", out var result);
|
||||
|
||||
// assert
|
||||
Assert.That(handled, Is.True);
|
||||
Assert.That(result, Is.Null);
|
||||
Assert.That(calls, Is.EqualTo(new[] { "no-topic" }));
|
||||
}
|
||||
|
||||
[Test]
|
||||
public void Handle_Should_ReturnFalse_WhenNoRoutesMatch()
|
||||
{
|
||||
// arrange
|
||||
var collection = new QueryRouteCollection(typeof(string));
|
||||
collection.AddRoute("other-topic", MessageRoute<string>.CreateWithTopicFilter("type", "other-topic", (_, _, _, _) => null));
|
||||
collection.Build();
|
||||
|
||||
// act
|
||||
var handled = collection.Handle("topic", null!, DateTime.UtcNow, "original", "data", out var result);
|
||||
|
||||
// assert
|
||||
Assert.That(handled, Is.False);
|
||||
Assert.That(result, Is.Null);
|
||||
}
|
||||
|
||||
[Test]
|
||||
public void Handle_Should_InvokeRoutesWithoutTopicFilter_AndMatchingTopicRoutes()
|
||||
{
|
||||
// arrange
|
||||
var calls = new List<string>();
|
||||
var collection = new QueryRouteCollection(typeof(string));
|
||||
collection.AddRoute(null, MessageRoute<string>.CreateWithoutTopicFilter("type", (_, _, _, _) =>
|
||||
{
|
||||
calls.Add("no-topic");
|
||||
return null;
|
||||
}));
|
||||
collection.AddRoute("topic", MessageRoute<string>.CreateWithTopicFilter("type", "topic", (_, _, _, _) =>
|
||||
{
|
||||
calls.Add("topic");
|
||||
return null;
|
||||
}));
|
||||
collection.Build();
|
||||
|
||||
// act
|
||||
var handled = collection.Handle("topic", null!, DateTime.UtcNow, "original", "data", out var result);
|
||||
|
||||
// assert
|
||||
Assert.That(handled, Is.True);
|
||||
Assert.That(result, Is.Null);
|
||||
Assert.That(calls, Is.EqualTo(new[] { "no-topic", "topic" }));
|
||||
}
|
||||
|
||||
[Test]
|
||||
public void Handle_Should_StopAfterFirstNonNullMatchingResult_WhenMultipleReadersIsFalse()
|
||||
{
|
||||
// arrange
|
||||
var calls = new List<string>();
|
||||
var expectedResult = CallResult.SuccessResult;
|
||||
var collection = new QueryRouteCollection(typeof(string));
|
||||
collection.AddRoute("topic", MessageRoute<string>.CreateWithTopicFilter("type", "topic", (_, _, _, _) =>
|
||||
{
|
||||
calls.Add("first");
|
||||
return expectedResult;
|
||||
}));
|
||||
collection.AddRoute("topic", MessageRoute<string>.CreateWithTopicFilter("type", "topic", (_, _, _, _) =>
|
||||
{
|
||||
calls.Add("second");
|
||||
return new CallResult(null);
|
||||
}));
|
||||
collection.Build();
|
||||
|
||||
// act
|
||||
var handled = collection.Handle("topic", null!, DateTime.UtcNow, "original", "data", out var result);
|
||||
|
||||
// assert
|
||||
Assert.That(handled, Is.True);
|
||||
Assert.That(result, Is.SameAs(expectedResult));
|
||||
Assert.That(calls, Is.EqualTo(new[] { "first" }));
|
||||
}
|
||||
|
||||
[Test]
|
||||
public void Handle_Should_ContinueAfterNonNullMatchingResult_WhenMultipleReadersIsTrue()
|
||||
{
|
||||
// arrange
|
||||
var calls = new List<string>();
|
||||
var expectedResult = CallResult.SuccessResult;
|
||||
var collection = new QueryRouteCollection(typeof(string));
|
||||
collection.AddRoute("topic", MessageRoute<string>.CreateWithTopicFilter("type", "topic", (_, _, _, _) =>
|
||||
{
|
||||
calls.Add("first");
|
||||
return expectedResult;
|
||||
}, true));
|
||||
collection.AddRoute("topic", MessageRoute<string>.CreateWithTopicFilter("type", "topic", (_, _, _, _) =>
|
||||
{
|
||||
calls.Add("second");
|
||||
return new CallResult(null);
|
||||
}));
|
||||
collection.Build();
|
||||
|
||||
// act
|
||||
var handled = collection.Handle("topic", null!, DateTime.UtcNow, "original", "data", out var result);
|
||||
|
||||
// assert
|
||||
Assert.That(handled, Is.True);
|
||||
Assert.That(result, Is.SameAs(expectedResult));
|
||||
Assert.That(calls, Is.EqualTo(new[] { "first", "second" }));
|
||||
}
|
||||
|
||||
[Test]
|
||||
public void Handle_Should_ContinueUntilNonNullResult_WhenEarlierMatchingRoutesReturnNull()
|
||||
{
|
||||
// arrange
|
||||
var calls = new List<string>();
|
||||
var expectedResult = CallResult.SuccessResult;
|
||||
var collection = new QueryRouteCollection(typeof(string));
|
||||
collection.AddRoute("topic", MessageRoute<string>.CreateWithTopicFilter("type", "topic", (_, _, _, _) =>
|
||||
{
|
||||
calls.Add("first");
|
||||
return null;
|
||||
}));
|
||||
collection.AddRoute("topic", MessageRoute<string>.CreateWithTopicFilter("type", "topic", (_, _, _, _) =>
|
||||
{
|
||||
calls.Add("second");
|
||||
return expectedResult;
|
||||
}));
|
||||
collection.AddRoute("topic", MessageRoute<string>.CreateWithTopicFilter("type", "topic", (_, _, _, _) =>
|
||||
{
|
||||
calls.Add("third");
|
||||
return new CallResult(null);
|
||||
}));
|
||||
collection.Build();
|
||||
|
||||
// act
|
||||
var handled = collection.Handle("topic", null!, DateTime.UtcNow, "original", "data", out var result);
|
||||
|
||||
// assert
|
||||
Assert.That(handled, Is.True);
|
||||
Assert.That(result, Is.SameAs(expectedResult));
|
||||
Assert.That(calls, Is.EqualTo(new[] { "first", "second" }));
|
||||
}
|
||||
|
||||
[Test]
|
||||
public void Handle_Should_ReturnHandledTrue_WhenMatchingRoutesReturnNull()
|
||||
{
|
||||
// arrange
|
||||
var collection = new QueryRouteCollection(typeof(string));
|
||||
collection.AddRoute("topic", MessageRoute<string>.CreateWithTopicFilter("type", "topic", (_, _, _, _) => null));
|
||||
collection.Build();
|
||||
|
||||
// act
|
||||
var handled = collection.Handle("topic", null!, DateTime.UtcNow, "original", "data", out var result);
|
||||
|
||||
// assert
|
||||
Assert.That(handled, Is.True);
|
||||
Assert.That(result, Is.Null);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,167 @@
|
||||
using CryptoExchange.Net.Sockets.Default;
|
||||
using CryptoExchange.Net.Sockets.Default.Routing;
|
||||
using CryptoExchange.Net.Sockets.Interfaces;
|
||||
using NUnit.Framework;
|
||||
using System;
|
||||
using System.Linq;
|
||||
|
||||
namespace CryptoExchange.Net.UnitTests.SocketRoutingTests
|
||||
{
|
||||
[TestFixture]
|
||||
public class RoutingTableTests
|
||||
{
|
||||
[Test]
|
||||
public void Update_Should_CreateEntriesPerTypeIdentifier_WithCorrectDeserializationTypeAndHandlers()
|
||||
{
|
||||
// arrange
|
||||
var processor1 = new TestMessageProcessor(
|
||||
1,
|
||||
MessageRouter.Create(
|
||||
MessageRoute<string>.CreateWithoutTopicFilter("type1", (_, _, _, _) => null),
|
||||
MessageRoute<string>.CreateWithTopicFilter("type1", "topic1", (_, _, _, _) => null)));
|
||||
|
||||
var processor2 = new TestMessageProcessor(
|
||||
2,
|
||||
MessageRouter.Create(
|
||||
MessageRoute<int>.CreateWithTopicFilter("type2", "topic2", (_, _, _, _) => null)));
|
||||
|
||||
var table = new RoutingTable();
|
||||
|
||||
// act
|
||||
table.Update(new IMessageProcessor[] { processor1, processor2 });
|
||||
|
||||
var type1Entry = table.GetRouteTableEntry("type1");
|
||||
var type2Entry = table.GetRouteTableEntry("type2");
|
||||
var missingEntry = table.GetRouteTableEntry("missing");
|
||||
|
||||
// assert
|
||||
Assert.That(type1Entry, Is.Not.Null);
|
||||
Assert.That(type2Entry, Is.Not.Null);
|
||||
Assert.That(missingEntry, Is.Null);
|
||||
|
||||
Assert.That(type1Entry!.DeserializationType, Is.EqualTo(typeof(string)));
|
||||
Assert.That(type1Entry.IsStringOutput, Is.True);
|
||||
Assert.That(type1Entry.Handlers, Has.Count.EqualTo(1));
|
||||
Assert.That(type1Entry.Handlers.Single(), Is.SameAs(processor1));
|
||||
|
||||
Assert.That(type2Entry!.DeserializationType, Is.EqualTo(typeof(int)));
|
||||
Assert.That(type2Entry.IsStringOutput, Is.False);
|
||||
Assert.That(type2Entry.Handlers, Has.Count.EqualTo(1));
|
||||
Assert.That(type2Entry.Handlers.Single(), Is.SameAs(processor2));
|
||||
}
|
||||
|
||||
[Test]
|
||||
public void Update_Should_AddMultipleProcessors_ForSameTypeIdentifier()
|
||||
{
|
||||
// arrange
|
||||
var processor1 = new TestMessageProcessor(
|
||||
1,
|
||||
MessageRouter.Create(
|
||||
MessageRoute<string>.CreateWithoutTopicFilter("type1", (_, _, _, _) => null)));
|
||||
|
||||
var processor2 = new TestMessageProcessor(
|
||||
2,
|
||||
MessageRouter.Create(
|
||||
MessageRoute<string>.CreateWithTopicFilter("type1", "topic1", (_, _, _, _) => null)));
|
||||
|
||||
var table = new RoutingTable();
|
||||
|
||||
// act
|
||||
table.Update(new IMessageProcessor[] { processor1, processor2 });
|
||||
var entry = table.GetRouteTableEntry("type1");
|
||||
|
||||
// assert
|
||||
Assert.That(entry, Is.Not.Null);
|
||||
Assert.That(entry!.DeserializationType, Is.EqualTo(typeof(string)));
|
||||
Assert.That(entry.Handlers, Has.Count.EqualTo(2));
|
||||
Assert.That(entry.Handlers, Does.Contain(processor1));
|
||||
Assert.That(entry.Handlers, Does.Contain(processor2));
|
||||
}
|
||||
|
||||
[Test]
|
||||
public void Update_Should_ReplacePreviousEntries()
|
||||
{
|
||||
// arrange
|
||||
var initialProcessor = new TestMessageProcessor(
|
||||
1,
|
||||
MessageRouter.Create(
|
||||
MessageRoute<string>.CreateWithoutTopicFilter("type1", (_, _, _, _) => null)));
|
||||
|
||||
var replacementProcessor = new TestMessageProcessor(
|
||||
2,
|
||||
MessageRouter.Create(
|
||||
MessageRoute<int>.CreateWithoutTopicFilter("type2", (_, _, _, _) => null)));
|
||||
|
||||
var table = new RoutingTable();
|
||||
table.Update(new IMessageProcessor[] { initialProcessor });
|
||||
|
||||
// act
|
||||
table.Update(new IMessageProcessor[] { replacementProcessor });
|
||||
|
||||
var oldEntry = table.GetRouteTableEntry("type1");
|
||||
var newEntry = table.GetRouteTableEntry("type2");
|
||||
|
||||
// assert
|
||||
Assert.That(oldEntry, Is.Null);
|
||||
Assert.That(newEntry, Is.Not.Null);
|
||||
Assert.That(newEntry!.DeserializationType, Is.EqualTo(typeof(int)));
|
||||
Assert.That(newEntry.Handlers, Has.Count.EqualTo(1));
|
||||
Assert.That(newEntry.Handlers.Single(), Is.SameAs(replacementProcessor));
|
||||
}
|
||||
|
||||
[Test]
|
||||
public void Update_WithEmptyProcessors_Should_ClearEntries()
|
||||
{
|
||||
// arrange
|
||||
var processor = new TestMessageProcessor(
|
||||
1,
|
||||
MessageRouter.Create(
|
||||
MessageRoute<string>.CreateWithoutTopicFilter("type1", (_, _, _, _) => null)));
|
||||
|
||||
var table = new RoutingTable();
|
||||
table.Update(new IMessageProcessor[] { processor });
|
||||
|
||||
// act
|
||||
table.Update(Array.Empty<IMessageProcessor>());
|
||||
|
||||
// assert
|
||||
Assert.That(table.GetRouteTableEntry("type1"), Is.Null);
|
||||
}
|
||||
|
||||
[Test]
|
||||
public void TypeRoutingCollection_Should_SetIsStringOutput_BasedOnDeserializationType()
|
||||
{
|
||||
// arrange & act
|
||||
var stringCollection = new TypeRoutingCollection(typeof(string));
|
||||
var intCollection = new TypeRoutingCollection(typeof(int));
|
||||
|
||||
// assert
|
||||
Assert.That(stringCollection.IsStringOutput, Is.True);
|
||||
Assert.That(stringCollection.DeserializationType, Is.EqualTo(typeof(string)));
|
||||
Assert.That(stringCollection.Handlers, Is.Empty);
|
||||
|
||||
Assert.That(intCollection.IsStringOutput, Is.False);
|
||||
Assert.That(intCollection.DeserializationType, Is.EqualTo(typeof(int)));
|
||||
Assert.That(intCollection.Handlers, Is.Empty);
|
||||
}
|
||||
|
||||
private sealed class TestMessageProcessor : IMessageProcessor
|
||||
{
|
||||
public int Id { get; }
|
||||
public MessageRouter MessageRouter { get; }
|
||||
|
||||
public TestMessageProcessor(int id, MessageRouter messageRouter)
|
||||
{
|
||||
Id = id;
|
||||
MessageRouter = messageRouter;
|
||||
}
|
||||
|
||||
public event Action? OnMessageRouterUpdated;
|
||||
|
||||
public bool Handle(string typeIdentifier, string? topicFilter, SocketConnection socketConnection, DateTime receiveTime, string? originalData, object result)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,160 @@
|
||||
using CryptoExchange.Net.Objects;
|
||||
using CryptoExchange.Net.Sockets.Default.Routing;
|
||||
using NUnit.Framework;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
|
||||
namespace CryptoExchange.Net.UnitTests.SocketRoutingTests
|
||||
{
|
||||
[TestFixture]
|
||||
public class SubscriptionRouterTests
|
||||
{
|
||||
[Test]
|
||||
public void BuildFromRoutes_Should_GroupRoutesByTypeIdentifier_AndSetDeserializationType()
|
||||
{
|
||||
// arrange
|
||||
var routes = new MessageRoute[]
|
||||
{
|
||||
MessageRoute<string>.CreateWithoutTopicFilter("type1", (_, _, _, _) => null),
|
||||
MessageRoute<string>.CreateWithTopicFilter("type1", "topic1", (_, _, _, _) => null),
|
||||
MessageRoute<int>.CreateWithTopicFilter("type2", "topic2", (_, _, _, _) => null)
|
||||
};
|
||||
|
||||
var router = new SubscriptionRouter(routes);
|
||||
|
||||
// act
|
||||
var type1Routes = router.GetRoutes("type1");
|
||||
var type2Routes = router.GetRoutes("type2");
|
||||
var missingRoutes = router.GetRoutes("missing");
|
||||
|
||||
// assert
|
||||
Assert.That(type1Routes, Is.Not.Null);
|
||||
Assert.That(type2Routes, Is.Not.Null);
|
||||
Assert.That(missingRoutes, Is.Null);
|
||||
|
||||
Assert.That(type1Routes, Is.TypeOf<SubscriptionRouteCollection>());
|
||||
Assert.That(type2Routes, Is.TypeOf<SubscriptionRouteCollection>());
|
||||
Assert.That(type1Routes!.DeserializationType, Is.EqualTo(typeof(string)));
|
||||
Assert.That(type2Routes!.DeserializationType, Is.EqualTo(typeof(int)));
|
||||
}
|
||||
|
||||
[Test]
|
||||
public void Handle_Should_InvokeRoutesWithoutTopicFilter_WhenTopicFilterIsNull()
|
||||
{
|
||||
// arrange
|
||||
var calls = new List<string>();
|
||||
var collection = new SubscriptionRouteCollection(typeof(string));
|
||||
collection.AddRoute(null, MessageRoute<string>.CreateWithoutTopicFilter("type", (_, _, _, _) =>
|
||||
{
|
||||
calls.Add("no-topic");
|
||||
return null;
|
||||
}));
|
||||
collection.AddRoute("topic", MessageRoute<string>.CreateWithTopicFilter("type", "topic", (_, _, _, _) =>
|
||||
{
|
||||
calls.Add("topic");
|
||||
return null;
|
||||
}));
|
||||
collection.Build();
|
||||
|
||||
// act
|
||||
var handled = collection.Handle(null, null!, DateTime.UtcNow, "original", "data", out var result);
|
||||
|
||||
// assert
|
||||
Assert.That(handled, Is.True);
|
||||
Assert.That(result, Is.SameAs(CallResult.SuccessResult));
|
||||
Assert.That(calls, Is.EqualTo(new[] { "no-topic" }));
|
||||
}
|
||||
|
||||
[Test]
|
||||
public void Handle_Should_ReturnFalse_WhenNoRoutesMatch()
|
||||
{
|
||||
// arrange
|
||||
var collection = new SubscriptionRouteCollection(typeof(string));
|
||||
collection.AddRoute("other-topic", MessageRoute<string>.CreateWithTopicFilter("type", "other-topic", (_, _, _, _) => null));
|
||||
collection.Build();
|
||||
|
||||
// act
|
||||
var handled = collection.Handle("topic", null!, DateTime.UtcNow, "original", "data", out var result);
|
||||
|
||||
// assert
|
||||
Assert.That(handled, Is.False);
|
||||
Assert.That(result, Is.SameAs(CallResult.SuccessResult));
|
||||
}
|
||||
|
||||
[Test]
|
||||
public void Handle_Should_InvokeRoutesWithoutTopicFilter_AndMatchingTopicRoutes()
|
||||
{
|
||||
// arrange
|
||||
var calls = new List<string>();
|
||||
var collection = new SubscriptionRouteCollection(typeof(string));
|
||||
collection.AddRoute(null, MessageRoute<string>.CreateWithoutTopicFilter("type", (_, _, _, _) =>
|
||||
{
|
||||
calls.Add("no-topic");
|
||||
return null;
|
||||
}));
|
||||
collection.AddRoute("topic", MessageRoute<string>.CreateWithTopicFilter("type", "topic", (_, _, _, _) =>
|
||||
{
|
||||
calls.Add("topic");
|
||||
return null;
|
||||
}));
|
||||
collection.Build();
|
||||
|
||||
// act
|
||||
var handled = collection.Handle("topic", null!, DateTime.UtcNow, "original", "data", out var result);
|
||||
|
||||
// assert
|
||||
Assert.That(handled, Is.True);
|
||||
Assert.That(result, Is.SameAs(CallResult.SuccessResult));
|
||||
Assert.That(calls, Is.EqualTo(new[] { "no-topic", "topic" }));
|
||||
}
|
||||
|
||||
[Test]
|
||||
public void Handle_Should_InvokeAllMatchingTopicRoutes()
|
||||
{
|
||||
// arrange
|
||||
var calls = new List<string>();
|
||||
var collection = new SubscriptionRouteCollection(typeof(string));
|
||||
collection.AddRoute("topic", MessageRoute<string>.CreateWithTopicFilter("type", "topic", (_, _, _, _) =>
|
||||
{
|
||||
calls.Add("first");
|
||||
return CallResult.SuccessResult;
|
||||
}));
|
||||
collection.AddRoute("topic", MessageRoute<string>.CreateWithTopicFilter("type", "topic", (_, _, _, _) =>
|
||||
{
|
||||
calls.Add("second");
|
||||
return null;
|
||||
}));
|
||||
collection.Build();
|
||||
|
||||
// act
|
||||
var handled = collection.Handle("topic", null!, DateTime.UtcNow, "original", "data", out var result);
|
||||
|
||||
// assert
|
||||
Assert.That(handled, Is.True);
|
||||
Assert.That(result, Is.SameAs(CallResult.SuccessResult));
|
||||
Assert.That(calls, Is.EqualTo(new[] { "first", "second" }));
|
||||
}
|
||||
|
||||
[Test]
|
||||
public void Handle_Should_NotInvokeTopicRoutes_WhenTopicFilterIsNull()
|
||||
{
|
||||
// arrange
|
||||
var calls = new List<string>();
|
||||
var collection = new SubscriptionRouteCollection(typeof(string));
|
||||
collection.AddRoute("topic", MessageRoute<string>.CreateWithTopicFilter("type", "topic", (_, _, _, _) =>
|
||||
{
|
||||
calls.Add("topic");
|
||||
return null;
|
||||
}));
|
||||
collection.Build();
|
||||
|
||||
// act
|
||||
var handled = collection.Handle(null, null!, DateTime.UtcNow, "original", "data", out var result);
|
||||
|
||||
// assert
|
||||
Assert.That(handled, Is.False);
|
||||
Assert.That(result, Is.SameAs(CallResult.SuccessResult));
|
||||
Assert.That(calls, Is.Empty);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -230,7 +230,7 @@ namespace CryptoExchange.Net.Converters.SystemTextJson
|
||||
|
||||
/// <summary>
|
||||
/// Try to get the enum value based on the string value using the Utf8JsonReader's ValueTextEquals method.
|
||||
/// This is an optimization to avoid string allocations when possible, but can only match case insensitively
|
||||
/// This is an optimization to avoid string allocations when possible, but can only match case sensitively
|
||||
/// </summary>
|
||||
private static T? GetValueOptimistic(ref Utf8JsonReader reader)
|
||||
{
|
||||
|
||||
@ -10,7 +10,7 @@ namespace CryptoExchange.Net.Sockets.Default.Routing
|
||||
/// </summary>
|
||||
public class MessageRouter
|
||||
{
|
||||
private RoutingSubTable? _routingTable;
|
||||
private ProcessorRouter? _routingTable;
|
||||
|
||||
/// <summary>
|
||||
/// The routes registered for this router
|
||||
@ -28,17 +28,29 @@ namespace CryptoExchange.Net.Sockets.Default.Routing
|
||||
/// <summary>
|
||||
/// Build the route mapping
|
||||
/// </summary>
|
||||
public void BuildRouteMap()
|
||||
public void BuildQueryRouter()
|
||||
{
|
||||
_routingTable = new RoutingSubTable(Routes);
|
||||
_routingTable = new QueryRouter(Routes);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Get routes matching the type identifier
|
||||
/// Build the route mapping
|
||||
/// </summary>
|
||||
internal RoutingSubTableEntry? this[string identifier]
|
||||
public void BuildSubscriptionRouter()
|
||||
{
|
||||
get => (_routingTable ?? throw new InvalidOperationException("Route map not initialized before use"))[identifier];
|
||||
_routingTable = new SubscriptionRouter(Routes);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Handle message
|
||||
/// </summary>
|
||||
public bool Handle(string typeIdentifier, string? topicFilter, SocketConnection connection, DateTime receiveTime, string? originalData, object data, out CallResult? result)
|
||||
{
|
||||
var routeCollection = (_routingTable ?? throw new NullReferenceException("Routing table not build before handling")).GetRoutes(typeIdentifier);
|
||||
if (routeCollection == null)
|
||||
throw new InvalidOperationException($"No routes for {typeIdentifier} message type");
|
||||
|
||||
return routeCollection.Handle(topicFilter, connection, receiveTime, originalData, data, out result);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
||||
@ -0,0 +1,36 @@
|
||||
using System.Collections.Generic;
|
||||
#if NET8_0_OR_GREATER
|
||||
using System.Collections.Frozen;
|
||||
#endif
|
||||
|
||||
namespace CryptoExchange.Net.Sockets.Default.Routing
|
||||
{
|
||||
internal abstract class ProcessorRouter
|
||||
{
|
||||
public abstract RouteCollection? GetRoutes(string identifier);
|
||||
}
|
||||
|
||||
internal abstract class ProcessorRouter<T> : ProcessorRouter
|
||||
where T : RouteCollection
|
||||
{
|
||||
#if NET8_0_OR_GREATER
|
||||
private FrozenDictionary<string, T> _routeMap;
|
||||
#else
|
||||
private Dictionary<string, T> _routeMap;
|
||||
#endif
|
||||
|
||||
public ProcessorRouter(IEnumerable<MessageRoute> routes)
|
||||
{
|
||||
var map = BuildFromRoutes(routes);
|
||||
#if NET8_0_OR_GREATER
|
||||
_routeMap = map.ToFrozenDictionary();
|
||||
#else
|
||||
_routeMap = map;
|
||||
#endif
|
||||
}
|
||||
|
||||
public abstract Dictionary<string, T> BuildFromRoutes(IEnumerable<MessageRoute> routes);
|
||||
|
||||
public override RouteCollection? GetRoutes(string identifier) => _routeMap.TryGetValue(identifier, out var routes) ? routes : null;
|
||||
}
|
||||
}
|
||||
90
CryptoExchange.Net/Sockets/Default/Routing/QueryRouter.cs
Normal file
90
CryptoExchange.Net/Sockets/Default/Routing/QueryRouter.cs
Normal file
@ -0,0 +1,90 @@
|
||||
using CryptoExchange.Net.Objects;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
|
||||
namespace CryptoExchange.Net.Sockets.Default.Routing
|
||||
{
|
||||
internal class QueryRouter : ProcessorRouter<QueryRouteCollection>
|
||||
{
|
||||
public QueryRouter(IEnumerable<MessageRoute> routes) : base(routes)
|
||||
{
|
||||
}
|
||||
|
||||
public override Dictionary<string, QueryRouteCollection> BuildFromRoutes(IEnumerable<MessageRoute> routes)
|
||||
{
|
||||
var newMap = new Dictionary<string, QueryRouteCollection>();
|
||||
foreach (var route in routes)
|
||||
{
|
||||
if (!newMap.TryGetValue(route.TypeIdentifier, out var typeMap))
|
||||
{
|
||||
typeMap = new QueryRouteCollection(route.DeserializationType);
|
||||
newMap.Add(route.TypeIdentifier, typeMap);
|
||||
}
|
||||
|
||||
typeMap.AddRoute(route.TopicFilter, route);
|
||||
}
|
||||
|
||||
foreach (var subEntry in newMap.Values)
|
||||
subEntry.Build();
|
||||
|
||||
return newMap;
|
||||
}
|
||||
}
|
||||
|
||||
internal class QueryRouteCollection : RouteCollection
|
||||
{
|
||||
public bool MultipleReaders { get; private set; }
|
||||
|
||||
public QueryRouteCollection(Type routeType) : base(routeType)
|
||||
{
|
||||
}
|
||||
|
||||
public override void AddRoute(string? topicFilter, MessageRoute route)
|
||||
{
|
||||
base.AddRoute(topicFilter, route);
|
||||
|
||||
if (route.MultipleReaders)
|
||||
MultipleReaders = true;
|
||||
}
|
||||
|
||||
public override bool Handle(string? topicFilter, SocketConnection connection, DateTime receiveTime, string? originalData, object data, out CallResult? result)
|
||||
{
|
||||
result = null;
|
||||
|
||||
// Routes without topic filter handle both when the message topic is empty and when it is not, so we always call them
|
||||
var handled = false;
|
||||
foreach (var route in _routesWithoutTopicFilter)
|
||||
{
|
||||
var thisResult = route.Handle(connection, receiveTime, originalData, data);
|
||||
if (thisResult != null)
|
||||
result ??= thisResult;
|
||||
|
||||
handled = true;
|
||||
}
|
||||
|
||||
// Forward to routes with matching topic filter, if any
|
||||
if (topicFilter == null)
|
||||
return handled;
|
||||
|
||||
var matchingTopicRoutes = GetRoutesWithMatchingTopicFilter(topicFilter);
|
||||
if (matchingTopicRoutes == null)
|
||||
return handled;
|
||||
|
||||
foreach (var route in matchingTopicRoutes)
|
||||
{
|
||||
var thisResult = route.Handle(connection, receiveTime, originalData, data);
|
||||
handled = true;
|
||||
|
||||
if (thisResult != null)
|
||||
{
|
||||
result ??= thisResult;
|
||||
|
||||
if (!MultipleReaders)
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return handled;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,65 @@
|
||||
using CryptoExchange.Net.Objects;
|
||||
using System;
|
||||
#if NET8_0_OR_GREATER
|
||||
using System.Collections.Frozen;
|
||||
#endif
|
||||
using System.Collections.Generic;
|
||||
|
||||
namespace CryptoExchange.Net.Sockets.Default.Routing
|
||||
{
|
||||
internal abstract class RouteCollection
|
||||
{
|
||||
protected List<MessageRoute> _routesWithoutTopicFilter;
|
||||
protected Dictionary<string, List<MessageRoute>> _routesWithTopicFilter;
|
||||
#if NET8_0_OR_GREATER
|
||||
protected FrozenDictionary<string, List<MessageRoute>>? _routesWithTopicFilterFrozen;
|
||||
#endif
|
||||
|
||||
public Type DeserializationType { get; }
|
||||
|
||||
public RouteCollection(Type routeType)
|
||||
{
|
||||
_routesWithoutTopicFilter = new List<MessageRoute>();
|
||||
_routesWithTopicFilter = new Dictionary<string, List<MessageRoute>>();
|
||||
|
||||
DeserializationType = routeType;
|
||||
}
|
||||
|
||||
public virtual void AddRoute(string? topicFilter, MessageRoute route)
|
||||
{
|
||||
if (string.IsNullOrEmpty(topicFilter))
|
||||
{
|
||||
_routesWithoutTopicFilter.Add(route);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (!_routesWithTopicFilter.TryGetValue(topicFilter!, out var list))
|
||||
{
|
||||
list = new List<MessageRoute>();
|
||||
_routesWithTopicFilter.Add(topicFilter!, list);
|
||||
}
|
||||
|
||||
list.Add(route);
|
||||
}
|
||||
}
|
||||
|
||||
public void Build()
|
||||
{
|
||||
#if NET8_0_OR_GREATER
|
||||
_routesWithTopicFilterFrozen = _routesWithTopicFilter.ToFrozenDictionary();
|
||||
#endif
|
||||
}
|
||||
|
||||
protected List<MessageRoute>? GetRoutesWithMatchingTopicFilter(string topicFilter)
|
||||
{
|
||||
#if NET8_0_OR_GREATER
|
||||
_routesWithTopicFilterFrozen!.TryGetValue(topicFilter, out var matchingTopicRoutes);
|
||||
#else
|
||||
_routesWithTopicFilter.TryGetValue(topicFilter, out var matchingTopicRoutes);
|
||||
#endif
|
||||
return matchingTopicRoutes;
|
||||
}
|
||||
|
||||
public abstract bool Handle(string? topicFilter, SocketConnection connection, DateTime receiveTime, string? originalData, object data, out CallResult? result);
|
||||
}
|
||||
}
|
||||
@ -1,142 +0,0 @@
|
||||
using CryptoExchange.Net.Objects;
|
||||
using System;
|
||||
#if NET8_0_OR_GREATER
|
||||
using System.Collections.Frozen;
|
||||
#endif
|
||||
using System.Collections.Generic;
|
||||
|
||||
namespace CryptoExchange.Net.Sockets.Default.Routing
|
||||
{
|
||||
internal class RoutingSubTable
|
||||
{
|
||||
#if NET8_0_OR_GREATER
|
||||
// Used for mapping a type identifier to the routes matching it
|
||||
private FrozenDictionary<string, RoutingSubTableEntry> _routeMap;
|
||||
#else
|
||||
private Dictionary<string, RoutingSubTableEntry> _routeMap;
|
||||
#endif
|
||||
|
||||
public RoutingSubTable(IEnumerable<MessageRoute> routes)
|
||||
{
|
||||
var newMap = new Dictionary<string, RoutingSubTableEntry>();
|
||||
foreach (var route in routes)
|
||||
{
|
||||
if (!newMap.TryGetValue(route.TypeIdentifier, out var typeMap))
|
||||
{
|
||||
typeMap = new RoutingSubTableEntry(route.DeserializationType);
|
||||
newMap.Add(route.TypeIdentifier, typeMap);
|
||||
}
|
||||
|
||||
typeMap.AddRoute(route.TopicFilter, route);
|
||||
}
|
||||
|
||||
foreach(var subEntry in newMap.Values)
|
||||
subEntry.Build();
|
||||
|
||||
#if NET8_0_OR_GREATER
|
||||
_routeMap = newMap.ToFrozenDictionary();
|
||||
#else
|
||||
_routeMap = newMap;
|
||||
#endif
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Get routes matching the type identifier
|
||||
/// </summary>
|
||||
public RoutingSubTableEntry? this[string identifier]
|
||||
{
|
||||
get => _routeMap.TryGetValue(identifier, out var routes) ? routes : null;
|
||||
}
|
||||
}
|
||||
|
||||
internal record RoutingSubTableEntry
|
||||
{
|
||||
public Type DeserializationType { get; }
|
||||
public bool MultipleReaders { get; private set; }
|
||||
|
||||
private List<MessageRoute> _routesWithoutTopicFilter;
|
||||
private Dictionary<string, List<MessageRoute>> _routesWithTopicFilter;
|
||||
#if NET8_0_OR_GREATER
|
||||
private FrozenDictionary<string, List<MessageRoute>>? _routesWithTopicFilterFrozen;
|
||||
#endif
|
||||
|
||||
public RoutingSubTableEntry(Type routeType)
|
||||
{
|
||||
_routesWithoutTopicFilter = new List<MessageRoute>();
|
||||
_routesWithTopicFilter = new Dictionary<string, List<MessageRoute>>();
|
||||
|
||||
DeserializationType = routeType;
|
||||
}
|
||||
|
||||
public void AddRoute(string? topicFilter, MessageRoute route)
|
||||
{
|
||||
if (string.IsNullOrEmpty(topicFilter))
|
||||
{
|
||||
_routesWithoutTopicFilter.Add(route);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (!_routesWithTopicFilter.TryGetValue(topicFilter!, out var list))
|
||||
{
|
||||
list = new List<MessageRoute>();
|
||||
_routesWithTopicFilter.Add(topicFilter!, list);
|
||||
}
|
||||
|
||||
list.Add(route);
|
||||
}
|
||||
|
||||
if (route.MultipleReaders)
|
||||
MultipleReaders = true;
|
||||
}
|
||||
|
||||
public void Build()
|
||||
{
|
||||
#if NET8_0_OR_GREATER
|
||||
_routesWithTopicFilterFrozen = _routesWithTopicFilter.ToFrozenDictionary();
|
||||
#endif
|
||||
}
|
||||
|
||||
internal bool Handle(string? topicFilter, SocketConnection connection, DateTime receiveTime, string? originalData, object data, out CallResult? result)
|
||||
{
|
||||
result = null;
|
||||
|
||||
// Routes without topic filter handle both when the message topic is empty and when it is not, so we always call them
|
||||
var handled = false;
|
||||
foreach (var route in _routesWithoutTopicFilter)
|
||||
{
|
||||
var thisResult = route.Handle(connection, receiveTime, originalData, data);
|
||||
if (thisResult != null)
|
||||
result ??= thisResult;
|
||||
|
||||
handled = true;
|
||||
}
|
||||
|
||||
// Forward to routes with matching topic filter, if any
|
||||
if (topicFilter != null)
|
||||
{
|
||||
#if NET8_0_OR_GREATER
|
||||
_routesWithTopicFilterFrozen!.TryGetValue(topicFilter, out var matchingTopicRoutes);
|
||||
#else
|
||||
_routesWithTopicFilter.TryGetValue(topicFilter, out var matchingTopicRoutes);
|
||||
#endif
|
||||
foreach (var route in matchingTopicRoutes ?? [])
|
||||
{
|
||||
var thisResult = route.Handle(connection, receiveTime, originalData, data);
|
||||
handled = true;
|
||||
|
||||
if (thisResult != null)
|
||||
{
|
||||
result ??= thisResult;
|
||||
|
||||
#warning MultipleReaders is only for queries, subscriptions should always have multiple readers = true. Maybe create different RoutingSubTable implementations for Queries and Subscriptions?
|
||||
if (!MultipleReaders)
|
||||
break;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
return handled;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -2,6 +2,9 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Text;
|
||||
#if NET8_0_OR_GREATER
|
||||
using System.Collections.Frozen;
|
||||
#endif
|
||||
|
||||
namespace CryptoExchange.Net.Sockets.Default.Routing
|
||||
{
|
||||
@ -10,40 +13,51 @@ namespace CryptoExchange.Net.Sockets.Default.Routing
|
||||
/// </summary>
|
||||
public class RoutingTable
|
||||
{
|
||||
private Dictionary<string, RoutingTableEntry> _routeTableEntries;
|
||||
#if NET8_0_OR_GREATER
|
||||
private FrozenDictionary<string, TypeRoutingCollection> _typeRoutingCollections = new Dictionary<string, TypeRoutingCollection>().ToFrozenDictionary();
|
||||
#else
|
||||
private Dictionary<string, TypeRoutingCollection> _typeRoutingCollections = new();
|
||||
#endif
|
||||
|
||||
/// <summary>
|
||||
/// Create routing table for provided processors
|
||||
/// Update the routing table
|
||||
/// </summary>
|
||||
public RoutingTable(IEnumerable<IMessageProcessor> processors)
|
||||
/// <param name="processors"></param>
|
||||
public void Update(IEnumerable<IMessageProcessor> processors)
|
||||
{
|
||||
_routeTableEntries = new Dictionary<string, RoutingTableEntry>();
|
||||
var newTypeMap = new Dictionary<string, TypeRoutingCollection>();
|
||||
foreach (var entry in processors)
|
||||
{
|
||||
foreach (var route in entry.MessageRouter.Routes)
|
||||
{
|
||||
if (!_routeTableEntries.ContainsKey(route.TypeIdentifier))
|
||||
_routeTableEntries.Add(route.TypeIdentifier, new RoutingTableEntry(route.DeserializationType));
|
||||
if (!newTypeMap.ContainsKey(route.TypeIdentifier))
|
||||
newTypeMap.Add(route.TypeIdentifier, new TypeRoutingCollection(route.DeserializationType));
|
||||
|
||||
if (!_routeTableEntries[route.TypeIdentifier].Handlers.Contains(entry))
|
||||
_routeTableEntries[route.TypeIdentifier].Handlers.Add(entry);
|
||||
if (!newTypeMap[route.TypeIdentifier].Handlers.Contains(entry))
|
||||
newTypeMap[route.TypeIdentifier].Handlers.Add(entry);
|
||||
}
|
||||
}
|
||||
|
||||
#if NET8_0_OR_GREATER
|
||||
_typeRoutingCollections = newTypeMap.ToFrozenDictionary();
|
||||
#else
|
||||
_typeRoutingCollections = newTypeMap;
|
||||
#endif
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Get route table entry for a type identifier
|
||||
/// </summary>
|
||||
public RoutingTableEntry? GetRouteTableEntry(string typeIdentifier)
|
||||
public TypeRoutingCollection? GetRouteTableEntry(string typeIdentifier)
|
||||
{
|
||||
return _routeTableEntries.TryGetValue(typeIdentifier, out var entry) ? entry : null;
|
||||
return _typeRoutingCollections.TryGetValue(typeIdentifier, out var entry) ? entry : null;
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public override string ToString()
|
||||
{
|
||||
var sb = new StringBuilder();
|
||||
foreach (var entry in _routeTableEntries)
|
||||
foreach (var entry in _typeRoutingCollections)
|
||||
{
|
||||
sb.AppendLine($"{entry.Key}, {entry.Value.DeserializationType.Name}");
|
||||
foreach(var item in entry.Value.Handlers)
|
||||
@ -69,7 +83,7 @@ namespace CryptoExchange.Net.Sockets.Default.Routing
|
||||
/// <summary>
|
||||
/// Routing table entry
|
||||
/// </summary>
|
||||
public record RoutingTableEntry
|
||||
public record TypeRoutingCollection
|
||||
{
|
||||
/// <summary>
|
||||
/// Whether the deserialization type is string
|
||||
@ -87,7 +101,7 @@ namespace CryptoExchange.Net.Sockets.Default.Routing
|
||||
/// <summary>
|
||||
/// ctor
|
||||
/// </summary>
|
||||
public RoutingTableEntry(Type deserializationType)
|
||||
public TypeRoutingCollection(Type deserializationType)
|
||||
{
|
||||
IsStringOutput = deserializationType == typeof(string);
|
||||
DeserializationType = deserializationType;
|
||||
|
||||
@ -0,0 +1,69 @@
|
||||
using CryptoExchange.Net.Objects;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
|
||||
namespace CryptoExchange.Net.Sockets.Default.Routing
|
||||
{
|
||||
internal class SubscriptionRouter : ProcessorRouter<SubscriptionRouteCollection>
|
||||
{
|
||||
public SubscriptionRouter(IEnumerable<MessageRoute> routes) : base(routes)
|
||||
{
|
||||
}
|
||||
|
||||
public override Dictionary<string, SubscriptionRouteCollection> BuildFromRoutes(IEnumerable<MessageRoute> routes)
|
||||
{
|
||||
var newMap = new Dictionary<string, SubscriptionRouteCollection>();
|
||||
foreach (var route in routes)
|
||||
{
|
||||
if (!newMap.TryGetValue(route.TypeIdentifier, out var typeMap))
|
||||
{
|
||||
typeMap = new SubscriptionRouteCollection(route.DeserializationType);
|
||||
newMap.Add(route.TypeIdentifier, typeMap);
|
||||
}
|
||||
|
||||
typeMap.AddRoute(route.TopicFilter, route);
|
||||
}
|
||||
|
||||
foreach (var subEntry in newMap.Values)
|
||||
subEntry.Build();
|
||||
|
||||
return newMap;
|
||||
}
|
||||
}
|
||||
|
||||
internal class SubscriptionRouteCollection : RouteCollection
|
||||
{
|
||||
public SubscriptionRouteCollection(Type routeType) : base(routeType)
|
||||
{
|
||||
}
|
||||
|
||||
public override bool Handle(string? topicFilter, SocketConnection connection, DateTime receiveTime, string? originalData, object data, out CallResult? result)
|
||||
{
|
||||
result = CallResult.SuccessResult;
|
||||
|
||||
// Routes without topic filter handle both when the message topic is empty and when it is not, so we always call them
|
||||
var handled = false;
|
||||
foreach (var route in _routesWithoutTopicFilter)
|
||||
{
|
||||
route.Handle(connection, receiveTime, originalData, data);
|
||||
handled = true;
|
||||
}
|
||||
|
||||
// Forward to routes with matching topic filter, if any
|
||||
if (topicFilter == null)
|
||||
return handled;
|
||||
|
||||
var matchingTopicRoutes = GetRoutesWithMatchingTopicFilter(topicFilter);
|
||||
if (matchingTopicRoutes == null)
|
||||
return handled;
|
||||
|
||||
foreach (var route in matchingTopicRoutes)
|
||||
{
|
||||
route.Handle(connection, receiveTime, originalData, data);
|
||||
handled = true;
|
||||
}
|
||||
|
||||
return handled;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -262,7 +262,7 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
private readonly object _listenersLock = new object();
|
||||
#endif
|
||||
|
||||
private RoutingTable _routeTable = new RoutingTable([]);
|
||||
private RoutingTable _routingTable = new RoutingTable();
|
||||
|
||||
private ReadOnlyCollection<IMessageProcessor> _listeners;
|
||||
private readonly ILogger _logger;
|
||||
@ -531,7 +531,7 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
return;
|
||||
}
|
||||
|
||||
var routingEntry = _routeTable.GetRouteTableEntry(typeIdentifier);
|
||||
var routingEntry = _routingTable.GetRouteTableEntry(typeIdentifier);
|
||||
if (routingEntry == null)
|
||||
{
|
||||
if (!ApiClient.HandleUnhandledMessage(this, typeIdentifier, data))
|
||||
@ -1139,9 +1139,9 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
});
|
||||
}
|
||||
|
||||
private void BuildRoutingTable()
|
||||
private void UpdateRoutingTable()
|
||||
{
|
||||
_routeTable = new RoutingTable(_listeners);
|
||||
_routingTable.Update(_listeners);
|
||||
}
|
||||
|
||||
private void AddMessageProcessor(IMessageProcessor processor)
|
||||
@ -1150,13 +1150,13 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
{
|
||||
var updatedList = new List<IMessageProcessor>(_listeners);
|
||||
updatedList.Add(processor);
|
||||
processor.OnMessageRouterUpdated += BuildRoutingTable;
|
||||
processor.OnMessageRouterUpdated += UpdateRoutingTable;
|
||||
_listeners = updatedList.AsReadOnly();
|
||||
if (processor.MessageRouter.Routes.Length > 0)
|
||||
{
|
||||
BuildRoutingTable();
|
||||
UpdateRoutingTable();
|
||||
#if DEBUG
|
||||
_logger.LogTrace("Processor added, new routing table:\r\n" + _routeTable.ToString());
|
||||
_logger.LogTrace("Processor added, new routing table:\r\n" + _routingTable.ToString());
|
||||
#endif
|
||||
}
|
||||
}
|
||||
@ -1167,14 +1167,14 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
lock (_listenersLock)
|
||||
{
|
||||
var updatedList = new List<IMessageProcessor>(_listeners);
|
||||
processor.OnMessageRouterUpdated -= UpdateRoutingTable;
|
||||
if (!updatedList.Remove(processor))
|
||||
return; // If nothing removed nothing has changed
|
||||
|
||||
processor.OnMessageRouterUpdated -= BuildRoutingTable;
|
||||
_listeners = updatedList.AsReadOnly();
|
||||
BuildRoutingTable();
|
||||
UpdateRoutingTable();
|
||||
#if DEBUG
|
||||
_logger.LogTrace("Processor removed, new routing table:\r\n" + _routeTable.ToString());
|
||||
_logger.LogTrace("Processor removed, new routing table:\r\n" + _routingTable.ToString());
|
||||
#endif
|
||||
}
|
||||
}
|
||||
@ -1187,7 +1187,7 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
var anyRemoved = false;
|
||||
foreach (var processor in processors)
|
||||
{
|
||||
processor.OnMessageRouterUpdated -= BuildRoutingTable;
|
||||
processor.OnMessageRouterUpdated -= UpdateRoutingTable;
|
||||
if (updatedList.Remove(processor))
|
||||
anyRemoved = true;
|
||||
}
|
||||
@ -1196,9 +1196,9 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
return; // If nothing removed nothing has changed
|
||||
|
||||
_listeners = updatedList.AsReadOnly();
|
||||
BuildRoutingTable();
|
||||
UpdateRoutingTable();
|
||||
#if DEBUG
|
||||
_logger.LogTrace("Processors removed, new routing table:\r\n" + _routeTable.ToString());
|
||||
_logger.LogTrace("Processors removed, new routing table:\r\n" + _routingTable.ToString());
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
||||
@ -82,7 +82,7 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
set
|
||||
{
|
||||
_router = value;
|
||||
_router.BuildRouteMap();
|
||||
_router.BuildSubscriptionRouter();
|
||||
OnMessageRouterUpdated?.Invoke();
|
||||
}
|
||||
}
|
||||
@ -198,7 +198,7 @@ namespace CryptoExchange.Net.Sockets.Default
|
||||
SubscriptionQuery.Timeout();
|
||||
}
|
||||
|
||||
return MessageRouter[typeIdentifier]?.Handle(topicFilter, connection, receiveTime, originalData, data, out _) ?? false;
|
||||
return MessageRouter.Handle(typeIdentifier, topicFilter, connection, receiveTime, originalData, data, out _);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
||||
@ -62,7 +62,7 @@ namespace CryptoExchange.Net.Sockets
|
||||
|
||||
private MessageRouter _router;
|
||||
/// <summary>
|
||||
/// Router for this subscription
|
||||
/// Router for this query
|
||||
/// </summary>
|
||||
public MessageRouter MessageRouter
|
||||
{
|
||||
@ -70,7 +70,7 @@ namespace CryptoExchange.Net.Sockets
|
||||
set
|
||||
{
|
||||
_router = value;
|
||||
_router.BuildRouteMap();
|
||||
_router.BuildQueryRouter();
|
||||
OnMessageRouterUpdated?.Invoke();
|
||||
}
|
||||
}
|
||||
@ -208,12 +208,14 @@ namespace CryptoExchange.Net.Sockets
|
||||
if (CurrentResponses == RequiredResponses)
|
||||
Response = message;
|
||||
|
||||
var handled = false;
|
||||
if (Result?.Success != false)
|
||||
{
|
||||
// If an error result is already set don't override that
|
||||
MessageRouter[typeIdentifier]!.Handle(topicFilter, connection, receiveTime, originalData, message, out var result);
|
||||
MessageRouter.Handle(typeIdentifier, topicFilter, connection, receiveTime, originalData, message, out var result);
|
||||
Result = result;
|
||||
if (Result == null)
|
||||
handled = Result != null;
|
||||
if (!handled)
|
||||
// Null from Handle means it wasn't actually for this query
|
||||
CurrentResponses -= 1;
|
||||
}
|
||||
@ -225,7 +227,7 @@ namespace CryptoExchange.Net.Sockets
|
||||
OnComplete?.Invoke();
|
||||
}
|
||||
|
||||
return true;
|
||||
return handled;
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user