diff --git a/CryptoExchange.Net/Clients/SocketApiClient.cs b/CryptoExchange.Net/Clients/SocketApiClient.cs
index 43af3bf..57f3180 100644
--- a/CryptoExchange.Net/Clients/SocketApiClient.cs
+++ b/CryptoExchange.Net/Clients/SocketApiClient.cs
@@ -10,6 +10,7 @@ using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
+using System.Net.WebSockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
@@ -103,15 +104,12 @@ namespace CryptoExchange.Net
}
}
-
///
public new SocketExchangeOptions ClientOptions => (SocketExchangeOptions)base.ClientOptions;
///
public new SocketApiOptions ApiOptions => (SocketApiOptions)base.ApiOptions;
- ///
- public abstract MessageInterpreterPipeline Pipeline { get; }
#endregion
///
@@ -135,19 +133,9 @@ namespace CryptoExchange.Net
RateLimiters = rateLimiters;
}
- ///
- /// Set a delegate which can manipulate the message stream before it is processed by listeners
- ///
- /// Interceptor
- protected void SetInterceptor(Func interceptor)
- {
- this.interceptor = interceptor;
- }
-
///
/// Connect to an url and listen for data on the BaseAddress
///
- /// The type of the expected data
/// The subscription
/// Cancellation token for closing this subscription
///
@@ -159,7 +147,6 @@ namespace CryptoExchange.Net
///
/// Connect to an url and listen for data
///
- /// The type of the expected data
/// The URL to connect to
/// The subscription
/// Cancellation token for closing this subscription
@@ -247,7 +234,7 @@ namespace CryptoExchange.Net
return new CallResult(subResult.Error!);
}
- subscription.HandleSubQueryResponse(subQuery.Response);
+ subscription.HandleSubQueryResponse(subQuery.Response!);
}
subscription.Confirmed = true;
@@ -382,7 +369,7 @@ namespace CryptoExchange.Net
/// Should return the request which can be used to authenticate a socket connection
///
///
- protected internal virtual BaseQuery GetAuthenticationRequest() => throw new NotImplementedException();
+ protected internal virtual Query GetAuthenticationRequest() => throw new NotImplementedException();
///
/// Adds a system subscription. Used for example to reply to ping requests
@@ -474,7 +461,7 @@ namespace CryptoExchange.Net
/// Process an unhandled message
///
/// The message that wasn't processed
- protected virtual void HandleUnhandledMessage(BaseParsedMessage message)
+ protected virtual void HandleUnhandledMessage(SocketMessage message)
{
}
@@ -538,7 +525,7 @@ namespace CryptoExchange.Net
/// How often
/// Method returning the query to send
/// The callback for processing the response
- protected virtual void QueryPeriodic(string identifier, TimeSpan interval, Func queryDelegate, Action? callback)
+ protected virtual void QueryPeriodic(string identifier, TimeSpan interval, Func queryDelegate, Action? callback)
{
if (queryDelegate == null)
throw new ArgumentNullException(nameof(queryDelegate));
@@ -686,7 +673,7 @@ namespace CryptoExchange.Net
sb.AppendLine($" Id: {subscription.Id}");
sb.AppendLine($" Confirmed: {subscription.Confirmed}");
sb.AppendLine($" Invocations: {subscription.TotalInvocations}");
- sb.AppendLine($" Identifiers: [{string.Join(", ", subscription.StreamIdentifiers)}]");
+ sb.AppendLine($" Identifiers: [{string.Join(", ", subscription.ListenerIdentifiers)}]");
}
}
return sb.ToString();
@@ -708,5 +695,20 @@ namespace CryptoExchange.Net
semaphoreSlim?.Dispose();
base.Dispose();
}
+
+ ///
+ /// Get the listener identifier for the message
+ ///
+ ///
+ ///
+ public abstract string GetListenerIdentifier(SocketMessage message);
+
+ ///
+ /// Preprocess a stream message
+ ///
+ ///
+ ///
+ ///
+ public virtual Stream PreprocessStreamMessage(WebSocketMessageType type, Stream stream) => stream;
}
}
diff --git a/CryptoExchange.Net/Converters/JTokenAccessor.cs b/CryptoExchange.Net/Converters/JTokenAccessor.cs
index 182bc5b..c21f330 100644
--- a/CryptoExchange.Net/Converters/JTokenAccessor.cs
+++ b/CryptoExchange.Net/Converters/JTokenAccessor.cs
@@ -1,158 +1,158 @@
-using CryptoExchange.Net.Interfaces;
-using CryptoExchange.Net.Objects.Sockets;
-using Newtonsoft.Json;
-using Newtonsoft.Json.Linq;
-using System;
-using System.Collections.Generic;
-using System.IO;
-using System.Linq;
-using System.Runtime.InteropServices.ComTypes;
-using System.Text;
+//using CryptoExchange.Net.Interfaces;
+//using CryptoExchange.Net.Objects.Sockets;
+//using Newtonsoft.Json;
+//using Newtonsoft.Json.Linq;
+//using System;
+//using System.Collections.Generic;
+//using System.IO;
+//using System.Linq;
+//using System.Runtime.InteropServices.ComTypes;
+//using System.Text;
-namespace CryptoExchange.Net.Converters
-{
- internal class JTokenAccessor : IMessageAccessor
- {
- private readonly JToken _token;
- private readonly Stream _stream;
- private readonly StreamReader _reader;
- private Dictionary _cache = new Dictionary();
- private static JsonSerializer _serializer = JsonSerializer.Create(SerializerOptions.WithConverters);
+//namespace CryptoExchange.Net.Converters
+//{
+// internal class JTokenAccessor : IMessageAccessor
+// {
+// private readonly JToken _token;
+// private readonly Stream _stream;
+// private readonly StreamReader _reader;
+// private Dictionary _cache = new Dictionary();
+// private static JsonSerializer _serializer = JsonSerializer.Create(SerializerOptions.WithConverters);
- public JTokenAccessor(Stream stream)
- {
- _stream = stream;
- _reader = new StreamReader(stream, Encoding.UTF8, false, (int)stream.Length, true);
- using var jsonTextReader = new JsonTextReader(_reader);
- JToken token;
- try
- {
- _token = JToken.Load(jsonTextReader);
- }
- catch (Exception ex)
- {
- // Not a json message
- throw;
- }
- }
+// public JTokenAccessor(Stream stream)
+// {
+// _stream = stream;
+// _reader = new StreamReader(stream, Encoding.UTF8, false, (int)stream.Length, true);
+// using var jsonTextReader = new JsonTextReader(_reader);
+// JToken token;
+// try
+// {
+// _token = JToken.Load(jsonTextReader);
+// }
+// catch (Exception ex)
+// {
+// // Not a json message
+// throw;
+// }
+// }
- public BaseParsedMessage Instantiate(Type type)
- {
- var resultMessageType = typeof(ParsedMessage<>).MakeGenericType(type);
- var instance = (BaseParsedMessage)Activator.CreateInstance(resultMessageType, type == null ? null : _token.ToObject(type, _serializer));
- return instance;
- }
+// public BaseParsedMessage Instantiate(Type type)
+// {
+// var resultMessageType = typeof(ParsedMessage<>).MakeGenericType(type);
+// var instance = (BaseParsedMessage)Activator.CreateInstance(resultMessageType, type == null ? null : _token.ToObject(type, _serializer));
+// return instance;
+// }
- public string GetOriginalString()
- {
- _stream.Position = 0;
- return _reader.ReadToEnd();
- }
+// public string GetOriginalString()
+// {
+// _stream.Position = 0;
+// return _reader.ReadToEnd();
+// }
- public int? GetArrayIntValue(string? key, int index)
- {
- var accessToken = key == null ? _token : GetToken(key);
- if (accessToken == null || accessToken is not JArray arr)
- return null;
- return arr[index].Value();
- }
+// public int? GetArrayIntValue(string? key, int index)
+// {
+// var accessToken = key == null ? _token : GetToken(key);
+// if (accessToken == null || accessToken is not JArray arr)
+// return null;
+// return arr[index].Value();
+// }
- public string? GetArrayStringValue(string? key, int index)
- {
- var accessToken = key == null ? _token : GetToken(key);
- if (accessToken == null || accessToken is not JArray arr)
- return null;
+// public string? GetArrayStringValue(string? key, int index)
+// {
+// var accessToken = key == null ? _token : GetToken(key);
+// if (accessToken == null || accessToken is not JArray arr)
+// return null;
- if (arr.Count <= index)
- return null;
+// if (arr.Count <= index)
+// return null;
- if (arr[index].Type != JTokenType.String)
- return null;
+// if (arr[index].Type != JTokenType.String)
+// return null;
- return arr[index].Value();
- }
+// return arr[index].Value();
+// }
- public int? GetCount(string key)
- {
- var accessToken = GetToken(key);
- return accessToken.Count();
- }
+// public int? GetCount(string key)
+// {
+// var accessToken = GetToken(key);
+// return accessToken.Count();
+// }
- public int? GetIntValue(string key)
- {
- var accessToken = GetToken(key);
- return accessToken?.Value();
- }
+// public int? GetIntValue(string key)
+// {
+// var accessToken = GetToken(key);
+// return accessToken?.Value();
+// }
- public string? GetStringValue(string key)
- {
- var accessToken = GetToken(key);
- if (accessToken?.Type == JTokenType.Object)
- return ((JObject)accessToken).Properties().First().Name;
+// public string? GetStringValue(string key)
+// {
+// var accessToken = GetToken(key);
+// if (accessToken?.Type == JTokenType.Object)
+// return ((JObject)accessToken).Properties().First().Name;
- return accessToken?.ToString();
- }
+// return accessToken?.ToString();
+// }
- public bool IsObject(string? key) => _token.Type == JTokenType.Object;
- public bool IsArray(IEnumerable indexes)
- {
- var item = _token;
- foreach(var index in indexes)
- {
- if (item.Type != JTokenType.Array)
- return false;
+// public bool IsObject(string? key) => _token.Type == JTokenType.Object;
+// public bool IsArray(IEnumerable indexes)
+// {
+// var item = _token;
+// foreach(var index in indexes)
+// {
+// if (item.Type != JTokenType.Array)
+// return false;
- var arr = ((JArray)item);
- if (arr.Count <= index)
- return false;
+// var arr = ((JArray)item);
+// if (arr.Count <= index)
+// return false;
- item = arr[index];
- }
+// item = arr[index];
+// }
- return item.Type == JTokenType.Array;
- }
+// return item.Type == JTokenType.Array;
+// }
- public bool IsEmptyArray(IEnumerable indexes)
- {
- var item = _token;
- foreach (var index in indexes)
- {
- if (item.Type != JTokenType.Array)
- return false;
+// public bool IsEmptyArray(IEnumerable indexes)
+// {
+// var item = _token;
+// foreach (var index in indexes)
+// {
+// if (item.Type != JTokenType.Array)
+// return false;
- var arr = ((JArray)item);
- if (arr.Count <= index)
- return false;
+// var arr = ((JArray)item);
+// if (arr.Count <= index)
+// return false;
- item = arr[index];
- }
+// item = arr[index];
+// }
- return item.Type == JTokenType.Array && !item.HasValues;
- }
+// return item.Type == JTokenType.Array && !item.HasValues;
+// }
- private JToken? GetToken(string key)
- {
- if (key == null)
- return _token;
+// private JToken? GetToken(string key)
+// {
+// if (key == null)
+// return _token;
- if (_cache.TryGetValue(key, out var token))
- return token;
+// if (_cache.TryGetValue(key, out var token))
+// return token;
- var splitTokens = key.Split(new char[] { ':' });
- var accessToken = _token;
- foreach (var splitToken in splitTokens)
- {
- if (accessToken.Type == JTokenType.Array)
- return null;
+// var splitTokens = key.Split(new char[] { ':' });
+// var accessToken = _token;
+// foreach (var splitToken in splitTokens)
+// {
+// if (accessToken.Type == JTokenType.Array)
+// return null;
- accessToken = accessToken[splitToken];
+// accessToken = accessToken[splitToken];
- if (accessToken == null)
- break;
- }
+// if (accessToken == null)
+// break;
+// }
- _cache.Add(key, accessToken);
- return accessToken;
- }
- }
-}
+// _cache.Add(key, accessToken);
+// return accessToken;
+// }
+// }
+//}
diff --git a/CryptoExchange.Net/Interfaces/IMessageAccessor.cs b/CryptoExchange.Net/Interfaces/IMessageAccessor.cs
deleted file mode 100644
index f9c67ae..0000000
--- a/CryptoExchange.Net/Interfaces/IMessageAccessor.cs
+++ /dev/null
@@ -1,21 +0,0 @@
-using CryptoExchange.Net.Objects.Sockets;
-using System;
-using System.Collections.Generic;
-using System.Text;
-
-namespace CryptoExchange.Net.Interfaces
-{
- public interface IMessageAccessor
- {
- bool IsObject(string? key);
- bool IsArray(IEnumerable indexes);
- bool IsEmptyArray(IEnumerable indexes);
- string? GetStringValue(string key);
- int? GetIntValue(string key);
- public int? GetCount(string key);
- public int? GetArrayIntValue(string? key, int index);
- public string? GetArrayStringValue(string? key, int index);
-
- public BaseParsedMessage Instantiate(Type type);
- }
-}
diff --git a/CryptoExchange.Net/Interfaces/IMessageProcessor.cs b/CryptoExchange.Net/Interfaces/IMessageProcessor.cs
index 62cf75b..299d16e 100644
--- a/CryptoExchange.Net/Interfaces/IMessageProcessor.cs
+++ b/CryptoExchange.Net/Interfaces/IMessageProcessor.cs
@@ -8,11 +8,31 @@ using System.Threading.Tasks;
namespace CryptoExchange.Net.Interfaces
{
+ ///
+ /// Message processor
+ ///
public interface IMessageProcessor
{
+ ///
+ /// Id of the processor
+ ///
public int Id { get; }
- public List StreamIdentifiers { get; }
- Task HandleMessageAsync(SocketConnection connection, DataEvent message);
- Dictionary TypeMapping { get; }
+ ///
+ /// The identifiers for this processor
+ ///
+ public HashSet ListenerIdentifiers { get; }
+ ///
+ /// Handle a message
+ ///
+ ///
+ ///
+ ///
+ Task HandleAsync(SocketConnection connection, DataEvent