Подтвердить что ты не робот

Как остановить и возобновить наблюдение.

Это выдает галочку каждые 5 секунд.

Observable.interval(5, TimeUnit.SECONDS, Schedulers.io())
            .subscribe(tick -> Log.d(TAG, "tick = "+tick));

Чтобы остановить его, вы можете использовать

Schedulers.shutdown();

Но затем все Планировщики останавливаются, и позже невозможно возобновить тикинг. Как я могу остановить и возобновить испускание "изящно"?

4b9b3361

Ответ 1

Здесь одно из возможных решений:

class TickHandler {

    private AtomicLong lastTick = new AtomicLong(0L);
    private Subscription subscription;

    void resume() {
        System.out.println("resumed");
        subscription = Observable.interval(5, TimeUnit.SECONDS, Schedulers.io())
                                 .map(tick -> lastTick.getAndIncrement())
                                 .subscribe(tick -> System.out.println("tick = " + tick));
    }

    void stop() {
        if (subscription != null && !subscription.isUnsubscribed()) {
            System.out.println("stopped");
            subscription.unsubscribe();
        }
    }
}

Ответ 2

Некоторое время назад я также искал решения RX "таймер", но не они оправдали мои ожидания. Итак, вы можете найти свое решение:

AtomicLong elapsedTime = new AtomicLong();
AtomicBoolean resumed = new AtomicBoolean();
AtomicBoolean stopped = new AtomicBoolean();

public Flowable<Long> startTimer() { //Create and starts timper
    resumed.set(true);
    stopped.set(false);
    return Flowable.interval(1, TimeUnit.SECONDS)
            .takeWhile(tick -> !stopped.get())
            .filter(tick -> resumed.get())
            .map(tick -> elapsedTime.addAndGet(1000));
}

public void pauseTimer() {
    resumed.set(false);
}

public void resumeTimer() {
    resumed.set(true);
}

public void stopTimer() {
    stopped.set(true);
}

public void addToTimer(int seconds) {
    elapsedTime.addAndGet(seconds * 1000);
}

Ответ 3

val switch = new java.util.concurrent.atomic.AtomicBoolean(true)
val tick = new java.util.concurrent.atomic.AtomicLong(0L)

val suspendableObservable = 
  Observable.
    interval(5 seconds).
    takeWhile(_ => switch.get()).
    repeat.
    map(_ => tick.incrementAndGet())

Вы можете установить switch на false, чтобы приостановить тикинг и true, чтобы возобновить его.

Ответ 4

Вот еще один способ сделать это, я думаю.
Когда вы проверяете исходный код, вы найдете interval(), используя класс OnSubscribeTimerPeriodically. key code ниже.

@Override
public void call(final Subscriber<? super Long> child) {
    final Worker worker = scheduler.createWorker();
    child.add(worker);
    worker.schedulePeriodically(new Action0() {
        long counter;
        @Override
        public void call() {
            try {
                child.onNext(counter++);
            } catch (Throwable e) {
                try {
                    worker.unsubscribe();
                } finally {
                    Exceptions.throwOrReport(e, child);
                }
            }
        }

    }, initialDelay, period, unit);
}

Итак, вы увидите, если вы хотите, чтобы cannel цикл, как насчет бросания нового исключения в onNext(). Пример кода ниже.

Observable.interval(1000, TimeUnit.MILLISECONDS)
            .subscribe(new Action1<Long>() {
                @Override
                public void call(Long aLong) {
                    Log.i("abc", "onNext");
                    if (aLong == 5) throw new NullPointerException();
                }
            }, new Action1<Throwable>() {
                @Override
                public void call(Throwable throwable) {
                    Log.i("abc", "onError");
                }
            }, new Action0() {
                @Override
                public void call() {
                    Log.i("abc", "onCompleted");
                }
            });

Затем вы увидите следующее:

08-08 11:10:46.008 28146-28181/net.bingyan.test I/abc: onNext
08-08 11:10:47.008 28146-28181/net.bingyan.test I/abc: onNext
08-08 11:10:48.008 28146-28181/net.bingyan.test I/abc: onNext
08-08 11:10:49.008 28146-28181/net.bingyan.test I/abc: onNext
08-08 11:10:50.008 28146-28181/net.bingyan.test I/abc: onNext
08-08 11:10:51.008 28146-28181/net.bingyan.test I/abc: onNext
08-08 11:10:51.018 28146-28181/net.bingyan.test I/abc: onError             

Ответ 5

Извините, что это в RxJS вместо RxJava, но концепция будет такой же. Я адаптировал это из learn-rxjs.io, и здесь он находится на codepen.

Идея состоит в том, что вы начинаете с двух потоков событий click, startClick$ и stopClick$. Каждый щелчок, происходящий в потоке stopClick$, отображается на пустой видимый объект и нажимает на startClick$, каждый из которых отображается в поток interval$. Два результирующих потока получают merge -d вместе в один наблюдаемый наблюдаемый. Другими словами, новый, наблюдаемый один из двух типов, будет испускаться из merge каждый раз, когда есть щелчок. Получившийся наблюдаемый пройдет через switchMap, который начнет слушать это новое наблюдаемое и перестанет слушать то, что он слушал раньше. Switchmap также начнет сливать значения из этого нового наблюдаемого в существующий поток.

После переключения scan только когда-либо видит значение "increment", испускаемое interval$, и оно не видит никаких значений при нажатии "stop".

И пока первый клик не появится, startWith начнет выдавать значения из $interval, просто чтобы все получилось:

const start = 0;
const increment = 1;
const delay = 1000;
const stopButton = document.getElementById('stop');
const startButton = document.getElementById('start');
const startClick$ = Rx.Observable.fromEvent(startButton, 'click');
const stopClick$ = Rx.Observable.fromEvent(stopButton, 'click');
const interval$ = Rx.Observable.interval(delay).mapTo(increment);
const setCounter = newValue => document.getElementById("counter").innerHTML = newValue;
setCounter(start);

const timer$ = Rx.Observable

    // a "stop" click will emit an empty observable,
    // and a "start" click will emit the interval$ observable.  
    // These two streams are merged into one observable.
    .merge(stopClick$.mapTo(Rx.Observable.empty()), 
           startClick$.mapTo(interval$))

    // until the first click occurs, merge will emit nothing, so 
    // use the interval$ to start the counter in the meantime
    .startWith(interval$)

    // whenever a new observable starts, stop listening to the previous
    // one and start emitting values from the new one
    .switchMap(val => val)

    // add the increment emitted by the interval$ stream to the accumulator
    .scan((acc, curr) => curr + acc, start)

    // start the observable and send results to the DIV
    .subscribe((x) => setCounter(x));

И вот HTML

<html>
<body>
  <div id="counter"></div>
  <button id="start">
    Start
  </button>
  <button id="stop">
    Stop
  </button>
</body>
</html>

Ответ 6

Вы можете использовать takeWhile и цикл, пока условия не будут выполнены

Observable.interval(1, TimeUnit.SECONDS)
        .takeWhile {
            Log.i(TAG, " time " + it)
            it != 30L
        }
        .subscribe(object : Observer<Long> {
            override fun onComplete() {
                Log.i(TAG, "onComplete " + format.format(System.currentTimeMillis()))
            }

            override fun onSubscribe(d: Disposable) {
                Log.i(TAG, "onSubscribe " + format.format(System.currentTimeMillis()))
            }

            override fun onNext(t: Long) {
                Log.i(TAG, "onNext " + format.format(System.currentTimeMillis()))
            }

            override fun onError(e: Throwable) {
                Log.i(TAG, "onError")
                e.printStackTrace()
            }

        });