Подтвердить что ты не робот

Напишите метод расширения Rx "RetryAfter"

В книге IntroToRx автор предлагает написать "умную" повторную попытку ввода-вывода, которая повторяет запрос ввода-вывода, как сетевой запрос, через некоторое время.

Вот точный абзац:

Полезным методом расширения для добавления в вашу собственную библиотеку может быть "Назад Off и Retry". Команды, с которыми я работал, нашли такие особенно полезно при выполнении операций ввода-вывода, особенно сетевых запросов. концепция заключается в том, чтобы попытаться, и при отказе ждать определенного периода времени и затем повторите попытку. Ваша версия этого метода может учитывать тип исключения, на который вы хотите повторить попытку, а также максимальное количество раз повторить попытку. Возможно, вы захотите увеличить время ожидания до быть менее агрессивным при каждой последующей попытке.

К сожалению, я не могу понять, как написать этот метод.: (

4b9b3361

Ответ 1

Ключом к этой реализации повторной попытки повторения является отложенные наблюдаемые. Отложенный наблюдаемый не выполнит свой factory, пока кто-то не подпишет его. И он будет ссылаться на factory для каждой подписки, что делает его идеальным для нашего сценария повторения.

Предположим, что у нас есть метод, который запускает сетевой запрос.

public IObservable<WebResponse> SomeApiMethod() { ... }

В целях этого небольшого фрагмента давайте определим отложенное значение как source

var source = Observable.Defer(() => SomeApiMethod());

Всякий раз, когда кто-то подписывается на источник, он вызывает SomeApiMethod и запускает новый веб-запрос. Наивный способ повторить попытку при его сбое будет использовать встроенный оператор Retry.

source.Retry(4)

Это было бы не очень хорошо для API, хотя это и не то, о чем вы просите. Нам нужно отложить запуск запросов между каждой попыткой. Один из способов сделать это с задержкой подписки.

Observable.Defer(() => source.DelaySubscription(TimeSpan.FromSeconds(1))).Retry(4)

Это не идеально, поскольку он добавит задержку даже по первому запросу, пусть исправить это.

int attempt = 0;
Observable.Defer(() => { 
   return ((++attempt == 1)  ? source : source.DelaySubscription(TimeSpan.FromSeconds(1)))
})
.Retry(4)
.Select(response => ...)

Просто приостановка на секунду - это не очень хороший метод повторения, поэтому пусть изменение этой константы будет функцией, которая получает счетчик повторов и возвращает соответствующую задержку. Экспоненциальное отключение достаточно просто реализовать.

Func<int, TimeSpan> strategy = n => TimeSpan.FromSeconds(Math.Pow(n, 2));

((++attempt == 1)  ? source : source.DelaySubscription(strategy(attempt - 1)))

Мы почти закончили сейчас, нам просто нужно добавить способ указания, для каких исключений мы должны повторить попытку. Давайте добавим функцию, которая задала исключение, вернет ли это значение, чтобы повторить попытку, мы назовем его retryOnError.

Теперь нам нужно написать какой-то страшный код, но нести меня.

Observable.Defer(() => {
    return ((++attempt == 1)  ? source : source.DelaySubscription(strategy(attempt - 1)))
        .Select(item => new Tuple<bool, WebResponse, Exception>(true, item, null))
        .Catch<Tuple<bool, WebResponse, Exception>, Exception>(e => retryOnError(e)
            ? Observable.Throw<Tuple<bool, WebResponse, Exception>>(e)
            : Observable.Return(new Tuple<bool, WebResponse, Exception>(false, null, e)));
})
.Retry(retryCount)
.SelectMany(t => t.Item1
    ? Observable.Return(t.Item2)
    : Observable.Throw<T>(t.Item3))

Все эти угловые скобки заключают в себе исключение, для которого мы не должны повторять попытку .Retry(). Мы сделали внутренний наблюдаемый be IObservable<Tuple<bool, WebResponse, Exception>>, где первый bool указывает, есть ли у нас ответ или исключение. Если retryOnError указывает, что мы должны повторить попытку за конкретное исключение, то внутреннее наблюдаемое будет бросать, и это будет вызвано повторением. SelectMany просто разворачивает наш Tuple и снова создает результирующее значение IObservable<WebRequest>.

Посмотрите мой gist с полным источником и тестами для окончательной версии. Наличие этого оператора позволяет нам написать наш код повтора достаточно лаконично

Observable.Defer(() => SomApiMethod())
  .RetryWithBackoffStrategy(
     retryCount: 4, 
     retryOnError: e => e is ApiRetryWebException
  )

Ответ 2

Возможно, я больше упрощаю ситуацию, но если мы посмотрим на реализацию Retry, это просто Observable.Catch над бесконечным перечислимым из наблюдаемых:

private static IEnumerable<T> RepeatInfinite<T>(T value)
{
    while (true)
        yield return value;
}

public virtual IObservable<TSource> Retry<TSource>(IObservable<TSource> source)
{
    return Observable.Catch<TSource>(QueryLanguage.RepeatInfinite<IObservable<TSource>(source));
}

Итак, если мы возьмем этот подход, мы можем просто добавить задержку после первого урока.

private static IEnumerable<IObservable<TSource>> RepeateInfinite<TSource> (IObservable<TSource> source, TimeSpan dueTime)
{
    // Don't delay the first time        
    yield return source;

    while (true)
        yield return source.DelaySubscription(dueTime);
    }

public static IObservable<TSource> RetryAfterDelay<TSource>(this IObservable<TSource> source, TimeSpan dueTime)
{
    return RepeateInfinite(source, dueTime).Catch();
}

Перегрузка, которая улавливает конкретное исключение со счетчиком повторов, может быть еще более кратким:

public static IObservable<TSource> RetryAfterDelay<TSource, TException>(this IObservable<TSource> source, TimeSpan dueTime, int count) where TException : Exception
{
    return source.Catch<TSource, TException>(exception =>
    {
        if (count <= 0)
        {
            return Observable.Throw<TSource>(exception);
        }

        return source.DelaySubscription(dueTime).RetryAfterDelay<TSource, TException>(dueTime, --count);
    });
}

Обратите внимание, что здесь перегрузка использует рекурсию. В первых случаях кажется, что исключение StackOverflowException возможно, если счетчик был похож на Int32.MaxValue. Тем не менее, DelaySubscription использует планировщик для запуска действия подписки, поэтому переполнение стека было бы невозможным (т.е. Использование "батут" ). Я думаю, это не совсем очевидно, если посмотреть на код. Мы могли бы заставить переполнение стека, явно устанавливая планировщик в перегрузке DelaySubscription для Scheduler.Immediate и передавая TimeSpan.Zero и Int32.MaxValue. Мы могли бы передать не-немедленный планировщик, чтобы выразить наше намерение более явно, например:

return source.DelaySubscription(dueTime, TaskPoolScheduler.Default).RetryAfterDelay<TSource, TException>(dueTime, --count);

UPDATE: добавлена ​​перегрузка для конкретного планировщика.

public static IObservable<TSource> RetryAfterDelay<TSource, TException>(
    this IObservable<TSource> source,
    TimeSpan retryDelay,
    int retryCount,
    IScheduler scheduler) where TException : Exception
{
    return source.Catch<TSource, TException>(
        ex =>
        {
            if (retryCount <= 0)
            {
                return Observable.Throw<TSource>(ex);
            }

            return
                source.DelaySubscription(retryDelay, scheduler)
                    .RetryAfterDelay<TSource, TException>(retryDelay, --retryCount, scheduler);
        });
} 

Ответ 3

Вот тот, который я использую:

public static IObservable<T> DelayedRetry<T>(this IObservable<T> src, TimeSpan delay)
{
    Contract.Requires(src != null);
    Contract.Ensures(Contract.Result<IObservable<T>>() != null);

    if (delay == TimeSpan.Zero) return src.Retry();
    return src.Catch(Observable.Timer(delay).SelectMany(x => src).Retry());
}

Ответ 4

На основании ответа Маркуса я написал следующее:

public static class ObservableExtensions
{
    private static IObservable<T> BackOffAndRetry<T>(
        this IObservable<T> source,
        Func<int, TimeSpan> strategy,
        Func<int, Exception, bool> retryOnError,
        int attempt)
    {
        return Observable
            .Defer(() =>
            {
                var delay = attempt == 0 ? TimeSpan.Zero : strategy(attempt);
                var s = delay == TimeSpan.Zero ? source : source.DelaySubscription(delay);

                return s
                    .Catch<T, Exception>(e =>
                    {
                        if (retryOnError(attempt, e))
                        {
                            return source.BackOffAndRetry(strategy, retryOnError, attempt + 1);
                        }
                        return Observable.Throw<T>(e);
                    });
            });
    }

    public static IObservable<T> BackOffAndRetry<T>(
        this IObservable<T> source,
        Func<int, TimeSpan> strategy,
        Func<int, Exception, bool> retryOnError)
    {
        return source.BackOffAndRetry(strategy, retryOnError, 0);
    }
}

Мне нравится больше, потому что

  • он не изменяет attempts, а использует рекурсию.
  • Он не использует retries, но передает количество попыток retryOnError

Ответ 5

Вот еще немного другая реализация, которую я придумал, изучая Rxx. Таким образом, это в значительной степени сводная версия подхода Rxx.

Подпись немного отличается от версии Markus. Вы указываете тип исключения для повторного включения, а стратегия задержки принимает исключение и счетчик повторов, поэтому у вас могут быть более длительные задержки для каждой последующей повторной попытки и т.д.

Я не могу гарантировать, что это ошибка, или лучший подход, но, похоже, это работает.

public static IObservable<TSource> RetryWithDelay<TSource, TException>(this IObservable<TSource> source, Func<TException, int, TimeSpan> delayFactory, IScheduler scheduler = null)
where TException : Exception
{
    return Observable.Create<TSource>(observer =>
    {
        scheduler = scheduler ?? Scheduler.CurrentThread;
        var disposable = new SerialDisposable();
        int retryCount = 0;

        var scheduleDisposable = scheduler.Schedule(TimeSpan.Zero,
        self =>
        {
            var subscription = source.Subscribe(
            observer.OnNext,
            ex =>
            {
                var typedException = ex as TException;
                if (typedException != null)
                {
                    var retryDelay = delayFactory(typedException, ++retryCount);
                    self(retryDelay);
                }
                else
                {
                    observer.OnError(ex);
                }
            },
            observer.OnCompleted);

            disposable.Disposable = subscription;
        });

        return new CompositeDisposable(scheduleDisposable, disposable);
    });
}

Ответ 6

Вот тот, с которым я столкнулся.

Не хотел объединять элементы отдельных попыток в одну последовательность, но испускать исходную последовательность в целом при каждой попытке - поэтому оператор возвращает IObservable<IObservable<TSource>>. Если это нежелательно, его можно просто вернуть Switch() в одну последовательность.

(Предыстория: в моем случае использование источника - это горячая горячая последовательность, в которой я GroupByUntil появляется элемент, который закрывает группу. Если этот элемент потерян между двумя попытками, группа никогда не закрывается, что приводит к утечке памяти. последовательность последовательностей позволяет группировать только внутренние последовательности (или обработку исключений или...).

/// <summary>
/// Repeats <paramref name="source"/> in individual windows, with <paramref name="interval"/> time in between.
/// </summary>
public static IObservable<IObservable<TSource>> RetryAfter<TSource>(this IObservable<TSource> source, TimeSpan interval, IScheduler scheduler = null)
{
    if (scheduler == null) scheduler = Scheduler.Default;
    return Observable.Create<IObservable<TSource>>(observer =>
    {
        return scheduler.Schedule(self =>
        {
            observer.OnNext(Observable.Create<TSource>(innerObserver =>
            {
                return source.Subscribe(
                    innerObserver.OnNext,
                    ex => { innerObserver.OnError(ex); scheduler.Schedule(interval, self); },
                    () => { innerObserver.OnCompleted(); scheduler.Schedule(interval, self); });
            }));
        });
    });
}