Подтвердить что ты не робот

Последовательность RxJS эквивалентна обещанию .then()?

Раньше я много развивался с обещанием, и теперь я перехожу к RxJS. Документ RxJS не дает очень четкого примера того, как перейти от последовательности обещаний к последовательности наблюдателей.

Например, я обычно пишу цепочку обещаний с несколькими шагами, например

// a function that returns a promise
getPromise()
.then(function(result) {
   // do something
})
.then(function(result) {
   // do something
})
.then(function(result) {
   // do something
})
.catch(function(err) {
    // handle error
});

Как мне переписать эту цепочку обещаний в стиле RxJS?

4b9b3361

Ответ 1

Для потока данных (эквивалентно then):

Rx.Observable.fromPromise(...)
  .flatMap(function(result) {
   // do something
  })
  .flatMap(function(result) {
   // do something
  })
  .subscribe(function onNext(result) {
    // end of chain
  }, function onError(error) {
    // process the error
  });

Обещание может быть преобразовано в наблюдаемое с Rx.Observable.fromPromise.

Некоторые операторы обещаний имеют прямой перевод. Например, RSVP.all или jQuery.when можно заменить на Rx.Observable.forkJoin.

Имейте в виду, что у вас есть группа операторов, которая позволяет асинхронно преобразовывать данные и выполнять задачи, которые вы не можете или будет очень трудно сделать с помощью promises. Rxjs показывает все свои возможности с асинхронными последовательностями данных (последовательность, то есть более 1 асинхронного значения).

Для управления ошибками объект немного сложнее.

  • есть catch и finally операторы тоже
  • retryWhen также может помочь повторить последовательность в случае ошибки
  • вы также можете иметь дело с ошибками в самом подписчике с помощью функции onError.

Для точной семантики более подробно рассмотрите документацию и примеры, которые вы можете найти в Интернете, или задайте здесь конкретные вопросы.

Это определенно станет хорошей отправной точкой для более глубокого управления ошибками с помощью Rxjs: https://xgrommx.github.io/rx-book/content/getting_started_with_rxjs/creating_and_querying_observable_sequences/error_handling.html

Ответ 2

Более современная альтернатива:

import {from as fromPromise} from 'rxjs';
import {catchError, flatMap} from 'rxjs/operators';

fromPromise(...).pipe(
   flatMap(result => {
       // do something
   }),
   flatMap(result => {
       // do something
   }),
   flatMap(result => {
       // do something
   }),
   catchError(error => {
       // handle error
   })
)

Также обратите внимание, что для того, чтобы все это работало, вам нужно где-то subscribe на этот канал Observable, но я предполагаю, что он обрабатывается в какой-то другой части приложения.

Ответ 3

Обновление мая 2019 года с использованием RxJs 6

Согласитесь с приведенными выше ответами, хотел бы добавить конкретный пример с некоторыми игрушечными данными и простыми обещаниями (с setTimeout), используя RxJs v6 для большей ясности.

Просто обновите переданный идентификатор (в настоящее время жестко запрограммированный как 1) на что-то, что не существует, чтобы выполнить логику обработки ошибок тоже. Важно отметить, что также отметить использование of с catchError сообщения.

import { from as fromPromise, of } from "rxjs";
import { catchError, flatMap, tap } from "rxjs/operators";

const posts = [
  { title: "I love JavaScript", author: "Wes Bos", id: 1 },
  { title: "CSS!", author: "Chris Coyier", id: 2 },
  { title: "Dev tools tricks", author: "Addy Osmani", id: 3 }
];

const authors = [
  { name: "Wes Bos", twitter: "@wesbos", bio: "Canadian Developer" },
  {
    name: "Chris Coyier",
    twitter: "@chriscoyier",
    bio: "CSS Tricks and CodePen"
  },
  { name: "Addy Osmani", twitter: "@addyosmani", bio: "Googler" }
];

function getPostById(id) {
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      const post = posts.find(post => post.id === id);
      if (post) {
        console.log("ok, post found!");
        resolve(post);
      } else {
        reject(Error("Post not found!"));
      }
    }, 200);
  });
}

function hydrateAuthor(post) {
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      const authorDetails = authors.find(person => person.name === post.author);
      if (authorDetails) {
        post.author = authorDetails;
        console.log("ok, post hydrated with author info");
        resolve(post);
      } else {
        reject(Error("Author not Found!"));
      }
    }, 200);
  });
}

function dehydratePostTitle(post) {
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      delete post.title;
      console.log("ok, applied transformation to remove title");
      resolve(post);
    }, 200);
  });
}

// ok, here is how it looks regarding this question..
let source$ = fromPromise(getPostById(1)).pipe(
  flatMap(post => {
    return hydrateAuthor(post);
  }),
  flatMap(post => {
    return dehydratePostTitle(post);
  }),
  catchError(error => of('Caught error: ${error}'))
);

source$.subscribe(console.log);

Выходные данные:

ok, post found!
ok, post hydrated with author info
ok, applied transformation to remove title
{ author:
   { name: 'Wes Bos',
     twitter: '@wesbos',
     bio: 'Canadian Developer' },
  id: 1 }

Ключевая часть, эквивалентна следующей, использующей простой поток управления обещаниями:

getPostById(1)
  .then(post => {
    return hydrateAuthor(post);
  })
  .then(post => {
    return dehydratePostTitle(post);
  })
  .then(author => {
    console.log(author);
  })
  .catch(err => {
    console.error(err);
  });

Ответ 4

если функция getPromise находится в середине потокового канала, вы должны просто обернуть ее в одну из функций mergeMap, switchMap или concatMap (обычно mergeMap):

stream$.pipe(
   mergeMap(data => getPromise(data)),
   filter(...),
   map(...)
 ).subscribe(...);

если вы хотите начать свой поток с getPromise() то оберните его from функции:

import {from} from 'rxjs';

from(getPromise()).pipe(
   filter(...)
   map(...)
).subscribe(...);

Ответ 5

Насколько я только что узнал, если вы возвращаете результат в flatMap, он преобразует его в массив, даже если вы вернули строку.

Но если вы возвращаете Observable, эта Observable может вернуть строку;

Ответ 6

Если я правильно понял, вы имеете в виду использование значений, и в этом случае вы используете sbuscribe, т.е.

const arrObservable = from([1,2,3,4,5,6,7,8]);
arrObservable.subscribe(number => console.log(num) );

Кроме того, вы можете просто превратить наблюдаемое в обещание, используя toPromise(), как показано ниже:

arrObservable.toPromise().then()

Ответ 7

Вот как я это сделал.

предварительно

  public fetchContacts(onCompleteFn: (response: gapi.client.Response<gapi.client.people.ListConnectionsResponse>) => void) {
    const request = gapi.client.people.people.connections.list({
      resourceName: 'people/me',
      pageSize: 100,
      personFields: 'phoneNumbers,organizations,emailAddresses,names'
    }).then(response => {
      onCompleteFn(response as gapi.client.Response<gapi.client.people.ListConnectionsResponse>);
    });
  }

// caller:

  this.gapi.fetchContacts((rsp: gapi.client.Response<gapi.client.people.ListConnectionsResponse>) => {
      // handle rsp;
  });

После того, как (ют?)

public fetchContacts(): Observable<gapi.client.Response<gapi.client.people.ListConnectionsResponse>> {
    return from(
      new Promise((resolve, reject) => {
        gapi.client.people.people.connections.list({
          resourceName: 'people/me',
          pageSize: 100,
          personFields: 'phoneNumbers,organizations,emailAddresses,names'
        }).then(result => {
          resolve(result);
        });
      })
    ).pipe(map((result: gapi.client.Response<gapi.client.people.ListConnectionsResponse>) => {
      return result; //map is not really required if you not changing anything in the response. you can just return the from() and caller would subscribe to it.
    }));
  }

// caller

this.gapi.fetchContacts().subscribe(((rsp: gapi.client.Response<gapi.client.people.ListConnectionsResponse>) => {
  // handle rsp
}), (error) => {
  // handle error
});