Подтвердить что ты не робот

Реактивное наблюдение за подпиской

Если у меня есть доступ к IObservable, который, как мне известно, будет когда-либо возвращать один элемент, будет ли это работать и является ли он лучшим шаблоном использования?

IDisposable disposable = null;
disposable = myObservable.Subscribe(x =>
  {
     DoThingWithItem(x);
     if (disposable != null)
     {
       disposable.Dispose();
     }
  });
4b9b3361

Ответ 1

Отказ от ответственности: я все еще изучаю Rx. Поэтому я не эксперт, но считаю, что одноразовый, возвращенный Subscribe, отменит подписку на подписку. Также, если источник завершен, как и в вашем случае, отмена подписки выполняется автоматически. Поэтому я думаю, что Dispose имеет избыточность и может быть безопасно удален.

Для получения дополнительной информации см. ответ на этот question.

Ответ 2

Одноразовое средство, возвращаемое методами расширения Subscribe, возвращается только для того, чтобы вы могли вручную отписаться от наблюдаемого до наблюдаемого естественного конца.

Если наблюдаемое завершено - с помощью OnCompleted или OnError - тогда подписка уже настроена для вас.

Попробуйте этот код:

var xs = Observable.Create<int>(o =>
{
    var d = Observable.Return(1).Subscribe(o);
    return Disposable.Create(() =>
    {
        Console.WriteLine("Disposed!");
        d.Dispose();
    });
});

var subscription = xs.Subscribe(x => Console.WriteLine(x));

Если вы запустите выше, вы увидите, что "Disposed!" записывается на консоль, когда наблюдаемое завершается без необходимости вызова .Dispose() в подписке.

Важно отметить: сборщик мусора никогда не вызывает .Dispose() по наблюдаемым подпискам, поэтому вы должны распоряжаться своими подписками, если они не имеют (или могут отсутствовать), естественно, законченные до вашей подписки выходит за рамки.

Возьмите это, например:

var wc = new WebClient();

var ds = Observable
    .FromEventPattern<
        DownloadStringCompletedEventHandler,
        DownloadStringCompletedEventArgs>(
            h => wc.DownloadStringCompleted += h,
            h => wc.DownloadStringCompleted -= h);

var subscription =
    ds.Subscribe(d =>
        Console.WriteLine(d.EventArgs.Result));

Наблюдаемый ds будет прикрепляться только к обработчику событий, когда он имеет подписку, и будет отделяться только тогда, когда наблюдаемое завершено или подписка будет удалена. Поскольку это обработчик событий, наблюдаемый никогда не будет завершен, потому что он ждет больше событий, и, следовательно, удаление является единственным способом отсоединиться от события (для приведенного выше примера).

Если у вас есть наблюдаемый FromEventPattern, который, как вы знаете, только когда-либо вернет одно значение, целесообразно добавить метод расширения .Take(1) перед подпиской, чтобы позволить обработчику событий автоматически отсоединяться, а затем вам не нужно вручную отмените подписку.

Так же:

var ds = Observable
    .FromEventPattern<
        DownloadStringCompletedEventHandler,
        DownloadStringCompletedEventArgs>(
            h => wc.DownloadStringCompleted += h,
            h => wc.DownloadStringCompleted -= h)
    .Take(1);

Надеюсь, это поможет.

Ответ 3

Функция Take будет выполнять именно то, что вы ищете. В этом случае Take(1).

Ответ 4

В отличие от некоторых комментариев, это вовсе не редкость распоряжаться подпиской изнутри OnNext.

Хотя верно, что удаление в OnCompleted и OnError выполняется для вас завернутой подпиской, созданной методом расширения Subscribe, вы можете отказаться от подписки на основе значения, которое вы наблюдаете (например, в вашем случае: 1-й). У вас не всегда есть наблюдаемое, которое, как известно, производит только одно значение.

Проблема в том, что вы получаете IDisposable только после того, как подписались. Наблюдаемый может перезвонить вам на OnNext еще до того, как он вернет вам IDisposable для отмены подписки (в зависимости от таких вещей, как IScheduler, который он использует).

System.Reactive.Disposables.SingleAssignmentDisposable пригодится в этом случае. Он обертывает IDisposable, который вы можете назначить опозданием, и немедленно удалит его при назначении, если к нему уже был установлен SingleAssignmentDisposable. Также он несет свойство IsDisposed, которое изначально является false и устанавливается на true, когда вызывается Dispose().

Итак:

IObservable<string> source = ...;

var subscription = new SingleAssignmentDisposable();
subscription.Disposable = source.Subscribe(x =>
{
    if (subscription.IsDisposed) // getting notified though I've told it to stop
        return;
    DoThingsWithItem(x);
    if (x == "the last item I'm interested in")
        subscription.Dispose();
});