diff --git a/CryptoExchange.Net/Testing/SocketSubscriptionValidator.cs b/CryptoExchange.Net/Testing/SocketSubscriptionValidator.cs index de9f628..3549c2b 100644 --- a/CryptoExchange.Net/Testing/SocketSubscriptionValidator.cs +++ b/CryptoExchange.Net/Testing/SocketSubscriptionValidator.cs @@ -8,6 +8,7 @@ using System.Diagnostics; using System.IO; using System.Text; using System.Text.Json; +using System.Text.RegularExpressions; using System.Threading; using System.Threading.Tasks; @@ -39,6 +40,135 @@ namespace CryptoExchange.Net.Testing _nestedPropertyForCompare = nestedPropertyForCompare; } + public async Task ValidateConcurrentAsync( + Func>, Task>> methodInvoke1, + Func>, Task>> methodInvoke2, + string name) + { + var path = Directory.GetParent(Environment.CurrentDirectory)!.Parent!.Parent!.FullName; + FileStream file1; + try + { + file1 = File.OpenRead(Path.Combine(path, _folder, $"{name}.txt")); + } + catch (FileNotFoundException) + { + throw new Exception("Response file not found"); + } + + var buffer1 = new byte[file1.Length]; + await file1.ReadAsync(buffer1, 0, (int)file1.Length).ConfigureAwait(false); + file1.Close(); + + var data1 = Encoding.UTF8.GetString(buffer1); + using var reader1 = new StringReader(data1); + + var socket = TestHelpers.ConfigureSocketClient(_client, _baseAddress); + + string? lastMessage1 = null; + string? lastMessage2 = null; + bool turn = false; + socket.OnMessageSend += (x) => + { + if (turn) + lastMessage1 = x; + else + lastMessage2 = x; + + turn = !turn; + }; + + int updates1 = 0; + int updates2 = 0; + Task> task1; + Task> task2; + // Invoke subscription method + try + { + task1 = methodInvoke1(_client, x => { updates1++; }); + task2 = methodInvoke2(_client, x => { updates2++; }); + } + catch (Exception) + { + throw; + } + + string? message1 = null; + string? message2 = null; + + while (true) + { + var line1 = reader1.ReadLine(); + if (line1 == null) + break; + + if (line1.StartsWith(">")) + { + // Expect a message from client to server + if (line1[1] == '1') + message1 = line1.Substring(3); + else + message2 = line1.Substring(3); + + await Task.Delay(100).ConfigureAwait(false); + } + else if (line1.StartsWith("<")) + { + var line = line1.Substring(3); + var matches = Regex.Matches(line, "(\\|.+\\|)"); + if (matches.Count > 0) + { + var match = matches[0]; + var prevMessage = line1[1] == '1' ? lastMessage1 : lastMessage2; + var json = JsonDocument.Parse(prevMessage); + var propName = match.Value.Substring(1, match.Value.Length - 2); + var split = propName.Split('.'); + var jsonProp = json.RootElement; + foreach (var x in split) + jsonProp = jsonProp.GetProperty(x); + + var value = jsonProp.ValueKind == JsonValueKind.String ? jsonProp.GetString() : jsonProp.GetInt32().ToString(); + line = line.Replace(match.Value, value); + } + + socket.InvokeMessage(line); + } + else if (line1.StartsWith("=")) + { + var line = line1.Substring(3); + var matches = Regex.Matches(line, "(\\|.+\\|)"); + if (matches.Count > 0) + { + var match = matches[0]; + var prevMessage = line1[1] == '1' ? lastMessage1 : lastMessage2; + var json = JsonDocument.Parse(prevMessage); + var propName = match.Value.Substring(1, match.Value.Length - 2); + var split = propName.Split('.'); + var jsonProp = json.RootElement; + foreach (var x in split) + jsonProp = jsonProp.GetProperty(x); + + var value = jsonProp.ValueKind == JsonValueKind.String ? jsonProp.GetString() : jsonProp.GetInt32().ToString(); + line = line.Replace(match.Value, value); + } + + socket.InvokeMessage(line); + } + } + + var res = await Task.WhenAll(task1, task2).ConfigureAwait(false); + if (!res[0]) + throw new Exception("Subscribe failed: " + res[0].Error!.ToString()); + if (!res[1]) + throw new Exception("Subscribe failed: " + res[1].Error!.ToString()); + + if (updates1 != 1 || updates2 != 1) + throw new Exception($"Expected 1 update for both subscriptions, instead got {updates1} and {updates2}"); + + //await _client.UnsubscribeAllAsync().ConfigureAwait(false); + } + + /// /// Validate a subscription ///