mirror of
https://github.com/JKorf/CryptoExchange.Net
synced 2025-06-08 00:16:27 +00:00
wip + tests
This commit is contained in:
parent
fc6503035a
commit
acd9b0d533
@ -1,11 +1,17 @@
|
|||||||
using System;
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
using System.Threading;
|
using System.Threading;
|
||||||
|
using System.Threading.Tasks;
|
||||||
using CryptoExchange.Net.Objects;
|
using CryptoExchange.Net.Objects;
|
||||||
|
using CryptoExchange.Net.Objects.Sockets;
|
||||||
using CryptoExchange.Net.Sockets;
|
using CryptoExchange.Net.Sockets;
|
||||||
using CryptoExchange.Net.UnitTests.TestImplementations;
|
using CryptoExchange.Net.UnitTests.TestImplementations;
|
||||||
|
using CryptoExchange.Net.UnitTests.TestImplementations.Sockets;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
|
using Moq;
|
||||||
using Newtonsoft.Json.Linq;
|
using Newtonsoft.Json.Linq;
|
||||||
using NUnit.Framework;
|
using NUnit.Framework;
|
||||||
|
using NUnit.Framework.Constraints;
|
||||||
|
|
||||||
namespace CryptoExchange.Net.UnitTests
|
namespace CryptoExchange.Net.UnitTests
|
||||||
{
|
{
|
||||||
@ -46,7 +52,7 @@ namespace CryptoExchange.Net.UnitTests
|
|||||||
}
|
}
|
||||||
|
|
||||||
[TestCase]
|
[TestCase]
|
||||||
public void SocketMessages_Should_BeProcessedInDataHandlers()
|
public async Task SocketMessages_Should_BeProcessedInDataHandlers()
|
||||||
{
|
{
|
||||||
// arrange
|
// arrange
|
||||||
var client = new TestSocketClient(options => {
|
var client = new TestSocketClient(options => {
|
||||||
@ -58,28 +64,31 @@ namespace CryptoExchange.Net.UnitTests
|
|||||||
socket.DisconnectTime = DateTime.UtcNow;
|
socket.DisconnectTime = DateTime.UtcNow;
|
||||||
var sub = new SocketConnection(new TraceLogger(), client.SubClient, socket, null);
|
var sub = new SocketConnection(new TraceLogger(), client.SubClient, socket, null);
|
||||||
var rstEvent = new ManualResetEvent(false);
|
var rstEvent = new ManualResetEvent(false);
|
||||||
JToken result = null;
|
Dictionary<string, string> result = null;
|
||||||
sub.AddSubscription(SocketSubscription.CreateForIdentifier(10, "TestHandler", true, false, (messageEvent) =>
|
|
||||||
{
|
|
||||||
result = messageEvent.JsonData;
|
|
||||||
rstEvent.Set();
|
|
||||||
}));
|
|
||||||
client.SubClient.ConnectSocketSub(sub);
|
client.SubClient.ConnectSocketSub(sub);
|
||||||
|
|
||||||
|
sub.AddSubscription(new TestSubscription<Dictionary<string, string>>(Mock.Of<ILogger>(), (messageEvent) =>
|
||||||
|
{
|
||||||
|
result = messageEvent.Data;
|
||||||
|
rstEvent.Set();
|
||||||
|
}));
|
||||||
|
|
||||||
// act
|
// act
|
||||||
socket.InvokeMessage("{\"property\": 123}");
|
await socket.InvokeMessage("{\"property\": \"123\", \"topic\": \"topic\"}");
|
||||||
rstEvent.WaitOne(1000);
|
rstEvent.WaitOne(1000);
|
||||||
|
|
||||||
// assert
|
// assert
|
||||||
Assert.IsTrue((int)result["property"] == 123);
|
Assert.IsTrue(result["property"] == "123");
|
||||||
}
|
}
|
||||||
|
|
||||||
[TestCase(false)]
|
[TestCase(false)]
|
||||||
[TestCase(true)]
|
[TestCase(true)]
|
||||||
public void SocketMessages_Should_ContainOriginalDataIfEnabled(bool enabled)
|
public async Task SocketMessages_Should_ContainOriginalDataIfEnabled(bool enabled)
|
||||||
{
|
{
|
||||||
// arrange
|
// arrange
|
||||||
var client = new TestSocketClient(options => {
|
var client = new TestSocketClient(options =>
|
||||||
|
{
|
||||||
options.ReconnectInterval = TimeSpan.Zero;
|
options.ReconnectInterval = TimeSpan.Zero;
|
||||||
options.SubOptions.OutputOriginalData = enabled;
|
options.SubOptions.OutputOriginalData = enabled;
|
||||||
});
|
});
|
||||||
@ -90,15 +99,16 @@ namespace CryptoExchange.Net.UnitTests
|
|||||||
var sub = new SocketConnection(new TraceLogger(), client.SubClient, socket, null);
|
var sub = new SocketConnection(new TraceLogger(), client.SubClient, socket, null);
|
||||||
var rstEvent = new ManualResetEvent(false);
|
var rstEvent = new ManualResetEvent(false);
|
||||||
string original = null;
|
string original = null;
|
||||||
sub.AddSubscription(SocketSubscription.CreateForIdentifier(10, "TestHandler", true, false, (messageEvent) =>
|
|
||||||
|
client.SubClient.ConnectSocketSub(sub);
|
||||||
|
sub.AddSubscription(new TestSubscription<Dictionary<string, string>>(Mock.Of<ILogger>(), (messageEvent) =>
|
||||||
{
|
{
|
||||||
original = messageEvent.OriginalData;
|
original = messageEvent.OriginalData;
|
||||||
rstEvent.Set();
|
rstEvent.Set();
|
||||||
}));
|
}));
|
||||||
client.SubClient.ConnectSocketSub(sub);
|
|
||||||
|
|
||||||
// act
|
// act
|
||||||
socket.InvokeMessage("{\"property\": 123}");
|
await socket.InvokeMessage("{\"property\": 123}");
|
||||||
rstEvent.WaitOne(1000);
|
rstEvent.WaitOne(1000);
|
||||||
|
|
||||||
// assert
|
// assert
|
||||||
@ -109,16 +119,18 @@ namespace CryptoExchange.Net.UnitTests
|
|||||||
public void UnsubscribingStream_Should_CloseTheSocket()
|
public void UnsubscribingStream_Should_CloseTheSocket()
|
||||||
{
|
{
|
||||||
// arrange
|
// arrange
|
||||||
var client = new TestSocketClient(options => {
|
var client = new TestSocketClient(options =>
|
||||||
|
{
|
||||||
options.ReconnectInterval = TimeSpan.Zero;
|
options.ReconnectInterval = TimeSpan.Zero;
|
||||||
});
|
});
|
||||||
var socket = client.CreateSocket();
|
var socket = client.CreateSocket();
|
||||||
socket.CanConnect = true;
|
socket.CanConnect = true;
|
||||||
var sub = new SocketConnection(new TraceLogger(), client.SubClient, socket, null);
|
var sub = new SocketConnection(new TraceLogger(), client.SubClient, socket, null);
|
||||||
client.SubClient.ConnectSocketSub(sub);
|
client.SubClient.ConnectSocketSub(sub);
|
||||||
var us = SocketSubscription.CreateForIdentifier(10, "Test", true, false, (e) => { });
|
|
||||||
var ups = new UpdateSubscription(sub, us);
|
var subscription = new TestSubscription<Dictionary<string, string>>(Mock.Of<ILogger>(), (messageEvent) => { });
|
||||||
sub.AddSubscription(us);
|
var ups = new UpdateSubscription(sub, subscription);
|
||||||
|
sub.AddSubscription(subscription);
|
||||||
|
|
||||||
// act
|
// act
|
||||||
client.UnsubscribeAsync(ups).Wait();
|
client.UnsubscribeAsync(ups).Wait();
|
||||||
@ -140,12 +152,13 @@ namespace CryptoExchange.Net.UnitTests
|
|||||||
var sub2 = new SocketConnection(new TraceLogger(), client.SubClient, socket2, null);
|
var sub2 = new SocketConnection(new TraceLogger(), client.SubClient, socket2, null);
|
||||||
client.SubClient.ConnectSocketSub(sub1);
|
client.SubClient.ConnectSocketSub(sub1);
|
||||||
client.SubClient.ConnectSocketSub(sub2);
|
client.SubClient.ConnectSocketSub(sub2);
|
||||||
var us1 = SocketSubscription.CreateForIdentifier(10, "Test1", true, false, (e) => { });
|
var subscription1 = new TestSubscription<Dictionary<string, string>>(Mock.Of<ILogger>(), (messageEvent) => { });
|
||||||
var us2 = SocketSubscription.CreateForIdentifier(11, "Test2", true, false, (e) => { });
|
var subscription2 = new TestSubscription<Dictionary<string, string>>(Mock.Of<ILogger>(), (messageEvent) => { });
|
||||||
sub1.AddSubscription(us1);
|
|
||||||
sub2.AddSubscription(us2);
|
sub1.AddSubscription(subscription1);
|
||||||
var ups1 = new UpdateSubscription(sub1, us1);
|
sub2.AddSubscription(subscription2);
|
||||||
var ups2 = new UpdateSubscription(sub2, us2);
|
var ups1 = new UpdateSubscription(sub1, subscription1);
|
||||||
|
var ups2 = new UpdateSubscription(sub2, subscription2);
|
||||||
|
|
||||||
// act
|
// act
|
||||||
client.UnsubscribeAllAsync().Wait();
|
client.UnsubscribeAllAsync().Wait();
|
||||||
|
@ -14,13 +14,13 @@ namespace CryptoExchange.Net.UnitTests
|
|||||||
[TestFixture]
|
[TestFixture]
|
||||||
public class SymbolOrderBookTests
|
public class SymbolOrderBookTests
|
||||||
{
|
{
|
||||||
private static OrderBookOptions defaultOrderBookOptions = new OrderBookOptions();
|
private static readonly OrderBookOptions _defaultOrderBookOptions = new OrderBookOptions();
|
||||||
|
|
||||||
private class TestableSymbolOrderBook : SymbolOrderBook
|
private class TestableSymbolOrderBook : SymbolOrderBook
|
||||||
{
|
{
|
||||||
public TestableSymbolOrderBook() : base(null, "Test", "BTC/USD")
|
public TestableSymbolOrderBook() : base(null, "Test", "BTC/USD")
|
||||||
{
|
{
|
||||||
Initialize(defaultOrderBookOptions);
|
Initialize(_defaultOrderBookOptions);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -0,0 +1,19 @@
|
|||||||
|
using CryptoExchange.Net.Sockets;
|
||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Linq;
|
||||||
|
using System.Text;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
|
namespace CryptoExchange.Net.UnitTests.TestImplementations.Sockets
|
||||||
|
{
|
||||||
|
internal class TestQuery : Query<object>
|
||||||
|
{
|
||||||
|
public override HashSet<string> ListenerIdentifiers { get; set; }
|
||||||
|
|
||||||
|
public TestQuery(string identifier, object request, bool authenticated, int weight = 1) : base(request, authenticated, weight)
|
||||||
|
{
|
||||||
|
ListenerIdentifiers = new HashSet<string> { identifier };
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,36 @@
|
|||||||
|
using CryptoExchange.Net.Objects;
|
||||||
|
using CryptoExchange.Net.Objects.Sockets;
|
||||||
|
using CryptoExchange.Net.Sockets;
|
||||||
|
using CryptoExchange.Net.Sockets.MessageParsing.Interfaces;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Linq;
|
||||||
|
using System.Text;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
|
namespace CryptoExchange.Net.UnitTests.TestImplementations.Sockets
|
||||||
|
{
|
||||||
|
internal class TestSubscription<T> : Subscription<object, object>
|
||||||
|
{
|
||||||
|
private readonly Action<DataEvent<T>> _handler;
|
||||||
|
|
||||||
|
public override HashSet<string> ListenerIdentifiers { get; set; } = new HashSet<string> { "topic" };
|
||||||
|
|
||||||
|
public TestSubscription(ILogger logger, Action<DataEvent<T>> handler) : base(logger, false)
|
||||||
|
{
|
||||||
|
_handler = handler;
|
||||||
|
}
|
||||||
|
|
||||||
|
public override Task<CallResult> DoHandleMessageAsync(SocketConnection connection, DataEvent<object> message)
|
||||||
|
{
|
||||||
|
var data = (T)message.Data;
|
||||||
|
_handler.Invoke(message.As(data));
|
||||||
|
return Task.FromResult(new CallResult(null));
|
||||||
|
}
|
||||||
|
|
||||||
|
public override Type GetMessageType(IMessageAccessor message) => typeof(T);
|
||||||
|
public override Query GetSubQuery(SocketConnection connection) => new TestQuery("sub", new object(), false, 1);
|
||||||
|
public override Query GetUnsubQuery() => new TestQuery("unsub", new object(), false, 1);
|
||||||
|
}
|
||||||
|
}
|
@ -1,4 +1,6 @@
|
|||||||
using System;
|
using System;
|
||||||
|
using System.IO;
|
||||||
|
using System.Net.WebSockets;
|
||||||
using System.Security.Authentication;
|
using System.Security.Authentication;
|
||||||
using System.Text;
|
using System.Text;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
@ -12,16 +14,15 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations
|
|||||||
public bool CanConnect { get; set; }
|
public bool CanConnect { get; set; }
|
||||||
public bool Connected { get; set; }
|
public bool Connected { get; set; }
|
||||||
|
|
||||||
public event Action OnClose;
|
public event Func<Task> OnClose;
|
||||||
|
|
||||||
#pragma warning disable 0067
|
#pragma warning disable 0067
|
||||||
public event Action OnReconnected;
|
public event Func<Task> OnReconnected;
|
||||||
public event Action OnReconnecting;
|
public event Func<Task> OnReconnecting;
|
||||||
#pragma warning restore 0067
|
#pragma warning restore 0067
|
||||||
public event Action<int> OnRequestSent;
|
public event Func<int, Task> OnRequestSent;
|
||||||
public event Action<string> OnMessage;
|
public event Func<WebSocketMessageType, Stream, Task> OnStreamMessage;
|
||||||
public event Action<Exception> OnError;
|
public event Func<Exception, Task> OnError;
|
||||||
public event Action OnOpen;
|
public event Func<Task> OnOpen;
|
||||||
public Func<Task<Uri>> GetReconnectionUrl { get; set; }
|
public Func<Task<Uri>> GetReconnectionUrl { get; set; }
|
||||||
|
|
||||||
public int Id { get; }
|
public int Id { get; }
|
||||||
@ -110,9 +111,10 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations
|
|||||||
OnOpen?.Invoke();
|
OnOpen?.Invoke();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void InvokeMessage(string data)
|
public async Task InvokeMessage(string data)
|
||||||
{
|
{
|
||||||
OnMessage?.Invoke(data);
|
var stream = new MemoryStream(Encoding.UTF8.GetBytes(data));
|
||||||
|
await OnStreamMessage?.Invoke(WebSocketMessageType.Text, stream);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void SetProxy(ApiProxy proxy)
|
public void SetProxy(ApiProxy proxy)
|
||||||
|
@ -7,6 +7,7 @@ using CryptoExchange.Net.Objects;
|
|||||||
using CryptoExchange.Net.Objects.Options;
|
using CryptoExchange.Net.Objects.Options;
|
||||||
using CryptoExchange.Net.Objects.Sockets;
|
using CryptoExchange.Net.Objects.Sockets;
|
||||||
using CryptoExchange.Net.Sockets;
|
using CryptoExchange.Net.Sockets;
|
||||||
|
using CryptoExchange.Net.Sockets.MessageParsing.Interfaces;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
using Moq;
|
using Moq;
|
||||||
using Newtonsoft.Json.Linq;
|
using Newtonsoft.Json.Linq;
|
||||||
@ -89,35 +90,6 @@ namespace CryptoExchange.Net.UnitTests.TestImplementations
|
|||||||
return ConnectSocketAsync(sub).Result;
|
return ConnectSocketAsync(sub).Result;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected internal override bool HandleQueryResponse<T>(SocketConnection s, object request, JToken data, out CallResult<T> callResult)
|
public override string GetListenerIdentifier(IMessageAccessor messageAccessor) => "topic";
|
||||||
{
|
|
||||||
throw new NotImplementedException();
|
|
||||||
}
|
|
||||||
|
|
||||||
protected internal override bool HandleSubscriptionResponse(SocketConnection s, SocketSubscription subscription, object request, JToken message,
|
|
||||||
out CallResult<object> callResult)
|
|
||||||
{
|
|
||||||
throw new NotImplementedException();
|
|
||||||
}
|
|
||||||
|
|
||||||
protected internal override bool MessageMatchesHandler(SocketConnection s, JToken message, object request)
|
|
||||||
{
|
|
||||||
throw new NotImplementedException();
|
|
||||||
}
|
|
||||||
|
|
||||||
protected internal override bool MessageMatchesHandler(SocketConnection s, JToken message, string identifier)
|
|
||||||
{
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected internal override Task<CallResult<bool>> AuthenticateSocketAsync(SocketConnection s)
|
|
||||||
{
|
|
||||||
throw new NotImplementedException();
|
|
||||||
}
|
|
||||||
|
|
||||||
protected internal override Task<bool> UnsubscribeAsync(SocketConnection connection, SocketSubscription s)
|
|
||||||
{
|
|
||||||
throw new NotImplementedException();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -116,9 +116,9 @@ namespace CryptoExchange.Net
|
|||||||
|
|
||||||
var result = await GetResponseAsync<object>(request.Data, deserializer, cancellationToken, true).ConfigureAwait(false);
|
var result = await GetResponseAsync<object>(request.Data, deserializer, cancellationToken, true).ConfigureAwait(false);
|
||||||
if (!result)
|
if (!result)
|
||||||
_logger.Log(LogLevel.Warning, $"[{result.RequestId}] Error received in {result.ResponseTime!.Value.TotalMilliseconds}ms: {result.Error}");
|
_logger.Log(LogLevel.Warning, $"[Req {result.RequestId}] Error received in {result.ResponseTime!.Value.TotalMilliseconds}ms: {result.Error}");
|
||||||
else
|
else
|
||||||
_logger.Log(LogLevel.Debug, $"[{result.RequestId}] Response received in {result.ResponseTime!.Value.TotalMilliseconds}ms{(OutputOriginalData ? (": " + result.OriginalData) : "")}");
|
_logger.Log(LogLevel.Debug, $"[Req {result.RequestId}] Response received in {result.ResponseTime!.Value.TotalMilliseconds}ms{(OutputOriginalData ? (": " + result.OriginalData) : "")}");
|
||||||
|
|
||||||
if (await ShouldRetryRequestAsync(result, currentTry).ConfigureAwait(false))
|
if (await ShouldRetryRequestAsync(result, currentTry).ConfigureAwait(false))
|
||||||
continue;
|
continue;
|
||||||
@ -170,9 +170,9 @@ namespace CryptoExchange.Net
|
|||||||
|
|
||||||
var result = await GetResponseAsync<T>(request.Data, deserializer, cancellationToken, false).ConfigureAwait(false);
|
var result = await GetResponseAsync<T>(request.Data, deserializer, cancellationToken, false).ConfigureAwait(false);
|
||||||
if (!result)
|
if (!result)
|
||||||
_logger.Log(LogLevel.Warning, $"[{result.RequestId}] Error received in {result.ResponseTime!.Value.TotalMilliseconds}ms: {result.Error}");
|
_logger.Log(LogLevel.Warning, $"[Req {result.RequestId}] Error received in {result.ResponseTime!.Value.TotalMilliseconds}ms: {result.Error}");
|
||||||
else
|
else
|
||||||
_logger.Log(LogLevel.Debug, $"[{result.RequestId}] Response received in {result.ResponseTime!.Value.TotalMilliseconds}ms{(OutputOriginalData ? (": " + result.OriginalData) : "")}");
|
_logger.Log(LogLevel.Debug, $"[Req {result.RequestId}] Response received in {result.ResponseTime!.Value.TotalMilliseconds}ms{(OutputOriginalData ? (": " + result.OriginalData) : "")}");
|
||||||
|
|
||||||
if (await ShouldRetryRequestAsync(result, currentTry).ConfigureAwait(false))
|
if (await ShouldRetryRequestAsync(result, currentTry).ConfigureAwait(false))
|
||||||
continue;
|
continue;
|
||||||
@ -224,7 +224,7 @@ namespace CryptoExchange.Net
|
|||||||
var syncTimeResult = await syncTask.ConfigureAwait(false);
|
var syncTimeResult = await syncTask.ConfigureAwait(false);
|
||||||
if (!syncTimeResult)
|
if (!syncTimeResult)
|
||||||
{
|
{
|
||||||
_logger.Log(LogLevel.Debug, $"[{requestId}] Failed to sync time, aborting request: " + syncTimeResult.Error);
|
_logger.Log(LogLevel.Debug, $"[Req {requestId}] Failed to sync time, aborting request: " + syncTimeResult.Error);
|
||||||
return syncTimeResult.As<IRequest>(default);
|
return syncTimeResult.As<IRequest>(default);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -242,11 +242,11 @@ namespace CryptoExchange.Net
|
|||||||
|
|
||||||
if (signed && AuthenticationProvider == null)
|
if (signed && AuthenticationProvider == null)
|
||||||
{
|
{
|
||||||
_logger.Log(LogLevel.Warning, $"[{requestId}] Request {uri.AbsolutePath} failed because no ApiCredentials were provided");
|
_logger.Log(LogLevel.Warning, $"[Req {requestId}] Request {uri.AbsolutePath} failed because no ApiCredentials were provided");
|
||||||
return new CallResult<IRequest>(new NoApiCredentialsError());
|
return new CallResult<IRequest>(new NoApiCredentialsError());
|
||||||
}
|
}
|
||||||
|
|
||||||
_logger.Log(LogLevel.Information, $"[{requestId}] Creating request for " + uri);
|
_logger.Log(LogLevel.Information, $"[Req {requestId}] Creating request for " + uri);
|
||||||
var paramsPosition = parameterPosition ?? ParameterPositions[method];
|
var paramsPosition = parameterPosition ?? ParameterPositions[method];
|
||||||
var request = ConstructRequest(uri, method, parameters?.OrderBy(p => p.Key).ToDictionary(p => p.Key, p => p.Value), signed, paramsPosition, arraySerialization ?? this.arraySerialization, requestBodyFormat ?? this.requestBodyFormat, requestId, additionalHeaders);
|
var request = ConstructRequest(uri, method, parameters?.OrderBy(p => p.Key).ToDictionary(p => p.Key, p => p.Value), signed, paramsPosition, arraySerialization ?? this.arraySerialization, requestBodyFormat ?? this.requestBodyFormat, requestId, additionalHeaders);
|
||||||
|
|
||||||
@ -259,7 +259,7 @@ namespace CryptoExchange.Net
|
|||||||
paramString += " with headers " + string.Join(", ", headers.Select(h => h.Key + $"=[{string.Join(",", h.Value)}]"));
|
paramString += " with headers " + string.Join(", ", headers.Select(h => h.Key + $"=[{string.Join(",", h.Value)}]"));
|
||||||
|
|
||||||
TotalRequestsMade++;
|
TotalRequestsMade++;
|
||||||
_logger.Log(LogLevel.Trace, $"[{requestId}] Sending {method}{(signed ? " signed" : "")} request to {request.Uri}{paramString ?? " "}");
|
_logger.Log(LogLevel.Trace, $"[Req {requestId}] Sending {method}{(signed ? " signed" : "")} request to {request.Uri}{paramString ?? " "}");
|
||||||
return new CallResult<IRequest>(request);
|
return new CallResult<IRequest>(request);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3,12 +3,15 @@ using CryptoExchange.Net.Objects;
|
|||||||
using CryptoExchange.Net.Objects.Options;
|
using CryptoExchange.Net.Objects.Options;
|
||||||
using CryptoExchange.Net.Objects.Sockets;
|
using CryptoExchange.Net.Objects.Sockets;
|
||||||
using CryptoExchange.Net.Sockets;
|
using CryptoExchange.Net.Sockets;
|
||||||
|
using CryptoExchange.Net.Sockets.MessageParsing.Interfaces;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
using System;
|
using System;
|
||||||
using System.Collections.Concurrent;
|
using System.Collections.Concurrent;
|
||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
|
using System.Diagnostics;
|
||||||
using System.IO;
|
using System.IO;
|
||||||
using System.Linq;
|
using System.Linq;
|
||||||
|
using System.Net.Sockets;
|
||||||
using System.Net.WebSockets;
|
using System.Net.WebSockets;
|
||||||
using System.Text;
|
using System.Text;
|
||||||
using System.Threading;
|
using System.Threading;
|
||||||
@ -55,17 +58,16 @@ namespace CryptoExchange.Net
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
protected AsyncResetEvent? periodicEvent;
|
protected AsyncResetEvent? periodicEvent;
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// If true; data which is a response to a query will also be distributed to subscriptions
|
|
||||||
/// If false; data which is a response to a query won't get forwarded to subscriptions as well
|
|
||||||
/// </summary>
|
|
||||||
protected internal bool ContinueOnQueryResponse { get; protected set; }
|
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// If a message is received on the socket which is not handled by a handler this boolean determines whether this logs an error message
|
/// If a message is received on the socket which is not handled by a handler this boolean determines whether this logs an error message
|
||||||
/// </summary>
|
/// </summary>
|
||||||
protected internal bool UnhandledMessageExpected { get; set; }
|
protected internal bool UnhandledMessageExpected { get; set; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// If true a subscription will accept message before the confirmation of a subscription has been received
|
||||||
|
/// </summary>
|
||||||
|
protected bool HandleMessageBeforeConfirmation { get; set; }
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// The rate limiters
|
/// The rate limiters
|
||||||
/// </summary>
|
/// </summary>
|
||||||
@ -181,7 +183,7 @@ namespace CryptoExchange.Net
|
|||||||
var success = socketConnection.CanAddSubscription();
|
var success = socketConnection.CanAddSubscription();
|
||||||
if (!success)
|
if (!success)
|
||||||
{
|
{
|
||||||
_logger.Log(LogLevel.Trace, $"Socket {socketConnection.SocketId} failed to add subscription, retrying on different connection");
|
_logger.Log(LogLevel.Trace, $"[Sckt {socketConnection.SocketId}] failed to add subscription, retrying on different connection");
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -209,18 +211,23 @@ namespace CryptoExchange.Net
|
|||||||
|
|
||||||
if (socketConnection.PausedActivity)
|
if (socketConnection.PausedActivity)
|
||||||
{
|
{
|
||||||
_logger.Log(LogLevel.Warning, $"Socket {socketConnection.SocketId} has been paused, can't subscribe at this moment");
|
_logger.Log(LogLevel.Warning, $"[Sckt {socketConnection.SocketId}] has been paused, can't subscribe at this moment");
|
||||||
return new CallResult<UpdateSubscription>(new ServerError("Socket is paused"));
|
return new CallResult<UpdateSubscription>(new ServerError("Socket is paused"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var waitEvent = new AsyncResetEvent(false);
|
||||||
var subQuery = subscription.GetSubQuery(socketConnection);
|
var subQuery = subscription.GetSubQuery(socketConnection);
|
||||||
if (subQuery != null)
|
if (subQuery != null)
|
||||||
{
|
{
|
||||||
|
if (HandleMessageBeforeConfirmation)
|
||||||
|
socketConnection.AddSubscription(subscription);
|
||||||
|
|
||||||
// Send the request and wait for answer
|
// Send the request and wait for answer
|
||||||
var subResult = await socketConnection.SendAndWaitQueryAsync(subQuery).ConfigureAwait(false);
|
var subResult = await socketConnection.SendAndWaitQueryAsync(subQuery, waitEvent).ConfigureAwait(false);
|
||||||
if (!subResult)
|
if (!subResult)
|
||||||
{
|
{
|
||||||
_logger.Log(LogLevel.Warning, $"Socket {socketConnection.SocketId} failed to subscribe: {subResult.Error}");
|
waitEvent?.Set();
|
||||||
|
_logger.Log(LogLevel.Warning, $"[Sckt {socketConnection.SocketId}] failed to subscribe: {subResult.Error}");
|
||||||
// If this was a timeout we still need to send an unsubscribe to prevent messages coming in later
|
// If this was a timeout we still need to send an unsubscribe to prevent messages coming in later
|
||||||
var unsubscribe = subResult.Error is CancellationRequestedError;
|
var unsubscribe = subResult.Error is CancellationRequestedError;
|
||||||
await socketConnection.CloseAsync(subscription, unsubscribe).ConfigureAwait(false);
|
await socketConnection.CloseAsync(subscription, unsubscribe).ConfigureAwait(false);
|
||||||
@ -236,13 +243,16 @@ namespace CryptoExchange.Net
|
|||||||
{
|
{
|
||||||
subscription.CancellationTokenRegistration = ct.Register(async () =>
|
subscription.CancellationTokenRegistration = ct.Register(async () =>
|
||||||
{
|
{
|
||||||
_logger.Log(LogLevel.Information, $"Socket {socketConnection.SocketId} Cancellation token set, closing subscription {subscription.Id}");
|
_logger.Log(LogLevel.Information, $"[Sckt {socketConnection.SocketId}] Cancellation token set, closing subscription {subscription.Id}");
|
||||||
await socketConnection.CloseAsync(subscription).ConfigureAwait(false);
|
await socketConnection.CloseAsync(subscription).ConfigureAwait(false);
|
||||||
}, false);
|
}, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!HandleMessageBeforeConfirmation)
|
||||||
socketConnection.AddSubscription(subscription);
|
socketConnection.AddSubscription(subscription);
|
||||||
_logger.Log(LogLevel.Information, $"Socket {socketConnection.SocketId} subscription {subscription.Id} completed successfully");
|
|
||||||
|
waitEvent?.Set();
|
||||||
|
_logger.Log(LogLevel.Information, $"[Sckt {socketConnection.SocketId}] subscription {subscription.Id} completed successfully");
|
||||||
return new CallResult<UpdateSubscription>(new UpdateSubscription(socketConnection, subscription));
|
return new CallResult<UpdateSubscription>(new UpdateSubscription(socketConnection, subscription));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -299,7 +309,7 @@ namespace CryptoExchange.Net
|
|||||||
|
|
||||||
if (socketConnection.PausedActivity)
|
if (socketConnection.PausedActivity)
|
||||||
{
|
{
|
||||||
_logger.Log(LogLevel.Warning, $"Socket {socketConnection.SocketId} has been paused, can't send query at this moment");
|
_logger.Log(LogLevel.Warning, $"[Sckt {socketConnection.SocketId}] has been paused, can't send query at this moment");
|
||||||
return new CallResult<T>(new ServerError("Socket is paused"));
|
return new CallResult<T>(new ServerError("Socket is paused"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -340,21 +350,24 @@ namespace CryptoExchange.Net
|
|||||||
if (AuthenticationProvider == null)
|
if (AuthenticationProvider == null)
|
||||||
return new CallResult<bool>(new NoApiCredentialsError());
|
return new CallResult<bool>(new NoApiCredentialsError());
|
||||||
|
|
||||||
_logger.Log(LogLevel.Debug, $"Socket {socket.SocketId} Attempting to authenticate");
|
_logger.Log(LogLevel.Debug, $"[Sckt {socket.SocketId}] Attempting to authenticate");
|
||||||
var authRequest = GetAuthenticationRequest();
|
var authRequest = GetAuthenticationRequest();
|
||||||
|
if (authRequest != null)
|
||||||
|
{
|
||||||
var result = await socket.SendAndWaitQueryAsync(authRequest).ConfigureAwait(false);
|
var result = await socket.SendAndWaitQueryAsync(authRequest).ConfigureAwait(false);
|
||||||
|
|
||||||
if (!result)
|
if (!result)
|
||||||
{
|
{
|
||||||
_logger.Log(LogLevel.Warning, $"Socket {socket.SocketId} authentication failed");
|
_logger.Log(LogLevel.Warning, $"[Sckt {socket.SocketId}] authentication failed");
|
||||||
if (socket.Connected)
|
if (socket.Connected)
|
||||||
await socket.CloseAsync().ConfigureAwait(false);
|
await socket.CloseAsync().ConfigureAwait(false);
|
||||||
|
|
||||||
result.Error!.Message = "Authentication failed: " + result.Error.Message;
|
result.Error!.Message = "Authentication failed: " + result.Error.Message;
|
||||||
return new CallResult<bool>(result.Error)!;
|
return new CallResult<bool>(result.Error)!;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
_logger.Log(LogLevel.Debug, $"Socket {socket.SocketId} authenticated");
|
_logger.Log(LogLevel.Debug, $"[Sckt {socket.SocketId}] authenticated");
|
||||||
socket.Authenticated = true;
|
socket.Authenticated = true;
|
||||||
return new CallResult<bool>(true);
|
return new CallResult<bool>(true);
|
||||||
}
|
}
|
||||||
@ -363,7 +376,7 @@ namespace CryptoExchange.Net
|
|||||||
/// Should return the request which can be used to authenticate a socket connection
|
/// Should return the request which can be used to authenticate a socket connection
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
protected internal virtual Query GetAuthenticationRequest() => throw new NotImplementedException();
|
protected internal virtual Query? GetAuthenticationRequest() => throw new NotImplementedException();
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Adds a system subscription. Used for example to reply to ping requests
|
/// Adds a system subscription. Used for example to reply to ping requests
|
||||||
@ -443,7 +456,6 @@ namespace CryptoExchange.Net
|
|||||||
var socket = CreateSocket(connectionAddress.Data!);
|
var socket = CreateSocket(connectionAddress.Data!);
|
||||||
var socketConnection = new SocketConnection(_logger, this, socket, address);
|
var socketConnection = new SocketConnection(_logger, this, socket, address);
|
||||||
socketConnection.UnhandledMessage += HandleUnhandledMessage;
|
socketConnection.UnhandledMessage += HandleUnhandledMessage;
|
||||||
socketConnection.UnparsedMessage += HandleUnparsedMessage;
|
|
||||||
|
|
||||||
foreach (var systemSubscription in systemSubscriptions)
|
foreach (var systemSubscription in systemSubscriptions)
|
||||||
socketConnection.AddSubscription(systemSubscription);
|
socketConnection.AddSubscription(systemSubscription);
|
||||||
@ -455,15 +467,7 @@ namespace CryptoExchange.Net
|
|||||||
/// Process an unhandled message
|
/// Process an unhandled message
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="message">The message that wasn't processed</param>
|
/// <param name="message">The message that wasn't processed</param>
|
||||||
protected virtual void HandleUnhandledMessage(SocketMessage message)
|
protected virtual void HandleUnhandledMessage(IMessageAccessor message)
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// Process an unparsed message
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="message">The message that wasn't parsed</param>
|
|
||||||
protected virtual void HandleUnparsedMessage(byte[] message)
|
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -507,7 +511,7 @@ namespace CryptoExchange.Net
|
|||||||
protected virtual IWebsocket CreateSocket(string address)
|
protected virtual IWebsocket CreateSocket(string address)
|
||||||
{
|
{
|
||||||
var socket = SocketFactory.CreateWebsocket(_logger, GetWebSocketParameters(address));
|
var socket = SocketFactory.CreateWebsocket(_logger, GetWebSocketParameters(address));
|
||||||
_logger.Log(LogLevel.Debug, $"Socket {socket.Id} new socket created for " + address);
|
_logger.Log(LogLevel.Debug, $"[Sckt {socket.Id}] created for " + address);
|
||||||
return socket;
|
return socket;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -547,7 +551,7 @@ namespace CryptoExchange.Net
|
|||||||
if (query == null)
|
if (query == null)
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
_logger.Log(LogLevel.Trace, $"Socket {socketConnection.SocketId} sending periodic {identifier}");
|
_logger.Log(LogLevel.Trace, $"[Sckt {socketConnection.SocketId}] sending periodic {identifier}");
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
@ -556,7 +560,7 @@ namespace CryptoExchange.Net
|
|||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
_logger.Log(LogLevel.Warning, $"Socket {socketConnection.SocketId} Periodic send {identifier} failed: " + ex.ToLogString());
|
_logger.Log(LogLevel.Warning, $"[Sckt {socketConnection.SocketId}] Periodic send {identifier} failed: " + ex.ToLogString());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -585,7 +589,7 @@ namespace CryptoExchange.Net
|
|||||||
if (subscription == null || connection == null)
|
if (subscription == null || connection == null)
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
_logger.Log(LogLevel.Information, $"Socket {connection.SocketId} Unsubscribing subscription " + subscriptionId);
|
_logger.Log(LogLevel.Information, $"[Sckt {connection.SocketId}] unsubscribing subscription " + subscriptionId);
|
||||||
await connection.CloseAsync(subscription).ConfigureAwait(false);
|
await connection.CloseAsync(subscription).ConfigureAwait(false);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@ -600,7 +604,7 @@ namespace CryptoExchange.Net
|
|||||||
if (subscription == null)
|
if (subscription == null)
|
||||||
throw new ArgumentNullException(nameof(subscription));
|
throw new ArgumentNullException(nameof(subscription));
|
||||||
|
|
||||||
_logger.Log(LogLevel.Information, $"Socket {subscription.SocketId} Unsubscribing subscription " + subscription.Id);
|
_logger.Log(LogLevel.Information, $"[Sckt {subscription.SocketId}] Unsubscribing subscription " + subscription.Id);
|
||||||
await subscription.CloseAsync().ConfigureAwait(false);
|
await subscription.CloseAsync().ConfigureAwait(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -692,9 +696,9 @@ namespace CryptoExchange.Net
|
|||||||
/// <summary>
|
/// <summary>
|
||||||
/// Get the listener identifier for the message
|
/// Get the listener identifier for the message
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="message"></param>
|
/// <param name="messageAccessor"></param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public abstract string GetListenerIdentifier(SocketMessage message);
|
public abstract string? GetListenerIdentifier(IMessageAccessor messageAccessor);
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Preprocess a stream message
|
/// Preprocess a stream message
|
||||||
|
@ -47,7 +47,7 @@ namespace CryptoExchange.Net.Converters
|
|||||||
mapping = AddMapping(enumType);
|
mapping = AddMapping(enumType);
|
||||||
|
|
||||||
var stringValue = reader.Value?.ToString();
|
var stringValue = reader.Value?.ToString();
|
||||||
if (stringValue == null)
|
if (stringValue == null || stringValue == "")
|
||||||
{
|
{
|
||||||
// Received null value
|
// Received null value
|
||||||
var emptyResult = GetDefaultValue(objectType, enumType);
|
var emptyResult = GetDefaultValue(objectType, enumType);
|
||||||
|
@ -144,6 +144,16 @@ namespace CryptoExchange.Net
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Return the last unique id that was generated
|
||||||
|
/// </summary>
|
||||||
|
/// <returns></returns>
|
||||||
|
public static int LastId()
|
||||||
|
{
|
||||||
|
lock (_idLock)
|
||||||
|
return _lastId;
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Generate a random string of specified length
|
/// Generate a random string of specified length
|
||||||
/// </summary>
|
/// </summary>
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
using CryptoExchange.Net.Objects;
|
using CryptoExchange.Net.Objects;
|
||||||
using CryptoExchange.Net.Objects.Sockets;
|
using CryptoExchange.Net.Objects.Sockets;
|
||||||
using CryptoExchange.Net.Sockets;
|
using CryptoExchange.Net.Sockets;
|
||||||
|
using CryptoExchange.Net.Sockets.MessageParsing.Interfaces;
|
||||||
using System;
|
using System;
|
||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
@ -30,8 +31,15 @@ namespace CryptoExchange.Net.Interfaces
|
|||||||
/// <summary>
|
/// <summary>
|
||||||
/// Get the type the message should be deserialized to
|
/// Get the type the message should be deserialized to
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="message"></param>
|
/// <param name="messageAccessor"></param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
Type? GetMessageType(SocketMessage message);
|
Type? GetMessageType(IMessageAccessor messageAccessor);
|
||||||
|
/// <summary>
|
||||||
|
/// Deserialize a message int oobject of type
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="accessor"></param>
|
||||||
|
/// <param name="type"></param>
|
||||||
|
/// <returns></returns>
|
||||||
|
object Deserialize(IMessageAccessor accessor, Type type);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -24,7 +24,7 @@ namespace CryptoExchange.Net.Interfaces
|
|||||||
/// <summary>
|
/// <summary>
|
||||||
/// The factory for creating sockets. Used for unit testing
|
/// The factory for creating sockets. Used for unit testing
|
||||||
/// </summary>
|
/// </summary>
|
||||||
IWebsocketFactory SocketFactory { get; }
|
IWebsocketFactory SocketFactory { get; set; }
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Current client options
|
/// Current client options
|
||||||
/// </summary>
|
/// </summary>
|
||||||
|
@ -70,7 +70,7 @@ namespace CryptoExchange.Net.Objects
|
|||||||
/// <param name="originalData"></param>
|
/// <param name="originalData"></param>
|
||||||
/// <param name="error"></param>
|
/// <param name="error"></param>
|
||||||
#pragma warning disable 8618
|
#pragma warning disable 8618
|
||||||
protected CallResult([AllowNull]T data, string? originalData, Error? error): base(error)
|
public CallResult([AllowNull]T data, string? originalData, Error? error): base(error)
|
||||||
#pragma warning restore 8618
|
#pragma warning restore 8618
|
||||||
{
|
{
|
||||||
OriginalData = originalData;
|
OriginalData = originalData;
|
||||||
@ -91,6 +91,13 @@ namespace CryptoExchange.Net.Objects
|
|||||||
/// <param name="error">The erro rto return</param>
|
/// <param name="error">The erro rto return</param>
|
||||||
public CallResult(Error error) : this(default, null, error) { }
|
public CallResult(Error error) : this(default, null, error) { }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Create a new error result
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="error">The error to return</param>
|
||||||
|
/// <param name="originalData">The original response data</param>
|
||||||
|
public CallResult(Error error, string? originalData) : this(default, originalData, error) { }
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Overwrite bool check so we can use if(callResult) instead of if(callResult.Success)
|
/// Overwrite bool check so we can use if(callResult) instead of if(callResult.Success)
|
||||||
/// </summary>
|
/// </summary>
|
||||||
|
@ -278,7 +278,7 @@ namespace CryptoExchange.Net.Objects
|
|||||||
/// <param name="code"></param>
|
/// <param name="code"></param>
|
||||||
/// <param name="message"></param>
|
/// <param name="message"></param>
|
||||||
/// <param name="data"></param>
|
/// <param name="data"></param>
|
||||||
protected CancellationRequestedError(int? code, string message, object? data): base(code, message, data) { }
|
public CancellationRequestedError(int? code, string message, object? data): base(code, message, data) { }
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
|
@ -183,7 +183,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
|
|
||||||
private async Task<bool> ConnectInternalAsync()
|
private async Task<bool> ConnectInternalAsync()
|
||||||
{
|
{
|
||||||
_logger.Log(LogLevel.Debug, $"Socket {Id} connecting");
|
_logger.Log(LogLevel.Debug, $"[Sckt {Id}] connecting");
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
using CancellationTokenSource tcs = new(TimeSpan.FromSeconds(10));
|
using CancellationTokenSource tcs = new(TimeSpan.FromSeconds(10));
|
||||||
@ -191,11 +191,11 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
}
|
}
|
||||||
catch (Exception e)
|
catch (Exception e)
|
||||||
{
|
{
|
||||||
_logger.Log(LogLevel.Debug, $"Socket {Id} connection failed: " + e.ToLogString());
|
_logger.Log(LogLevel.Debug, $"[Sckt {Id}] connection failed: " + e.ToLogString());
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
_logger.Log(LogLevel.Debug, $"Socket {Id} connected to {Uri}");
|
_logger.Log(LogLevel.Debug, $"[Sckt {Id}] connected to {Uri}");
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -204,13 +204,13 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
{
|
{
|
||||||
while (!_stopRequested)
|
while (!_stopRequested)
|
||||||
{
|
{
|
||||||
_logger.Log(LogLevel.Debug, $"Socket {Id} starting processing tasks");
|
_logger.Log(LogLevel.Debug, $"[Sckt {Id}] starting processing tasks");
|
||||||
_processState = ProcessState.Processing;
|
_processState = ProcessState.Processing;
|
||||||
var sendTask = SendLoopAsync();
|
var sendTask = SendLoopAsync();
|
||||||
var receiveTask = ReceiveLoopAsync();
|
var receiveTask = ReceiveLoopAsync();
|
||||||
var timeoutTask = Parameters.Timeout != null && Parameters.Timeout > TimeSpan.FromSeconds(0) ? CheckTimeoutAsync() : Task.CompletedTask;
|
var timeoutTask = Parameters.Timeout != null && Parameters.Timeout > TimeSpan.FromSeconds(0) ? CheckTimeoutAsync() : Task.CompletedTask;
|
||||||
await Task.WhenAll(sendTask, receiveTask, timeoutTask).ConfigureAwait(false);
|
await Task.WhenAll(sendTask, receiveTask, timeoutTask).ConfigureAwait(false);
|
||||||
_logger.Log(LogLevel.Debug, $"Socket {Id} processing tasks finished");
|
_logger.Log(LogLevel.Debug, $"[Sckt {Id}] processing tasks finished");
|
||||||
|
|
||||||
_processState = ProcessState.WaitingForClose;
|
_processState = ProcessState.WaitingForClose;
|
||||||
while (_closeTask == null)
|
while (_closeTask == null)
|
||||||
@ -238,14 +238,14 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
|
|
||||||
while (!_stopRequested)
|
while (!_stopRequested)
|
||||||
{
|
{
|
||||||
_logger.Log(LogLevel.Debug, $"Socket {Id} attempting to reconnect");
|
_logger.Log(LogLevel.Debug, $"[Sckt {Id}] attempting to reconnect");
|
||||||
var task = GetReconnectionUrl?.Invoke();
|
var task = GetReconnectionUrl?.Invoke();
|
||||||
if (task != null)
|
if (task != null)
|
||||||
{
|
{
|
||||||
var reconnectUri = await task.ConfigureAwait(false);
|
var reconnectUri = await task.ConfigureAwait(false);
|
||||||
if (reconnectUri != null && Parameters.Uri != reconnectUri)
|
if (reconnectUri != null && Parameters.Uri != reconnectUri)
|
||||||
{
|
{
|
||||||
_logger.Log(LogLevel.Debug, $"Socket {Id} reconnect URI set to {reconnectUri}");
|
_logger.Log(LogLevel.Debug, $"[Sckt {Id}] reconnect URI set to {reconnectUri}");
|
||||||
Parameters.Uri = reconnectUri;
|
Parameters.Uri = reconnectUri;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -278,7 +278,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
return;
|
return;
|
||||||
|
|
||||||
var bytes = Parameters.Encoding.GetBytes(data);
|
var bytes = Parameters.Encoding.GetBytes(data);
|
||||||
_logger.Log(LogLevel.Trace, $"Socket {Id} msg {id} - Adding {bytes.Length} bytes to send buffer");
|
_logger.Log(LogLevel.Trace, $"[Sckt {Id}] msg {id} - Adding {bytes.Length} bytes to send buffer");
|
||||||
_sendBuffer.Enqueue(new SendItem { Id = id, Weight = weight, Bytes = bytes });
|
_sendBuffer.Enqueue(new SendItem { Id = id, Weight = weight, Bytes = bytes });
|
||||||
_sendEvent.Set();
|
_sendEvent.Set();
|
||||||
}
|
}
|
||||||
@ -289,7 +289,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
if (_processState != ProcessState.Processing && IsOpen)
|
if (_processState != ProcessState.Processing && IsOpen)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
_logger.Log(LogLevel.Debug, $"Socket {Id} reconnect requested");
|
_logger.Log(LogLevel.Debug, $"[Sckt {Id}] reconnect requested");
|
||||||
_closeTask = CloseInternalAsync();
|
_closeTask = CloseInternalAsync();
|
||||||
await _closeTask.ConfigureAwait(false);
|
await _closeTask.ConfigureAwait(false);
|
||||||
}
|
}
|
||||||
@ -304,18 +304,18 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
{
|
{
|
||||||
if (_closeTask?.IsCompleted == false)
|
if (_closeTask?.IsCompleted == false)
|
||||||
{
|
{
|
||||||
_logger.Log(LogLevel.Debug, $"Socket {Id} CloseAsync() waiting for existing close task");
|
_logger.Log(LogLevel.Debug, $"[Sckt {Id}] CloseAsync() waiting for existing close task");
|
||||||
await _closeTask.ConfigureAwait(false);
|
await _closeTask.ConfigureAwait(false);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!IsOpen)
|
if (!IsOpen)
|
||||||
{
|
{
|
||||||
_logger.Log(LogLevel.Debug, $"Socket {Id} CloseAsync() socket not open");
|
_logger.Log(LogLevel.Debug, $"[Sckt {Id}] CloseAsync() socket not open");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
_logger.Log(LogLevel.Debug, $"Socket {Id} closing");
|
_logger.Log(LogLevel.Debug, $"[Sckt {Id}] closing");
|
||||||
_closeTask = CloseInternalAsync();
|
_closeTask = CloseInternalAsync();
|
||||||
}
|
}
|
||||||
finally
|
finally
|
||||||
@ -327,7 +327,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
if(_processTask != null)
|
if(_processTask != null)
|
||||||
await _processTask.ConfigureAwait(false);
|
await _processTask.ConfigureAwait(false);
|
||||||
await (OnClose?.Invoke() ?? Task.CompletedTask).ConfigureAwait(false);
|
await (OnClose?.Invoke() ?? Task.CompletedTask).ConfigureAwait(false);
|
||||||
_logger.Log(LogLevel.Debug, $"Socket {Id} closed");
|
_logger.Log(LogLevel.Debug, $"[Sckt {Id}] closed");
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -379,11 +379,11 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
if (_disposed)
|
if (_disposed)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
_logger.Log(LogLevel.Debug, $"Socket {Id} disposing");
|
_logger.Log(LogLevel.Debug, $"[Sckt {Id}] disposing");
|
||||||
_disposed = true;
|
_disposed = true;
|
||||||
_socket.Dispose();
|
_socket.Dispose();
|
||||||
_ctsSource.Dispose();
|
_ctsSource.Dispose();
|
||||||
_logger.Log(LogLevel.Trace, $"Socket {Id} disposed");
|
_logger.Log(LogLevel.Trace, $"[Sckt {Id}] disposed");
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -415,7 +415,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
if (limitResult.Success)
|
if (limitResult.Success)
|
||||||
{
|
{
|
||||||
if (limitResult.Data > 0)
|
if (limitResult.Data > 0)
|
||||||
_logger.Log(LogLevel.Debug, $"Socket {Id} msg {data.Id} - send delayed {limitResult.Data}ms because of rate limit");
|
_logger.Log(LogLevel.Debug, $"[Sckt {Id}] msg {data.Id} - send delayed {limitResult.Data}ms because of rate limit");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -424,7 +424,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
{
|
{
|
||||||
await _socket.SendAsync(new ArraySegment<byte>(data.Bytes, 0, data.Bytes.Length), WebSocketMessageType.Text, true, _ctsSource.Token).ConfigureAwait(false);
|
await _socket.SendAsync(new ArraySegment<byte>(data.Bytes, 0, data.Bytes.Length), WebSocketMessageType.Text, true, _ctsSource.Token).ConfigureAwait(false);
|
||||||
await (OnRequestSent?.Invoke(data.Id) ?? Task.CompletedTask).ConfigureAwait(false);
|
await (OnRequestSent?.Invoke(data.Id) ?? Task.CompletedTask).ConfigureAwait(false);
|
||||||
_logger.Log(LogLevel.Trace, $"Socket {Id} msg {data.Id} - sent {data.Bytes.Length} bytes");
|
_logger.Log(LogLevel.Trace, $"[Sckt {Id}] msg {data.Id} - sent {data.Bytes.Length} bytes");
|
||||||
}
|
}
|
||||||
catch (OperationCanceledException)
|
catch (OperationCanceledException)
|
||||||
{
|
{
|
||||||
@ -447,13 +447,13 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
// Because this is running in a separate task and not awaited until the socket gets closed
|
// Because this is running in a separate task and not awaited until the socket gets closed
|
||||||
// any exception here will crash the send processing, but do so silently unless the socket get's stopped.
|
// any exception here will crash the send processing, but do so silently unless the socket get's stopped.
|
||||||
// Make sure we at least let the owner know there was an error
|
// Make sure we at least let the owner know there was an error
|
||||||
_logger.Log(LogLevel.Warning, $"Socket {Id} send loop stopped with exception");
|
_logger.Log(LogLevel.Warning, $"[Sckt {Id}] send loop stopped with exception");
|
||||||
await (OnError?.Invoke(e) ?? Task.CompletedTask).ConfigureAwait(false);
|
await (OnError?.Invoke(e) ?? Task.CompletedTask).ConfigureAwait(false);
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
finally
|
finally
|
||||||
{
|
{
|
||||||
_logger.Log(LogLevel.Debug, $"Socket {Id} send loop finished");
|
_logger.Log(LogLevel.Debug, $"[Sckt {Id}] send loop finished");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -501,7 +501,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
if (receiveResult.MessageType == WebSocketMessageType.Close)
|
if (receiveResult.MessageType == WebSocketMessageType.Close)
|
||||||
{
|
{
|
||||||
// Connection closed unexpectedly
|
// Connection closed unexpectedly
|
||||||
_logger.Log(LogLevel.Debug, $"Socket {Id} received `Close` message");
|
_logger.Log(LogLevel.Debug, $"[Sckt {Id}] received `Close` message");
|
||||||
if (_closeTask?.IsCompleted != false)
|
if (_closeTask?.IsCompleted != false)
|
||||||
_closeTask = CloseInternalAsync();
|
_closeTask = CloseInternalAsync();
|
||||||
break;
|
break;
|
||||||
@ -512,7 +512,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
// We received data, but it is not complete, write it to a memory stream for reassembling
|
// We received data, but it is not complete, write it to a memory stream for reassembling
|
||||||
multiPartMessage = true;
|
multiPartMessage = true;
|
||||||
memoryStream ??= new MemoryStream();
|
memoryStream ??= new MemoryStream();
|
||||||
_logger.Log(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in partial message");
|
_logger.Log(LogLevel.Trace, $"[Sckt {Id}] received {receiveResult.Count} bytes in partial message");
|
||||||
await memoryStream.WriteAsync(buffer.Array, buffer.Offset, receiveResult.Count).ConfigureAwait(false);
|
await memoryStream.WriteAsync(buffer.Array, buffer.Offset, receiveResult.Count).ConfigureAwait(false);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
@ -520,13 +520,13 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
if (!multiPartMessage)
|
if (!multiPartMessage)
|
||||||
{
|
{
|
||||||
// Received a complete message and it's not multi part
|
// Received a complete message and it's not multi part
|
||||||
_logger.Log(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in single message");
|
_logger.Log(LogLevel.Trace, $"[Sckt {Id}] received {receiveResult.Count} bytes in single message");
|
||||||
await ProcessData(receiveResult.MessageType, new MemoryStream(buffer.Array, buffer.Offset, receiveResult.Count)).ConfigureAwait(false);
|
await ProcessData(receiveResult.MessageType, new MemoryStream(buffer.Array, buffer.Offset, receiveResult.Count)).ConfigureAwait(false);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
// Received the end of a multipart message, write to memory stream for reassembling
|
// Received the end of a multipart message, write to memory stream for reassembling
|
||||||
_logger.Log(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in partial message");
|
_logger.Log(LogLevel.Trace, $"[Sckt {Id}] received {receiveResult.Count} bytes in partial message");
|
||||||
await memoryStream!.WriteAsync(buffer.Array, buffer.Offset, receiveResult.Count).ConfigureAwait(false);
|
await memoryStream!.WriteAsync(buffer.Array, buffer.Offset, receiveResult.Count).ConfigureAwait(false);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
@ -554,13 +554,13 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
if (receiveResult?.EndOfMessage == true)
|
if (receiveResult?.EndOfMessage == true)
|
||||||
{
|
{
|
||||||
// Reassemble complete message from memory stream
|
// Reassemble complete message from memory stream
|
||||||
_logger.Log(LogLevel.Trace, $"Socket {Id} reassembled message of {memoryStream!.Length} bytes");
|
_logger.Log(LogLevel.Trace, $"[Sckt {Id}] reassembled message of {memoryStream!.Length} bytes");
|
||||||
await ProcessData(receiveResult.MessageType, memoryStream).ConfigureAwait(false);
|
await ProcessData(receiveResult.MessageType, memoryStream).ConfigureAwait(false);
|
||||||
memoryStream.Dispose();
|
memoryStream.Dispose();
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
_logger.Log(LogLevel.Trace, $"Socket {Id} discarding incomplete message of {memoryStream!.Length} bytes");
|
_logger.Log(LogLevel.Trace, $"[Sckt {Id}] discarding incomplete message of {memoryStream!.Length} bytes");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -570,13 +570,13 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
// Because this is running in a separate task and not awaited until the socket gets closed
|
// Because this is running in a separate task and not awaited until the socket gets closed
|
||||||
// any exception here will crash the receive processing, but do so silently unless the socket gets stopped.
|
// any exception here will crash the receive processing, but do so silently unless the socket gets stopped.
|
||||||
// Make sure we at least let the owner know there was an error
|
// Make sure we at least let the owner know there was an error
|
||||||
_logger.Log(LogLevel.Warning, $"Socket {Id} receive loop stopped with exception");
|
_logger.Log(LogLevel.Warning, $"[Sckt {Id}] receive loop stopped with exception");
|
||||||
await (OnError?.Invoke(e) ?? Task.CompletedTask).ConfigureAwait(false);
|
await (OnError?.Invoke(e) ?? Task.CompletedTask).ConfigureAwait(false);
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
finally
|
finally
|
||||||
{
|
{
|
||||||
_logger.Log(LogLevel.Debug, $"Socket {Id} receive loop finished");
|
_logger.Log(LogLevel.Debug, $"[Sckt {Id}] receive loop finished");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -601,7 +601,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
protected async Task CheckTimeoutAsync()
|
protected async Task CheckTimeoutAsync()
|
||||||
{
|
{
|
||||||
_logger.Log(LogLevel.Debug, $"Socket {Id} starting task checking for no data received for {Parameters.Timeout}");
|
_logger.Log(LogLevel.Debug, $"[Sckt {Id}] starting task checking for no data received for {Parameters.Timeout}");
|
||||||
LastActionTime = DateTime.UtcNow;
|
LastActionTime = DateTime.UtcNow;
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
@ -612,7 +612,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
|
|
||||||
if (DateTime.UtcNow - LastActionTime > Parameters.Timeout)
|
if (DateTime.UtcNow - LastActionTime > Parameters.Timeout)
|
||||||
{
|
{
|
||||||
_logger.Log(LogLevel.Warning, $"Socket {Id} no data received for {Parameters.Timeout}, reconnecting socket");
|
_logger.Log(LogLevel.Warning, $"[Sckt {Id}] no data received for {Parameters.Timeout}, reconnecting socket");
|
||||||
_ = ReconnectAsync().ConfigureAwait(false);
|
_ = ReconnectAsync().ConfigureAwait(false);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -1,4 +1,6 @@
|
|||||||
using System;
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.IO;
|
||||||
|
|
||||||
namespace CryptoExchange.Net.Sockets.MessageParsing.Interfaces
|
namespace CryptoExchange.Net.Sockets.MessageParsing.Interfaces
|
||||||
{
|
{
|
||||||
@ -12,6 +14,15 @@ namespace CryptoExchange.Net.Sockets.MessageParsing.Interfaces
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
bool IsJson { get; }
|
bool IsJson { get; }
|
||||||
/// <summary>
|
/// <summary>
|
||||||
|
/// The underlying data object
|
||||||
|
/// </summary>
|
||||||
|
object? Underlying { get; }
|
||||||
|
/// <summary>
|
||||||
|
/// Load a stream message
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="stream"></param>
|
||||||
|
void Load(Stream stream);
|
||||||
|
/// <summary>
|
||||||
/// Get the type of node
|
/// Get the type of node
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
@ -30,10 +41,18 @@ namespace CryptoExchange.Net.Sockets.MessageParsing.Interfaces
|
|||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
T? GetValue<T>(MessagePath path);
|
T? GetValue<T>(MessagePath path);
|
||||||
/// <summary>
|
/// <summary>
|
||||||
|
/// Get the values of an array
|
||||||
|
/// </summary>
|
||||||
|
/// <typeparam name="T"></typeparam>
|
||||||
|
/// <param name="path"></param>
|
||||||
|
/// <returns></returns>
|
||||||
|
List<T?>? GetValues<T>(MessagePath path);
|
||||||
|
/// <summary>
|
||||||
/// Deserialize the message into this type
|
/// Deserialize the message into this type
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="type"></param>
|
/// <param name="type"></param>
|
||||||
|
/// <param name="path"></param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
object Deserialize(Type type);
|
object Deserialize(Type type, MessagePath? path = null);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,15 @@
|
|||||||
|
namespace CryptoExchange.Net.Sockets.MessageParsing.Interfaces
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Serializer interface
|
||||||
|
/// </summary>
|
||||||
|
public interface IMessageSerializer
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Serialize an object to a string
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="message"></param>
|
||||||
|
/// <returns></returns>
|
||||||
|
string Serialize(object message);
|
||||||
|
}
|
||||||
|
}
|
@ -3,7 +3,9 @@ using CryptoExchange.Net.Sockets.MessageParsing.Interfaces;
|
|||||||
using Newtonsoft.Json;
|
using Newtonsoft.Json;
|
||||||
using Newtonsoft.Json.Linq;
|
using Newtonsoft.Json.Linq;
|
||||||
using System;
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
using System.IO;
|
using System.IO;
|
||||||
|
using System.Linq;
|
||||||
using System.Text;
|
using System.Text;
|
||||||
|
|
||||||
namespace CryptoExchange.Net.Sockets.MessageParsing
|
namespace CryptoExchange.Net.Sockets.MessageParsing
|
||||||
@ -11,20 +13,20 @@ namespace CryptoExchange.Net.Sockets.MessageParsing
|
|||||||
/// <summary>
|
/// <summary>
|
||||||
/// Json.Net message accessor
|
/// Json.Net message accessor
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public class JsonNetMessageData : IMessageAccessor
|
public class JsonNetMessageAccessor : IMessageAccessor
|
||||||
{
|
{
|
||||||
private readonly JToken? _token;
|
private JToken? _token;
|
||||||
private readonly Stream _stream;
|
private Stream? _stream;
|
||||||
private static JsonSerializer _serializer = JsonSerializer.Create(SerializerOptions.WithConverters);
|
private static JsonSerializer _serializer = JsonSerializer.Create(SerializerOptions.WithConverters);
|
||||||
|
|
||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
public bool IsJson { get; private set; }
|
public bool IsJson { get; private set; }
|
||||||
|
|
||||||
/// <summary>
|
/// <inheritdoc />
|
||||||
/// ctor
|
public object? Underlying => _token;
|
||||||
/// </summary>
|
|
||||||
/// <param name="stream"></param>
|
/// <inheritdoc />
|
||||||
public JsonNetMessageData(Stream stream)
|
public void Load(Stream stream)
|
||||||
{
|
{
|
||||||
_stream = stream;
|
_stream = stream;
|
||||||
using var reader = new StreamReader(stream, Encoding.UTF8, false, (int)stream.Length, true);
|
using var reader = new StreamReader(stream, Encoding.UTF8, false, (int)stream.Length, true);
|
||||||
@ -43,7 +45,7 @@ namespace CryptoExchange.Net.Sockets.MessageParsing
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
public object Deserialize(Type type)
|
public object Deserialize(Type type, MessagePath? path = null)
|
||||||
{
|
{
|
||||||
if (!IsJson)
|
if (!IsJson)
|
||||||
{
|
{
|
||||||
@ -51,7 +53,11 @@ namespace CryptoExchange.Net.Sockets.MessageParsing
|
|||||||
return sr.ReadToEnd();
|
return sr.ReadToEnd();
|
||||||
}
|
}
|
||||||
|
|
||||||
return _token!.ToObject(type, _serializer)!;
|
var source = _token;
|
||||||
|
if (path != null)
|
||||||
|
source = GetPathNode(path.Value);
|
||||||
|
|
||||||
|
return source!.ToObject(type, _serializer)!;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
@ -98,27 +104,48 @@ namespace CryptoExchange.Net.Sockets.MessageParsing
|
|||||||
return value!.Value<T>();
|
return value!.Value<T>();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public List<T?>? GetValues<T>(MessagePath path)
|
||||||
|
{
|
||||||
|
var value = GetPathNode(path);
|
||||||
|
if (value == null)
|
||||||
|
return default;
|
||||||
|
|
||||||
|
if (value.Type == JTokenType.Object)
|
||||||
|
return default;
|
||||||
|
|
||||||
|
return value!.Values<T>().ToList();
|
||||||
|
}
|
||||||
|
|
||||||
private JToken? GetPathNode(MessagePath path)
|
private JToken? GetPathNode(MessagePath path)
|
||||||
{
|
{
|
||||||
var currentToken = _token;
|
var currentToken = _token;
|
||||||
foreach (var node in path)
|
foreach (var node in path)
|
||||||
{
|
{
|
||||||
if (node.Type)
|
if (node.Type == 0)
|
||||||
{
|
{
|
||||||
// Int value
|
// Int value
|
||||||
var val = (int)node.Value;
|
var val = (int)node.Value!;
|
||||||
if (currentToken!.Type != JTokenType.Array || ((JArray)currentToken).Count <= val)
|
if (currentToken!.Type != JTokenType.Array || ((JArray)currentToken).Count <= val)
|
||||||
return null;
|
return null;
|
||||||
|
|
||||||
currentToken = currentToken[val];
|
currentToken = currentToken[val];
|
||||||
}
|
}
|
||||||
else
|
else if (node.Type == 1)
|
||||||
{
|
{
|
||||||
// String value
|
// String value
|
||||||
if (currentToken!.Type != JTokenType.Object)
|
if (currentToken!.Type != JTokenType.Object)
|
||||||
return null;
|
return null;
|
||||||
|
|
||||||
currentToken = currentToken[(string)node.Value];
|
currentToken = currentToken[(string)node.Value!];
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// Property name
|
||||||
|
if (currentToken!.Type != JTokenType.Object)
|
||||||
|
return null;
|
||||||
|
|
||||||
|
currentToken = (currentToken.First as JProperty)?.Name;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (currentToken == null)
|
if (currentToken == null)
|
||||||
|
@ -0,0 +1,12 @@
|
|||||||
|
using CryptoExchange.Net.Sockets.MessageParsing.Interfaces;
|
||||||
|
using Newtonsoft.Json;
|
||||||
|
|
||||||
|
namespace CryptoExchange.Net.Sockets.MessageParsing
|
||||||
|
{
|
||||||
|
/// <inheritdoc />
|
||||||
|
public class JsonNetSerializer : IMessageSerializer
|
||||||
|
{
|
||||||
|
/// <inheritdoc />
|
||||||
|
public string Serialize(object message) => JsonConvert.SerializeObject(message, Formatting.None);
|
||||||
|
}
|
||||||
|
}
|
@ -8,13 +8,13 @@
|
|||||||
/// <summary>
|
/// <summary>
|
||||||
/// Value
|
/// Value
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public object Value { get; }
|
public object? Value { get; }
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Type (true = int, false = string)
|
/// Type (0 = int, 1 = string, 2 = prop name)
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public bool Type { get; }
|
public int Type { get; }
|
||||||
|
|
||||||
private NodeAccessor(object value, bool type)
|
private NodeAccessor(object? value, int type)
|
||||||
{
|
{
|
||||||
Value = value;
|
Value = value;
|
||||||
Type = type;
|
Type = type;
|
||||||
@ -25,14 +25,20 @@
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="value"></param>
|
/// <param name="value"></param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public static NodeAccessor Int(int value) { return new NodeAccessor(value, true); }
|
public static NodeAccessor Int(int value) { return new NodeAccessor(value, 0); }
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Create a string node accessor
|
/// Create a string node accessor
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="value"></param>
|
/// <param name="value"></param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public static NodeAccessor String(string value) { return new NodeAccessor(value, false); }
|
public static NodeAccessor String(string value) { return new NodeAccessor(value, 1); }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Create a property name node accessor
|
||||||
|
/// </summary>
|
||||||
|
/// <returns></returns>
|
||||||
|
public static NodeAccessor PropertyName() { return new NodeAccessor(null, 2); }
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -17,6 +17,17 @@
|
|||||||
return path;
|
return path;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Add a property name node accessor
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="path"></param>
|
||||||
|
/// <returns></returns>
|
||||||
|
public static MessagePath PropertyName(this MessagePath path)
|
||||||
|
{
|
||||||
|
path.Add(NodeAccessor.PropertyName());
|
||||||
|
return path;
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Add a int node accessor
|
/// Add a int node accessor
|
||||||
/// </summary>
|
/// </summary>
|
||||||
|
@ -1,8 +1,10 @@
|
|||||||
using CryptoExchange.Net.Interfaces;
|
using CryptoExchange.Net.Interfaces;
|
||||||
using CryptoExchange.Net.Objects;
|
using CryptoExchange.Net.Objects;
|
||||||
using CryptoExchange.Net.Objects.Sockets;
|
using CryptoExchange.Net.Objects.Sockets;
|
||||||
|
using CryptoExchange.Net.Sockets.MessageParsing.Interfaces;
|
||||||
using System;
|
using System;
|
||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
|
using System.Diagnostics;
|
||||||
using System.Threading;
|
using System.Threading;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
@ -41,7 +43,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
/// <summary>
|
/// <summary>
|
||||||
/// Action to execute when query is finished
|
/// Action to execute when query is finished
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public Action? OnFinished { get; set; }
|
public AsyncResetEvent? ContinueAwaiter { get; set; }
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Strings to match this query to a received message
|
/// Strings to match this query to a received message
|
||||||
@ -68,7 +70,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="message"></param>
|
/// <param name="message"></param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public abstract Type GetMessageType(SocketMessage message);
|
public abstract Type? GetMessageType(IMessageAccessor message);
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Wait event for response
|
/// Wait event for response
|
||||||
@ -113,6 +115,9 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public async Task WaitAsync(TimeSpan timeout) => await _event.WaitAsync(timeout).ConfigureAwait(false);
|
public async Task WaitAsync(TimeSpan timeout) => await _event.WaitAsync(timeout).ConfigureAwait(false);
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public virtual object Deserialize(IMessageAccessor message, Type type) => message.Deserialize(type);
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Mark request as timeout
|
/// Mark request as timeout
|
||||||
/// </summary>
|
/// </summary>
|
||||||
@ -141,7 +146,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
public abstract class Query<TResponse> : Query
|
public abstract class Query<TResponse> : Query
|
||||||
{
|
{
|
||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
public override Type GetMessageType(SocketMessage message) => typeof(TResponse);
|
public override Type? GetMessageType(IMessageAccessor message) => typeof(TResponse);
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// The typed call result
|
/// The typed call result
|
||||||
@ -164,8 +169,8 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
Completed = true;
|
Completed = true;
|
||||||
Response = message.Data;
|
Response = message.Data;
|
||||||
Result = await HandleMessageAsync(connection, message.As((TResponse)message.Data)).ConfigureAwait(false);
|
Result = await HandleMessageAsync(connection, message.As((TResponse)message.Data)).ConfigureAwait(false);
|
||||||
OnFinished?.Invoke();
|
|
||||||
_event.Set();
|
_event.Set();
|
||||||
|
await (ContinueAwaiter?.WaitAsync() ?? Task.CompletedTask).ConfigureAwait(false);
|
||||||
return Result;
|
return Result;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -175,7 +180,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
/// <param name="connection"></param>
|
/// <param name="connection"></param>
|
||||||
/// <param name="message"></param>
|
/// <param name="message"></param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public virtual Task<CallResult<TResponse>> HandleMessageAsync(SocketConnection connection, DataEvent<TResponse> message) => Task.FromResult(new CallResult<TResponse>(message.Data));
|
public virtual Task<CallResult<TResponse>> HandleMessageAsync(SocketConnection connection, DataEvent<TResponse> message) => Task.FromResult(new CallResult<TResponse>(message.Data, message.OriginalData, null));
|
||||||
|
|
||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
public override void Timeout()
|
public override void Timeout()
|
||||||
@ -184,8 +189,8 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
return;
|
return;
|
||||||
|
|
||||||
Completed = true;
|
Completed = true;
|
||||||
Result = new CallResult<TResponse>(new CancellationRequestedError());
|
Result = new CallResult<TResponse>(new CancellationRequestedError(null, "Query timeout", null));
|
||||||
OnFinished?.Invoke();
|
ContinueAwaiter?.Set();
|
||||||
_event.Set();
|
_event.Set();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -194,7 +199,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
{
|
{
|
||||||
Result = new CallResult<TResponse>(new ServerError(error));
|
Result = new CallResult<TResponse>(new ServerError(error));
|
||||||
Completed = true;
|
Completed = true;
|
||||||
OnFinished?.Invoke();
|
ContinueAwaiter?.Set();
|
||||||
_event.Set();
|
_event.Set();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3,7 +3,6 @@ using System;
|
|||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
using System.Linq;
|
using System.Linq;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
using Newtonsoft.Json;
|
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
using CryptoExchange.Net.Objects;
|
using CryptoExchange.Net.Objects;
|
||||||
using System.Net.WebSockets;
|
using System.Net.WebSockets;
|
||||||
@ -12,6 +11,7 @@ using CryptoExchange.Net.Objects.Sockets;
|
|||||||
using System.Text;
|
using System.Text;
|
||||||
using System.Diagnostics;
|
using System.Diagnostics;
|
||||||
using CryptoExchange.Net.Sockets.MessageParsing;
|
using CryptoExchange.Net.Sockets.MessageParsing;
|
||||||
|
using CryptoExchange.Net.Sockets.MessageParsing.Interfaces;
|
||||||
|
|
||||||
namespace CryptoExchange.Net.Sockets
|
namespace CryptoExchange.Net.Sockets
|
||||||
{
|
{
|
||||||
@ -48,12 +48,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
/// <summary>
|
/// <summary>
|
||||||
/// Unhandled message event
|
/// Unhandled message event
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public event Action<SocketMessage>? UnhandledMessage;
|
public event Action<IMessageAccessor>? UnhandledMessage;
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// Unparsed message event
|
|
||||||
/// </summary>
|
|
||||||
public event Action<byte[]>? UnparsedMessage; // TODO not linked up
|
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// The amount of subscriptions on this connection
|
/// The amount of subscriptions on this connection
|
||||||
@ -135,7 +130,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
if (_pausedActivity != value)
|
if (_pausedActivity != value)
|
||||||
{
|
{
|
||||||
_pausedActivity = value;
|
_pausedActivity = value;
|
||||||
_logger.Log(LogLevel.Information, $"Socket {SocketId} paused activity: " + value);
|
_logger.Log(LogLevel.Information, $"[Sckt {SocketId}] paused activity: " + value);
|
||||||
if(_pausedActivity) _ = Task.Run(() => ActivityPaused?.Invoke());
|
if(_pausedActivity) _ = Task.Run(() => ActivityPaused?.Invoke());
|
||||||
else _ = Task.Run(() => ActivityUnpaused?.Invoke());
|
else _ = Task.Run(() => ActivityUnpaused?.Invoke());
|
||||||
}
|
}
|
||||||
@ -155,7 +150,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
|
|
||||||
var oldStatus = _status;
|
var oldStatus = _status;
|
||||||
_status = value;
|
_status = value;
|
||||||
_logger.Log(LogLevel.Debug, $"Socket {SocketId} status changed from {oldStatus} to {_status}");
|
_logger.Log(LogLevel.Debug, $"[Sckt {SocketId}] status changed from {oldStatus} to {_status}");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -165,6 +160,9 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
private readonly ILogger _logger;
|
private readonly ILogger _logger;
|
||||||
private SocketStatus _status;
|
private SocketStatus _status;
|
||||||
|
|
||||||
|
private IMessageSerializer _serializer;
|
||||||
|
private IMessageAccessor _accessor;
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// The underlying websocket
|
/// The underlying websocket
|
||||||
/// </summary>
|
/// </summary>
|
||||||
@ -196,6 +194,9 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
|
|
||||||
_listenersLock = new object();
|
_listenersLock = new object();
|
||||||
_listeners = new List<IMessageProcessor>();
|
_listeners = new List<IMessageProcessor>();
|
||||||
|
|
||||||
|
_serializer = new JsonNetSerializer();
|
||||||
|
_accessor = new JsonNetMessageAccessor();
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -221,7 +222,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
foreach (var subscription in _listeners.OfType<Subscription>())
|
foreach (var subscription in _listeners.OfType<Subscription>())
|
||||||
subscription.Confirmed = false;
|
subscription.Confirmed = false;
|
||||||
|
|
||||||
foreach (var query in _listeners.OfType<Query>())
|
foreach (var query in _listeners.OfType<Query>().ToList())
|
||||||
{
|
{
|
||||||
query.Fail("Connection interupted");
|
query.Fail("Connection interupted");
|
||||||
_listeners.Remove(query);
|
_listeners.Remove(query);
|
||||||
@ -246,7 +247,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
foreach (var subscription in _listeners.OfType<Subscription>())
|
foreach (var subscription in _listeners.OfType<Subscription>())
|
||||||
subscription.Confirmed = false;
|
subscription.Confirmed = false;
|
||||||
|
|
||||||
foreach (var query in _listeners.OfType<Query>())
|
foreach (var query in _listeners.OfType<Query>().ToList())
|
||||||
{
|
{
|
||||||
query.Fail("Connection interupted");
|
query.Fail("Connection interupted");
|
||||||
_listeners.Remove(query);
|
_listeners.Remove(query);
|
||||||
@ -275,7 +276,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
|
|
||||||
lock (_listenersLock)
|
lock (_listenersLock)
|
||||||
{
|
{
|
||||||
foreach (var query in _listeners.OfType<Query>())
|
foreach (var query in _listeners.OfType<Query>().ToList())
|
||||||
{
|
{
|
||||||
query.Fail("Connection interupted");
|
query.Fail("Connection interupted");
|
||||||
_listeners.Remove(query);
|
_listeners.Remove(query);
|
||||||
@ -288,7 +289,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
var reconnectSuccessful = await ProcessReconnectAsync().ConfigureAwait(false);
|
var reconnectSuccessful = await ProcessReconnectAsync().ConfigureAwait(false);
|
||||||
if (!reconnectSuccessful)
|
if (!reconnectSuccessful)
|
||||||
{
|
{
|
||||||
_logger.Log(LogLevel.Warning, $"Socket {SocketId} failed reconnect processing: {reconnectSuccessful.Error}, reconnecting again");
|
_logger.Log(LogLevel.Warning, $"[Sckt {SocketId}] failed reconnect processing: {reconnectSuccessful.Error}, reconnecting again");
|
||||||
_ = _socket.ReconnectAsync().ConfigureAwait(false);
|
_ = _socket.ReconnectAsync().ConfigureAwait(false);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
@ -312,9 +313,9 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
protected virtual Task HandleErrorAsync(Exception e)
|
protected virtual Task HandleErrorAsync(Exception e)
|
||||||
{
|
{
|
||||||
if (e is WebSocketException wse)
|
if (e is WebSocketException wse)
|
||||||
_logger.Log(LogLevel.Warning, $"Socket {SocketId} error: Websocket error code {wse.WebSocketErrorCode}, details: " + e.ToLogString());
|
_logger.Log(LogLevel.Warning, $"[Sckt {SocketId}] error: Websocket error code {wse.WebSocketErrorCode}, details: " + e.ToLogString());
|
||||||
else
|
else
|
||||||
_logger.Log(LogLevel.Warning, $"Socket {SocketId} error: " + e.ToLogString());
|
_logger.Log(LogLevel.Warning, $"[Sckt {SocketId}] error: " + e.ToLogString());
|
||||||
|
|
||||||
return Task.CompletedTask;
|
return Task.CompletedTask;
|
||||||
}
|
}
|
||||||
@ -333,7 +334,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
|
|
||||||
if (query == null)
|
if (query == null)
|
||||||
{
|
{
|
||||||
_logger.Log(LogLevel.Debug, $"Socket {SocketId} msg {requestId} - message sent, but not pending");
|
_logger.Log(LogLevel.Debug, $"[Sckt {SocketId}] msg {requestId} - message sent, but not pending");
|
||||||
return Task.CompletedTask;
|
return Task.CompletedTask;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -350,77 +351,106 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
protected virtual async Task HandleStreamMessage(WebSocketMessageType type, Stream stream)
|
protected virtual async Task HandleStreamMessage(WebSocketMessageType type, Stream stream)
|
||||||
{
|
{
|
||||||
var sw = Stopwatch.StartNew();
|
var sw = Stopwatch.StartNew();
|
||||||
|
var receiveTime = DateTime.UtcNow;
|
||||||
|
string? originalData = null;
|
||||||
|
|
||||||
// 1. Decrypt/Preprocess if necessary
|
// 1. Decrypt/Preprocess if necessary
|
||||||
stream = ApiClient.PreprocessStreamMessage(type, stream);
|
stream = ApiClient.PreprocessStreamMessage(type, stream);
|
||||||
|
|
||||||
// 2. Read data into accessor
|
// 2. Read data into accessor
|
||||||
var messageData = new JsonNetMessageData(stream); // TODO if we let the implementation create this we can switch per implementation
|
_accessor.Load(stream);
|
||||||
var message = new SocketMessage(DateTime.UtcNow, messageData);
|
|
||||||
if (ApiClient.ApiOptions.OutputOriginalData ?? ApiClient.ClientOptions.OutputOriginalData)
|
if (ApiClient.ApiOptions.OutputOriginalData ?? ApiClient.ClientOptions.OutputOriginalData)
|
||||||
{
|
{
|
||||||
stream.Position = 0;
|
stream.Position = 0;
|
||||||
using var textReader = new StreamReader(stream, Encoding.UTF8, false, 1024, true);
|
using var textReader = new StreamReader(stream, Encoding.UTF8, false, 1024, true);
|
||||||
message.RawData = textReader.ReadToEnd();
|
originalData = textReader.ReadToEnd();
|
||||||
|
|
||||||
_logger.LogTrace("Socket {SocketId} received {Data}", SocketId, message.RawData);
|
_logger.LogTrace("[Sckt {SocketId}] received {Data}", SocketId, originalData);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 3. Determine the subscription interested in the messsage
|
// 3. Determine the subscription interested in the messsage
|
||||||
var listenId = ApiClient.GetListenerIdentifier(message);
|
var listenId = ApiClient.GetListenerIdentifier(_accessor);
|
||||||
|
if (listenId == null)
|
||||||
|
{
|
||||||
|
if (!ApiClient.UnhandledMessageExpected)
|
||||||
|
_logger.LogWarning("[Sckt {SocketId}] failed to evaluate message", SocketId);
|
||||||
|
|
||||||
|
UnhandledMessage?.Invoke(_accessor);
|
||||||
|
stream.Dispose();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 4. Get the listeners interested in this message
|
||||||
List<IMessageProcessor> processors;
|
List<IMessageProcessor> processors;
|
||||||
lock(_listenersLock)
|
lock(_listenersLock)
|
||||||
processors = _listeners.Where(s => s.ListenerIdentifiers.Contains(listenId)).ToList();
|
processors = _listeners.Where(s => s.ListenerIdentifiers.Contains(listenId)).ToList();
|
||||||
|
|
||||||
if (!processors.Any())
|
if (!processors.Any())
|
||||||
{
|
{
|
||||||
_logger.LogWarning("Socket {SocketId} received message not matched to any processor", SocketId);
|
if (!ApiClient.UnhandledMessageExpected)
|
||||||
UnhandledMessage?.Invoke(message);
|
{
|
||||||
|
_logger.LogWarning("[Sckt {SocketId}] received message not matched to any processor. ListenId: {ListenId}", SocketId, listenId);
|
||||||
|
UnhandledMessage?.Invoke(_accessor);
|
||||||
|
}
|
||||||
|
|
||||||
|
stream.Dispose();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
_logger.LogTrace("Socket {SocketId} {Count} processors matched to message with listener identifier {ListenerId}", SocketId, processors.Count, listenId);
|
_logger.LogTrace("[Sckt {SocketId}] {Count} processors matched to message with listener identifier {ListenerId}", SocketId, processors.Count, listenId);
|
||||||
var totalUserTime = 0;
|
var totalUserTime = 0;
|
||||||
|
Dictionary<Type, object>? desCache = null;
|
||||||
|
if (processors.Count > 1)
|
||||||
|
{
|
||||||
|
// Only instantiate a cache if there are multiple processors
|
||||||
|
desCache = new Dictionary<Type, object>();
|
||||||
|
}
|
||||||
|
|
||||||
foreach (var processor in processors)
|
foreach (var processor in processors)
|
||||||
{
|
{
|
||||||
// 4. Determine the type to deserialize to
|
// 5. Determine the type to deserialize to for this processor
|
||||||
var messageType = processor.GetMessageType(message);
|
var messageType = processor.GetMessageType(_accessor);
|
||||||
if (messageType == null)
|
if (messageType == null)
|
||||||
{
|
{
|
||||||
_logger.LogWarning("Socket {SocketId} received message not recognized by handler {Id}", SocketId, processor.Id);
|
_logger.LogWarning("[Sckt {SocketId}] received message not recognized by handler {Id}", SocketId, processor.Id);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// 5. Deserialize the message
|
// 6. Deserialize the message
|
||||||
object deserialized;
|
object? deserialized = null;
|
||||||
|
desCache?.TryGetValue(messageType, out deserialized);
|
||||||
|
|
||||||
|
if (deserialized == null)
|
||||||
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
deserialized = message.Deserialize(messageType);
|
deserialized = processor.Deserialize(_accessor, messageType);
|
||||||
|
desCache?.Add(messageType, deserialized);
|
||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
_logger.LogWarning("Socket {SocketId} failed to deserialize message to type {Type}: {Exception}", SocketId, messageType.Name, ex.ToLogString());
|
_logger.LogWarning("[Sckt {SocketId}] failed to deserialize message to type {Type}: {Exception}", SocketId, messageType.Name, ex.ToLogString());
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// 6. Hand of the message to the subscription
|
// 7. Hand of the message to the subscription
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
var innerSw = Stopwatch.StartNew();
|
var innerSw = Stopwatch.StartNew();
|
||||||
await processor.HandleAsync(this, new DataEvent<object>(deserialized, null, message.RawData, message.ReceiveTime, null)).ConfigureAwait(false);
|
await processor.HandleAsync(this, new DataEvent<object>(deserialized, null, originalData, receiveTime, null)).ConfigureAwait(false);
|
||||||
totalUserTime += (int)innerSw.ElapsedMilliseconds;
|
totalUserTime += (int)innerSw.ElapsedMilliseconds;
|
||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
_logger.LogWarning("Socket {SocketId} user message processing failed: {Exception}", SocketId, ex.ToLogString());
|
_logger.LogWarning("[Sckt {SocketId}] user message processing failed: {Exception}", SocketId, ex.ToLogString());
|
||||||
if (processor is Subscription subscription)
|
if (processor is Subscription subscription)
|
||||||
subscription.InvokeExceptionHandler(ex);
|
subscription.InvokeExceptionHandler(ex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
stream.Dispose();
|
stream.Dispose();
|
||||||
_logger.LogTrace($"Socket {SocketId} message processed in {(int)sw.ElapsedMilliseconds}ms ({totalUserTime - sw.ElapsedMilliseconds}ms parsing)");
|
_logger.LogTrace($"[Sckt {SocketId}] message processed in {(int)sw.ElapsedMilliseconds}ms ({sw.ElapsedMilliseconds - totalUserTime}ms parsing)");
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -474,27 +504,36 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public async Task CloseAsync(Subscription subscription, bool unsubEvenIfNotConfirmed = false)
|
public async Task CloseAsync(Subscription subscription, bool unsubEvenIfNotConfirmed = false)
|
||||||
{
|
{
|
||||||
lock (_listenersLock)
|
|
||||||
{
|
|
||||||
if (!_listeners.Contains(subscription))
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
subscription.Closed = true;
|
subscription.Closed = true;
|
||||||
|
|
||||||
if (Status == SocketStatus.Closing || Status == SocketStatus.Closed || Status == SocketStatus.Disposed)
|
if (Status == SocketStatus.Closing || Status == SocketStatus.Closed || Status == SocketStatus.Disposed)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
_logger.Log(LogLevel.Debug, $"Socket {SocketId} closing subscription {subscription.Id}");
|
_logger.Log(LogLevel.Debug, $"[Sckt {SocketId}] closing subscription {subscription.Id}");
|
||||||
if (subscription.CancellationTokenRegistration.HasValue)
|
if (subscription.CancellationTokenRegistration.HasValue)
|
||||||
subscription.CancellationTokenRegistration.Value.Dispose();
|
subscription.CancellationTokenRegistration.Value.Dispose();
|
||||||
|
|
||||||
if ((unsubEvenIfNotConfirmed || subscription.Confirmed) && _socket.IsOpen)
|
bool anyDuplicateSubscription;
|
||||||
|
lock (_listenersLock)
|
||||||
|
anyDuplicateSubscription = _listeners.OfType<Subscription>().Any(x => x != subscription && x.ListenerIdentifiers.All(l => subscription.ListenerIdentifiers.Contains(l)));
|
||||||
|
|
||||||
|
if (!anyDuplicateSubscription)
|
||||||
|
{
|
||||||
|
bool needUnsub;
|
||||||
|
lock (_listenersLock)
|
||||||
|
needUnsub = _listeners.Contains(subscription);
|
||||||
|
|
||||||
|
if (needUnsub && (unsubEvenIfNotConfirmed || subscription.Confirmed) && _socket.IsOpen)
|
||||||
await UnsubscribeAsync(subscription).ConfigureAwait(false);
|
await UnsubscribeAsync(subscription).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
_logger.Log(LogLevel.Debug, $"[Sckt {SocketId}] not unsubscribing subscription as there is still a duplicate subscription running");
|
||||||
|
}
|
||||||
|
|
||||||
if (Status == SocketStatus.Closing)
|
if (Status == SocketStatus.Closing)
|
||||||
{
|
{
|
||||||
_logger.Log(LogLevel.Debug, $"Socket {SocketId} already closing");
|
_logger.Log(LogLevel.Debug, $"[Sckt {SocketId}] already closing");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -508,7 +547,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
|
|
||||||
if (shouldCloseConnection)
|
if (shouldCloseConnection)
|
||||||
{
|
{
|
||||||
_logger.Log(LogLevel.Debug, $"Socket {SocketId} closing as there are no more subscriptions");
|
_logger.Log(LogLevel.Debug, $"[Sckt {SocketId}] closing as there are no more subscriptions");
|
||||||
await CloseAsync().ConfigureAwait(false);
|
await CloseAsync().ConfigureAwait(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -544,7 +583,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
_listeners.Add(subscription);
|
_listeners.Add(subscription);
|
||||||
|
|
||||||
if (subscription.UserSubscription)
|
if (subscription.UserSubscription)
|
||||||
_logger.Log(LogLevel.Debug, $"Socket {SocketId} adding new subscription with id {subscription.Id}, total subscriptions on connection: {UserSubscriptionCount}");
|
_logger.Log(LogLevel.Debug, $"[Sckt {SocketId}] adding new subscription with id {subscription.Id}, total subscriptions on connection: {UserSubscriptionCount}");
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -572,11 +611,11 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
/// Send a query request and wait for an answer
|
/// Send a query request and wait for an answer
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="query">Query to send</param>
|
/// <param name="query">Query to send</param>
|
||||||
/// <param name="onFinished">Action to run when query finishes</param>
|
/// <param name="continueEvent">Wait event for when the socket message handler can continue</param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public virtual async Task<CallResult> SendAndWaitQueryAsync(Query query, Action? onFinished = null)
|
public virtual async Task<CallResult> SendAndWaitQueryAsync(Query query, AsyncResetEvent? continueEvent = null)
|
||||||
{
|
{
|
||||||
await SendAndWaitIntAsync(query, onFinished).ConfigureAwait(false);
|
await SendAndWaitIntAsync(query, continueEvent).ConfigureAwait(false);
|
||||||
return query.Result ?? new CallResult(new ServerError("Timeout"));
|
return query.Result ?? new CallResult(new ServerError("Timeout"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -585,27 +624,31 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
/// <typeparam name="T">Query response type</typeparam>
|
/// <typeparam name="T">Query response type</typeparam>
|
||||||
/// <param name="query">Query to send</param>
|
/// <param name="query">Query to send</param>
|
||||||
/// <param name="onFinished">Action to run when query finishes</param>
|
/// <param name="continueEvent">Wait event for when the socket message handler can continue</param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public virtual async Task<CallResult<T>> SendAndWaitQueryAsync<T>(Query<T> query, Action? onFinished = null)
|
public virtual async Task<CallResult<T>> SendAndWaitQueryAsync<T>(Query<T> query, AsyncResetEvent? continueEvent = null)
|
||||||
{
|
{
|
||||||
await SendAndWaitIntAsync(query, onFinished).ConfigureAwait(false);
|
await SendAndWaitIntAsync(query, continueEvent).ConfigureAwait(false);
|
||||||
return query.TypedResult ?? new CallResult<T>(new ServerError("Timeout"));
|
return query.TypedResult ?? new CallResult<T>(new ServerError("Timeout"));
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task SendAndWaitIntAsync(Query query, Action? onFinished)
|
private async Task SendAndWaitIntAsync(Query query, AsyncResetEvent? continueEvent)
|
||||||
{
|
{
|
||||||
lock(_listenersLock)
|
lock(_listenersLock)
|
||||||
_listeners.Add(query);
|
_listeners.Add(query);
|
||||||
|
|
||||||
|
query.ContinueAwaiter = continueEvent;
|
||||||
var sendOk = Send(query.Id, query.Request, query.Weight);
|
var sendOk = Send(query.Id, query.Request, query.Weight);
|
||||||
if (!sendOk)
|
if (!sendOk)
|
||||||
{
|
{
|
||||||
query.Fail("Failed to send");
|
query.Fail("Failed to send");
|
||||||
|
lock (_listenersLock)
|
||||||
|
_listeners.Remove(query);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
query.OnFinished = onFinished;
|
try
|
||||||
|
{
|
||||||
while (true)
|
while (true)
|
||||||
{
|
{
|
||||||
if (!_socket.IsOpen)
|
if (!_socket.IsOpen)
|
||||||
@ -623,6 +666,12 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
lock (_listenersLock)
|
||||||
|
_listeners.Remove(query);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Send data over the websocket connection
|
/// Send data over the websocket connection
|
||||||
@ -630,14 +679,13 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
/// <typeparam name="T">The type of the object to send</typeparam>
|
/// <typeparam name="T">The type of the object to send</typeparam>
|
||||||
/// <param name="requestId">The request id</param>
|
/// <param name="requestId">The request id</param>
|
||||||
/// <param name="obj">The object to send</param>
|
/// <param name="obj">The object to send</param>
|
||||||
/// <param name="nullValueHandling">How null values should be serialized</param>
|
|
||||||
/// <param name="weight">The weight of the message</param>
|
/// <param name="weight">The weight of the message</param>
|
||||||
public virtual bool Send<T>(int requestId, T obj, int weight, NullValueHandling nullValueHandling = NullValueHandling.Ignore)
|
public virtual bool Send<T>(int requestId, T obj, int weight)
|
||||||
{
|
{
|
||||||
if(obj is string str)
|
if(obj is string str)
|
||||||
return Send(requestId, str, weight);
|
return Send(requestId, str, weight);
|
||||||
else
|
else
|
||||||
return Send(requestId, JsonConvert.SerializeObject(obj, Formatting.None, new JsonSerializerSettings { NullValueHandling = nullValueHandling }), weight);
|
return Send(requestId, _serializer.Serialize(obj!), weight);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -648,7 +696,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
/// <param name="requestId">The id of the request</param>
|
/// <param name="requestId">The id of the request</param>
|
||||||
public virtual bool Send(int requestId, string data, int weight)
|
public virtual bool Send(int requestId, string data, int weight)
|
||||||
{
|
{
|
||||||
_logger.Log(LogLevel.Trace, $"Socket {SocketId} msg {requestId} - sending messsage: {data}");
|
_logger.Log(LogLevel.Trace, $"[Sckt {SocketId}] msg {requestId} - sending messsage: {data}");
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
_socket.Send(requestId, data, weight);
|
_socket.Send(requestId, data, weight);
|
||||||
@ -671,7 +719,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
if (!anySubscriptions)
|
if (!anySubscriptions)
|
||||||
{
|
{
|
||||||
// No need to resubscribe anything
|
// No need to resubscribe anything
|
||||||
_logger.Log(LogLevel.Debug, $"Socket {SocketId} nothing to resubscribe, closing connection");
|
_logger.Log(LogLevel.Debug, $"[Sckt {SocketId}] nothing to resubscribe, closing connection");
|
||||||
_ = _socket.CloseAsync();
|
_ = _socket.CloseAsync();
|
||||||
return new CallResult<bool>(true);
|
return new CallResult<bool>(true);
|
||||||
}
|
}
|
||||||
@ -685,12 +733,12 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
var authResult = await ApiClient.AuthenticateSocketAsync(this).ConfigureAwait(false);
|
var authResult = await ApiClient.AuthenticateSocketAsync(this).ConfigureAwait(false);
|
||||||
if (!authResult)
|
if (!authResult)
|
||||||
{
|
{
|
||||||
_logger.Log(LogLevel.Warning, $"Socket {SocketId} authentication failed on reconnected socket. Disconnecting and reconnecting.");
|
_logger.Log(LogLevel.Warning, $"[Sckt {SocketId}] authentication failed on reconnected socket. Disconnecting and reconnecting.");
|
||||||
return authResult;
|
return authResult;
|
||||||
}
|
}
|
||||||
|
|
||||||
Authenticated = true;
|
Authenticated = true;
|
||||||
_logger.Log(LogLevel.Debug, $"Socket {SocketId} authentication succeeded on reconnected socket.");
|
_logger.Log(LogLevel.Debug, $"[Sckt {SocketId}] authentication succeeded on reconnected socket.");
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get a list of all subscriptions on the socket
|
// Get a list of all subscriptions on the socket
|
||||||
@ -704,7 +752,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
var result = await ApiClient.RevitalizeRequestAsync(subscription).ConfigureAwait(false);
|
var result = await ApiClient.RevitalizeRequestAsync(subscription).ConfigureAwait(false);
|
||||||
if (!result)
|
if (!result)
|
||||||
{
|
{
|
||||||
_logger.Log(LogLevel.Warning, $"Socket {SocketId} failed request revitalization: " + result.Error);
|
_logger.Log(LogLevel.Warning, $"[Sckt {SocketId}] failed request revitalization: " + result.Error);
|
||||||
return result.As(false);
|
return result.As(false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -722,9 +770,12 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
if (subQuery == null)
|
if (subQuery == null)
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
taskList.Add(SendAndWaitQueryAsync(subQuery, () =>
|
var waitEvent = new AsyncResetEvent(false);
|
||||||
|
taskList.Add(SendAndWaitQueryAsync(subQuery, waitEvent).ContinueWith((r) =>
|
||||||
{
|
{
|
||||||
subscription.HandleSubQueryResponse(subQuery.Response!);
|
subscription.HandleSubQueryResponse(subQuery.Response!);
|
||||||
|
waitEvent.Set();
|
||||||
|
return r.Result;
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -739,7 +790,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
if (!_socket.IsOpen)
|
if (!_socket.IsOpen)
|
||||||
return new CallResult<bool>(new WebError("Socket not connected"));
|
return new CallResult<bool>(new WebError("Socket not connected"));
|
||||||
|
|
||||||
_logger.Log(LogLevel.Debug, $"Socket {SocketId} all subscription successfully resubscribed on reconnected socket.");
|
_logger.Log(LogLevel.Debug, $"[Sckt {SocketId}] all subscription successfully resubscribed on reconnected socket.");
|
||||||
return new CallResult<bool>(true);
|
return new CallResult<bool>(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -750,7 +801,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
return;
|
return;
|
||||||
|
|
||||||
await SendAndWaitQueryAsync(unsubscribeRequest).ConfigureAwait(false);
|
await SendAndWaitQueryAsync(unsubscribeRequest).ConfigureAwait(false);
|
||||||
_logger.Log(LogLevel.Information, $"Socket {SocketId} subscription {subscription!.Id} unsubscribed");
|
_logger.Log(LogLevel.Information, $"[Sckt {SocketId}] subscription {subscription!.Id} unsubscribed");
|
||||||
}
|
}
|
||||||
|
|
||||||
internal async Task<CallResult> ResubscribeAsync(Subscription subscription)
|
internal async Task<CallResult> ResubscribeAsync(Subscription subscription)
|
||||||
|
@ -1,45 +0,0 @@
|
|||||||
using CryptoExchange.Net.Sockets.MessageParsing.Interfaces;
|
|
||||||
using System;
|
|
||||||
|
|
||||||
namespace CryptoExchange.Net.Sockets
|
|
||||||
{
|
|
||||||
/// <summary>
|
|
||||||
/// Message received from the websocket
|
|
||||||
/// </summary>
|
|
||||||
public class SocketMessage
|
|
||||||
{
|
|
||||||
/// <summary>
|
|
||||||
/// Message receive time
|
|
||||||
/// </summary>
|
|
||||||
public DateTime ReceiveTime { get; set; }
|
|
||||||
/// <summary>
|
|
||||||
/// The message data
|
|
||||||
/// </summary>
|
|
||||||
public IMessageAccessor Message { get; set; }
|
|
||||||
/// <summary>
|
|
||||||
/// Raw string data
|
|
||||||
/// </summary>
|
|
||||||
public string? RawData { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// ctor
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="receiveTime"></param>
|
|
||||||
/// <param name="message"></param>
|
|
||||||
public SocketMessage(DateTime receiveTime, IMessageAccessor message)
|
|
||||||
{
|
|
||||||
ReceiveTime = receiveTime;
|
|
||||||
Message = message;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// Deserialize the message to a type
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="type"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
public object Deserialize(Type type)
|
|
||||||
{
|
|
||||||
return Message.Deserialize(type);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,6 +1,7 @@
|
|||||||
using CryptoExchange.Net.Interfaces;
|
using CryptoExchange.Net.Interfaces;
|
||||||
using CryptoExchange.Net.Objects;
|
using CryptoExchange.Net.Objects;
|
||||||
using CryptoExchange.Net.Objects.Sockets;
|
using CryptoExchange.Net.Objects.Sockets;
|
||||||
|
using CryptoExchange.Net.Sockets.MessageParsing.Interfaces;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
using System;
|
using System;
|
||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
@ -74,7 +75,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="message"></param>
|
/// <param name="message"></param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public abstract Type? GetMessageType(SocketMessage message);
|
public abstract Type? GetMessageType(IMessageAccessor message);
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// ctor
|
/// ctor
|
||||||
@ -114,6 +115,9 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public abstract Query? GetUnsubQuery();
|
public abstract Query? GetUnsubQuery();
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public virtual object Deserialize(IMessageAccessor message, Type type) => message.Deserialize(type);
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Handle an update message
|
/// Handle an update message
|
||||||
/// </summary>
|
/// </summary>
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
using CryptoExchange.Net.Objects;
|
using CryptoExchange.Net.Objects;
|
||||||
using CryptoExchange.Net.Objects.Sockets;
|
using CryptoExchange.Net.Objects.Sockets;
|
||||||
|
using CryptoExchange.Net.Sockets.MessageParsing.Interfaces;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
using System;
|
using System;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
@ -31,7 +32,7 @@ namespace CryptoExchange.Net.Sockets
|
|||||||
public abstract class SystemSubscription<T> : SystemSubscription
|
public abstract class SystemSubscription<T> : SystemSubscription
|
||||||
{
|
{
|
||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
public override Type GetMessageType(SocketMessage message) => typeof(T);
|
public override Type GetMessageType(IMessageAccessor message) => typeof(T);
|
||||||
|
|
||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
public override Task<CallResult> DoHandleMessageAsync(SocketConnection connection, DataEvent<object> message)
|
public override Task<CallResult> DoHandleMessageAsync(SocketConnection connection, DataEvent<object> message)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user