Подтвердить что ты не робот

TPL Data Flow, завершение гарантии только после завершения всех блоков данных источника

Как я могу переписать код, который код завершает, когда BOTH transformblocks завершен? Я думал, что завершение означает, что он отмечен как полный И "очередь в очереди" пуста?

public Test()
    {
        broadCastBlock = new BroadcastBlock<int>(i =>
            {
                return i;
            });

        transformBlock1 = new TransformBlock<int, string>(i =>
            {
                Console.WriteLine("1 input count: " + transformBlock1.InputCount);
                Thread.Sleep(50);
                return ("1_" + i);
            });

        transformBlock2 = new TransformBlock<int, string>(i =>
            {
                Console.WriteLine("2 input count: " + transformBlock1.InputCount);
                Thread.Sleep(20);
                return ("2_" + i);
            });

        processorBlock = new ActionBlock<string>(i =>
            {
                Console.WriteLine(i);
            });

        //Linking
        broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
        broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });
        transformBlock1.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
        transformBlock2.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
    }

    public void Start()
    {
        const int numElements = 100;

        for (int i = 1; i <= numElements; i++)
        {
            broadCastBlock.SendAsync(i);
        }

        //mark completion
        broadCastBlock.Complete();

        processorBlock.Completion.Wait();

        Console.WriteLine("Finished");
        Console.ReadLine();
    }
}

Я редактировал код, добавляя количество входных буферов для каждого блока преобразования. Очевидно, что все 100 элементов передаются в каждый из блоков преобразования. Но как только один из трансформационных блоков заканчивается, процессорный блок не принимает больше элементов, и вместо этого входной буфер неполного трансформационного блока просто очищает входной буфер.

4b9b3361

Ответ 1

Вопрос в том, что сказал casperOne в своем ответе. Как только первый блок преобразования завершается, процессорный блок переходит в "режим завершения": он будет обрабатывать оставшиеся элементы в своей очереди ввода, но он не примет никаких новых элементов.

Существует более простое решение, чем разделение блока процессора на два: не устанавливайте PropagateCompletion, а вместо этого устанавливайте завершение блока процессора вручную, когда оба блока преобразования завершены:

Task.WhenAll(transformBlock1.Completion, transformBlock2.Completion)
    .ContinueWith(_ => processorBlock.Complete());

Ответ 2

Проблема заключается в том, что вы устанавливаете свойство PropagateCompletion каждый раз, когда вы вызываете метод LinkTo связывать блоки и разные времена ожидания в блоках преобразования.

Из документации для метода Complete в интерфейсе IDataflowBlock (выделено мной)

Сигналы к IDataflowBlock, которые он не должен принимать, и не создают никаких сообщений и не тратят больше отложенных сообщений.

Поскольку вы разыгрываете время ожидания в каждом из экземпляров TransformBlock<TInput, TOutput>, transformBlock2 (ожидание 20 мс) заканчивается до transformBlock1 (ожидание в течение 50 мс). transformBlock2 завершает сначала, а затем посылает сигнал на processorBlock, который затем говорит: "Я ничего не принимаю" (и transformBlock1 пока не выпустил все свои сообщения).

Обратите внимание, что обработка transformBlock1 до transformBlock1 не гарантируется абсолютно; возможно, что пул потоков (при условии, что вы используете планировщик по умолчанию) будет обрабатывать задачи в другом порядке (но, скорее всего, не будет, поскольку он будет красть работу из очередей после выполнения 20 мс элементов).

Конвейер выглядит следующим образом:

           broadcastBlock
          /              \
 transformBlock1   transformBlock2
          \              /
           processorBlock

Чтобы обойти это, вы хотите иметь конвейер, который выглядит так:

           broadcastBlock
          /              \
 transformBlock1   transformBlock2
          |              |
 processorBlock1   processorBlock2

Это выполняется путем создания двух отдельных ActionBlock<TInput> экземпляров:

// The action, can be a method, makes it easier to share.
Action<string> a = i => Console.WriteLine(i);

// Create the processor blocks.
processorBlock1 = new ActionBlock<string>(a);
processorBlock2 = new ActionBlock<string>(a);


// Linking
broadCastBlock.LinkTo(transformBlock1, 
    new DataflowLinkOptions { PropagateCompletion = true });
broadCastBlock.LinkTo(transformBlock2, 
    new DataflowLinkOptions { PropagateCompletion = true });
transformBlock1.LinkTo(processorBlock1, 
    new DataflowLinkOptions { PropagateCompletion = true });
transformBlock2.LinkTo(processorBlock2, 
    new DataflowLinkOptions { PropagateCompletion = true });

Затем вам нужно подождать оба процессорных блока вместо одного:

Task.WhenAll(processorBlock1.Completion, processorBlock2.Completion).Wait();

Очень важное замечание здесь; при создании ActionBlock<TInput> по умолчанию используется свойство MaxDegreeOfParallelism на ExecutionDataflowBlockOptions экземпляр, переданный ему, установлен на один.

Это означает, что вызовы делегата Action<T>, которые вы передаете в ActionBlock<TInput>, являются потокобезопасными, только один будет выполняться одновременно.

Поскольку у вас теперь есть два экземпляра ActionBlock<TInput>, указывающие на один и тот же делегат Action<T>, вам не гарантируется безопасность потоков.

Если ваш метод является потокобезопасным, вам не нужно ничего делать (что позволит вам установить свойство MaxDegreeOfParallelism в DataflowBlockOptions.Unbounded, так как нет причин блокировать).

Если это не потокобезопасно, и вам нужно это гарантировать, вам нужно прибегнуть к традиционным примитивам синхронизации, например, к lock.

В этом случае вы сделали бы это так (хотя это явно не нужно, как метод WriteLine на Console класс является потокобезопасным):

// The lock.
var l = new object();

// The action, can be a method, makes it easier to share.
Action<string> a = i => {
    // Ensure one call at a time.
    lock (l) Console.WriteLine(i);
};

// And so on...

Ответ 3

В дополнение к ответу svick: чтобы быть совместимым с поведением, которое вы получаете с опцией PropagateCompletion, вам также необходимо пересылать исключения в случае, если предыдущий блок был поврежден. Этот метод продолжения, как и следующий, также заботится об этом:

public static void CompleteWhenAll(this IDataflowBlock target, params IDataflowBlock[] sources) {
    if (target == null) return;
    if (sources.Length == 0) { target.Complete(); return; }
    Task.Factory.ContinueWhenAll(
        sources.Select(b => b.Completion).ToArray(),
        tasks => {
            var exceptions = (from t in tasks where t.IsFaulted select t.Exception).ToList();
            if (exceptions.Count != 0) {
                target.Fault(new AggregateException(exceptions));
            } else {
                target.Complete();
            }
        }
    );
}

Ответ 4

В других ответах достаточно ясно, почему PropagateCompletion = true mess things up, когда блок имеет более двух источников.

Чтобы обеспечить простое решение проблемы, вы можете захотеть взглянуть на библиотеку с открытым исходным кодом DataflowEx, которая решает эту проблему с более разумными правилами завершения. (Он использует привязку потока данных TPL внутри, но поддерживает сложное распространение завершения. Реализация похожа на WhenAll, но также обрабатывает добавление динамической ссылки. Проверьте Dataflow.RegisterDependency() и TaskEx.AwaitableWhenAll() для подробностей.)

Я немного изменил свой код, чтобы все работало с помощью DataflowEx:

public CompletionDemo1()
{
    broadCaster = new BroadcastBlock<int>(
        i =>
            {
                return i;
            }).ToDataflow();

    transformBlock1 = new TransformBlock<int, string>(
        i =>
            {
                Console.WriteLine("1 input count: " + transformBlock1.InputCount);
                Thread.Sleep(50);
                return ("1_" + i);
            });

    transformBlock2 = new TransformBlock<int, string>(
        i =>
            {
                Console.WriteLine("2 input count: " + transformBlock2.InputCount);
                Thread.Sleep(20);
                return ("2_" + i);
            });

    processor = new ActionBlock<string>(
        i =>
            {
                Console.WriteLine(i);
            }).ToDataflow();

    /** rather than TPL linking
      broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
      broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });
      transformBlock1.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
      transformBlock2.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
     **/

    //Use DataflowEx linking
    var transform1 = transformBlock1.ToDataflow();
    var transform2 = transformBlock2.ToDataflow();

    broadCaster.LinkTo(transform1);
    broadCaster.LinkTo(transform2);
    transform1.LinkTo(processor);
    transform2.LinkTo(processor);
}

Полный код здесь.

Отказ от ответственности: я являюсь автором DataflowEx, который публикуется под лицензией MIT.