1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-06-10 01:16:24 +00:00

Fix for concurrent sent issue

This commit is contained in:
Jkorf 2021-08-20 16:27:23 +02:00
parent e142386bef
commit 474bb13204
3 changed files with 28 additions and 19 deletions

View File

@ -3818,7 +3818,9 @@
<member name="M:CryptoExchange.Net.Sockets.WebsocketFactory.CreateWebsocket(CryptoExchange.Net.Logging.Log,System.String,System.Collections.Generic.IDictionary{System.String,System.String},System.Collections.Generic.IDictionary{System.String,System.String})"> <member name="M:CryptoExchange.Net.Sockets.WebsocketFactory.CreateWebsocket(CryptoExchange.Net.Logging.Log,System.String,System.Collections.Generic.IDictionary{System.String,System.String},System.Collections.Generic.IDictionary{System.String,System.String})">
<inheritdoc /> <inheritdoc />
</member> </member>
<member name="T:System.Diagnostics.CodeAnalysis.AllowNullAttribute"> </members>
</doc>
System.Diagnostics.CodeAnalysis.AllowNullAttribute">
<summary> <summary>
Specifies that <see langword="null"/> is allowed as an input even if the Specifies that <see langword="null"/> is allowed as an input even if the
corresponding type disallows it. corresponding type disallows it.

View File

@ -463,6 +463,9 @@ namespace CryptoExchange.Net.OrderBook
if(!checksumResult) if(!checksumResult)
{ {
// Reconnects the socket, also closing other subscriptions on that socket.
// Should maybe only reconnect the specific subscription?
log.Write(LogLevel.Warning, $"{Id} order book {Symbol} out of sync. Resyncing"); log.Write(LogLevel.Warning, $"{Id} order book {Symbol} out of sync. Resyncing");
_ = subscription?.ReconnectAsync(); _ = subscription?.ReconnectAsync();
return; return;

View File

@ -336,29 +336,33 @@ namespace CryptoExchange.Net.Sockets
_startedSent = true; _startedSent = true;
while (true) while (true)
{ {
if (_closing)
break;
_sendEvent.WaitOne(); _sendEvent.WaitOne();
if (_closing) if (_closing)
break;
if (!_sendBuffer.TryDequeue(out var data))
continue;
try
{
await _socket.SendAsync(new ArraySegment<byte>(data, 0, data.Length), WebSocketMessageType.Text, true, _ctsSource.Token).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
// cancelled
break; break;
}
catch (WebSocketException wse) while (_sendBuffer.TryDequeue(out var data))
{ {
// Connection closed unexpectedly try
Handle(errorHandlers, wse); {
await CloseInternalAsync(false, true).ConfigureAwait(false); log.Write(LogLevel.Debug, "Sending " + Encoding.UTF8.GetString(data));
break; await _socket.SendAsync(new ArraySegment<byte>(data, 0, data.Length), WebSocketMessageType.Text, true, _ctsSource.Token).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
// cancelled
break;
}
catch (WebSocketException wse)
{
// Connection closed unexpectedly
Handle(errorHandlers, wse);
await CloseInternalAsync(false, true).ConfigureAwait(false);
break;
}
} }
} }
} }