1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-06-08 00:16:27 +00:00
This commit is contained in:
JKorf 2023-11-01 21:55:18 +01:00
parent 5539320827
commit 35f7dbf9fb
6 changed files with 76 additions and 12 deletions

View File

@ -151,9 +151,9 @@ namespace CryptoExchange.Net
/// <param name="subscription">The subscription</param> /// <param name="subscription">The subscription</param>
/// <param name="ct">Cancellation token for closing this subscription</param> /// <param name="ct">Cancellation token for closing this subscription</param>
/// <returns></returns> /// <returns></returns>
protected virtual Task<CallResult<UpdateSubscription>> SubscribeAsync<T>(Subscription subscription, CancellationToken ct) protected virtual Task<CallResult<UpdateSubscription>> SubscribeAsync(Subscription subscription, CancellationToken ct)
{ {
return SubscribeAsync<T>(BaseAddress, subscription, ct); return SubscribeAsync(BaseAddress, subscription, ct);
} }
/// <summary> /// <summary>
@ -164,7 +164,7 @@ namespace CryptoExchange.Net
/// <param name="subscription">The subscription</param> /// <param name="subscription">The subscription</param>
/// <param name="ct">Cancellation token for closing this subscription</param> /// <param name="ct">Cancellation token for closing this subscription</param>
/// <returns></returns> /// <returns></returns>
protected virtual async Task<CallResult<UpdateSubscription>> SubscribeAsync<T>(string url, Subscription subscription, CancellationToken ct) protected virtual async Task<CallResult<UpdateSubscription>> SubscribeAsync(string url, Subscription subscription, CancellationToken ct)
{ {
if (_disposing) if (_disposing)
return new CallResult<UpdateSubscription>(new InvalidOperationError("Client disposed, can't subscribe")); return new CallResult<UpdateSubscription>(new InvalidOperationError("Client disposed, can't subscribe"));
@ -195,7 +195,7 @@ namespace CryptoExchange.Net
socketConnection = socketResult.Data; socketConnection = socketResult.Data;
// Add a subscription on the socket connection // Add a subscription on the socket connection
messageListener = AddSubscription<T>(subscription, true, socketConnection); messageListener = AddSubscription(subscription, true, socketConnection);
if (messageListener == null) if (messageListener == null)
{ {
_logger.Log(LogLevel.Trace, $"Socket {socketConnection.SocketId} failed to add subscription, retrying on different connection"); _logger.Log(LogLevel.Trace, $"Socket {socketConnection.SocketId} failed to add subscription, retrying on different connection");
@ -441,7 +441,7 @@ namespace CryptoExchange.Net
/// Should return the request which can be used to authenticate a socket connection /// Should return the request which can be used to authenticate a socket connection
/// </summary> /// </summary>
/// <returns></returns> /// <returns></returns>
protected internal abstract Query GetAuthenticationRequest(); protected internal virtual Query GetAuthenticationRequest() => throw new NotImplementedException();
/// <summary> /// <summary>
/// Add a subscription to a connection /// Add a subscription to a connection
@ -451,7 +451,7 @@ namespace CryptoExchange.Net
/// <param name="userSubscription">Whether or not this is a user subscription (counts towards the max amount of handlers on a socket)</param> /// <param name="userSubscription">Whether or not this is a user subscription (counts towards the max amount of handlers on a socket)</param>
/// <param name="connection">The socket connection the handler is on</param> /// <param name="connection">The socket connection the handler is on</param>
/// <returns></returns> /// <returns></returns>
protected virtual MessageListener? AddSubscription<T>(Subscription subscription, bool userSubscription, SocketConnection connection) protected virtual MessageListener? AddSubscription(Subscription subscription, bool userSubscription, SocketConnection connection)
{ {
var messageListener = new MessageListener(ExchangeHelpers.NextId(), subscription, userSubscription); var messageListener = new MessageListener(ExchangeHelpers.NextId(), subscription, userSubscription);
if (!connection.AddListener(messageListener)) if (!connection.AddListener(messageListener))

View File

@ -88,7 +88,7 @@ namespace CryptoExchange.Net.Objects.Sockets
public Task ProcessAsync(ParsedMessage message) public Task ProcessAsync(ParsedMessage message)
{ {
// TODO // TODO
var dataEvent = new DataEvent<ParsedMessage>(message, null, null, DateTime.UtcNow, null); var dataEvent = new DataEvent<ParsedMessage>(message, null, message.OriginalData, DateTime.UtcNow, null);
return Subscription.HandleEventAsync(dataEvent); return Subscription.HandleEventAsync(dataEvent);
} }
} }

View File

@ -0,0 +1,39 @@
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Objects.Sockets;
using CryptoExchange.Net.Sockets;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.IO;
using System.Text;
using System.Threading.Tasks;
namespace CryptoExchange.Net.Objects.Testing
{
public class TestWebsocket : CryptoExchangeWebSocketClient
{
public TestWebsocket(ILogger logger, WebSocketParameters websocketParameters) : base(logger, websocketParameters)
{
}
public override bool IsClosed => false;
public override bool IsOpen => true;
public override Task<bool> ConnectAsync() => Task.FromResult(true);
public override Task CloseAsync() => Task.CompletedTask;
public override Task ReconnectAsync() => Task.CompletedTask;
public override void Send(int id, string data, int weight) { }
public void Receive(string data)
{
var bytes = Encoding.UTF8.GetBytes(data);
var stream = new MemoryStream(bytes);
stream.Position = 0;
_ = ProcessData(stream);
}
}
}

View File

@ -0,0 +1,24 @@
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Objects.Sockets;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Text;
namespace CryptoExchange.Net.Objects.Testing
{
public class TestWebsocketFactory : IWebsocketFactory
{
private readonly Func<ILogger, WebSocketParameters, IWebsocket> _websocketFactory;
public TestWebsocketFactory(Func<ILogger, WebSocketParameters, IWebsocket> websocketFactory)
{
_websocketFactory = websocketFactory;
}
public IWebsocket CreateWebsocket(ILogger logger, WebSocketParameters parameters)
{
return _websocketFactory(logger, parameters);
}
}
}

View File

@ -75,10 +75,10 @@ namespace CryptoExchange.Net.Sockets
public Uri Uri => Parameters.Uri; public Uri Uri => Parameters.Uri;
/// <inheritdoc /> /// <inheritdoc />
public bool IsClosed => _socket.State == WebSocketState.Closed; public virtual bool IsClosed => _socket.State == WebSocketState.Closed;
/// <inheritdoc /> /// <inheritdoc />
public bool IsOpen => _socket.State == WebSocketState.Open && !_ctsSource.IsCancellationRequested; public virtual bool IsOpen => _socket.State == WebSocketState.Open && !_ctsSource.IsCancellationRequested;
/// <inheritdoc /> /// <inheritdoc />
public double IncomingKbps public double IncomingKbps
@ -521,7 +521,7 @@ namespace CryptoExchange.Net.Sockets
{ {
// Received a complete message and it's not multi part // Received a complete message and it's not multi part
_logger.Log(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in single message"); _logger.Log(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in single message");
await ProcessData(new MemoryStream(buffer.Array, buffer.Offset, receiveResult.Count), receiveResult.MessageType).ConfigureAwait(false); await ProcessData(new MemoryStream(buffer.Array, buffer.Offset, receiveResult.Count)).ConfigureAwait(false);
} }
else else
{ {
@ -555,7 +555,7 @@ namespace CryptoExchange.Net.Sockets
{ {
// Reassemble complete message from memory stream // Reassemble complete message from memory stream
_logger.Log(LogLevel.Trace, $"Socket {Id} reassembled message of {memoryStream!.Length} bytes"); _logger.Log(LogLevel.Trace, $"Socket {Id} reassembled message of {memoryStream!.Length} bytes");
await ProcessData(memoryStream, receiveResult.MessageType).ConfigureAwait(false); await ProcessData(memoryStream).ConfigureAwait(false);
memoryStream.Dispose(); memoryStream.Dispose();
} }
else else
@ -580,7 +580,7 @@ namespace CryptoExchange.Net.Sockets
} }
} }
private async Task ProcessData(Stream stream, WebSocketMessageType messageType) protected async Task ProcessData(Stream stream)
{ {
stream.Position = 0; stream.Position = 0;
if (Parameters.Interceptor != null) if (Parameters.Interceptor != null)

View File

@ -1,6 +1,7 @@
using CryptoExchange.Net.Interfaces; using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Objects.Sockets; using CryptoExchange.Net.Objects.Sockets;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using System;
namespace CryptoExchange.Net.Sockets namespace CryptoExchange.Net.Sockets
{ {