Skip to content

Commit

Permalink
Merge pull request dotnet#3767 from mconnew/Issue3724
Browse files Browse the repository at this point in the history
Fix missing timeout when reading response stream
  • Loading branch information
StephenBonikowsky authored Jul 29, 2019
2 parents 52619e9 + cc4a541 commit 5e517d4
Showing 1 changed file with 8 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ private Task<Message> ReadStreamAsMessageAsync(TimeoutHelper timeoutHelper)
return ReadChunkedBufferedMessageAsync(contentStreamTask, timeoutHelper);
}

return ReadBufferedMessageAsync(contentStreamTask);
return ReadBufferedMessageAsync(contentStreamTask, timeoutHelper);
}

private async Task<Message> ReadChunkedBufferedMessageAsync(Task<Stream> inputStreamTask, TimeoutHelper timeoutHelper)
Expand All @@ -196,7 +196,7 @@ private async Task<Message> ReadChunkedBufferedMessageAsync(Task<Stream> inputSt
}
}

private async Task<Message> ReadBufferedMessageAsync(Task<Stream> inputStreamTask)
private async Task<Message> ReadBufferedMessageAsync(Task<Stream> inputStreamTask, TimeoutHelper timeoutHelper)
{
var inputStream = await inputStreamTask;
if (_contentLength > _factory.MaxReceivedMessageSize)
Expand All @@ -211,10 +211,11 @@ private async Task<Message> ReadBufferedMessageAsync(Task<Stream> inputStreamTas
byte[] buffer = messageBuffer.Array;
int offset = 0;
int count = messageBuffer.Count;
var ct = await timeoutHelper.GetCancellationTokenAsync();

while (count > 0)
{
int bytesRead = await inputStream.ReadAsync(buffer, offset, count);
int bytesRead = await inputStream.ReadAsync(buffer, offset, count, ct);
if (bytesRead == 0) // EOF
{
if (_contentLength != -1)
Expand All @@ -229,7 +230,7 @@ private async Task<Message> ReadBufferedMessageAsync(Task<Stream> inputStreamTas
offset += bytesRead;
}

return await DecodeBufferedMessageAsync(new ArraySegment<byte>(buffer, 0, offset), inputStream);
return await DecodeBufferedMessageAsync(new ArraySegment<byte>(buffer, 0, offset), inputStream, timeoutHelper);
}

private async Task<Message> ReadStreamedMessageAsync(Task<Stream> inputStreamTask)
Expand Down Expand Up @@ -267,15 +268,16 @@ private void ThrowMaxReceivedMessageSizeExceeded()
throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new CommunicationException(message, inner));
}

private async Task<Message> DecodeBufferedMessageAsync(ArraySegment<byte> buffer, Stream inputStream)
private async Task<Message> DecodeBufferedMessageAsync(ArraySegment<byte> buffer, Stream inputStream, TimeoutHelper timeoutHelper)
{
try
{
var ct = await timeoutHelper.GetCancellationTokenAsync();
// if we're chunked, make sure we've consumed the whole body
if (_contentLength == -1 && buffer.Count == _factory.MaxReceivedMessageSize)
{
byte[] extraBuffer = new byte[1];
int extraReceived = await inputStream.ReadAsync(extraBuffer, 0, 1);
int extraReceived = await inputStream.ReadAsync(extraBuffer, 0, 1, ct);
if (extraReceived > 0)
{
ThrowMaxReceivedMessageSizeExceeded();
Expand Down

0 comments on commit 5e517d4

Please sign in to comment.