Подтвердить что ты не робот

Реактивное кэширование HTTP-сервиса

Я использую RsJS 5 (5.0.1) для кэширования в Angular 2. Он работает хорошо.

Мяч функции кеширования:

const observable = Observable.defer(
    () => actualFn().do(() => this.console.log('CACHE MISS', cacheKey))
  )
  .publishReplay(1, this.RECACHE_INTERVAL)
  .refCount().take(1)
  .do(() => this.console.log('CACHE HIT', cacheKey));

actualFn - это this.http.get('/some/resource').

Как я уже сказал, это отлично работает для меня. Кэш возвращается из наблюдаемого по длительности RECACHE_INTERVAL. Если после этого интервала будет выполнен запрос, вызывается actualFn().

То, что я пытаюсь понять, это когда истекает RECACHE_INTERVAL и вызывается actualFn() — как вернуть последнее значение. Существует промежуток времени между истечением RECACHE_INTERVAL, и actualFn() воспроизводится, что наблюдаемый не возвращает значение. Я хотел бы избавиться от этого разрыва во времени и всегда возвращать последнее значение.

Я мог бы использовать побочный эффект и сохранить последний вызов хорошего значения .next(lastValue), ожидая ответа HTTP-ответа, но это кажется наивным. Я хотел бы использовать способ "RxJS", решение для чистой функции — если возможно.

4b9b3361

Ответ 1

Обновленный ответ:

Если вы всегда хотите использовать предыдущее значение во время создания нового запроса, он может поместить другой объект в цепочку, который сохраняет самое последнее значение.

Затем вы можете повторить значение, чтобы можно было узнать, исходило ли оно из кеша или нет. Затем абонент может отфильтровать кешированные значения, если они не заинтересованы в них.

// Take values while they pass the predicate, then return one more
// i.e also return the first value which returned false
const takeWhileInclusive = predicate => src =>
  src
  .flatMap(v => Observable.from([v, v]))
  .takeWhile((v, index) =>
     index % 2 === 0 ? true : predicate(v, index)
  )
  .filter((v, index) => index % 2 !== 1);

// Source observable will still push its values into the subject
// even after the subscriber unsubscribes
const keepHot = subject => src =>
  Observable.create(subscriber => {
    src.subscribe(subject);

    return subject.subscribe(subscriber);
  });

const cachedRequest = request
   // Subjects below only store the most recent value
   // so make sure most recent is marked as 'fromCache'
  .flatMap(v => Observable.from([
     {fromCache: false, value: v},
     {fromCache: true, value: v}
   ]))
   // Never complete subject
  .concat(Observable.never())
   // backup cache while new request is in progress
  .let(keepHot(new ReplaySubject(1)))
   // main cache with expiry time
  .let(keepHot(new ReplaySubject(1, this.RECACHE_INTERVAL)))
  .publish()
  .refCount()
  .let(takeWhileInclusive(v => v.fromCache));

  // Cache will be re-filled by request when there is another subscription after RECACHE_INTERVAL
  // Subscribers will get the most recent cached value first then an updated value

https://acutmore.jsbin.com/kekevib/8/edit?js,console

Оригинальный ответ:

Вместо того, чтобы устанавливать размер окна в replaySubject - вы можете изменить наблюдаемый источник для повторения после задержки.

const observable = Observable.defer(
    () => actualFn().do(() => this.console.log('CACHE MISS', cacheKey))
  )
  .repeatWhen(_ => _.delay(this.RECACHE_INTERVAL))
  .publishReplay(1)
  .refCount()
  .take(1)
  .do(() => this.console.log('CACHE HIT', cacheKey));

Оператору repeatWhen требуется RxJs-beta12 или выше https://github.com/ReactiveX/rxjs/blob/master/CHANGELOG.md#500-beta12-2016-09-09

Ответ 2

Смотрите эту демонстрацию: https://jsbin.com/todude/10/edit?js,console

Обратите внимание, что я пытаюсь получить кешированные результаты в 1200ms, когда случай недействителен, а затем в 1300ms, когда предыдущий запрос все еще ожидает (он принимает 200ms). Оба результата получены, как они должны.

Это происходит потому, что, когда вы подписываетесь и publishReplay() не содержит никакого действительного значения, он ничего не испустит и не будет немедленно завершен (благодаря take(1)), поэтому он должен подписаться на его источник, который делает HTTP-запросы (это действительно происходит в refCount()).

Тогда второй абонент ничего не получит и будет добавлен в массив наблюдателей в publishReplay(). Он не будет делать другую подписку, потому что он уже подписался на свой источник (refCount()) и ждет ответа.

Итак, ситуация, о которой вы описываете, не должна произойти, я думаю. Со временем вы получите демоверсию, демонстрирующую вашу проблему.

ИЗМЕНИТЬ:

Извлечение как недействительных элементов, так и свежих элементов

В следующем примере показана немного другая функциональность, чем связанный пример. Если кешированный отклик недействителен, он будет выходить в любом случае, а затем получит также новое значение. Это означает, что абонент получает одно или два значения:

  • 1 значение: кешированное значение
  • 2 значения: недействительное кешированное значение, а затем новое новое значение, которое будет кэшироваться с этого момента.

Код может выглядеть следующим образом:

let counter = 1;
const RECACHE_INTERVAL = 1000;

function mockDataFetch() {
  return Observable.of(counter++)
    .delay(200);
}

let source = Observable.defer(() => {
  const now = (new Date()).getTime();

  return mockDataFetch()
    .map(response => {
      return {
        'timestamp': now,
        'response': response,
      };
    });
});

let updateRequest = source
  .publishReplay(1)
  .refCount()
  .concatMap(value => {
    if (value.timestamp + RECACHE_INTERVAL > (new Date()).getTime()) {
      return Observable.from([value.response, null]);
    } else {
      return Observable.of(value.response);
    }
  })
  .takeWhile(value => value);


setTimeout(() => updateRequest.subscribe(val => console.log("Response 0:", val)), 0);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 50:", val)), 50);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 200:", val)), 200);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 1200:", val)), 1200);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 1300:", val)), 1300);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 1500:", val)), 1500);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 3500:", val)), 3500);

Смотрите демо-версию: https://jsbin.com/ketemi/2/edit?js,console

Это выводит на консоль следующий вывод:

Response 0: 1
Response 50: 1
Response 200: 1
Response 1200: 1
Response 1300: 1
Response 1200: 2
Response 1300: 2
Response 1500: 2
Response 3500: 2
Response 3500: 3

Обратите внимание, что 1200 и 1300 сразу получили старое кешированное значение 1, а затем другое значение со свежим значением 2.
С другой стороны, 1500 получил только новое значение, потому что 2 уже кэширован и действителен.

Самая запутанная вещь, вероятно, почему я использую concatMap().takeWhile(). Это связано с тем, что я должен убедиться, что новый ответ (а не недействительный) является последним значением перед отправкой полного уведомления, и, вероятно, для этого нет оператора (ни first(), ни takeWhile() не применимы для этого прецедента).

Испускает только текущий элемент, не дожидаясь обновления

Еще один пример использования может состоять в том, что мы хотим выпустить только кешированное значение, не ожидая нового ответа от HTTP-запроса.

let counter = 1;
const RECACHE_INTERVAL = 1000;

function mockDataFetch() {
  return Observable.of(counter++)
    .delay(200);
}

let source = Observable.defer(() => {
  const now = (new Date()).getTime();

  return mockDataFetch()
    .map(response => {
      return {
        'timestamp': now,
        'response': response,
      };
    });
});

let updateRequest = source
  .publishReplay(1)
  .refCount()
  .concatMap((value, i) => {
    if (i === 0) {
      if (value.timestamp + RECACHE_INTERVAL > (new Date()).getTime()) { // is cached item valid?
        return Observable.from([value.response, null]);
      } else {
        return Observable.of(value.response);
      }
    }
    return Observable.of(null);
  })
  .takeWhile(value => value);


setTimeout(() => updateRequest.subscribe(val => console.log("Response 0:", val)), 0);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 50:", val)), 50);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 200:", val)), 200);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 1200:", val)), 1200);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 1300:", val)), 1300);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 1500:", val)), 1500);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 3500:", val)), 3500);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 3800:", val)), 3800);

Смотрите демо-версию: https://jsbin.com/kebapu/2/edit?js,console

Этот пример выводится на консоль:

Response 0: 1
Response 50: 1
Response 200: 1
Response 1200: 1
Response 1300: 1
Response 1500: 2
Response 3500: 2
Response 3800: 3

Обратите внимание, что оба 1200 и 1300 получают значение 1, потому что это кешированное значение, даже если оно недействительно. Первый вызов в 1200 просто порождает новый HTTP-запрос без ожидания его ответа и испускает только кешированное значение. Тогда при 1500 новое значение кэшируется, поэтому оно просто переиздается. То же самое относится к 3500 и 3800.

Обратите внимание, что абонент в 1200 сразу получит уведомление next, но уведомление complete будет отправлено только после завершения HTTP-запроса. Нам нужно подождать, потому что, если мы отправим complete сразу после next, он сделает цепочку для размещения своих расходных материалов, которая также должна отменить HTTP-запрос (что мы определенно не хотим делать).

Ответ 3

Почти любая сложная логика быстро выходит из-под контроля, если вы используете простой rxjs. Я бы предпочел реализовать пользовательский оператор cache с нуля, вы можете использовать этот gist в качестве примера.