Подтвердить что ты не робот

В чем разница между подпиской Rx.Observable и forEach

После создания наблюдаемого, подобного

var source = Rx.Observable.create(function(observer) {...});

В чем разница между подпиской

source.subscribe(function(x) {});

и для каждого

source.forEach(function(x) {});
4b9b3361

Ответ 1

В спецификации ES7, который следует RxJS 5.0 (но RxJS 4.0 не работает), два НЕ являются одинаковыми.

подписаться

public subscribe(observerOrNext: Observer | Function, error: Function, complete: Function): Subscription

Observable.subscribe - это то, где вы будете выполнять большую часть своей истинной обработки Observable. Он возвращает токен подписки, который вы можете использовать для отмены подписки. Это важно, когда вы не знаете, сколько времени вы подписали, или вам может потребоваться прекратить прослушивание до известной продолжительности.

ForEach

public forEach(next: Function, PromiseCtor?: PromiseConstructor): Promise

Observable.forEach возвращает обещание, которое либо будет разрешено, либо отклонено при завершении Observable или ошибках. Он предназначен для выяснения ситуаций, когда вы обрабатываете наблюдаемую последовательность ограниченной/конечной продолжительности более "синхронно", например, сопоставляя все входящие значения и затем представляя один раз, обрабатывая обещание.

Эффективно, вы можете действовать по каждому значению, а также по событиям с ошибкой и завершением в любом случае. Таким образом, самым значительным функциональным различием является невозможность отменить обещание.

Ответ 2

Я просто просматриваю последний доступный код, технически код foreach на самом деле вызывает подписку в RxScala, RxJS и RxJava. Это не кажется большим отличием. Теперь у них есть тип возврата, позволяющий пользователю иметь возможность остановить подписку или подобное.

Когда я работаю над ранней версией RxJava, у подписки есть возврат подписки, а forEach - просто пустота. Из-за изменений вы можете увидеть какой-то другой ответ.

/**
 * Subscribes to the [[Observable]] and receives notifications for each element.
 *
 * Alias to `subscribe(T => Unit)`.
 *
 * $noDefaultScheduler
 *  
 * @param onNext function to execute for each item.
 * @throws java.lang.IllegalArgumentException if `onNext` is null
 * @throws rx.exceptions.OnErrorNotImplementedException if the [[Observable]] tries to call `onError`
 * @since 0.19
 * @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX operators documentation: Subscribe</a>
 */
def foreach(onNext: T => Unit): Unit = {
    asJavaObservable.subscribe(onNext)
 }

def subscribe(onNext: T => Unit): Subscription = {
    asJavaObservable.subscribe(scalaFunction1ProducingUnitToAction1(onNext))
}

/**
 *  Subscribes an o to the observable sequence.
 *  @param {Mixed} [oOrOnNext] The object that is to receive notifications or an action to invoke for each element in the observable sequence.
 *  @param {Function} [onError] Action to invoke upon exceptional termination of the observable sequence.
 *  @param {Function} [onCompleted] Action to invoke upon graceful termination of the observable sequence.
 *  @returns {Disposable} A disposable handling the subscriptions and unsubscriptions.
 */
observableProto.subscribe = observableProto.forEach = function (oOrOnNext, onError, onCompleted) {
  return this._subscribe(typeof oOrOnNext === 'object' ?
    oOrOnNext :
    observerCreate(oOrOnNext, onError, onCompleted));
};

/**
 * Subscribes to the {@link Observable} and receives notifications for each element.
 * <p>
 * Alias to {@link #subscribe(Action1)}
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code forEach} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 * 
 * @param onNext
 *            {@link Action1} to execute for each item.
 * @throws IllegalArgumentException
 *             if {@code onNext} is null
 * @throws OnErrorNotImplementedException
 *             if the Observable calls {@code onError}
 * @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX operators documentation: Subscribe</a>
 */
public final void forEach(final Action1<? super T> onNext) {
    subscribe(onNext);
}

public final Disposable forEach(Consumer<? super T> onNext) {
    return subscribe(onNext);
}

Ответ 3

observable.subscribe(...) и observable.forEach(...) точно такие же.
 Так же, как user3743222 упоминается в его ответе

Из документации Rx:

Подписывает наблюдателя наблюдаемой последовательности.

Rx.Observable.prototype.subscribe([observer] | [onNext], [onError], [onCompleted])

Rx.Observable.prototype.forEach([observer] | [onNext], [onError], [onCompleted])

Подробнее см. полную документацию.