Подтвердить что ты не робот

BufferBlock тупик с OutputAvailableAsync после TryReceiveAll

Работая над ответом на этот вопрос, я написал этот фрагмент:

var buffer = new BufferBlock<object>();
var producer = Task.Run(async () =>
{
    while (true)
    {
        await Task.Delay(TimeSpan.FromMilliseconds(100));
        buffer.Post(null);
        Console.WriteLine("Post " + buffer.Count);
    }
});
var consumer = Task.Run(async () =>
{
    while (await buffer.OutputAvailableAsync())
    {
        IList<object> items;
        buffer.TryReceiveAll(out items);
        Console.WriteLine("TryReceiveAll " + buffer.Count);
    }
});
await Task.WhenAll(consumer, producer);

Продюсер должен отправлять элементы в буфер каждые 100 мс, и потребитель должен очистить все элементы из буфера и асинхронно дождаться появления большего количества элементов.

Что на самом деле происходит, так это то, что производитель очищает все элементы один раз, а затем никогда больше не выходит за пределы OutputAvailableAsync. Если я переключаю пользователя на удаление элементов один за другим, он работает как исключенный:

while (await buffer.OutputAvailableAsync())
{
    object item;
    while (buffer.TryReceive(out item)) ;
}

Я что-то не понимаю? Если нет, в чем проблема?

4b9b3361

Ответ 1

Это ошибка в SourceCore, которая используется внутри BufferBlock. Его метод TryReceiveAll не включает элемент _enableOffering boolean data, а TryReceive делает. Это приводит к возврату задачи из OutputAvailableAsync.

Здесь минимальное воспроизведение:

var buffer = new BufferBlock<object>();
buffer.Post(null);

IList<object> items;
buffer.TryReceiveAll(out items);

var outputAvailableAsync = buffer.OutputAvailableAsync();
buffer.Post(null);

await outputAvailableAsync; // Never completes

Я только что зафиксировал его в реестре .Net core с этим запросом на перенос. Надеемся, что исправление вскоре появится в пакете nuget.

Ответ 2

Увы, это конец сентября 2015 года, и хотя i3arnon исправил ошибку, она не была решена в версии, выпущенной через два дня после ошибки исправлено: Microsoft TPL Dataflow версии 4.5.24.

Однако IReceivableSourceBlock.TryReceive(...) работает правильно. Метод расширения решит проблему. После новой версии TPL Dataflow будет легко изменить метод расширения.

/// <summary>
/// This extension method returns all available items in the IReceivableSourceBlock
/// or an empty sequence if nothing is available. The functin does not wait.
/// </summary>
/// <typeparam name="T">The type of items stored in the IReceivableSourceBlock</typeparam>
/// <param name="buffer">the source where the items should be extracted from </param>
/// <returns>The IList with the received items. Empty if no items were available</returns>
public static IList<T> TryReceiveAllEx<T>(this IReceivableSourceBlock<T> buffer)
{
    /* Microsoft TPL Dataflow version 4.5.24 contains a bug in TryReceiveAll
     * Hence this function uses TryReceive until nothing is available anymore
     * */
    IList<T> receivedItems = new List<T>();
    T receivedItem = default(T);
    while (buffer.TryReceive<T>(out receivedItem))
    {
        receivedItems.Add(receivedItem);
    }
    return receivedItems;
}

использование:

while (await this.bufferBlock.OutputAvailableAsync())
{
    // some data available
    var receivedItems = this.bufferBlock.TryReceiveAllEx();
    if (receivedItems.Any())
    {
        ProcessReceivedItems(bufferBlock);
    }
}