Подтвердить что ты не робот

Задачи в очереди с помощью RxJava на Android

Я разрабатываю приложение для Android с синхронизацией фоновых данных. В настоящее время я использую RxJava для размещения некоторых данных на сервере через регулярные промежутки времени. Кроме этого, я хотел бы предоставить пользователю кнопку "принудительная синхронизация", которая немедленно вызовет синхронизацию. Я знаю, как использовать Observable.interval() для перемещения данных через регулярные промежутки времени, и я бы знал, как использовать Observalbe.just() для форматирования того, что принудительно, но я хотел бы поставить их в очередь, если так случилось, что один из них запускается во время предыдущее все еще выполняется.

Итак, давайте возьмем пример, когда 1min - это интервал автоматической синхронизации, и пусть говорят, что синхронизация длится 40 секунд (я больше преувеличиваю здесь, чтобы сделать более легкую точку). Теперь, если случайно пользователь нажимает кнопку "force", когда автомат все еще работает (или наоборот - автоматические триггеры, когда принудительный один все еще запущен), я хотел бы поставить в очередь второй запрос синхронизации, чтобы идти так же, как и первый заканчивается.

Я нарисовал это изображение, которое может показаться ему более перспективным:

введите описание изображения здесь

Как вы можете видеть, автоматический запуск (с помощью некоторых Observable.interval()), а в середине синхронизации пользователь нажимает кнопку "force". Теперь мы хотим дождаться завершения первого запроса и затем снова начать принудительный запрос. В какой-то момент, когда принудительный запрос запущен, новый автоматический запрос был снова запущен, что только добавило его в очередь. После того, как последний был завершен из очереди, все останавливается, а затем автоматический запуск запланирован немного позже.

Надеюсь, что кто-нибудь может указать мне, чтобы исправить оператора, как это сделать. Я пробовал с Observable.combineLatest(), но список очередей был отправлен с самого начала, и когда я добавил новую синхронизацию в очередь, она не продолжилась, когда предыдущая операция была завершена.

Любая помощь приветствуется, Дарко

4b9b3361

Ответ 1

Вы можете сделать это, объединив таймер с помощью кнопки Observable/Subject, используя эффект очереди onBackpressureBuffer и concatMap обработки в нее, которая гарантирует, что она запускается по одному.

PublishSubject<Long> subject = PublishSubject.create();

Observable<Long> periodic = Observable.interval(1, 1, TimeUnit.SECONDS);

periodic.mergeWith(subject)
.onBackpressureBuffer()
.concatMap(new Func1<Long, Observable<Integer>>() {
    @Override
    public Observable<Integer> call(Long v) {
        // simulates the task to run
        return Observable.just(1)
                .delay(300, TimeUnit.MILLISECONDS);
    }
}
).subscribe(System.out::println, Throwable::printStackTrace);

Thread.sleep(1100);
// user clicks a button
subject.onNext(-1L);

Thread.sleep(800);

Ответ 2

Несмотря на то, что есть приемлемый ответ с хорошим решением, я хотел бы поделиться другим вариантом для этого, используя Scheduler и SingleThreadExecutor

public static void main(String[] args) throws Exception {
    System.out.println(" init ");
    Observable<Long> numberObservable =
            Observable.interval(700, TimeUnit.MILLISECONDS).take(10);

    final Subject subject = PublishSubject.create();

    Executor executor = Executors.newSingleThreadExecutor();
    Scheduler scheduler = Schedulers.from(executor);
    numberObservable.observeOn(scheduler).subscribe(subject);

    subject.subscribe(onNextFunc("subscriber 1"), onErrorFunc("subscriber 1"),
                    onCompleteFunc("subscriber 1"));

    Thread.sleep(800);
    //simulate action
    executor.execute(new Runnable() {
        @Override
        public void run() {
            subject.onNext(333l);
        }
    });

    Thread.sleep(5000);
}

static Action1<Long> onNextFunc(final String who) {
    return new Action1<Long>() {
        public void call(Long x) {
            System.out.println(who + " got " + x + " :: " + Thread.currentThread().getName()
                    + " -- " + System.currentTimeMillis());
            try {
                //simulate some work
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    };
}

static Action1<Throwable> onErrorFunc(final String who) {
    return new Action1<Throwable>() {
        public void call(Throwable t) {
            t.printStackTrace();
        }
    };
}

static Action0 onCompleteFunc(final String who) {
    return new Action0() {
        public void call() {
            System.out.println(who + " complete");
        }
    };
}