Подтвердить что ты не робот

Наблюдаемый против Flowable rxJava2

Я смотрел новый rx java 2, и я не совсем уверен, что я больше понимаю идею backpressure...

Я знаю, что у нас есть Observable, у которого нет поддержки backpressure и Flowable, которая имеет его.

Итак, на примере, скажем, у меня Flowable с interval:

        Flowable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    // do smth
                }
            });

Это приведет к сбою после 128 значений, и это довольно очевидно, что я потребляю медленнее, чем получение предметов.

Но тогда мы имеем то же самое с Observable

     Observable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    // do smth
                }
            });

Это не сработает вообще, даже если я позабочусь о его потреблении, он все равно работает. Чтобы сделать работу Flowable, скажем, я положил оператор onBackpressureDrop, сбой исчез, но не все значения также испускаются.

Итак, базовый вопрос, который я не могу найти в моей голове, почему я должен заботиться о backpressure, когда я могу использовать plain Observable, все равно получать все значения без управления buffer? Или, может быть, с другой стороны, какие преимущества делают backpressure в пользу управления и обработки потребления?

4b9b3361

Ответ 1

Какое противодавление проявляется на практике - это ограниченные буферы, Flowable.observeOn имеет буфер из 128 элементов, который сливается так быстро, как может его принять. Вы можете увеличить размер этого буфера индивидуально, чтобы обрабатывать пакетный источник, и все методы управления противодавлением по-прежнему применяются с 1.x. Observable.observeOn имеет неограниченный буфер, который продолжает собирать элементы, и ваше приложение может закончиться без памяти.

Вы можете использовать Observable, например:

  • обработка графических интерфейсов
  • работа с короткими последовательностями (всего менее 1000 элементов)

Вы можете использовать Flowable, например:

  • холодные и несрочные источники
  • генераторы, подобные источникам
  • устройства доступа к сети и базы данных

Ответ 2

Обратное давление - это когда ваш наблюдаемый (издатель) создает больше событий, чем может выдержать ваш подписчик. Таким образом, вы можете получить подписчиков пропущенных событий, или вы можете получить огромную очередь событий, которая в конечном итоге просто приводит к нехватке памяти. Flowable принимает противодавление. Observable нет. Это оно.

это напоминает мне о воронке, которая, когда у нее слишком много жидкости, переливается. Текучий может помочь не допустить этого:

с огромным противодавлением:

enter image description here

но при использовании текучего противодавления намного меньше:

enter image description here

Rxjava2 имеет несколько стратегий противодавления, которые вы можете использовать в зависимости от вашего варианта использования. под стратегией я подразумеваю, что Rxjava2 предоставляет способ обработки объектов, которые не могут быть обработаны из-за переполнения (противодавления).

вот стратегии. Я не буду проходить их все, но, например, если вы не хотите беспокоиться о переполненных элементах, вы можете использовать такую стратегию отбрасывания:

observable.toFlowable(BackpressureStrategy.DROP)

Насколько я знаю, в очереди должно быть ограничение в 128 элементов, после чего может быть переполнение (обратное давление). Даже если его не 128, это близко к этому числу. Надеюсь, это кому-нибудь поможет.

если вам нужно изменить размер буфера с 128, похоже, это можно сделать так (но следите за любыми ограничениями памяти:

myObservable.toFlowable(BackpressureStrategy.MISSING).buffer(256); //but using MISSING might be slower.  

В разработке программного обеспечения обычно стратегия противодействия означает, что вы говорите эмитенту немного замедлиться, так как потребитель не может справиться со скоростью ваших излучающих событий.

Ответ 3

Тот факт, что ваш Flowable разбился после испускания 128 значений без обработки обратного давления, не означает, что он всегда будет терпеть крах после 128 значений: иногда он вылетает после 10, и иногда он вообще не падает. Я считаю, что это произошло, когда вы попробовали пример с Observable - там не было противодавления, поэтому ваш код работал нормально, в следующий раз он может и не быть. Разница в RxJava 2 заключается в том, что больше нет понятия противодавления в Observable и нет способа справиться с этим. Если вы разрабатываете реактивную последовательность, которая, вероятно, потребует явной обработки противодавления, тогда Flowable - ваш лучший выбор.