Подтвердить что ты не робот

Реализация правильного завершения повторного блока

Teaser: ребята, этот вопрос не о том, как реализовать политику повтора. Это о правильном завершении блока потока данных TPL.

Этот вопрос в основном является продолжением моего предыдущего вопроса Повторить политику в ITargetBlock. Ответом на этот вопрос было решение @svick smart, которое использует TransformBlock (источник) и TransformManyBlock (target). Единственная проблема заключается в том, чтобы закончить этот блок правильно: дождитесь завершения всех попыток, а затем заполните целевой блок. Вот что я закончил (это просто фрагмент, не обращайте слишком много внимания на не-потоковое приложение retries):

var retries = new HashSet<RetryingMessage<TInput>>();

TransformManyBlock<RetryableMessage<TInput>, TOutput> target = null;
target = new TransformManyBlock<RetryableMessage<TInput>, TOutput>(
    async message =>
    {
        try
        {
            var result = new[] { await transform(message.Data) };
            retries.Remove(message);
            return result;
        }
        catch (Exception ex)
        {
            message.Exceptions.Add(ex);
            if (message.RetriesRemaining == 0)
            {
                if (failureHandler != null)
                    failureHandler(message.Exceptions);

                retries.Remove(message);
            }
            else
            {
                retries.Add(message);
                message.RetriesRemaining--;

                Task.Delay(retryDelay)
                    .ContinueWith(_ => target.Post(message));
            }
            return null;
        }
    }, dataflowBlockOptions);

source.LinkTo(target);

source.Completion.ContinueWith(async _ =>
{
    while (target.InputCount > 0 || retries.Any())
        await Task.Delay(100);

    target.Complete();
});

Идея состоит в том, чтобы выполнить какой-то опрос и проверить, есть ли еще сообщения, ожидающие обработки, и нет сообщений, требующих повторной попытки. Но в этом решении мне не нравится идея опроса.

Да, я могу инкапсулировать логику добавления/удаления повторений в отдельный класс и даже, например, выполнить некоторое действие, когда набор попыток станет пустым, но как справиться с условием target.InputCount > 0? Не существует такого обратного вызова, вызываемого при отсутствии ожидающих сообщений для блока, поэтому кажется, что проверка target.ItemCount в цикле с небольшой задержкой является единственным вариантом.

Кто-нибудь знает более умный способ достичь этого?

4b9b3361

Ответ 1

Сочетание hwcverwe ответа и комментария JamieSee может быть идеальным решением.

Сначала вам нужно создать несколько событий:

var signal  = new ManualResetEvent(false);
var completedEvent = new ManualResetEvent(false);

Затем вам нужно создать наблюдателя и подписаться на TransformManyBlock, поэтому вы будете уведомлены, когда произойдет соответствующее событие:

var observer = new RetryingBlockObserver<TOutput>(completedEvent);
var observable = target.AsObservable();
observable.Subscribe(observer);

Наблюдаемое может быть довольно простым:

private class RetryingBlockObserver<T> : IObserver<T> {
        private ManualResetEvent completedEvent;

        public RetryingBlockObserver(ManualResetEvent completedEvent) {                
            this.completedEvent = completedEvent;
        }

        public void OnCompleted() {
            completedEvent.Set();
        }

        public void OnError(Exception error) {
            //TODO
        }

        public void OnNext(T value) {
            //TODO
        }
    }

И вы можете дождаться либо сигнала, либо завершения (исчерпания всех исходных элементов), либо обоих

 source.Completion.ContinueWith(async _ => {

            WaitHandle.WaitAll(completedEvent, signal);
            // Or WaitHandle.WaitAny, depending on your needs!

            target.Complete();
        });

Вы можете проверить значение результата WaitAll, чтобы понять, какое событие было установлено, и реагировать соответствующим образом. Вы также можете добавить в код другие события, передавая их наблюдателю, чтобы он мог установить их при необходимости. Вы можете различать свое поведение и реагировать по-разному, когда возникает ошибка, например

Ответ 2

Возможно, ManualResetEvent может сделать трюк для вас.

Добавьте публичное свойство в TransformManyBlock

private ManualResetEvent _signal  = new ManualResetEvent(false);
public ManualResetEvent Signal { get { return _signal; } }

И вот вы идете:

var retries = new HashSet<RetryingMessage<TInput>>();

TransformManyBlock<RetryableMessage<TInput>, TOutput> target = null;
target = new TransformManyBlock<RetryableMessage<TInput>, TOutput>(
    async message =>
    {
        try
        {
            var result = new[] { await transform(message.Data) };
            retries.Remove(message);

            // Sets the state of the event to signaled, allowing one or more waiting threads to proceed
            if(!retries.Any()) Signal.Set(); 
            return result;
        }
        catch (Exception ex)
        {
            message.Exceptions.Add(ex);
            if (message.RetriesRemaining == 0)
            {
                if (failureHandler != null)
                    failureHandler(message.Exceptions);

                retries.Remove(message);

                // Sets the state of the event to signaled, allowing one or more waiting threads to proceed
                if(!retries.Any()) Signal.Set(); 
            }
            else
            {
                retries.Add(message);
                message.RetriesRemaining--;

                Task.Delay(retryDelay)
                    .ContinueWith(_ => target.Post(message));
            }
            return null;
        }
    }, dataflowBlockOptions);

source.LinkTo(target);

source.Completion.ContinueWith(async _ =>
{
    //Blocks the current thread until the current WaitHandle receives a signal.
    target.Signal.WaitOne();

    target.Complete();
});

Я не уверен, где установлен ваш target.InputCount. Итак, в месте, где вы меняете target.InputCount, вы можете добавить следующий код:

if(InputCount == 0)  Signal.Set();