1
0
mirror of https://github.com/JKorf/CryptoExchange.Net synced 2025-06-10 17:36:19 +00:00

Fix for socket issues .net framework

This commit is contained in:
Jkorf 2021-10-06 09:12:46 +02:00
parent 4758d348de
commit af6f4d14f6

View File

@ -44,7 +44,7 @@ namespace CryptoExchange.Net.Sockets
/// <summary> /// <summary>
/// Received messages time -> size /// Received messages time -> size
/// </summary> /// </summary>
protected readonly Dictionary<DateTime, int> _receivedMessages; internal readonly List<ReceiveItem> _receivedMessages;
/// <summary> /// <summary>
/// Received messages lock /// Received messages lock
/// </summary> /// </summary>
@ -152,7 +152,7 @@ namespace CryptoExchange.Net.Sockets
if (!_receivedMessages.Any()) if (!_receivedMessages.Any())
return 0; return 0;
return Math.Round(_receivedMessages.Values.Sum(v => v) / 1000 / 3d); return Math.Round(_receivedMessages.Sum(v => v.Bytes) / 1000 / 3d);
} }
} }
} }
@ -215,7 +215,7 @@ namespace CryptoExchange.Net.Sockets
this.headers = headers; this.headers = headers;
_outgoingMessages = new List<DateTime>(); _outgoingMessages = new List<DateTime>();
_receivedMessages = new Dictionary<DateTime, int>(); _receivedMessages = new List<ReceiveItem>();
_sendEvent = new AsyncResetEvent(); _sendEvent = new AsyncResetEvent();
_sendBuffer = new ConcurrentQueue<byte[]>(); _sendBuffer = new ConcurrentQueue<byte[]>();
_ctsSource = new CancellationTokenSource(); _ctsSource = new CancellationTokenSource();
@ -380,60 +380,71 @@ namespace CryptoExchange.Net.Sockets
private async Task SendLoopAsync() private async Task SendLoopAsync()
{ {
_startedSent = true; _startedSent = true;
while (true) try
{ {
if (_closing) while (true)
break;
await _sendEvent.WaitAsync().ConfigureAwait(false);
if (_closing)
break;
while (_sendBuffer.TryDequeue(out var data))
{ {
if(RatelimitPerSecond != null) if (_closing)
break;
await _sendEvent.WaitAsync().ConfigureAwait(false);
if (_closing)
break;
while (_sendBuffer.TryDequeue(out var data))
{ {
// Wait for rate limit if (RatelimitPerSecond != null)
DateTime? start = null;
while (MessagesSentLastSecond() >= RatelimitPerSecond)
{ {
if (start == null) // Wait for rate limit
start = DateTime.UtcNow; DateTime? start = null;
await Task.Delay(10).ConfigureAwait(false); while (MessagesSentLastSecond() >= RatelimitPerSecond)
{
if (start == null)
start = DateTime.UtcNow;
await Task.Delay(10).ConfigureAwait(false);
}
if (start != null)
log.Write(LogLevel.Trace, $"Socket {Id} sent delayed {Math.Round((DateTime.UtcNow - start.Value).TotalMilliseconds)}ms because of rate limit");
} }
if (start != null) try
log.Write(LogLevel.Trace, $"Socket {Id} sent delayed {Math.Round((DateTime.UtcNow - start.Value).TotalMilliseconds)}ms because of rate limit"); {
} await _socket.SendAsync(new ArraySegment<byte>(data, 0, data.Length), WebSocketMessageType.Text, true, _ctsSource.Token).ConfigureAwait(false);
_outgoingMessages.Add(DateTime.UtcNow);
try log.Write(LogLevel.Trace, $"Socket {Id} sent {data.Length} bytes");
{ }
await _socket.SendAsync(new ArraySegment<byte>(data, 0, data.Length), WebSocketMessageType.Text, true, _ctsSource.Token).ConfigureAwait(false); catch (OperationCanceledException)
_outgoingMessages.Add(DateTime.UtcNow); {
log.Write(LogLevel.Trace, $"Socket {Id} sent {data.Length} bytes"); // cancelled
} break;
catch (OperationCanceledException) }
{ catch (IOException ioe)
// cancelled {
break; // Connection closed unexpectedly, .NET framework
} Handle(errorHandlers, ioe);
catch (IOException ioe) await CloseInternalAsync(false, true).ConfigureAwait(false);
{ break;
// Connection closed unexpectedly, .NET framework }
Handle(errorHandlers, ioe); catch (WebSocketException wse)
await CloseInternalAsync(false, true).ConfigureAwait(false); {
break; // Connection closed unexpectedly
} Handle(errorHandlers, wse);
catch (WebSocketException wse) await CloseInternalAsync(false, true).ConfigureAwait(false);
{ break;
// Connection closed unexpectedly }
Handle(errorHandlers, wse);
await CloseInternalAsync(false, true).ConfigureAwait(false);
break;
} }
} }
} }
catch (Exception e)
{
// Because this is running in a separate task and not awaited until the socket gets closed
// any exception here will crash the send processing, but do so silently unless the socket get's stopped.
// Make sure we at least let the owner know there was an error
Handle(errorHandlers, e);
throw;
}
} }
/// <summary> /// <summary>
@ -446,101 +457,112 @@ namespace CryptoExchange.Net.Sockets
var buffer = new ArraySegment<byte>(new byte[65536]); var buffer = new ArraySegment<byte>(new byte[65536]);
var received = 0; var received = 0;
while (true) try
{ {
if (_closing)
break;
MemoryStream? memoryStream = null;
WebSocketReceiveResult? receiveResult = null;
bool multiPartMessage = false;
while (true) while (true)
{ {
try if (_closing)
{
receiveResult = await _socket.ReceiveAsync(buffer, _ctsSource.Token).ConfigureAwait(false);
received += receiveResult.Count;
lock(_receivedMessagesLock)
_receivedMessages.Add(DateTime.UtcNow, receiveResult.Count);
}
catch (OperationCanceledException)
{
// cancelled
break; break;
}
catch (WebSocketException wse)
{
// Connection closed unexpectedly
Handle(errorHandlers, wse);
await CloseInternalAsync(true, false).ConfigureAwait(false);
break;
}
catch (IOException ioe)
{
// Connection closed unexpectedly, .NET framework
Handle(errorHandlers, ioe);
await CloseInternalAsync(true, false).ConfigureAwait(false);
break;
}
if (receiveResult.MessageType == WebSocketMessageType.Close) MemoryStream? memoryStream = null;
WebSocketReceiveResult? receiveResult = null;
bool multiPartMessage = false;
while (true)
{ {
// Connection closed unexpectedly try
log.Write(LogLevel.Debug, $"Socket {Id} received `Close` message");
await CloseInternalAsync(true, false).ConfigureAwait(false);
break;
}
if (!receiveResult.EndOfMessage)
{
// We received data, but it is not complete, write it to a memory stream for reassembling
multiPartMessage = true;
if (memoryStream == null)
memoryStream = new MemoryStream();
log.Write(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in partial message");
await memoryStream.WriteAsync(buffer.Array, buffer.Offset, receiveResult.Count).ConfigureAwait(false);
}
else
{
if (!multiPartMessage)
{ {
// Received a complete message and it's not multi part receiveResult = await _socket.ReceiveAsync(buffer, _ctsSource.Token).ConfigureAwait(false);
log.Write(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in single message"); received += receiveResult.Count;
HandleMessage(buffer.Array, buffer.Offset, receiveResult.Count, receiveResult.MessageType); lock (_receivedMessagesLock)
_receivedMessages.Add(new ReceiveItem(DateTime.UtcNow, receiveResult.Count));
}
catch (OperationCanceledException)
{
// cancelled
break;
}
catch (WebSocketException wse)
{
// Connection closed unexpectedly
Handle(errorHandlers, wse);
await CloseInternalAsync(true, false).ConfigureAwait(false);
break;
}
catch (IOException ioe)
{
// Connection closed unexpectedly, .NET framework
Handle(errorHandlers, ioe);
await CloseInternalAsync(true, false).ConfigureAwait(false);
break;
}
if (receiveResult.MessageType == WebSocketMessageType.Close)
{
// Connection closed unexpectedly
log.Write(LogLevel.Debug, $"Socket {Id} received `Close` message");
await CloseInternalAsync(true, false).ConfigureAwait(false);
break;
}
if (!receiveResult.EndOfMessage)
{
// We received data, but it is not complete, write it to a memory stream for reassembling
multiPartMessage = true;
if (memoryStream == null)
memoryStream = new MemoryStream();
log.Write(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in partial message");
await memoryStream.WriteAsync(buffer.Array, buffer.Offset, receiveResult.Count).ConfigureAwait(false);
} }
else else
{ {
// Received the end of a multipart message, write to memory stream for reassembling if (!multiPartMessage)
log.Write(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in partial message"); {
await memoryStream!.WriteAsync(buffer.Array, buffer.Offset, receiveResult.Count).ConfigureAwait(false); // Received a complete message and it's not multi part
log.Write(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in single message");
HandleMessage(buffer.Array, buffer.Offset, receiveResult.Count, receiveResult.MessageType);
}
else
{
// Received the end of a multipart message, write to memory stream for reassembling
log.Write(LogLevel.Trace, $"Socket {Id} received {receiveResult.Count} bytes in partial message");
await memoryStream!.WriteAsync(buffer.Array, buffer.Offset, receiveResult.Count).ConfigureAwait(false);
}
break;
} }
}
lock (_receivedMessagesLock)
UpdateReceivedMessages();
if (receiveResult?.MessageType == WebSocketMessageType.Close)
{
// Received close message
break; break;
} }
}
lock (_receivedMessagesLock) if (receiveResult == null || _closing)
UpdateReceivedMessages(); {
// Error during receiving or cancellation requested, stop.
break;
}
if (receiveResult?.MessageType == WebSocketMessageType.Close) if (multiPartMessage)
{ {
// Received close message // Reassemble complete message from memory stream
break; log.Write(LogLevel.Trace, $"Socket {Id} reassembled message of {memoryStream!.Length} bytes");
} HandleMessage(memoryStream!.ToArray(), 0, (int)memoryStream.Length, receiveResult.MessageType);
memoryStream.Dispose();
if (receiveResult == null || _closing) }
{
// Error during receiving or cancellation requested, stop.
break;
}
if (multiPartMessage)
{
// Reassemble complete message from memory stream
log.Write(LogLevel.Trace, $"Socket {Id} reassembled message of {memoryStream!.Length} bytes");
HandleMessage(memoryStream!.ToArray(), 0, (int)memoryStream.Length, receiveResult.MessageType);
memoryStream.Dispose();
} }
} }
catch(Exception e)
{
// Because this is running in a separate task and not awaited until the socket gets closed
// any exception here will crash the receive processing, but do so silently unless the socket get's stopped.
// Make sure we at least let the owner know there was an error
Handle(errorHandlers, e);
throw;
}
} }
/// <summary> /// <summary>
@ -604,27 +626,38 @@ namespace CryptoExchange.Net.Sockets
protected async Task CheckTimeoutAsync() protected async Task CheckTimeoutAsync()
{ {
log.Write(LogLevel.Debug, $"Socket {Id} Starting task checking for no data received for {Timeout}"); log.Write(LogLevel.Debug, $"Socket {Id} Starting task checking for no data received for {Timeout}");
while (true) try
{ {
if (_closing) while (true)
return; {
if (_closing)
return;
if (DateTime.UtcNow - LastActionTime > Timeout) if (DateTime.UtcNow - LastActionTime > Timeout)
{ {
log.Write(LogLevel.Warning, $"Socket {Id} No data received for {Timeout}, reconnecting socket"); log.Write(LogLevel.Warning, $"Socket {Id} No data received for {Timeout}, reconnecting socket");
_ = CloseAsync().ConfigureAwait(false); _ = CloseAsync().ConfigureAwait(false);
return; return;
} }
try try
{ {
await Task.Delay(500, _ctsSource.Token).ConfigureAwait(false); await Task.Delay(500, _ctsSource.Token).ConfigureAwait(false);
} }
catch (OperationCanceledException) catch (OperationCanceledException)
{ {
// cancelled // cancelled
break; break;
}
} }
} }
catch (Exception e)
{
// Because this is running in a separate task and not awaited until the socket gets closed
// any exception here will stop the timeout checking, but do so silently unless the socket get's stopped.
// Make sure we at least let the owner know there was an error
Handle(errorHandlers, e);
throw;
}
} }
/// <summary> /// <summary>
@ -679,12 +712,24 @@ namespace CryptoExchange.Net.Sockets
var checkTime = DateTime.UtcNow; var checkTime = DateTime.UtcNow;
if (checkTime - _lastReceivedMessagesUpdate > TimeSpan.FromSeconds(1)) if (checkTime - _lastReceivedMessagesUpdate > TimeSpan.FromSeconds(1))
{ {
foreach (var msgTime in _receivedMessages.Keys.ToList()) foreach (var msg in _receivedMessages.ToList()) // To list here because we're removing from the list
if (checkTime - msgTime > TimeSpan.FromSeconds(3)) if (checkTime - msg.Timestamp > TimeSpan.FromSeconds(3))
_receivedMessages.Remove(msgTime); _receivedMessages.Remove(msg);
_lastReceivedMessagesUpdate = checkTime; _lastReceivedMessagesUpdate = checkTime;
} }
} }
} }
internal struct ReceiveItem
{
public DateTime Timestamp { get; set; }
public int Bytes { get; set; }
public ReceiveItem(DateTime timestamp, int bytes)
{
Timestamp = timestamp;
Bytes = bytes;
}
}
} }