-
Notifications
You must be signed in to change notification settings - Fork 5.4k
Description
Description
ChannelReader.WaitToReadAsync(CancellationToken) is supposed to return a ValueTask that completes to false if the channel was completed successfully.
Under specific circumstances, for a SingleConsumerUnboundedChannel, the returned ValueTask throws a ChannelClosedException instead. Specifically, this occurs when the reader calls ReadAsync() and is cancelled after waiting, and is currently awaiting WaitToReadAsync() when the channel is completed. See code snippet below for reproducible details.
Reproduction Steps
var channel = Channel.CreateUnbounded<int>(new UnboundedChannelOptions
{
AllowSynchronousContinuations = false,
SingleReader = true,
SingleWriter = false
});
// Step 1. Call ReadAsync() and get cancelled
using var cts = new CancellationTokenSource();
cts.CancelAfter(10);
try
{
await channel.Reader.ReadAsync(cts.Token);
}
catch (OperationCanceledException)
{
// Expected
}
// Step 2. Call WaitToReadAsync()
var readerTask = channel.Reader.WaitToReadAsync(CancellationToken.None).AsTask();
// Step 3. Close channel
channel.Writer.Complete();
// Should return false, instead throws ChannelClosedException
var result = await readerTask;
Expected behavior
WaitToReadAsync() returns a ValueTask which completes to false.
Actual behavior
WaitToReadAsync() returns a ValueTask which completes with ChannelClosedException.
Regression?
No response
Known Workarounds
No response
Configuration
No response
Other information
SingleConsumerUnboundedChannel holds references to up to two AsyncOperations: one for ReadAsync (blocked reader) and one for WaitToReadAsync (waiting reader). In the scenario described above, both fields are set on the channel (though the blocked reader is already cancelled). The implementation creates a ChannelClosedException to propagate to the blocked reader, however this is incorrectly propagated to the waiting reader as well.
This can be fixed by flipping the order of the if blocks in the following code block, or using a separate var for the blocked reader error on line 257:
Lines 254 to 273 in cf53805
| // Complete a blocked reader if necessary | |
| if (blockedReader is not null) | |
| { | |
| error = ChannelUtilities.CreateInvalidCompletionException(error); | |
| blockedReader.TrySetException(error); | |
| } | |
| // Complete a waiting reader if necessary. (We really shouldn't have both a blockedReader | |
| // and a waitingReader, but it's more expensive to prevent it than to just tolerate it.) | |
| if (waitingReader is not null) | |
| { | |
| if (error is not null) | |
| { | |
| waitingReader.TrySetException(error); | |
| } | |
| else | |
| { | |
| waitingReader.TrySetResult(result: false); | |
| } | |
| } |
Likely relevant person to fix this is the illustrious @stephentoub