Подтвердить что ты не робот

RxJs Наблюдаемая разбивка на страницы

Во-первых: Это первый проект, в котором я использую RxJs. Я думал, что я буду лучше учиться, используя его.

Я нашел этот ответ: Превращение разбитых на страницы запросов в поток Observable с помощью RxJs Но в комментариях говорится:

Вы превысили максимальный стек вызовов. Пришло около 430 страниц. Я думаю, что рекурсия не может быть лучшим решением здесь.

Я хочу запросить API данных Youtube, результаты возвращаются на страницы, и мне нужно их прокручивать. Я думал, что такой рабочий процесс может работать: 1) Инициировать вызов 2) Проверьте, имеет ли ответ "nextPageToken", 3) Если у него есть, выполните другой запрос API Youtube 4) Если нет, завершите

So to do this I could Imagine the following Observables / streams:
FirstRequestStream -A-X--------------->
ResponseStream     -A-A-A-A--X-------->
RequestStream      -I-A-I-A----------->
A = Action
I = Info from upper stream
X = Termination

(Не уверен, что эта диаграмма правильна, как я ее сделал)

Таким образом, ResponseStream зависит от FirstRequestStream и RequestStream (используя функцию слияния). RequestStream зависит от ResponseStream (это называется циркулирующим наблюдаемым?)

-Это правильный подход?

-Are 'циркулирующие наблюдаемые' хорошо, они даже возможны? (У меня возникли проблемы с созданием одного).

-Но другой путь я должен попробовать в первую очередь?

- Можно ли создавать взаимозависимые наблюдаемые потоки?

Благодарим вас за помощь.

4b9b3361

Ответ 1

Вы слишком усложняете эту проблему, ее можно решить намного проще, используя оператор defer.

Идея состоит в том, что вы создаете отложенную наблюдаемую (поэтому она будет создана и начнет извлекать данные только после подписки) и объедините ее с той же наблюдаемой, но для следующей страницы, которая также будет объединена со следующей страницей и т.д.... И все это можно сделать без рекурсии.

Вот как выглядит код:

const { defer, from, concat, EMPTY, timer } = rxjs; // = require("rxjs")
const { mergeMap, take, mapTo, tap } = rxjs.operators; // = require("rxjs/operators")

// simulate network request
function fetchPage(page=0) {
  return timer(100).pipe(
    tap(() => console.log('-> fetched page ${page}')),
    mapTo({
      items: Array.from({ length: 10 }).map((_, i) => page * 10 + i),
      nextPage: page + 1,
    })
  );
}

const getItems = page => defer(() => fetchPage(page)).pipe(
  mergeMap(({ items, nextPage }) => {
    const items$ = from(items);
    const next$ = nextPage ? getItems(nextPage) : EMPTY;
    return concat(items$, next$);
  })
);

// process only first 30 items, without fetching all of the data
getItems()
 .pipe(take(30))
 .subscribe(e => console.log(e));
<script src="https://unpkg.com/[email protected]/bundles/rxjs.umd.min.js"></script>

Ответ 2

Я бессовестно повторно использую фрагмент кода от Олеса Савлука с его хорошей функцией fetchPage и применяю идеи, изложенные в статье блога, на которую ссылается Picci (в комментариях), используя expand.

Статья о экспансии Николаса Джеймисона

Это дает немного более простой код с рекурсией, скрытой в вызове expand (и комментарии к статье показывают, как линеаризовать его, если это необходимо).

const { timer, EMPTY } = rxjs; // = require("rxjs")
const { concatMap, expand, mapTo, tap, toArray } = rxjs.operators; // = require("rxjs/operators")

// simulate network request
const pageNumber = 3;
function fetchPage(page = 0) {
  return timer(1000).pipe(
    tap(() => console.log('-> fetched page ${page}')),
    mapTo({
      items: Array.from({ length: 10 }).map((_, i) => page * 10 + i),
      nextPage: ++page === pageNumber ? undefined : page,
    }),
  );
}

const list = fetchPage().pipe(
  expand(({ nextPage }) => nextPage ? fetchPage(nextPage) : EMPTY),
  concatMap(({ items }) => items),
  // Transforms the stream of numbers (Observable<number>)
  // to a stream with only an array of numbers (Observable<number[]>).
  // Remove if you want a stream of numbers, not waiting for all requests to complete.
  toArray(),
);

list.subscribe(console.log);
<script src="https://unpkg.com/[email protected]/bundles/rxjs.umd.min.js"></script>

Ответ 3

Вот мое решение, использующее операторы rxjs expand, reduce и empty в контексте класса Angular с использованием модуля HttpClient:

Предположим, что ваш ответ API - это объект, содержащий свойства "данные" (массив элементов результатов) и "следующий" (строка, содержащая либо URL-адрес следующей страницы, либо ноль, если больше нет следующих элементов).

getAllResults(url) {
  return this.http.get(url).pipe(
    expand((res) => res.next ? this.http.get(next) : EMPTY),
    reduce((acc, res) => acc.concat(res.data), [])
  );
}