diff --git a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs index 188d60e..fa66a70 100644 --- a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs +++ b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs @@ -235,16 +235,25 @@ namespace CryptoExchange.Net.Sockets } log.Write(LogLevel.Trace, $"Socket {Id} connection succeeded, starting communication"); - _sendTask = Task.Factory.StartNew(SendLoopAsync, TaskCreationOptions.LongRunning); - _receiveTask = Task.Factory.StartNew(ReceiveLoopAsync, TaskCreationOptions.LongRunning); + _sendTask = Task.Factory.StartNew(SendLoopAsync, default, TaskCreationOptions.LongRunning | TaskCreationOptions.DenyChildAttach, TaskScheduler.Default).Unwrap(); + _receiveTask = Task.Factory.StartNew(ReceiveLoopAsync, default, TaskCreationOptions.LongRunning | TaskCreationOptions.DenyChildAttach, TaskScheduler.Default).Unwrap(); if (Timeout != default) _timeoutTask = Task.Run(CheckTimeoutAsync); var sw = Stopwatch.StartNew(); while (!_startedSent || !_startedReceive) + { // Wait for the tasks to have actually started await Task.Delay(10).ConfigureAwait(false); + if(sw.ElapsedMilliseconds > 5000) + { + _ = _socket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "", default); + log.Write(LogLevel.Debug, $"Socket {Id} startup interupted"); + return false; + } + } + log.Write(LogLevel.Debug, $"Socket {Id} connected"); return true; } @@ -279,8 +288,6 @@ namespace CryptoExchange.Net.Sockets if (_closing) return; - _startedSent = false; - _startedReceive = false; _closing = true; var tasksToAwait = new List(); if (_socket.State == WebSocketState.Open) @@ -323,6 +330,9 @@ namespace CryptoExchange.Net.Sockets log.Write(LogLevel.Debug, $"Socket {Id} resetting"); _ctsSource = new CancellationTokenSource(); _closing = false; + + while (_sendBuffer.TryDequeue(out _)) { } // Clear send buffer + _socket = CreateSocket(); } @@ -390,18 +400,11 @@ namespace CryptoExchange.Net.Sockets // canceled break; } - catch (IOException ioe) + catch (Exception ioe) { // Connection closed unexpectedly, .NET framework Handle(errorHandlers, ioe); - await CloseInternalAsync(false, true).ConfigureAwait(false); - break; - } - catch (WebSocketException wse) - { - // Connection closed unexpectedly - Handle(errorHandlers, wse); - await CloseInternalAsync(false, true).ConfigureAwait(false); + _ = Task.Run(async () => await CloseInternalAsync(false, true).ConfigureAwait(false)); break; } } @@ -415,6 +418,11 @@ namespace CryptoExchange.Net.Sockets Handle(errorHandlers, e); throw; } + finally + { + log.Write(LogLevel.Trace, $"Socket {Id} Send loop finished"); + _startedSent = false; + } } /// @@ -424,7 +432,6 @@ namespace CryptoExchange.Net.Sockets private async Task ReceiveLoopAsync() { _startedReceive = true; - var buffer = new ArraySegment(new byte[65536]); var received = 0; try @@ -451,18 +458,11 @@ namespace CryptoExchange.Net.Sockets // canceled break; } - catch (WebSocketException wse) + catch (Exception wse) { // Connection closed unexpectedly Handle(errorHandlers, wse); - await CloseInternalAsync(true, false).ConfigureAwait(false); - break; - } - catch (IOException ioe) - { - // Connection closed unexpectedly, .NET framework - Handle(errorHandlers, ioe); - await CloseInternalAsync(true, false).ConfigureAwait(false); + _ = Task.Run(async () => await CloseInternalAsync(true, true).ConfigureAwait(false)); break; } @@ -470,7 +470,7 @@ namespace CryptoExchange.Net.Sockets { // Connection closed unexpectedly log.Write(LogLevel.Debug, $"Socket {Id} received `Close` message"); - await CloseInternalAsync(true, false).ConfigureAwait(false); + _ = Task.Run(async () => await CloseInternalAsync(true, true).ConfigureAwait(false)); break; } @@ -517,21 +517,32 @@ namespace CryptoExchange.Net.Sockets if (multiPartMessage) { - // Reassemble complete message from memory stream - log.Write(LogLevel.Trace, $"Socket {Id} reassembled message of {memoryStream!.Length} bytes"); - HandleMessage(memoryStream!.ToArray(), 0, (int)memoryStream.Length, receiveResult.MessageType); - memoryStream.Dispose(); + // When the connection gets interupted we might not have received a full message + if (receiveResult?.EndOfMessage == true) + { + // Reassemble complete message from memory stream + log.Write(LogLevel.Trace, $"Socket {Id} reassembled message of {memoryStream!.Length} bytes"); + HandleMessage(memoryStream!.ToArray(), 0, (int)memoryStream.Length, receiveResult.MessageType); + memoryStream.Dispose(); + } + else + log.Write(LogLevel.Trace, $"Socket {Id} discarding incomplete message of {memoryStream!.Length} bytes"); } } } catch(Exception e) { // Because this is running in a separate task and not awaited until the socket gets closed - // any exception here will crash the receive processing, but do so silently unless the socket get's stopped. + // any exception here will crash the receive processing, but do so silently unless the socket gets stopped. // Make sure we at least let the owner know there was an error Handle(errorHandlers, e); throw; } + finally + { + log.Write(LogLevel.Trace, $"Socket {Id} Receive loop finished"); + _startedReceive = false; + } } ///