Подтвердить что ты не робот

Немедленно поставьте первый элемент, "отмените" следующие элементы

Рассмотрим следующий прецедент:

  • необходимо как можно скорее доставить первый элемент
  • необходимо отменить следующие события с 1 секундой тайм-аута

В результате я создал пользовательский оператор на основе OperatorDebounceWithTime, а затем использовал его как

.lift(new CustomOperatorDebounceWithTime<>(1, TimeUnit.SECONDS, Schedulers.computation()))

CustomOperatorDebounceWithTime немедленно отправляет первый элемент, затем использует логику оператора OperatorDebounceWithTime для дебюта более поздних элементов.

Существует ли более простой способ достичь описанного поведения? Пусть пропустить оператор compose, он не решает проблему. Я ищу способ достичь этого без внедрения пользовательских операторов.

4b9b3361

Ответ 1

Обновление:
Из комментариев @lopar лучший способ:

Observable.from(items).publish(publishedItems -> publishedItems.limit(1).concatWith(publishedItems.skip(1).debounce(1, TimeUnit.SECONDS)))

Что-то вроде этой работы:

String[] items = {"one", "two", "three", "four", "five", "six", "seven", "eight"};
Observable<String> myObservable = Observable.from(items);
Observable.concat(myObservable.first(), myObservable.skip(1).debounce(1, TimeUnit.SECONDS))
    .subscribe(s -> System.out.println(s));

Ответ 2

Ответы @LortRaydenMK и @lopar лучше всего, но я хотел предложить что-то еще, если бы вам удавалось работать лучше для вас или для кого-то в подобной ситуации.

Есть вариант debounce(), который принимает функцию, которая решает, сколько времени нужно отменить для этого конкретного элемента. Он указывает это, возвращая наблюдаемое, которое завершается через некоторое время. Ваша функция может вернуть empty() для первого элемента и timer() для остальных. Что-то вроде (непроверено):

String[] items = {"one", "two", "three", "four", "five", "six"};
Observable.from(items)
    .debounce(item -> item.equals("one")
            ? Observable.empty()
            : Observable.timer(1, TimeUnit.SECONDS));

Фокус в том, что эта функция должна знать, какой элемент является первым. Ваша последовательность может это знать. Если это не так, вам может потребоваться zip() с помощью range() или что-то еще. Лучше в этом случае использовать решение в другом ответе.

Ответ 3

Простое решение с использованием RxJava 2.0, переведенное из ответа на тот же вопрос для RxJS, которое объединяет throttleFirst и debounce, а затем удаляет дубликаты.

private <T> ObservableTransformer<T, T> debounceImmediate() {
    return observable  -> observable.publish(p -> 
        Observable.merge(p.throttleFirst(1, TimeUnit.SECONDS), 
            p.debounce(1, TimeUnit.SECONDS)).distinctUntilChanged());
} 

@Test
public void testDebounceImmediate() {
    Observable.just(0, 100, 200, 1500, 1600, 1800, 2000, 10000)
        .flatMap(v -> Observable.timer(v, TimeUnit.MILLISECONDS).map(w -> v))
        .doOnNext(v -> System.out.println(LocalDateTime.now() + " T=" + v))
            .compose(debounceImmediate())
            .blockingSubscribe(v -> System.out.println(LocalDateTime.now() + " Debounced: " + v));
}

Подход использования limit() или take(), по-видимому, не обрабатывает долгоживущие потоки данных, где я мог бы хотеть постоянно наблюдать, но все же действовать немедленно для первого события, замеченного какое-то время.

Ответ 4

Используйте версию debounce, которая выполняет функцию и реализует функцию следующим образом:

    .debounce(new Func1<String, Observable<String>>() {
        private AtomicBoolean isFirstEmission = new AtomicBoolean(true);
        @Override
        public Observable<String> call(String s) {
             // note: standard debounce causes the first item to be
             // delayed by 1 second unnecessarily, this is a workaround
             if (isFirstEmission.getAndSet(false)) {
                 return Observable.just(s);
             } else {
                 return Observable.just(s).delay(1, TimeUnit.SECONDS);
             }
        }
    })

Первый элемент немедленно испускается. Последующие элементы задерживаются на секунду. Если задержанный наблюдаемый не заканчивается до поступления следующего пункта, он отменяется, поэтому ожидаемое поведение отладки выполняется.

Ответ 5

У ответа LordRaydenMK и lopar есть проблема: вы всегда теряете второй предмет. Я полагаю, что никто не публиковал это раньше, потому что, если у вас есть отладка, у вас обычно много событий, а вторая в любом случае отбрасывается с отладкой. Правильный способ никогда не потерять событие:

observable
    .publish(published ->
        published
            .limit(1)
            .concatWith(published.debounce(1, TimeUnit.SECONDS)));

И не волнуйтесь, вы не получите ни одного дублированного события. Если вы не уверены в этом, вы можете запустить этот код и проверить его самостоятельно:

Observable.just(1, 2, 3, 4)
    .publish(published ->
        published
            .limit(1)
            .concatWith(published))
    .subscribe(System.out::println);

Ответ 6

Решение Ngrx - rxjs, разделить трубу на два

onMyAction$ = this.actions$
    .pipe(ofType<any>(ActionTypes.MY_ACTION);

lastTime = new Date();

@Effect()
onMyActionWithAbort$ = this.onMyAction$
    .pipe(
        filter((data) => { 
          const result = new Date() - this.lastTime > 200; 
          this.lastTime = new Date(); 
          return result; 
        }),
        switchMap(this.DoTheJob.bind(this))
    );

@Effect()
onMyActionWithDebounce$ = this.onMyAction$
    .pipe(
        debounceTime(200),
        filter(this.preventDuplicateFilter.bind(this)),
        switchMap(this.DoTheJob.bind(this))
    );

Ответ 7

Функции расширения Kotlin, основанные на комментарии @lopar:

fun <T> Flowable<T>.debounceImmediate(timeout: Long, unit: TimeUnit): Flowable<T> {
    return publish {
        it.take(1).concatWith(it.debounce(timeout, unit))
    }
}

fun <T> Observable<T>.debounceImmediate(timeout: Long, unit: TimeUnit): Observable<T> {
    return publish {
        it.take(1).concatWith(it.debounce(timeout, unit))
    }
}