Подтвердить что ты не робот

RxJava и кэшированные данные

Я все еще довольно новичок в RxJava, и я использую его в приложении для Android. Я прочитал метрическую тонну по этому вопросу, но все еще чувствую, что что-то не хватает.

У меня есть следующий сценарий:

У меня есть данные, хранящиеся в системе, к которой обращаются через различные служебные соединения (AIDL), и мне нужно получить данные из этой системы (может произойти 1-й номер асинхронных вызовов). Rx помог мне упростить этот код. Однако весь этот процесс имеет тенденцию занимать несколько секунд (более 5 секунд +), поэтому мне нужно кэшировать эти данные, чтобы ускорить собственное приложение.

Требования на данный момент:

  • Первоначальная подписка, кеш будет пустым, поэтому нам нужно подождать необходимое время для загрузки. Ничего страшного. После этого данные должны быть кэшированы.

  • Последующие нагрузки должны вытащить данные из кеша, но затем данные должны быть перезагружены, а кеш диска должен быть за кулисами.

Проблема: у меня две наблюдаемые - A и B. A содержит вложенные Observables, которые извлекают данные из локальных сервисов (т.е. здесь). B намного проще. B просто содержит код для извлечения данных из дискового кэша.

Необходимо решить: a) Верните кешированный элемент (если он кэширован) и продолжайте повторно загружать кеш диска. б) Кэш пуст, загружает данные из системы, кэширует его и возвращает. Последующие вызовы возвращаются к "a".

У меня было несколько человек, которые рекомендуют несколько операций, таких как flatmap, merge и даже темы, но по какой-то причине у меня возникают проблемы с подключением точек.

Как я могу это сделать?

4b9b3361

Ответ 1

Вот несколько вариантов, как это сделать. Я попытаюсь объяснить их как можно лучше, когда я пойду. Это код салфетки, и я использую синтаксис лямбда в стиле Java8, потому что я ленив и красивее.:)

  • Тема, например AsyncSubject, была бы идеальной, если бы вы могли сохранить их в качестве состояний экземпляра в памяти, хотя кажется, что вам нужно хранить их на диске. Тем не менее, я думаю, что этот подход стоит упомянуть на всякий случай, если вы в состоянии. Кроме того, это просто отличный способ узнать. AsyncSubject является наблюдаемым, который испускает только опубликованное значение LAST (A Subject является обозревателем и наблюдаемым) и будет запускать только после того, как был вызван onCompleted. Таким образом, все, что подписывается после этого завершения, получит следующее значение.

    В этом случае вы могли бы (в классе приложения или другом одиночном экземпляре на уровне приложения):

    public class MyApplication extends Application {    
        private final AsyncSubject<Foo> foo = AsyncSubject.create();
    
        /** Asynchronously gets foo and stores it in the subject. */
        public void fetchFooAsync() {
            // Gets the observable that does all the heavy lifting.
            // It should emit one item and then complete.
            FooHelper.getTheFooObservable().subscribe(foo);
        }
    
        /** Provides the foo for any consumers who need a foo. */
        public Observable<Foo> getFoo() {
            return foo;
        }
    
    }
    
  • Отложить наблюдение. Observable.defer позволяет подождать, чтобы создать Observable, пока он не будет подписан. Вы можете использовать это, чтобы разрешить выборку кеша диска в фоновом режиме, а затем вернуть кэшированную версию или, если не в кеш, сделать реальную сделку.

    В этой версии предполагается, что ваш код получателя, как выборка кэша, так и нелока, блокируют вызовы, а не наблюдаемые, а отсрочка работает в фоновом режиме. Например:

    public Observable<Foo> getFoo() {
        Observable.defer(() -> {
            if (FooHelper.isFooCached()) {
                return Observable.just(FooHelper.getFooFromCacheBlocking());
            }
            return Observable.just(FooHelper.createNewFooBlocking());
        }).subscribeOn(Schedulers.io());
    }
    
  • Используйте concatWith и take. Здесь мы предполагаем, что наш метод, чтобы получить Foo из кеша диска, либо испускает один элемент, либо завершается, либо просто завершается без испускания, если он пуст.

    public Observable<Foo> getFoo() {
        return FooHelper.getCachedFooObservable()
                .concatWith(FooHelper.getRealFooObservable())
                .take(1);
    }
    

    Этот метод должен только пытаться получить реальную сделку, если кэшированный наблюдаемый закончен пустым.

  • Используйте amb или ambWith. Это, наверное, один из самых сумасшедших решений, но интересно отметить. amb в основном берет пару (или больше с перегрузками) наблюдаемых и ждет, пока один из них не испустит элемент, тогда он полностью отбрасывает другое наблюдаемое и просто берет тот, который выиграл гонку. Единственный способ, которым это было бы полезно, - это сделать возможным шаг вычисления нового Foo быстрее, чем выборка с диска. В этом случае вы можете сделать что-то вроде этого:

    public Observable<Foo> getFoo() {
        return Observable.amb(
                FooHelper.getCachedFooObservable(),
                FooHelper.getRealFooObservable());
    }
    

Я предпочитаю вариант 3. Что касается собственно кэширования, вы можете иметь что-то подобное в одной из точек входа (желательно, прежде чем нам понадобится Foo, поскольку, как вы сказали, это длительная операция ) Позже потребители должны получить кешированную версию, пока она закончила писать. Использование AsyncSubject здесь также может помочь, чтобы убедиться, что мы не запускаем работу несколько раз, ожидая ее записи. Потребители получат только завершенный результат, но опять же, это работает только в том случае, если его можно разумно хранить в памяти.

if (!FooHelper.isFooCached()) {
    getFoo()
        .subscribeOn(Schedulers.io())
        .subscribe((foo) -> FooHelper.cacheTheFoo(foo));
}

Обратите внимание, что вы должны либо поддерживать один планировщик потоков, предназначенный для записи на диск (и чтения), и использовать .observeOn(foo) после .subscribeOn(...), либо иным образом синхронизировать доступ к кешу диска, чтобы предотвратить проблемы concurrency.

Ответ 2

Недавно я опубликовал библиотеку Github для Android и Java под названием RxCache, которая соответствует вашим потребностям в кэшировании данных с помощью наблюдаемых.

RxCache реализует два слоя кэширования - память и диск, и он подсчитывает несколько аннотаций, чтобы настроить поведение каждого провайдера.

Настоятельно рекомендуется использовать Retrofit для данных, полученных из HTTP-вызовов. Используя лямбда-выражение, вы можете сформулировать выражение следующим образом:

    rxCache.getUser(retrofit.getUser(id), () -> true).flatmap(user -> user);

Надеюсь, вам это будет интересно:)

Ответ 3

Взгляните на проект ниже. Это мое личное занятие, и я использовал этот шаблон в ряде приложений.

https://github.com/zsiegel/rxandroid-architecture-sample

Взгляните на PersistenceService. Вместо того, чтобы удалять базу данных (или MockService в примере проекта), вы можете просто иметь локальный список пользователей, которые обновляются с помощью метода save() и просто возвращать это значение в get().

Сообщите мне, если у вас есть какие-либо вопросы.