Подтвердить что ты не робот

Создание наблюдаемого без использования Observable.create

Я использую RxJava в своем приложении для Android, и я хочу загрузить данные из базы данных.

Таким образом, я создаю новый Observable, используя Observable.create(), который возвращает список EventLog

public Observable<List<EventLog>> loadEventLogs() {
    return Observable.create(new Observable.OnSubscribe<List<EventLog>>() {
        @Override
        public void call(Subscriber<? super List<EventLog>> subscriber) {
            List<DBEventLog> logs = new Select().from(DBEventLog.class).execute();
            List<EventLog> eventLogs = new ArrayList<>(logs.size());
            for (int i = 0; i < logs.size(); i++) {
                eventLogs.add(new EventLog(logs.get(i)));
            }
            subscriber.onNext(eventLogs);
        }
    });
}

Хотя он работает правильно, я читал, что использование Observable.create() на самом деле не является лучшей практикой для Rx Java (см. здесь).

Поэтому я изменил этот метод таким образом.

public Observable<List<EventLog>> loadEventLogs() {
    return Observable.fromCallable(new Func0<List<EventLog>>() {
        @Override
        public List<EventLog> call() {
            List<DBEventLog> logs = new Select().from(DBEventLog.class).execute();
            List<EventLog> eventLogs = new ArrayList<>(logs.size());
            for (int i = 0; i < logs.size(); i++) {
                eventLogs.add(new EventLog(logs.get(i)));
            }
            return eventLogs;
        }
    });
}

Это лучший подход с использованием Rx Java? Зачем? На самом деле разница между этими двумя методами?

Кроме того, поскольку база данных загружает список элементов, имеет смысл сразу же издать весь список? Или я должен испускать один элемент за раз?

4b9b3361

Ответ 1

Два метода могут выглядеть похожими и вести себя одинаково, но fromCallable справляется с трудностями противодавления для вас, тогда как версия create не работает. Работа с противодавлением внутри реализации OnSubscribe варьируется от простого до прямого разума; однако, если опустить, вы можете получить MissingBackpressureException вдоль асинхронных границ (например, observeOn) или даже на границах продолжения (например, concat).

RxJava пытается обеспечить надлежащую поддержку прочного давления для как можно большего количества фабрик и операторов, однако есть немало заводов и операторов, которые не могут его поддерживать.

Вторая проблема с ручной реализацией OnSubscribe - отсутствие поддержки отмены, особенно если вы создаете много вызовов onNext. Многие из них могут быть заменены стандартными методами factory (например, from) или вспомогательными классами (такими как SyncOnSubscribe), которые справляются со всей сложностью для вас.

Вы можете найти множество примеров и примеров, которые (по-прежнему) используют create по двум причинам.

  • Гораздо проще вводить потоковые потоки данных на основе push, показывая, как толчок событий работает императивно. На мой взгляд, такие источники тратят слишком много времени на create пропорционально, вместо того, чтобы говорить о стандартных методах factory и показывая, как можно безопасно выполнять определенные общие задачи (например, ваши).
  • Во многих из этих примеров было создано время, в течение которого RxJava не требовал поддержки обратного давления или даже надлежащей поддержки синхронной отмены или просто был перенесен из примеров Rx.NET(которые на сегодняшний день не поддерживают противодавление и синхронное аннулирование как-то, любезно из С#, я думаю.) Генерация значений путем вызова onNext была беззаботной. Однако такое использование приводит к раздуванию буфера и чрезмерному использованию памяти, поэтому команда Netflix придумала способ ограничить использование памяти, потребовав от наблюдателей указать количество элементов, которые они готовы продолжить. Это стало известно как противодавление.

Во втором вопросе, а именно, если нужно создать список или последовательность значений, это зависит от вашего источника. Если ваш источник поддерживает некоторую итерацию или потоковую обработку отдельных элементов данных (например, JDBC), вы можете просто подключить их и испустить один за другим (см. SyncOnSubscribe). Если он не поддерживает его или вам все равно в форме List, то сохраните его как есть. Вы можете всегда конвертировать между двумя формами через toList и flatMapIterable, если это необходимо.

Ответ 2

Как объясняется в ответ, который вы связали, с Observable.create вам может потребоваться нарушить расширенные требования RxJava.

Например, вам нужно будет реализовать backpressure или как отказаться от подписки.

В этом случае вы хотите испустить элемент, не имея дело с противодавлением или подпиской. Итак, Observable.fromCallable - хороший вызов. RxJava будет иметь дело с остальными.