1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-06-10 01:16:24 +00:00
This commit is contained in:
JKorf 2023-11-30 13:21:37 +01:00
parent c41e128900
commit b640690a0f
6 changed files with 17 additions and 34 deletions

View File

@ -197,7 +197,7 @@ namespace CryptoExchange.Net
socketConnection = socketResult.Data; socketConnection = socketResult.Data;
// Add a subscription on the socket connection // Add a subscription on the socket connection
var success = socketConnection.AddSubscription(subscription); var success = socketConnection.CanAddSubscription();
if (!success) if (!success)
{ {
_logger.Log(LogLevel.Trace, $"Socket {socketConnection.SocketId} failed to add subscription, retrying on different connection"); _logger.Log(LogLevel.Trace, $"Socket {socketConnection.SocketId} failed to add subscription, retrying on different connection");
@ -249,12 +249,8 @@ namespace CryptoExchange.Net
subscription.HandleSubQueryResponse(subQuery.Response); subscription.HandleSubQueryResponse(subQuery.Response);
} }
else
{
// No request to be sent, so just mark the subscription as comfirmed
subscription.Confirmed = true;
}
subscription.Confirmed = true;
if (ct != default) if (ct != default)
{ {
subscription.CancellationTokenRegistration = ct.Register(async () => subscription.CancellationTokenRegistration = ct.Register(async () =>
@ -264,7 +260,7 @@ namespace CryptoExchange.Net
}, false); }, false);
} }
subscription.Confirmed = true; socketConnection.AddSubscription(subscription);
_logger.Log(LogLevel.Information, $"Socket {socketConnection.SocketId} subscription {subscription.Id} completed successfully"); _logger.Log(LogLevel.Information, $"Socket {socketConnection.SocketId} subscription {subscription.Id} completed successfully");
return new CallResult<UpdateSubscription>(new UpdateSubscription(socketConnection, subscription)); return new CallResult<UpdateSubscription>(new UpdateSubscription(socketConnection, subscription));
} }

View File

@ -34,6 +34,9 @@ namespace CryptoExchange.Net.Converters
if (arr.Count <= index) if (arr.Count <= index)
return null; return null;
if (arr[index].Type != JTokenType.String)
return null;
return arr[index].Value<string>(); return arr[index].Value<string>();
} }
@ -58,6 +61,8 @@ namespace CryptoExchange.Net.Converters
return accessToken?.ToString(); return accessToken?.ToString();
} }
public bool IsObject(string? key) => _token.Type == JTokenType.Object;
private JToken? GetToken(string key) private JToken? GetToken(string key)
{ {
if (_cache.TryGetValue(key, out var token)) if (_cache.TryGetValue(key, out var token))

View File

@ -178,30 +178,5 @@ namespace CryptoExchange.Net.Converters
var instance = (BaseParsedMessage)Activator.CreateInstance(resultMessageType, type == null ? null : token.ToObject(type, _serializer)); var instance = (BaseParsedMessage)Activator.CreateInstance(resultMessageType, type == null ? null : token.ToObject(type, _serializer));
return instance; return instance;
} }
private string? GetValueForKey(JToken token, string key)
{
var splitTokens = key.Split(new char[] { ':' });
var accessToken = token;
foreach (var splitToken in splitTokens)
{
accessToken = accessToken[splitToken];
if (accessToken == null)
break;
if (accessToken.Type == JTokenType.Array)
{
// Received array, take first item as reference
accessToken = accessToken.First!;
}
}
if (accessToken?.Type == JTokenType.Object)
return ((JObject)accessToken).Properties().First().Name;
return accessToken?.ToString();
}
} }
} }

View File

@ -6,6 +6,7 @@ namespace CryptoExchange.Net.Interfaces
{ {
public interface IMessageAccessor public interface IMessageAccessor
{ {
bool IsObject(string? key);
string? GetStringValue(string key); string? GetStringValue(string key);
int? GetIntValue(string key); int? GetIntValue(string key);
public int? GetCount(string key); public int? GetCount(string key);

View File

@ -582,6 +582,7 @@ namespace CryptoExchange.Net.Sockets
protected async Task ProcessData(WebSocketMessageType type, Stream stream) protected async Task ProcessData(WebSocketMessageType type, Stream stream)
{ {
LastActionTime = DateTime.UtcNow;
stream.Position = 0; stream.Position = 0;
if (Parameters.Interceptor != null) if (Parameters.Interceptor != null)
stream = Parameters.Interceptor.Invoke(stream); stream = Parameters.Interceptor.Invoke(stream);

View File

@ -10,6 +10,7 @@ using System.Net.WebSockets;
using System.IO; using System.IO;
using CryptoExchange.Net.Objects.Sockets; using CryptoExchange.Net.Objects.Sockets;
using System.Text; using System.Text;
using System.Diagnostics.CodeAnalysis;
namespace CryptoExchange.Net.Sockets namespace CryptoExchange.Net.Sockets
{ {
@ -313,10 +314,12 @@ namespace CryptoExchange.Net.Sockets
var result = ApiClient.StreamConverter.ReadJson(type, stream, _listenerManager.GetMapping(), ApiClient.ApiOptions.OutputOriginalData ?? ApiClient.ClientOptions.OutputOriginalData); var result = ApiClient.StreamConverter.ReadJson(type, stream, _listenerManager.GetMapping(), ApiClient.ApiOptions.OutputOriginalData ?? ApiClient.ClientOptions.OutputOriginalData);
if(result == null) if(result == null)
{ {
// Not able to parse at all // Not able to parse at all
stream.Position = 0;
var buffer = new byte[stream.Length]; var buffer = new byte[stream.Length];
stream.Position = 0;
stream.Read(buffer, 0, buffer.Length); stream.Read(buffer, 0, buffer.Length);
_logger.LogDebug($"Socket {SocketId} Failed to parse data: {Encoding.UTF8.GetString(buffer)}");
UnparsedMessage?.Invoke(buffer); UnparsedMessage?.Invoke(buffer);
return; return;
} }
@ -436,6 +439,8 @@ namespace CryptoExchange.Net.Sockets
_socket.Dispose(); _socket.Dispose();
} }
public bool CanAddSubscription() => Status == SocketStatus.None || Status == SocketStatus.Connected;
/// <summary> /// <summary>
/// Add a subscription to this connection /// Add a subscription to this connection
/// </summary> /// </summary>