mirror of
https://github.com/JKorf/CryptoExchange.Net
synced 2026-04-13 00:22:22 +00:00
Compare commits
3 Commits
61c66300af
...
5bf89897e8
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5bf89897e8 | ||
|
|
6dbc84e6fc | ||
|
|
f66730ca8c |
@ -1,5 +1,4 @@
|
|||||||
using CryptoExchange.Net.Objects;
|
using CryptoExchange.Net.Objects;
|
||||||
using CryptoExchange.Net.UnitTests.TestImplementations;
|
|
||||||
using NUnit.Framework;
|
using NUnit.Framework;
|
||||||
using System;
|
using System;
|
||||||
using System.Net.Http;
|
using System.Net.Http;
|
||||||
|
|||||||
@ -1,7 +1,6 @@
|
|||||||
using CryptoExchange.Net.Objects;
|
using CryptoExchange.Net.Objects;
|
||||||
using CryptoExchange.Net.Testing;
|
using CryptoExchange.Net.Testing;
|
||||||
using CryptoExchange.Net.UnitTests.Implementations;
|
using CryptoExchange.Net.UnitTests.Implementations;
|
||||||
using CryptoExchange.Net.UnitTests.TestImplementations;
|
|
||||||
using NUnit.Framework;
|
using NUnit.Framework;
|
||||||
using System;
|
using System;
|
||||||
using System.Linq;
|
using System.Linq;
|
||||||
|
|||||||
@ -4,7 +4,7 @@ using NUnit.Framework;
|
|||||||
using System;
|
using System;
|
||||||
using System.Text.Json;
|
using System.Text.Json;
|
||||||
|
|
||||||
namespace CryptoExchange.Net.UnitTests
|
namespace CryptoExchange.Net.UnitTests.ConverterTests
|
||||||
{
|
{
|
||||||
[TestFixture()]
|
[TestFixture()]
|
||||||
public class SharedModelConversionTests
|
public class SharedModelConversionTests
|
||||||
|
|||||||
@ -23,7 +23,7 @@ namespace CryptoExchange.Net.UnitTests.Implementations
|
|||||||
{ }
|
{ }
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Get the CryptoCom environment by name
|
/// Get the environment by name
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public static TestEnvironment? GetEnvironmentByName(string? name)
|
public static TestEnvironment? GetEnvironmentByName(string? name)
|
||||||
=> name switch
|
=> name switch
|
||||||
|
|||||||
@ -1,6 +1,6 @@
|
|||||||
using System.Text.Json.Serialization;
|
using System.Text.Json.Serialization;
|
||||||
|
|
||||||
namespace CryptoExchange.Net.UnitTests.TestImplementations
|
namespace CryptoExchange.Net.UnitTests.Implementations
|
||||||
{
|
{
|
||||||
public class TestObject
|
public class TestObject
|
||||||
{
|
{
|
||||||
|
|||||||
@ -1,6 +1,5 @@
|
|||||||
using CryptoExchange.Net.UnitTests.ConverterTests;
|
using CryptoExchange.Net.UnitTests.ConverterTests;
|
||||||
using CryptoExchange.Net.UnitTests.Implementations;
|
using CryptoExchange.Net.UnitTests.Implementations;
|
||||||
using CryptoExchange.Net.UnitTests.TestImplementations;
|
|
||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
using System.Text.Json.Serialization;
|
using System.Text.Json.Serialization;
|
||||||
|
|
||||||
|
|||||||
@ -18,7 +18,7 @@ namespace CryptoExchange.Net.UnitTests
|
|||||||
}
|
}
|
||||||
|
|
||||||
[Test]
|
[Test]
|
||||||
public void AddingBasicNullValue_ThrowExecption()
|
public void AddingBasicNullValue_ThrowsException()
|
||||||
{
|
{
|
||||||
var parameters = new ParameterCollection();
|
var parameters = new ParameterCollection();
|
||||||
Assert.Throws<ArgumentNullException>(() => parameters.Add("test", null!));
|
Assert.Throws<ArgumentNullException>(() => parameters.Add("test", null!));
|
||||||
|
|||||||
@ -1,94 +0,0 @@
|
|||||||
using CryptoExchange.Net.Objects;
|
|
||||||
using CryptoExchange.Net.Sockets.Default;
|
|
||||||
using CryptoExchange.Net.Sockets.Default.Routing;
|
|
||||||
using CryptoExchange.Net.Sockets.Interfaces;
|
|
||||||
using NUnit.Framework;
|
|
||||||
using System;
|
|
||||||
|
|
||||||
namespace CryptoExchange.Net.UnitTests
|
|
||||||
{
|
|
||||||
internal class RoutingTableTests
|
|
||||||
{
|
|
||||||
[Test]
|
|
||||||
public void Constructor_CreatesEntriesAndDeduplicatesHandlersPerTypeIdentifier()
|
|
||||||
{
|
|
||||||
var processor = new TestMessageProcessor(1, MessageRouter.Create(
|
|
||||||
MessageRoute<string>.CreateWithoutTopicFilter("ticker", EmptyHandler<string>()),
|
|
||||||
MessageRoute<string>.CreateWithTopicFilter("ticker", "btcusdt", EmptyHandler<string>()),
|
|
||||||
MessageRoute<int>.CreateWithoutTopicFilter("trade", EmptyHandler<int>())));
|
|
||||||
|
|
||||||
var routingTable = new RoutingTable(new[] { processor });
|
|
||||||
var tickerEntry = routingTable.GetRouteTableEntry("ticker");
|
|
||||||
var tradeEntry = routingTable.GetRouteTableEntry("trade");
|
|
||||||
|
|
||||||
Assert.Multiple(() =>
|
|
||||||
{
|
|
||||||
Assert.That(tickerEntry, Is.Not.Null);
|
|
||||||
Assert.That(tickerEntry!.DeserializationType, Is.EqualTo(typeof(string)));
|
|
||||||
Assert.That(tickerEntry.IsStringOutput, Is.True);
|
|
||||||
Assert.That(tickerEntry.Handlers, Has.Count.EqualTo(1));
|
|
||||||
Assert.That(tickerEntry.Handlers[0], Is.SameAs(processor));
|
|
||||||
|
|
||||||
Assert.That(tradeEntry, Is.Not.Null);
|
|
||||||
Assert.That(tradeEntry!.DeserializationType, Is.EqualTo(typeof(int)));
|
|
||||||
Assert.That(tradeEntry.IsStringOutput, Is.False);
|
|
||||||
Assert.That(tradeEntry.Handlers, Has.Count.EqualTo(1));
|
|
||||||
Assert.That(tradeEntry.Handlers[0], Is.SameAs(processor));
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
[Test]
|
|
||||||
public void Constructor_AddsAllProcessorsForSameTypeIdentifier()
|
|
||||||
{
|
|
||||||
var processor1 = new TestMessageProcessor(1, MessageRouter.Create(
|
|
||||||
MessageRoute<string>.CreateWithoutTopicFilter("ticker", EmptyHandler<string>())));
|
|
||||||
var processor2 = new TestMessageProcessor(2, MessageRouter.Create(
|
|
||||||
MessageRoute<string>.CreateWithTopicFilter("ticker", "ethusdt", EmptyHandler<string>())));
|
|
||||||
|
|
||||||
var routingTable = new RoutingTable(new IMessageProcessor[] { processor1, processor2 });
|
|
||||||
var entry = routingTable.GetRouteTableEntry("ticker");
|
|
||||||
|
|
||||||
Assert.Multiple(() =>
|
|
||||||
{
|
|
||||||
Assert.That(entry, Is.Not.Null);
|
|
||||||
Assert.That(entry!.Handlers, Has.Count.EqualTo(2));
|
|
||||||
Assert.That(entry.Handlers, Does.Contain(processor1));
|
|
||||||
Assert.That(entry.Handlers, Does.Contain(processor2));
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
[Test]
|
|
||||||
public void GetRouteTableEntry_ReturnsNullForUnknownTypeIdentifier()
|
|
||||||
{
|
|
||||||
var processor = new TestMessageProcessor(1, MessageRouter.Create(
|
|
||||||
MessageRoute<string>.CreateWithoutTopicFilter("ticker", EmptyHandler<string>())));
|
|
||||||
|
|
||||||
var routingTable = new RoutingTable(new[] { processor });
|
|
||||||
|
|
||||||
Assert.That(routingTable.GetRouteTableEntry("unknown"), Is.Null);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static Func<SocketConnection, DateTime, string?, T, CallResult?> EmptyHandler<T>()
|
|
||||||
{
|
|
||||||
return (_, _, _, _) => null;
|
|
||||||
}
|
|
||||||
|
|
||||||
private class TestMessageProcessor : IMessageProcessor
|
|
||||||
{
|
|
||||||
public int Id { get; }
|
|
||||||
public MessageRouter MessageRouter { get; }
|
|
||||||
public event Action? OnMessageRouterUpdated;
|
|
||||||
|
|
||||||
public TestMessageProcessor(int id, MessageRouter messageRouter)
|
|
||||||
{
|
|
||||||
Id = id;
|
|
||||||
MessageRouter = messageRouter;
|
|
||||||
}
|
|
||||||
|
|
||||||
public bool Handle(string typeIdentifier, string? topicFilter, SocketConnection socketConnection, DateTime receiveTime, string? originalData, object result)
|
|
||||||
{
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -0,0 +1,235 @@
|
|||||||
|
using CryptoExchange.Net.Objects;
|
||||||
|
using CryptoExchange.Net.Sockets.Default;
|
||||||
|
using CryptoExchange.Net.Sockets.Default.Routing;
|
||||||
|
using NUnit.Framework;
|
||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
|
||||||
|
namespace CryptoExchange.Net.UnitTests.SocketRoutingTests
|
||||||
|
{
|
||||||
|
[TestFixture]
|
||||||
|
public class QueryRouterTests
|
||||||
|
{
|
||||||
|
[Test]
|
||||||
|
public void BuildFromRoutes_Should_GroupRoutesByTypeIdentifier_AndSetDeserializationType()
|
||||||
|
{
|
||||||
|
// arrange
|
||||||
|
var routes = new MessageRoute[]
|
||||||
|
{
|
||||||
|
MessageRoute<string>.CreateWithoutTopicFilter("type1", (_, _, _, _) => null),
|
||||||
|
MessageRoute<string>.CreateWithTopicFilter("type1", "topic1", (_, _, _, _) => null),
|
||||||
|
MessageRoute<int>.CreateWithTopicFilter("type2", "topic2", (_, _, _, _) => null)
|
||||||
|
};
|
||||||
|
|
||||||
|
var router = new QueryRouter(routes);
|
||||||
|
|
||||||
|
// act
|
||||||
|
var type1Routes = router.GetRoutes("type1");
|
||||||
|
var type2Routes = router.GetRoutes("type2");
|
||||||
|
var missingRoutes = router.GetRoutes("missing");
|
||||||
|
|
||||||
|
// assert
|
||||||
|
Assert.That(type1Routes, Is.Not.Null);
|
||||||
|
Assert.That(type2Routes, Is.Not.Null);
|
||||||
|
Assert.That(missingRoutes, Is.Null);
|
||||||
|
|
||||||
|
Assert.That(type1Routes, Is.TypeOf<QueryRouteCollection>());
|
||||||
|
Assert.That(type2Routes, Is.TypeOf<QueryRouteCollection>());
|
||||||
|
Assert.That(type1Routes!.DeserializationType, Is.EqualTo(typeof(string)));
|
||||||
|
Assert.That(type2Routes!.DeserializationType, Is.EqualTo(typeof(int)));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Test]
|
||||||
|
public void AddRoute_Should_SetMultipleReaders_WhenAnyRouteAllowsMultipleReaders()
|
||||||
|
{
|
||||||
|
// arrange
|
||||||
|
var collection = new QueryRouteCollection(typeof(string));
|
||||||
|
|
||||||
|
// act
|
||||||
|
collection.AddRoute(null, MessageRoute<string>.CreateWithoutTopicFilter("type", (_, _, _, _) => null));
|
||||||
|
var beforeMultipleReaders = collection.MultipleReaders;
|
||||||
|
|
||||||
|
collection.AddRoute("topic", MessageRoute<string>.CreateWithTopicFilter("type", "topic", (_, _, _, _) => null, true));
|
||||||
|
var afterMultipleReaders = collection.MultipleReaders;
|
||||||
|
|
||||||
|
// assert
|
||||||
|
Assert.That(beforeMultipleReaders, Is.False);
|
||||||
|
Assert.That(afterMultipleReaders, Is.True);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Test]
|
||||||
|
public void Handle_Should_InvokeRoutesWithoutTopicFilter_WhenTopicFilterIsNull()
|
||||||
|
{
|
||||||
|
// arrange
|
||||||
|
var calls = new List<string>();
|
||||||
|
var collection = new QueryRouteCollection(typeof(string));
|
||||||
|
collection.AddRoute(null, MessageRoute<string>.CreateWithoutTopicFilter("type", (_, _, _, _) =>
|
||||||
|
{
|
||||||
|
calls.Add("no-topic");
|
||||||
|
return null;
|
||||||
|
}));
|
||||||
|
collection.AddRoute("topic", MessageRoute<string>.CreateWithTopicFilter("type", "topic", (_, _, _, _) =>
|
||||||
|
{
|
||||||
|
calls.Add("topic");
|
||||||
|
return null;
|
||||||
|
}));
|
||||||
|
collection.Build();
|
||||||
|
|
||||||
|
// act
|
||||||
|
var handled = collection.Handle(null, null!, DateTime.UtcNow, "original", "data", out var result);
|
||||||
|
|
||||||
|
// assert
|
||||||
|
Assert.That(handled, Is.True);
|
||||||
|
Assert.That(result, Is.Null);
|
||||||
|
Assert.That(calls, Is.EqualTo(new[] { "no-topic" }));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Test]
|
||||||
|
public void Handle_Should_ReturnFalse_WhenNoRoutesMatch()
|
||||||
|
{
|
||||||
|
// arrange
|
||||||
|
var collection = new QueryRouteCollection(typeof(string));
|
||||||
|
collection.AddRoute("other-topic", MessageRoute<string>.CreateWithTopicFilter("type", "other-topic", (_, _, _, _) => null));
|
||||||
|
collection.Build();
|
||||||
|
|
||||||
|
// act
|
||||||
|
var handled = collection.Handle("topic", null!, DateTime.UtcNow, "original", "data", out var result);
|
||||||
|
|
||||||
|
// assert
|
||||||
|
Assert.That(handled, Is.False);
|
||||||
|
Assert.That(result, Is.Null);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Test]
|
||||||
|
public void Handle_Should_InvokeRoutesWithoutTopicFilter_AndMatchingTopicRoutes()
|
||||||
|
{
|
||||||
|
// arrange
|
||||||
|
var calls = new List<string>();
|
||||||
|
var collection = new QueryRouteCollection(typeof(string));
|
||||||
|
collection.AddRoute(null, MessageRoute<string>.CreateWithoutTopicFilter("type", (_, _, _, _) =>
|
||||||
|
{
|
||||||
|
calls.Add("no-topic");
|
||||||
|
return null;
|
||||||
|
}));
|
||||||
|
collection.AddRoute("topic", MessageRoute<string>.CreateWithTopicFilter("type", "topic", (_, _, _, _) =>
|
||||||
|
{
|
||||||
|
calls.Add("topic");
|
||||||
|
return null;
|
||||||
|
}));
|
||||||
|
collection.Build();
|
||||||
|
|
||||||
|
// act
|
||||||
|
var handled = collection.Handle("topic", null!, DateTime.UtcNow, "original", "data", out var result);
|
||||||
|
|
||||||
|
// assert
|
||||||
|
Assert.That(handled, Is.True);
|
||||||
|
Assert.That(result, Is.Null);
|
||||||
|
Assert.That(calls, Is.EqualTo(new[] { "no-topic", "topic" }));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Test]
|
||||||
|
public void Handle_Should_StopAfterFirstNonNullMatchingResult_WhenMultipleReadersIsFalse()
|
||||||
|
{
|
||||||
|
// arrange
|
||||||
|
var calls = new List<string>();
|
||||||
|
var expectedResult = CallResult.SuccessResult;
|
||||||
|
var collection = new QueryRouteCollection(typeof(string));
|
||||||
|
collection.AddRoute("topic", MessageRoute<string>.CreateWithTopicFilter("type", "topic", (_, _, _, _) =>
|
||||||
|
{
|
||||||
|
calls.Add("first");
|
||||||
|
return expectedResult;
|
||||||
|
}));
|
||||||
|
collection.AddRoute("topic", MessageRoute<string>.CreateWithTopicFilter("type", "topic", (_, _, _, _) =>
|
||||||
|
{
|
||||||
|
calls.Add("second");
|
||||||
|
return new CallResult(null);
|
||||||
|
}));
|
||||||
|
collection.Build();
|
||||||
|
|
||||||
|
// act
|
||||||
|
var handled = collection.Handle("topic", null!, DateTime.UtcNow, "original", "data", out var result);
|
||||||
|
|
||||||
|
// assert
|
||||||
|
Assert.That(handled, Is.True);
|
||||||
|
Assert.That(result, Is.SameAs(expectedResult));
|
||||||
|
Assert.That(calls, Is.EqualTo(new[] { "first" }));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Test]
|
||||||
|
public void Handle_Should_ContinueAfterNonNullMatchingResult_WhenMultipleReadersIsTrue()
|
||||||
|
{
|
||||||
|
// arrange
|
||||||
|
var calls = new List<string>();
|
||||||
|
var expectedResult = CallResult.SuccessResult;
|
||||||
|
var collection = new QueryRouteCollection(typeof(string));
|
||||||
|
collection.AddRoute("topic", MessageRoute<string>.CreateWithTopicFilter("type", "topic", (_, _, _, _) =>
|
||||||
|
{
|
||||||
|
calls.Add("first");
|
||||||
|
return expectedResult;
|
||||||
|
}, true));
|
||||||
|
collection.AddRoute("topic", MessageRoute<string>.CreateWithTopicFilter("type", "topic", (_, _, _, _) =>
|
||||||
|
{
|
||||||
|
calls.Add("second");
|
||||||
|
return new CallResult(null);
|
||||||
|
}));
|
||||||
|
collection.Build();
|
||||||
|
|
||||||
|
// act
|
||||||
|
var handled = collection.Handle("topic", null!, DateTime.UtcNow, "original", "data", out var result);
|
||||||
|
|
||||||
|
// assert
|
||||||
|
Assert.That(handled, Is.True);
|
||||||
|
Assert.That(result, Is.SameAs(expectedResult));
|
||||||
|
Assert.That(calls, Is.EqualTo(new[] { "first", "second" }));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Test]
|
||||||
|
public void Handle_Should_ContinueUntilNonNullResult_WhenEarlierMatchingRoutesReturnNull()
|
||||||
|
{
|
||||||
|
// arrange
|
||||||
|
var calls = new List<string>();
|
||||||
|
var expectedResult = CallResult.SuccessResult;
|
||||||
|
var collection = new QueryRouteCollection(typeof(string));
|
||||||
|
collection.AddRoute("topic", MessageRoute<string>.CreateWithTopicFilter("type", "topic", (_, _, _, _) =>
|
||||||
|
{
|
||||||
|
calls.Add("first");
|
||||||
|
return null;
|
||||||
|
}));
|
||||||
|
collection.AddRoute("topic", MessageRoute<string>.CreateWithTopicFilter("type", "topic", (_, _, _, _) =>
|
||||||
|
{
|
||||||
|
calls.Add("second");
|
||||||
|
return expectedResult;
|
||||||
|
}));
|
||||||
|
collection.AddRoute("topic", MessageRoute<string>.CreateWithTopicFilter("type", "topic", (_, _, _, _) =>
|
||||||
|
{
|
||||||
|
calls.Add("third");
|
||||||
|
return new CallResult(null);
|
||||||
|
}));
|
||||||
|
collection.Build();
|
||||||
|
|
||||||
|
// act
|
||||||
|
var handled = collection.Handle("topic", null!, DateTime.UtcNow, "original", "data", out var result);
|
||||||
|
|
||||||
|
// assert
|
||||||
|
Assert.That(handled, Is.True);
|
||||||
|
Assert.That(result, Is.SameAs(expectedResult));
|
||||||
|
Assert.That(calls, Is.EqualTo(new[] { "first", "second" }));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Test]
|
||||||
|
public void Handle_Should_ReturnHandledTrue_WhenMatchingRoutesReturnNull()
|
||||||
|
{
|
||||||
|
// arrange
|
||||||
|
var collection = new QueryRouteCollection(typeof(string));
|
||||||
|
collection.AddRoute("topic", MessageRoute<string>.CreateWithTopicFilter("type", "topic", (_, _, _, _) => null));
|
||||||
|
collection.Build();
|
||||||
|
|
||||||
|
// act
|
||||||
|
var handled = collection.Handle("topic", null!, DateTime.UtcNow, "original", "data", out var result);
|
||||||
|
|
||||||
|
// assert
|
||||||
|
Assert.That(handled, Is.True);
|
||||||
|
Assert.That(result, Is.Null);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,167 @@
|
|||||||
|
using CryptoExchange.Net.Sockets.Default;
|
||||||
|
using CryptoExchange.Net.Sockets.Default.Routing;
|
||||||
|
using CryptoExchange.Net.Sockets.Interfaces;
|
||||||
|
using NUnit.Framework;
|
||||||
|
using System;
|
||||||
|
using System.Linq;
|
||||||
|
|
||||||
|
namespace CryptoExchange.Net.UnitTests.SocketRoutingTests
|
||||||
|
{
|
||||||
|
[TestFixture]
|
||||||
|
public class RoutingTableTests
|
||||||
|
{
|
||||||
|
[Test]
|
||||||
|
public void Update_Should_CreateEntriesPerTypeIdentifier_WithCorrectDeserializationTypeAndHandlers()
|
||||||
|
{
|
||||||
|
// arrange
|
||||||
|
var processor1 = new TestMessageProcessor(
|
||||||
|
1,
|
||||||
|
MessageRouter.Create(
|
||||||
|
MessageRoute<string>.CreateWithoutTopicFilter("type1", (_, _, _, _) => null),
|
||||||
|
MessageRoute<string>.CreateWithTopicFilter("type1", "topic1", (_, _, _, _) => null)));
|
||||||
|
|
||||||
|
var processor2 = new TestMessageProcessor(
|
||||||
|
2,
|
||||||
|
MessageRouter.Create(
|
||||||
|
MessageRoute<int>.CreateWithTopicFilter("type2", "topic2", (_, _, _, _) => null)));
|
||||||
|
|
||||||
|
var table = new RoutingTable();
|
||||||
|
|
||||||
|
// act
|
||||||
|
table.Update(new IMessageProcessor[] { processor1, processor2 });
|
||||||
|
|
||||||
|
var type1Entry = table.GetRouteTableEntry("type1");
|
||||||
|
var type2Entry = table.GetRouteTableEntry("type2");
|
||||||
|
var missingEntry = table.GetRouteTableEntry("missing");
|
||||||
|
|
||||||
|
// assert
|
||||||
|
Assert.That(type1Entry, Is.Not.Null);
|
||||||
|
Assert.That(type2Entry, Is.Not.Null);
|
||||||
|
Assert.That(missingEntry, Is.Null);
|
||||||
|
|
||||||
|
Assert.That(type1Entry!.DeserializationType, Is.EqualTo(typeof(string)));
|
||||||
|
Assert.That(type1Entry.IsStringOutput, Is.True);
|
||||||
|
Assert.That(type1Entry.Handlers, Has.Count.EqualTo(1));
|
||||||
|
Assert.That(type1Entry.Handlers.Single(), Is.SameAs(processor1));
|
||||||
|
|
||||||
|
Assert.That(type2Entry!.DeserializationType, Is.EqualTo(typeof(int)));
|
||||||
|
Assert.That(type2Entry.IsStringOutput, Is.False);
|
||||||
|
Assert.That(type2Entry.Handlers, Has.Count.EqualTo(1));
|
||||||
|
Assert.That(type2Entry.Handlers.Single(), Is.SameAs(processor2));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Test]
|
||||||
|
public void Update_Should_AddMultipleProcessors_ForSameTypeIdentifier()
|
||||||
|
{
|
||||||
|
// arrange
|
||||||
|
var processor1 = new TestMessageProcessor(
|
||||||
|
1,
|
||||||
|
MessageRouter.Create(
|
||||||
|
MessageRoute<string>.CreateWithoutTopicFilter("type1", (_, _, _, _) => null)));
|
||||||
|
|
||||||
|
var processor2 = new TestMessageProcessor(
|
||||||
|
2,
|
||||||
|
MessageRouter.Create(
|
||||||
|
MessageRoute<string>.CreateWithTopicFilter("type1", "topic1", (_, _, _, _) => null)));
|
||||||
|
|
||||||
|
var table = new RoutingTable();
|
||||||
|
|
||||||
|
// act
|
||||||
|
table.Update(new IMessageProcessor[] { processor1, processor2 });
|
||||||
|
var entry = table.GetRouteTableEntry("type1");
|
||||||
|
|
||||||
|
// assert
|
||||||
|
Assert.That(entry, Is.Not.Null);
|
||||||
|
Assert.That(entry!.DeserializationType, Is.EqualTo(typeof(string)));
|
||||||
|
Assert.That(entry.Handlers, Has.Count.EqualTo(2));
|
||||||
|
Assert.That(entry.Handlers, Does.Contain(processor1));
|
||||||
|
Assert.That(entry.Handlers, Does.Contain(processor2));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Test]
|
||||||
|
public void Update_Should_ReplacePreviousEntries()
|
||||||
|
{
|
||||||
|
// arrange
|
||||||
|
var initialProcessor = new TestMessageProcessor(
|
||||||
|
1,
|
||||||
|
MessageRouter.Create(
|
||||||
|
MessageRoute<string>.CreateWithoutTopicFilter("type1", (_, _, _, _) => null)));
|
||||||
|
|
||||||
|
var replacementProcessor = new TestMessageProcessor(
|
||||||
|
2,
|
||||||
|
MessageRouter.Create(
|
||||||
|
MessageRoute<int>.CreateWithoutTopicFilter("type2", (_, _, _, _) => null)));
|
||||||
|
|
||||||
|
var table = new RoutingTable();
|
||||||
|
table.Update(new IMessageProcessor[] { initialProcessor });
|
||||||
|
|
||||||
|
// act
|
||||||
|
table.Update(new IMessageProcessor[] { replacementProcessor });
|
||||||
|
|
||||||
|
var oldEntry = table.GetRouteTableEntry("type1");
|
||||||
|
var newEntry = table.GetRouteTableEntry("type2");
|
||||||
|
|
||||||
|
// assert
|
||||||
|
Assert.That(oldEntry, Is.Null);
|
||||||
|
Assert.That(newEntry, Is.Not.Null);
|
||||||
|
Assert.That(newEntry!.DeserializationType, Is.EqualTo(typeof(int)));
|
||||||
|
Assert.That(newEntry.Handlers, Has.Count.EqualTo(1));
|
||||||
|
Assert.That(newEntry.Handlers.Single(), Is.SameAs(replacementProcessor));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Test]
|
||||||
|
public void Update_WithEmptyProcessors_Should_ClearEntries()
|
||||||
|
{
|
||||||
|
// arrange
|
||||||
|
var processor = new TestMessageProcessor(
|
||||||
|
1,
|
||||||
|
MessageRouter.Create(
|
||||||
|
MessageRoute<string>.CreateWithoutTopicFilter("type1", (_, _, _, _) => null)));
|
||||||
|
|
||||||
|
var table = new RoutingTable();
|
||||||
|
table.Update(new IMessageProcessor[] { processor });
|
||||||
|
|
||||||
|
// act
|
||||||
|
table.Update(Array.Empty<IMessageProcessor>());
|
||||||
|
|
||||||
|
// assert
|
||||||
|
Assert.That(table.GetRouteTableEntry("type1"), Is.Null);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Test]
|
||||||
|
public void TypeRoutingCollection_Should_SetIsStringOutput_BasedOnDeserializationType()
|
||||||
|
{
|
||||||
|
// arrange & act
|
||||||
|
var stringCollection = new TypeRoutingCollection(typeof(string));
|
||||||
|
var intCollection = new TypeRoutingCollection(typeof(int));
|
||||||
|
|
||||||
|
// assert
|
||||||
|
Assert.That(stringCollection.IsStringOutput, Is.True);
|
||||||
|
Assert.That(stringCollection.DeserializationType, Is.EqualTo(typeof(string)));
|
||||||
|
Assert.That(stringCollection.Handlers, Is.Empty);
|
||||||
|
|
||||||
|
Assert.That(intCollection.IsStringOutput, Is.False);
|
||||||
|
Assert.That(intCollection.DeserializationType, Is.EqualTo(typeof(int)));
|
||||||
|
Assert.That(intCollection.Handlers, Is.Empty);
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed class TestMessageProcessor : IMessageProcessor
|
||||||
|
{
|
||||||
|
public int Id { get; }
|
||||||
|
public MessageRouter MessageRouter { get; }
|
||||||
|
|
||||||
|
public TestMessageProcessor(int id, MessageRouter messageRouter)
|
||||||
|
{
|
||||||
|
Id = id;
|
||||||
|
MessageRouter = messageRouter;
|
||||||
|
}
|
||||||
|
|
||||||
|
public event Action? OnMessageRouterUpdated;
|
||||||
|
|
||||||
|
public bool Handle(string typeIdentifier, string? topicFilter, SocketConnection socketConnection, DateTime receiveTime, string? originalData, object result)
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,160 @@
|
|||||||
|
using CryptoExchange.Net.Objects;
|
||||||
|
using CryptoExchange.Net.Sockets.Default.Routing;
|
||||||
|
using NUnit.Framework;
|
||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
|
||||||
|
namespace CryptoExchange.Net.UnitTests.SocketRoutingTests
|
||||||
|
{
|
||||||
|
[TestFixture]
|
||||||
|
public class SubscriptionRouterTests
|
||||||
|
{
|
||||||
|
[Test]
|
||||||
|
public void BuildFromRoutes_Should_GroupRoutesByTypeIdentifier_AndSetDeserializationType()
|
||||||
|
{
|
||||||
|
// arrange
|
||||||
|
var routes = new MessageRoute[]
|
||||||
|
{
|
||||||
|
MessageRoute<string>.CreateWithoutTopicFilter("type1", (_, _, _, _) => null),
|
||||||
|
MessageRoute<string>.CreateWithTopicFilter("type1", "topic1", (_, _, _, _) => null),
|
||||||
|
MessageRoute<int>.CreateWithTopicFilter("type2", "topic2", (_, _, _, _) => null)
|
||||||
|
};
|
||||||
|
|
||||||
|
var router = new SubscriptionRouter(routes);
|
||||||
|
|
||||||
|
// act
|
||||||
|
var type1Routes = router.GetRoutes("type1");
|
||||||
|
var type2Routes = router.GetRoutes("type2");
|
||||||
|
var missingRoutes = router.GetRoutes("missing");
|
||||||
|
|
||||||
|
// assert
|
||||||
|
Assert.That(type1Routes, Is.Not.Null);
|
||||||
|
Assert.That(type2Routes, Is.Not.Null);
|
||||||
|
Assert.That(missingRoutes, Is.Null);
|
||||||
|
|
||||||
|
Assert.That(type1Routes, Is.TypeOf<SubscriptionRouteCollection>());
|
||||||
|
Assert.That(type2Routes, Is.TypeOf<SubscriptionRouteCollection>());
|
||||||
|
Assert.That(type1Routes!.DeserializationType, Is.EqualTo(typeof(string)));
|
||||||
|
Assert.That(type2Routes!.DeserializationType, Is.EqualTo(typeof(int)));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Test]
|
||||||
|
public void Handle_Should_InvokeRoutesWithoutTopicFilter_WhenTopicFilterIsNull()
|
||||||
|
{
|
||||||
|
// arrange
|
||||||
|
var calls = new List<string>();
|
||||||
|
var collection = new SubscriptionRouteCollection(typeof(string));
|
||||||
|
collection.AddRoute(null, MessageRoute<string>.CreateWithoutTopicFilter("type", (_, _, _, _) =>
|
||||||
|
{
|
||||||
|
calls.Add("no-topic");
|
||||||
|
return null;
|
||||||
|
}));
|
||||||
|
collection.AddRoute("topic", MessageRoute<string>.CreateWithTopicFilter("type", "topic", (_, _, _, _) =>
|
||||||
|
{
|
||||||
|
calls.Add("topic");
|
||||||
|
return null;
|
||||||
|
}));
|
||||||
|
collection.Build();
|
||||||
|
|
||||||
|
// act
|
||||||
|
var handled = collection.Handle(null, null!, DateTime.UtcNow, "original", "data", out var result);
|
||||||
|
|
||||||
|
// assert
|
||||||
|
Assert.That(handled, Is.True);
|
||||||
|
Assert.That(result, Is.SameAs(CallResult.SuccessResult));
|
||||||
|
Assert.That(calls, Is.EqualTo(new[] { "no-topic" }));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Test]
|
||||||
|
public void Handle_Should_ReturnFalse_WhenNoRoutesMatch()
|
||||||
|
{
|
||||||
|
// arrange
|
||||||
|
var collection = new SubscriptionRouteCollection(typeof(string));
|
||||||
|
collection.AddRoute("other-topic", MessageRoute<string>.CreateWithTopicFilter("type", "other-topic", (_, _, _, _) => null));
|
||||||
|
collection.Build();
|
||||||
|
|
||||||
|
// act
|
||||||
|
var handled = collection.Handle("topic", null!, DateTime.UtcNow, "original", "data", out var result);
|
||||||
|
|
||||||
|
// assert
|
||||||
|
Assert.That(handled, Is.False);
|
||||||
|
Assert.That(result, Is.SameAs(CallResult.SuccessResult));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Test]
|
||||||
|
public void Handle_Should_InvokeRoutesWithoutTopicFilter_AndMatchingTopicRoutes()
|
||||||
|
{
|
||||||
|
// arrange
|
||||||
|
var calls = new List<string>();
|
||||||
|
var collection = new SubscriptionRouteCollection(typeof(string));
|
||||||
|
collection.AddRoute(null, MessageRoute<string>.CreateWithoutTopicFilter("type", (_, _, _, _) =>
|
||||||
|
{
|
||||||
|
calls.Add("no-topic");
|
||||||
|
return null;
|
||||||
|
}));
|
||||||
|
collection.AddRoute("topic", MessageRoute<string>.CreateWithTopicFilter("type", "topic", (_, _, _, _) =>
|
||||||
|
{
|
||||||
|
calls.Add("topic");
|
||||||
|
return null;
|
||||||
|
}));
|
||||||
|
collection.Build();
|
||||||
|
|
||||||
|
// act
|
||||||
|
var handled = collection.Handle("topic", null!, DateTime.UtcNow, "original", "data", out var result);
|
||||||
|
|
||||||
|
// assert
|
||||||
|
Assert.That(handled, Is.True);
|
||||||
|
Assert.That(result, Is.SameAs(CallResult.SuccessResult));
|
||||||
|
Assert.That(calls, Is.EqualTo(new[] { "no-topic", "topic" }));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Test]
|
||||||
|
public void Handle_Should_InvokeAllMatchingTopicRoutes()
|
||||||
|
{
|
||||||
|
// arrange
|
||||||
|
var calls = new List<string>();
|
||||||
|
var collection = new SubscriptionRouteCollection(typeof(string));
|
||||||
|
collection.AddRoute("topic", MessageRoute<string>.CreateWithTopicFilter("type", "topic", (_, _, _, _) =>
|
||||||
|
{
|
||||||
|
calls.Add("first");
|
||||||
|
return CallResult.SuccessResult;
|
||||||
|
}));
|
||||||
|
collection.AddRoute("topic", MessageRoute<string>.CreateWithTopicFilter("type", "topic", (_, _, _, _) =>
|
||||||
|
{
|
||||||
|
calls.Add("second");
|
||||||
|
return null;
|
||||||
|
}));
|
||||||
|
collection.Build();
|
||||||
|
|
||||||
|
// act
|
||||||
|
var handled = collection.Handle("topic", null!, DateTime.UtcNow, "original", "data", out var result);
|
||||||
|
|
||||||
|
// assert
|
||||||
|
Assert.That(handled, Is.True);
|
||||||
|
Assert.That(result, Is.SameAs(CallResult.SuccessResult));
|
||||||
|
Assert.That(calls, Is.EqualTo(new[] { "first", "second" }));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Test]
|
||||||
|
public void Handle_Should_NotInvokeTopicRoutes_WhenTopicFilterIsNull()
|
||||||
|
{
|
||||||
|
// arrange
|
||||||
|
var calls = new List<string>();
|
||||||
|
var collection = new SubscriptionRouteCollection(typeof(string));
|
||||||
|
collection.AddRoute("topic", MessageRoute<string>.CreateWithTopicFilter("type", "topic", (_, _, _, _) =>
|
||||||
|
{
|
||||||
|
calls.Add("topic");
|
||||||
|
return null;
|
||||||
|
}));
|
||||||
|
collection.Build();
|
||||||
|
|
||||||
|
// act
|
||||||
|
var handled = collection.Handle(null, null!, DateTime.UtcNow, "original", "data", out var result);
|
||||||
|
|
||||||
|
// assert
|
||||||
|
Assert.That(handled, Is.False);
|
||||||
|
Assert.That(result, Is.SameAs(CallResult.SuccessResult));
|
||||||
|
Assert.That(calls, Is.Empty);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -230,7 +230,7 @@ namespace CryptoExchange.Net.Converters.SystemTextJson
|
|||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Try to get the enum value based on the string value using the Utf8JsonReader's ValueTextEquals method.
|
/// Try to get the enum value based on the string value using the Utf8JsonReader's ValueTextEquals method.
|
||||||
/// This is an optimization to avoid string allocations when possible, but can only match case insensitively
|
/// This is an optimization to avoid string allocations when possible, but can only match case sensitively
|
||||||
/// </summary>
|
/// </summary>
|
||||||
private static T? GetValueOptimistic(ref Utf8JsonReader reader)
|
private static T? GetValueOptimistic(ref Utf8JsonReader reader)
|
||||||
{
|
{
|
||||||
|
|||||||
@ -10,7 +10,7 @@ namespace CryptoExchange.Net.Sockets.Default.Routing
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
public class MessageRouter
|
public class MessageRouter
|
||||||
{
|
{
|
||||||
private RoutingSubTable? _routingTable;
|
private ProcessorRouter? _routingTable;
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// The routes registered for this router
|
/// The routes registered for this router
|
||||||
@ -28,17 +28,29 @@ namespace CryptoExchange.Net.Sockets.Default.Routing
|
|||||||
/// <summary>
|
/// <summary>
|
||||||
/// Build the route mapping
|
/// Build the route mapping
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public void BuildRouteMap()
|
public void BuildQueryRouter()
|
||||||
{
|
{
|
||||||
_routingTable = new RoutingSubTable(Routes);
|
_routingTable = new QueryRouter(Routes);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Get routes matching the type identifier
|
/// Build the route mapping
|
||||||
/// </summary>
|
/// </summary>
|
||||||
internal RoutingSubTableEntry? this[string identifier]
|
public void BuildSubscriptionRouter()
|
||||||
{
|
{
|
||||||
get => (_routingTable ?? throw new InvalidOperationException("Route map not initialized before use"))[identifier];
|
_routingTable = new SubscriptionRouter(Routes);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Handle message
|
||||||
|
/// </summary>
|
||||||
|
public bool Handle(string typeIdentifier, string? topicFilter, SocketConnection connection, DateTime receiveTime, string? originalData, object data, out CallResult? result)
|
||||||
|
{
|
||||||
|
var routeCollection = (_routingTable ?? throw new NullReferenceException("Routing table not build before handling")).GetRoutes(typeIdentifier);
|
||||||
|
if (routeCollection == null)
|
||||||
|
throw new InvalidOperationException($"No routes for {typeIdentifier} message type");
|
||||||
|
|
||||||
|
return routeCollection.Handle(topicFilter, connection, receiveTime, originalData, data, out result);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
|
|||||||
@ -0,0 +1,36 @@
|
|||||||
|
using System.Collections.Generic;
|
||||||
|
#if NET8_0_OR_GREATER
|
||||||
|
using System.Collections.Frozen;
|
||||||
|
#endif
|
||||||
|
|
||||||
|
namespace CryptoExchange.Net.Sockets.Default.Routing
|
||||||
|
{
|
||||||
|
internal abstract class ProcessorRouter
|
||||||
|
{
|
||||||
|
public abstract RouteCollection? GetRoutes(string identifier);
|
||||||
|
}
|
||||||
|
|
||||||
|
internal abstract class ProcessorRouter<T> : ProcessorRouter
|
||||||
|
where T : RouteCollection
|
||||||
|
{
|
||||||
|
#if NET8_0_OR_GREATER
|
||||||
|
private FrozenDictionary<string, T> _routeMap;
|
||||||
|
#else
|
||||||
|
private Dictionary<string, T> _routeMap;
|
||||||
|
#endif
|
||||||
|
|
||||||
|
public ProcessorRouter(IEnumerable<MessageRoute> routes)
|
||||||
|
{
|
||||||
|
var map = BuildFromRoutes(routes);
|
||||||
|
#if NET8_0_OR_GREATER
|
||||||
|
_routeMap = map.ToFrozenDictionary();
|
||||||
|
#else
|
||||||
|
_routeMap = map;
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
|
public abstract Dictionary<string, T> BuildFromRoutes(IEnumerable<MessageRoute> routes);
|
||||||
|
|
||||||
|
public override RouteCollection? GetRoutes(string identifier) => _routeMap.TryGetValue(identifier, out var routes) ? routes : null;
|
||||||
|
}
|
||||||
|
}
|
||||||
90
CryptoExchange.Net/Sockets/Default/Routing/QueryRouter.cs
Normal file
90
CryptoExchange.Net/Sockets/Default/Routing/QueryRouter.cs
Normal file
@ -0,0 +1,90 @@
|
|||||||
|
using CryptoExchange.Net.Objects;
|
||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
|
||||||
|
namespace CryptoExchange.Net.Sockets.Default.Routing
|
||||||
|
{
|
||||||
|
internal class QueryRouter : ProcessorRouter<QueryRouteCollection>
|
||||||
|
{
|
||||||
|
public QueryRouter(IEnumerable<MessageRoute> routes) : base(routes)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
public override Dictionary<string, QueryRouteCollection> BuildFromRoutes(IEnumerable<MessageRoute> routes)
|
||||||
|
{
|
||||||
|
var newMap = new Dictionary<string, QueryRouteCollection>();
|
||||||
|
foreach (var route in routes)
|
||||||
|
{
|
||||||
|
if (!newMap.TryGetValue(route.TypeIdentifier, out var typeMap))
|
||||||
|
{
|
||||||
|
typeMap = new QueryRouteCollection(route.DeserializationType);
|
||||||
|
newMap.Add(route.TypeIdentifier, typeMap);
|
||||||
|
}
|
||||||
|
|
||||||
|
typeMap.AddRoute(route.TopicFilter, route);
|
||||||
|
}
|
||||||
|
|
||||||
|
foreach (var subEntry in newMap.Values)
|
||||||
|
subEntry.Build();
|
||||||
|
|
||||||
|
return newMap;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
internal class QueryRouteCollection : RouteCollection
|
||||||
|
{
|
||||||
|
public bool MultipleReaders { get; private set; }
|
||||||
|
|
||||||
|
public QueryRouteCollection(Type routeType) : base(routeType)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
public override void AddRoute(string? topicFilter, MessageRoute route)
|
||||||
|
{
|
||||||
|
base.AddRoute(topicFilter, route);
|
||||||
|
|
||||||
|
if (route.MultipleReaders)
|
||||||
|
MultipleReaders = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
public override bool Handle(string? topicFilter, SocketConnection connection, DateTime receiveTime, string? originalData, object data, out CallResult? result)
|
||||||
|
{
|
||||||
|
result = null;
|
||||||
|
|
||||||
|
// Routes without topic filter handle both when the message topic is empty and when it is not, so we always call them
|
||||||
|
var handled = false;
|
||||||
|
foreach (var route in _routesWithoutTopicFilter)
|
||||||
|
{
|
||||||
|
var thisResult = route.Handle(connection, receiveTime, originalData, data);
|
||||||
|
if (thisResult != null)
|
||||||
|
result ??= thisResult;
|
||||||
|
|
||||||
|
handled = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Forward to routes with matching topic filter, if any
|
||||||
|
if (topicFilter == null)
|
||||||
|
return handled;
|
||||||
|
|
||||||
|
var matchingTopicRoutes = GetRoutesWithMatchingTopicFilter(topicFilter);
|
||||||
|
if (matchingTopicRoutes == null)
|
||||||
|
return handled;
|
||||||
|
|
||||||
|
foreach (var route in matchingTopicRoutes)
|
||||||
|
{
|
||||||
|
var thisResult = route.Handle(connection, receiveTime, originalData, data);
|
||||||
|
handled = true;
|
||||||
|
|
||||||
|
if (thisResult != null)
|
||||||
|
{
|
||||||
|
result ??= thisResult;
|
||||||
|
|
||||||
|
if (!MultipleReaders)
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return handled;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,65 @@
|
|||||||
|
using CryptoExchange.Net.Objects;
|
||||||
|
using System;
|
||||||
|
#if NET8_0_OR_GREATER
|
||||||
|
using System.Collections.Frozen;
|
||||||
|
#endif
|
||||||
|
using System.Collections.Generic;
|
||||||
|
|
||||||
|
namespace CryptoExchange.Net.Sockets.Default.Routing
|
||||||
|
{
|
||||||
|
internal abstract class RouteCollection
|
||||||
|
{
|
||||||
|
protected List<MessageRoute> _routesWithoutTopicFilter;
|
||||||
|
protected Dictionary<string, List<MessageRoute>> _routesWithTopicFilter;
|
||||||
|
#if NET8_0_OR_GREATER
|
||||||
|
protected FrozenDictionary<string, List<MessageRoute>>? _routesWithTopicFilterFrozen;
|
||||||
|
#endif
|
||||||
|
|
||||||
|
public Type DeserializationType { get; }
|
||||||
|
|
||||||
|
public RouteCollection(Type routeType)
|
||||||
|
{
|
||||||
|
_routesWithoutTopicFilter = new List<MessageRoute>();
|
||||||
|
_routesWithTopicFilter = new Dictionary<string, List<MessageRoute>>();
|
||||||
|
|
||||||
|
DeserializationType = routeType;
|
||||||
|
}
|
||||||
|
|
||||||
|
public virtual void AddRoute(string? topicFilter, MessageRoute route)
|
||||||
|
{
|
||||||
|
if (string.IsNullOrEmpty(topicFilter))
|
||||||
|
{
|
||||||
|
_routesWithoutTopicFilter.Add(route);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
if (!_routesWithTopicFilter.TryGetValue(topicFilter!, out var list))
|
||||||
|
{
|
||||||
|
list = new List<MessageRoute>();
|
||||||
|
_routesWithTopicFilter.Add(topicFilter!, list);
|
||||||
|
}
|
||||||
|
|
||||||
|
list.Add(route);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void Build()
|
||||||
|
{
|
||||||
|
#if NET8_0_OR_GREATER
|
||||||
|
_routesWithTopicFilterFrozen = _routesWithTopicFilter.ToFrozenDictionary();
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
|
protected List<MessageRoute>? GetRoutesWithMatchingTopicFilter(string topicFilter)
|
||||||
|
{
|
||||||
|
#if NET8_0_OR_GREATER
|
||||||
|
_routesWithTopicFilterFrozen!.TryGetValue(topicFilter, out var matchingTopicRoutes);
|
||||||
|
#else
|
||||||
|
_routesWithTopicFilter.TryGetValue(topicFilter, out var matchingTopicRoutes);
|
||||||
|
#endif
|
||||||
|
return matchingTopicRoutes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public abstract bool Handle(string? topicFilter, SocketConnection connection, DateTime receiveTime, string? originalData, object data, out CallResult? result);
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -1,142 +0,0 @@
|
|||||||
using CryptoExchange.Net.Objects;
|
|
||||||
using System;
|
|
||||||
#if NET8_0_OR_GREATER
|
|
||||||
using System.Collections.Frozen;
|
|
||||||
#endif
|
|
||||||
using System.Collections.Generic;
|
|
||||||
|
|
||||||
namespace CryptoExchange.Net.Sockets.Default.Routing
|
|
||||||
{
|
|
||||||
internal class RoutingSubTable
|
|
||||||
{
|
|
||||||
#if NET8_0_OR_GREATER
|
|
||||||
// Used for mapping a type identifier to the routes matching it
|
|
||||||
private FrozenDictionary<string, RoutingSubTableEntry> _routeMap;
|
|
||||||
#else
|
|
||||||
private Dictionary<string, RoutingSubTableEntry> _routeMap;
|
|
||||||
#endif
|
|
||||||
|
|
||||||
public RoutingSubTable(IEnumerable<MessageRoute> routes)
|
|
||||||
{
|
|
||||||
var newMap = new Dictionary<string, RoutingSubTableEntry>();
|
|
||||||
foreach (var route in routes)
|
|
||||||
{
|
|
||||||
if (!newMap.TryGetValue(route.TypeIdentifier, out var typeMap))
|
|
||||||
{
|
|
||||||
typeMap = new RoutingSubTableEntry(route.DeserializationType);
|
|
||||||
newMap.Add(route.TypeIdentifier, typeMap);
|
|
||||||
}
|
|
||||||
|
|
||||||
typeMap.AddRoute(route.TopicFilter, route);
|
|
||||||
}
|
|
||||||
|
|
||||||
foreach(var subEntry in newMap.Values)
|
|
||||||
subEntry.Build();
|
|
||||||
|
|
||||||
#if NET8_0_OR_GREATER
|
|
||||||
_routeMap = newMap.ToFrozenDictionary();
|
|
||||||
#else
|
|
||||||
_routeMap = newMap;
|
|
||||||
#endif
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// Get routes matching the type identifier
|
|
||||||
/// </summary>
|
|
||||||
public RoutingSubTableEntry? this[string identifier]
|
|
||||||
{
|
|
||||||
get => _routeMap.TryGetValue(identifier, out var routes) ? routes : null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
internal record RoutingSubTableEntry
|
|
||||||
{
|
|
||||||
public Type DeserializationType { get; }
|
|
||||||
public bool MultipleReaders { get; private set; }
|
|
||||||
|
|
||||||
private List<MessageRoute> _routesWithoutTopicFilter;
|
|
||||||
private Dictionary<string, List<MessageRoute>> _routesWithTopicFilter;
|
|
||||||
#if NET8_0_OR_GREATER
|
|
||||||
private FrozenDictionary<string, List<MessageRoute>>? _routesWithTopicFilterFrozen;
|
|
||||||
#endif
|
|
||||||
|
|
||||||
public RoutingSubTableEntry(Type routeType)
|
|
||||||
{
|
|
||||||
_routesWithoutTopicFilter = new List<MessageRoute>();
|
|
||||||
_routesWithTopicFilter = new Dictionary<string, List<MessageRoute>>();
|
|
||||||
|
|
||||||
DeserializationType = routeType;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void AddRoute(string? topicFilter, MessageRoute route)
|
|
||||||
{
|
|
||||||
if (string.IsNullOrEmpty(topicFilter))
|
|
||||||
{
|
|
||||||
_routesWithoutTopicFilter.Add(route);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
if (!_routesWithTopicFilter.TryGetValue(topicFilter!, out var list))
|
|
||||||
{
|
|
||||||
list = new List<MessageRoute>();
|
|
||||||
_routesWithTopicFilter.Add(topicFilter!, list);
|
|
||||||
}
|
|
||||||
|
|
||||||
list.Add(route);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (route.MultipleReaders)
|
|
||||||
MultipleReaders = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void Build()
|
|
||||||
{
|
|
||||||
#if NET8_0_OR_GREATER
|
|
||||||
_routesWithTopicFilterFrozen = _routesWithTopicFilter.ToFrozenDictionary();
|
|
||||||
#endif
|
|
||||||
}
|
|
||||||
|
|
||||||
internal bool Handle(string? topicFilter, SocketConnection connection, DateTime receiveTime, string? originalData, object data, out CallResult? result)
|
|
||||||
{
|
|
||||||
result = null;
|
|
||||||
|
|
||||||
// Routes without topic filter handle both when the message topic is empty and when it is not, so we always call them
|
|
||||||
var handled = false;
|
|
||||||
foreach (var route in _routesWithoutTopicFilter)
|
|
||||||
{
|
|
||||||
var thisResult = route.Handle(connection, receiveTime, originalData, data);
|
|
||||||
if (thisResult != null)
|
|
||||||
result ??= thisResult;
|
|
||||||
|
|
||||||
handled = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Forward to routes with matching topic filter, if any
|
|
||||||
if (topicFilter != null)
|
|
||||||
{
|
|
||||||
#if NET8_0_OR_GREATER
|
|
||||||
_routesWithTopicFilterFrozen!.TryGetValue(topicFilter, out var matchingTopicRoutes);
|
|
||||||
#else
|
|
||||||
_routesWithTopicFilter.TryGetValue(topicFilter, out var matchingTopicRoutes);
|
|
||||||
#endif
|
|
||||||
foreach (var route in matchingTopicRoutes ?? [])
|
|
||||||
{
|
|
||||||
var thisResult = route.Handle(connection, receiveTime, originalData, data);
|
|
||||||
handled = true;
|
|
||||||
|
|
||||||
if (thisResult != null)
|
|
||||||
{
|
|
||||||
result ??= thisResult;
|
|
||||||
|
|
||||||
#warning MultipleReaders is only for queries, subscriptions should always have multiple readers = true. Maybe create different RoutingSubTable implementations for Queries and Subscriptions?
|
|
||||||
if (!MultipleReaders)
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return handled;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -2,6 +2,9 @@
|
|||||||
using System;
|
using System;
|
||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
using System.Text;
|
using System.Text;
|
||||||
|
#if NET8_0_OR_GREATER
|
||||||
|
using System.Collections.Frozen;
|
||||||
|
#endif
|
||||||
|
|
||||||
namespace CryptoExchange.Net.Sockets.Default.Routing
|
namespace CryptoExchange.Net.Sockets.Default.Routing
|
||||||
{
|
{
|
||||||
@ -10,40 +13,51 @@ namespace CryptoExchange.Net.Sockets.Default.Routing
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
public class RoutingTable
|
public class RoutingTable
|
||||||
{
|
{
|
||||||
private Dictionary<string, RoutingTableEntry> _routeTableEntries;
|
#if NET8_0_OR_GREATER
|
||||||
|
private FrozenDictionary<string, TypeRoutingCollection> _typeRoutingCollections = new Dictionary<string, TypeRoutingCollection>().ToFrozenDictionary();
|
||||||
|
#else
|
||||||
|
private Dictionary<string, TypeRoutingCollection> _typeRoutingCollections = new();
|
||||||
|
#endif
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Create routing table for provided processors
|
/// Update the routing table
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public RoutingTable(IEnumerable<IMessageProcessor> processors)
|
/// <param name="processors"></param>
|
||||||
|
public void Update(IEnumerable<IMessageProcessor> processors)
|
||||||
{
|
{
|
||||||
_routeTableEntries = new Dictionary<string, RoutingTableEntry>();
|
var newTypeMap = new Dictionary<string, TypeRoutingCollection>();
|
||||||
foreach (var entry in processors)
|
foreach (var entry in processors)
|
||||||
{
|
{
|
||||||
foreach (var route in entry.MessageRouter.Routes)
|
foreach (var route in entry.MessageRouter.Routes)
|
||||||
{
|
{
|
||||||
if (!_routeTableEntries.ContainsKey(route.TypeIdentifier))
|
if (!newTypeMap.ContainsKey(route.TypeIdentifier))
|
||||||
_routeTableEntries.Add(route.TypeIdentifier, new RoutingTableEntry(route.DeserializationType));
|
newTypeMap.Add(route.TypeIdentifier, new TypeRoutingCollection(route.DeserializationType));
|
||||||
|
|
||||||
if (!_routeTableEntries[route.TypeIdentifier].Handlers.Contains(entry))
|
if (!newTypeMap[route.TypeIdentifier].Handlers.Contains(entry))
|
||||||
_routeTableEntries[route.TypeIdentifier].Handlers.Add(entry);
|
newTypeMap[route.TypeIdentifier].Handlers.Add(entry);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if NET8_0_OR_GREATER
|
||||||
|
_typeRoutingCollections = newTypeMap.ToFrozenDictionary();
|
||||||
|
#else
|
||||||
|
_typeRoutingCollections = newTypeMap;
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Get route table entry for a type identifier
|
/// Get route table entry for a type identifier
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public RoutingTableEntry? GetRouteTableEntry(string typeIdentifier)
|
public TypeRoutingCollection? GetRouteTableEntry(string typeIdentifier)
|
||||||
{
|
{
|
||||||
return _routeTableEntries.TryGetValue(typeIdentifier, out var entry) ? entry : null;
|
return _typeRoutingCollections.TryGetValue(typeIdentifier, out var entry) ? entry : null;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
public override string ToString()
|
public override string ToString()
|
||||||
{
|
{
|
||||||
var sb = new StringBuilder();
|
var sb = new StringBuilder();
|
||||||
foreach (var entry in _routeTableEntries)
|
foreach (var entry in _typeRoutingCollections)
|
||||||
{
|
{
|
||||||
sb.AppendLine($"{entry.Key}, {entry.Value.DeserializationType.Name}");
|
sb.AppendLine($"{entry.Key}, {entry.Value.DeserializationType.Name}");
|
||||||
foreach(var item in entry.Value.Handlers)
|
foreach(var item in entry.Value.Handlers)
|
||||||
@ -69,7 +83,7 @@ namespace CryptoExchange.Net.Sockets.Default.Routing
|
|||||||
/// <summary>
|
/// <summary>
|
||||||
/// Routing table entry
|
/// Routing table entry
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public record RoutingTableEntry
|
public record TypeRoutingCollection
|
||||||
{
|
{
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Whether the deserialization type is string
|
/// Whether the deserialization type is string
|
||||||
@ -87,7 +101,7 @@ namespace CryptoExchange.Net.Sockets.Default.Routing
|
|||||||
/// <summary>
|
/// <summary>
|
||||||
/// ctor
|
/// ctor
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public RoutingTableEntry(Type deserializationType)
|
public TypeRoutingCollection(Type deserializationType)
|
||||||
{
|
{
|
||||||
IsStringOutput = deserializationType == typeof(string);
|
IsStringOutput = deserializationType == typeof(string);
|
||||||
DeserializationType = deserializationType;
|
DeserializationType = deserializationType;
|
||||||
|
|||||||
@ -0,0 +1,69 @@
|
|||||||
|
using CryptoExchange.Net.Objects;
|
||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
|
||||||
|
namespace CryptoExchange.Net.Sockets.Default.Routing
|
||||||
|
{
|
||||||
|
internal class SubscriptionRouter : ProcessorRouter<SubscriptionRouteCollection>
|
||||||
|
{
|
||||||
|
public SubscriptionRouter(IEnumerable<MessageRoute> routes) : base(routes)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
public override Dictionary<string, SubscriptionRouteCollection> BuildFromRoutes(IEnumerable<MessageRoute> routes)
|
||||||
|
{
|
||||||
|
var newMap = new Dictionary<string, SubscriptionRouteCollection>();
|
||||||
|
foreach (var route in routes)
|
||||||
|
{
|
||||||
|
if (!newMap.TryGetValue(route.TypeIdentifier, out var typeMap))
|
||||||
|
{
|
||||||
|
typeMap = new SubscriptionRouteCollection(route.DeserializationType);
|
||||||
|
newMap.Add(route.TypeIdentifier, typeMap);
|
||||||
|
}
|
||||||
|
|
||||||
|
typeMap.AddRoute(route.TopicFilter, route);
|
||||||
|
}
|
||||||
|
|
||||||
|
foreach (var subEntry in newMap.Values)
|
||||||
|
subEntry.Build();
|
||||||
|
|
||||||
|
return newMap;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
internal class SubscriptionRouteCollection : RouteCollection
|
||||||
|
{
|
||||||
|
public SubscriptionRouteCollection(Type routeType) : base(routeType)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
public override bool Handle(string? topicFilter, SocketConnection connection, DateTime receiveTime, string? originalData, object data, out CallResult? result)
|
||||||
|
{
|
||||||
|
result = CallResult.SuccessResult;
|
||||||
|
|
||||||
|
// Routes without topic filter handle both when the message topic is empty and when it is not, so we always call them
|
||||||
|
var handled = false;
|
||||||
|
foreach (var route in _routesWithoutTopicFilter)
|
||||||
|
{
|
||||||
|
route.Handle(connection, receiveTime, originalData, data);
|
||||||
|
handled = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Forward to routes with matching topic filter, if any
|
||||||
|
if (topicFilter == null)
|
||||||
|
return handled;
|
||||||
|
|
||||||
|
var matchingTopicRoutes = GetRoutesWithMatchingTopicFilter(topicFilter);
|
||||||
|
if (matchingTopicRoutes == null)
|
||||||
|
return handled;
|
||||||
|
|
||||||
|
foreach (var route in matchingTopicRoutes)
|
||||||
|
{
|
||||||
|
route.Handle(connection, receiveTime, originalData, data);
|
||||||
|
handled = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return handled;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -262,7 +262,7 @@ namespace CryptoExchange.Net.Sockets.Default
|
|||||||
private readonly object _listenersLock = new object();
|
private readonly object _listenersLock = new object();
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
private RoutingTable _routeTable = new RoutingTable([]);
|
private RoutingTable _routingTable = new RoutingTable();
|
||||||
|
|
||||||
private ReadOnlyCollection<IMessageProcessor> _listeners;
|
private ReadOnlyCollection<IMessageProcessor> _listeners;
|
||||||
private readonly ILogger _logger;
|
private readonly ILogger _logger;
|
||||||
@ -531,7 +531,7 @@ namespace CryptoExchange.Net.Sockets.Default
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
var routingEntry = _routeTable.GetRouteTableEntry(typeIdentifier);
|
var routingEntry = _routingTable.GetRouteTableEntry(typeIdentifier);
|
||||||
if (routingEntry == null)
|
if (routingEntry == null)
|
||||||
{
|
{
|
||||||
if (!ApiClient.HandleUnhandledMessage(this, typeIdentifier, data))
|
if (!ApiClient.HandleUnhandledMessage(this, typeIdentifier, data))
|
||||||
@ -1139,9 +1139,9 @@ namespace CryptoExchange.Net.Sockets.Default
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private void BuildRoutingTable()
|
private void UpdateRoutingTable()
|
||||||
{
|
{
|
||||||
_routeTable = new RoutingTable(_listeners);
|
_routingTable.Update(_listeners);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void AddMessageProcessor(IMessageProcessor processor)
|
private void AddMessageProcessor(IMessageProcessor processor)
|
||||||
@ -1150,13 +1150,13 @@ namespace CryptoExchange.Net.Sockets.Default
|
|||||||
{
|
{
|
||||||
var updatedList = new List<IMessageProcessor>(_listeners);
|
var updatedList = new List<IMessageProcessor>(_listeners);
|
||||||
updatedList.Add(processor);
|
updatedList.Add(processor);
|
||||||
processor.OnMessageRouterUpdated += BuildRoutingTable;
|
processor.OnMessageRouterUpdated += UpdateRoutingTable;
|
||||||
_listeners = updatedList.AsReadOnly();
|
_listeners = updatedList.AsReadOnly();
|
||||||
if (processor.MessageRouter.Routes.Length > 0)
|
if (processor.MessageRouter.Routes.Length > 0)
|
||||||
{
|
{
|
||||||
BuildRoutingTable();
|
UpdateRoutingTable();
|
||||||
#if DEBUG
|
#if DEBUG
|
||||||
_logger.LogTrace("Processor added, new routing table:\r\n" + _routeTable.ToString());
|
_logger.LogTrace("Processor added, new routing table:\r\n" + _routingTable.ToString());
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1167,14 +1167,14 @@ namespace CryptoExchange.Net.Sockets.Default
|
|||||||
lock (_listenersLock)
|
lock (_listenersLock)
|
||||||
{
|
{
|
||||||
var updatedList = new List<IMessageProcessor>(_listeners);
|
var updatedList = new List<IMessageProcessor>(_listeners);
|
||||||
|
processor.OnMessageRouterUpdated -= UpdateRoutingTable;
|
||||||
if (!updatedList.Remove(processor))
|
if (!updatedList.Remove(processor))
|
||||||
return; // If nothing removed nothing has changed
|
return; // If nothing removed nothing has changed
|
||||||
|
|
||||||
processor.OnMessageRouterUpdated -= BuildRoutingTable;
|
|
||||||
_listeners = updatedList.AsReadOnly();
|
_listeners = updatedList.AsReadOnly();
|
||||||
BuildRoutingTable();
|
UpdateRoutingTable();
|
||||||
#if DEBUG
|
#if DEBUG
|
||||||
_logger.LogTrace("Processor removed, new routing table:\r\n" + _routeTable.ToString());
|
_logger.LogTrace("Processor removed, new routing table:\r\n" + _routingTable.ToString());
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1187,7 +1187,7 @@ namespace CryptoExchange.Net.Sockets.Default
|
|||||||
var anyRemoved = false;
|
var anyRemoved = false;
|
||||||
foreach (var processor in processors)
|
foreach (var processor in processors)
|
||||||
{
|
{
|
||||||
processor.OnMessageRouterUpdated -= BuildRoutingTable;
|
processor.OnMessageRouterUpdated -= UpdateRoutingTable;
|
||||||
if (updatedList.Remove(processor))
|
if (updatedList.Remove(processor))
|
||||||
anyRemoved = true;
|
anyRemoved = true;
|
||||||
}
|
}
|
||||||
@ -1196,9 +1196,9 @@ namespace CryptoExchange.Net.Sockets.Default
|
|||||||
return; // If nothing removed nothing has changed
|
return; // If nothing removed nothing has changed
|
||||||
|
|
||||||
_listeners = updatedList.AsReadOnly();
|
_listeners = updatedList.AsReadOnly();
|
||||||
BuildRoutingTable();
|
UpdateRoutingTable();
|
||||||
#if DEBUG
|
#if DEBUG
|
||||||
_logger.LogTrace("Processors removed, new routing table:\r\n" + _routeTable.ToString());
|
_logger.LogTrace("Processors removed, new routing table:\r\n" + _routingTable.ToString());
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -82,7 +82,7 @@ namespace CryptoExchange.Net.Sockets.Default
|
|||||||
set
|
set
|
||||||
{
|
{
|
||||||
_router = value;
|
_router = value;
|
||||||
_router.BuildRouteMap();
|
_router.BuildSubscriptionRouter();
|
||||||
OnMessageRouterUpdated?.Invoke();
|
OnMessageRouterUpdated?.Invoke();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -198,7 +198,7 @@ namespace CryptoExchange.Net.Sockets.Default
|
|||||||
SubscriptionQuery.Timeout();
|
SubscriptionQuery.Timeout();
|
||||||
}
|
}
|
||||||
|
|
||||||
return MessageRouter[typeIdentifier]?.Handle(topicFilter, connection, receiveTime, originalData, data, out _) ?? false;
|
return MessageRouter.Handle(typeIdentifier, topicFilter, connection, receiveTime, originalData, data, out _);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
|
|||||||
@ -62,7 +62,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
|
|
||||||
private MessageRouter _router;
|
private MessageRouter _router;
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Router for this subscription
|
/// Router for this query
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public MessageRouter MessageRouter
|
public MessageRouter MessageRouter
|
||||||
{
|
{
|
||||||
@ -70,7 +70,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
set
|
set
|
||||||
{
|
{
|
||||||
_router = value;
|
_router = value;
|
||||||
_router.BuildRouteMap();
|
_router.BuildQueryRouter();
|
||||||
OnMessageRouterUpdated?.Invoke();
|
OnMessageRouterUpdated?.Invoke();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -208,12 +208,14 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
if (CurrentResponses == RequiredResponses)
|
if (CurrentResponses == RequiredResponses)
|
||||||
Response = message;
|
Response = message;
|
||||||
|
|
||||||
|
var handled = false;
|
||||||
if (Result?.Success != false)
|
if (Result?.Success != false)
|
||||||
{
|
{
|
||||||
// If an error result is already set don't override that
|
// If an error result is already set don't override that
|
||||||
MessageRouter[typeIdentifier]!.Handle(topicFilter, connection, receiveTime, originalData, message, out var result);
|
MessageRouter.Handle(typeIdentifier, topicFilter, connection, receiveTime, originalData, message, out var result);
|
||||||
Result = result;
|
Result = result;
|
||||||
if (Result == null)
|
handled = Result != null;
|
||||||
|
if (!handled)
|
||||||
// Null from Handle means it wasn't actually for this query
|
// Null from Handle means it wasn't actually for this query
|
||||||
CurrentResponses -= 1;
|
CurrentResponses -= 1;
|
||||||
}
|
}
|
||||||
@ -225,7 +227,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
OnComplete?.Invoke();
|
OnComplete?.Invoke();
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
return handled;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user