Подтвердить что ты не робот

С Rx, как я могу игнорировать все, кроме самого последнего значения, когда работает метод Subscribe

Используя Reactive Extensions, я хочу игнорировать сообщения, исходящие из потока событий, которые происходят, когда мой метод Subscribe запущен. То есть иногда мне требуется больше времени для обработки сообщения, чем время между сообщением, поэтому я хочу отказаться от сообщений, которые у меня нет времени для обработки.

Однако, когда мой метод Subscribe завершается, если какие-либо сообщения поступают, я хочу обработать последний. Поэтому я всегда обрабатываю последнее сообщение.

Итак, если у меня есть код, который делает:

messages.OnNext(100);
messages.OnNext(1);
messages.OnNext(2);

и если мы предположим, что "100" требует много времени для обработки. Затем я хочу, чтобы "2" обрабатывался, когда "100" завершается. "1" следует игнорировать, поскольку он был заменен "2", а "100" все еще обрабатывался.

Вот пример результата, который я хочу использовать в фоновом задании, и Latest()

var messages = Observable.Interval(TimeSpan.FromMilliseconds(100));

Task.Factory.StartNew(() =>
{
    foreach(var n in messages.Latest())
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(250));
        Console.WriteLine(n);
    }
});

Однако, последний() является блокирующим вызовом, и я бы предпочел не иметь нить, ожидающую следующего значения, подобного этому (иногда могут возникать очень большие промежутки между сообщениями).

Я также могу получить результат, который я хочу, используя BroadcastBlock из TPL Dataflow следующим образом:

var buffer = new BroadcastBlock<long>(n => n);
Observable.Interval(TimeSpan.FromMilliseconds(100)).Subscribe(n => buffer.Post(n));

buffer.AsObservable()
    .Subscribe(n =>
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(250));
        Console.WriteLine(n);
    });

но похоже, что это должно быть возможно непосредственно в Rx. Какой лучший способ сделать это?

4b9b3361

Ответ 1

Благодаря Ли Кэмпбеллу (Intro To Rx известность), теперь у меня есть рабочее решение, использующее этот метод расширения:

public static IObservable<T> ObserveLatestOn<T>(this IObservable<T> source, IScheduler scheduler)
{
    return Observable.Create<T>(observer =>
    {
        Notification<T> outsideNotification = null;
        var gate = new object();
        bool active = false;
        var cancelable = new MultipleAssignmentDisposable();
        var disposable = source.Materialize().Subscribe(thisNotification =>
        {
            bool alreadyActive;
            lock (gate)
            {
                alreadyActive = active;
                active = true;
                outsideNotification = thisNotification;
            }

            if (!alreadyActive)
            {
                cancelable.Disposable = scheduler.Schedule(self =>
                {
                    Notification<T> localNotification = null;
                    lock (gate)
                    {
                        localNotification = outsideNotification;
                        outsideNotification = null;
                    }
                    localNotification.Accept(observer);
                    bool hasPendingNotification = false;
                    lock (gate)
                    {
                        hasPendingNotification = active = (outsideNotification != null);
                    }
                    if (hasPendingNotification)
                    {
                        self();
                    }
                });
            }
        });
        return new CompositeDisposable(disposable, cancelable);
    });
}

Ответ 2

Вот метод, похожий на Dave, но вместо него использует Sample (что более удобно, чем буфер). Я включил аналогичный метод расширения в тот, который был добавлен в ответ Дэйва.

Расширение:

public static IDisposable SubscribeWithoutOverlap<T>(this IObservable<T> source, Action<T> action)
{
    var sampler = new Subject<Unit>();

    var sub = source.
        Sample(sampler).
        ObserveOn(Scheduler.ThreadPool).
        Subscribe(l =>
        {
            action(l);
            sampler.OnNext(Unit.Default);
        });

    // start sampling when we have a first value
    source.Take(1).Subscribe(_ => sampler.OnNext(Unit.Default));

    return sub;
}

Обратите внимание, что это проще, и нет пустого буфера, который был запущен. Первый элемент, который отправляется в действие, фактически поступает из самого потока.

Использование прост:

messages.SubscribeWithoutOverlap(n =>
{
    Console.WriteLine("start: " + n);
    Thread.Sleep(500);
    Console.WriteLine("end: " + n);
});

messages.Subscribe(x => Console.WriteLine("source: " + x)); // for testing

И результаты:

source: 0
start: 0
source: 1
source: 2
source: 3
source: 4
source: 5
end: 0
start: 5
source: 6
source: 7
source: 8
source: 9
source: 10
end: 5
start: 10
source: 11
source: 12
source: 13
source: 14
source: 15
end: 10

Ответ 3

Вот попытка использовать "просто" Rx. Таймер и подписчик остаются независимыми, наблюдая за потоковым пулом, и я использовал тему, чтобы предоставить отзыв о завершении задачи.

Я не думаю, что это простое решение, но я надеюсь, что это может дать вам идеи для улучшения.

messages.
    Buffer(() => feedback).
    Select(l => l.LastOrDefault()).
    ObserveOn(Scheduler.ThreadPool).
    Subscribe(n =>
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(250));
        Console.WriteLine(n);
        feedback.OnNext(Unit.Default);
    });

feedback.OnNext(Unit.Default);

Есть одна небольшая проблема: буфер сначала закрывается, когда он пуст, поэтому он генерирует значение по умолчанию. Вероятно, вы могли бы решить эту проблему, выполнив обратную связь после первого сообщения.


Здесь это как функция расширения:

public static IDisposable SubscribeWithoutOverlap<T>(this IObservable<T> source, Action<T> action)
{
    var feedback = new Subject<Unit>();

    var sub = source.
        Buffer(() => feedback).
        ObserveOn(Scheduler.ThreadPool).
        Subscribe(l =>
        {
            action(l.LastOrDefault());
            feedback.OnNext(Unit.Default);
        });

    feedback.OnNext(Unit.Default);

    return sub;
}

И использование:

    messages.SubscribeWithoutOverlap(n =>
    {
        Thread.Sleep(1000);
        Console.WriteLine(n);
    });

Ответ 4

Я написал сообщение в блоге об этом с помощью решения, которое использует CAS вместо блокировок и избегает рекурсии. Код ниже, но здесь вы можете найти полное объяснение: http://www.zerobugbuild.com/?p=192

public static IObservable<TSource> ObserveLatestOn<TSource>(
    this IObservable<TSource> source,
    IScheduler scheduler)
{
    return Observable.Create<TSource>(observer =>
    {
        Notification<TSource> pendingNotification = null;
        var cancelable = new MultipleAssignmentDisposable();

        var sourceSubscription = source.Materialize()
            .Subscribe(notification =>
            {
                var previousNotification = Interlocked.Exchange(
                    ref pendingNotification, notification);

                if (previousNotification != null) return;

                cancelable.Disposable = scheduler.Schedule(() =>
                    {
                        var notificationToSend = Interlocked.Exchange(
                            ref pendingNotification, null);
                        notificationToSend.Accept(observer);
                    });
            });
            return new CompositeDisposable(sourceSubscription, cancelable);
    });
}

Ответ 5

Пример использования Observable.Switch. Он также обрабатывает случай, когда вы выполняете задачу, но нет ничего в очереди.

using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;

namespace System.Reactive
{
    public static class RXX
    {
        public static IDisposable SubscribeWithoutOverlap<T>
        ( this IObservable<T> source
        , Action<T> action
        , IScheduler scheduler = null)
        {
            var sampler = new Subject<Unit>();
            scheduler = scheduler ?? Scheduler.Default;
            var p = source.Publish();
            var connection = p.Connect();

            var subscription = sampler.Select(x=>p.Take(1))
                .Switch()
                .ObserveOn(scheduler)
                .Subscribe(l =>
                {
                    action(l);
                    sampler.OnNext(Unit.Default);
                });

            sampler.OnNext(Unit.Default);

            return new CompositeDisposable(connection, subscription);
        }
    }
}

Ответ 6

Только что закончил (и уже полностью переработал) свое собственное решение проблемы, которое я планирую использовать в производстве.

Если планировщик не использует текущий поток, вызовы OnNext, OnCompleted, OnError из источника должны немедленно возвращаться; если наблюдатель занят предыдущими уведомлениями, они входят в очередь с заданным максимальным размером, откуда они будут уведомлены всякий раз, когда предыдущее уведомление было обработано. Если очередь заполняется, то последние элементы отбрасываются. Таким образом, максимальный размер очереди 0 игнорирует все входящие элементы, пока наблюдатель занят; размер 1 всегда позволяет наблюдать за последним предметом; размер до int.MaxValue удерживает потребителя занятым до тех пор, пока он не догонит производителя.

Если планировщик поддерживает длительную работу (т.е. дает вам собственный поток), я планирую цикл для уведомления наблюдателя; в противном случае я использую рекурсивное планирование.

Вот код. Любые комментарии приветствуются.

partial class MoreObservables
{
    /// <summary>
    /// Avoids backpressure by enqueuing items when the <paramref name="source"/> produces them more rapidly than the observer can process.
    /// </summary>
    /// <param name="source">The source sequence.</param>
    /// <param name="maxQueueSize">Maximum queue size. If the queue gets full, less recent items are discarded from the queue.</param>
    /// <param name="scheduler">Optional, default: <see cref="Scheduler.Default"/>: <see cref="IScheduler"/> on which to observe notifications.</param>
    /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
    /// <exception cref="ArgumentOutOfRangeException"><paramref name="maxQueueSize"/> is negative.</exception>
    /// <remarks>
    /// A <paramref name="maxQueueSize"/> of 0 observes items only if the subscriber is ready.
    /// A <paramref name="maxQueueSize"/> of 1 guarantees to observe the last item in the sequence, if any.
    /// To observe the whole source sequence, specify <see cref="int.MaxValue"/>.
    /// </remarks>
    public static IObservable<TSource> Latest<TSource>(this IObservable<TSource> source, int maxQueueSize, IScheduler scheduler = null)
    {
        if (source == null) throw new ArgumentNullException(nameof(source));
        if (maxQueueSize < 0) throw new ArgumentOutOfRangeException(nameof(maxQueueSize));
        if (scheduler == null) scheduler = Scheduler.Default;

        return Observable.Create<TSource>(observer => LatestImpl<TSource>.Subscribe(source, maxQueueSize, scheduler, observer));
    }

    private static class LatestImpl<TSource>
    {
        public static IDisposable Subscribe(IObservable<TSource> source, int maxQueueSize, IScheduler scheduler, IObserver<TSource> observer)
        {
            if (observer == null) throw new ArgumentNullException(nameof(observer));

            var longrunningScheduler = scheduler.AsLongRunning();
            if (longrunningScheduler != null)
                return new LoopSubscription(source, maxQueueSize, longrunningScheduler, observer);

            return new RecursiveSubscription(source, maxQueueSize, scheduler, observer);
        }

        #region Subscriptions

        /// <summary>
        /// Represents a subscription to <see cref="Latest{TSource}(IObservable{TSource}, int, IScheduler)"/> which notifies in a loop.
        /// </summary>
        private sealed class LoopSubscription : IDisposable
        {
            private enum State
            {
                Idle, // nothing to notify
                Head, // next notification is in _head
                Queue, // next notifications are in _queue, followed by _completion
                Disposed, // disposed
            }

            private readonly SingleAssignmentDisposable _subscription = new SingleAssignmentDisposable();
            private readonly IObserver<TSource> _observer;
            private State _state;
            private TSource _head; // item in front of the queue
            private IQueue _queue; // queued items
            private Notification<TSource> _completion; // completion notification

            public LoopSubscription(IObservable<TSource> source, int maxQueueSize, ISchedulerLongRunning scheduler, IObserver<TSource> observer)
            {
                _observer = observer;
                _queue = Queue.Create(maxQueueSize);
                scheduler.ScheduleLongRunning(_ => Loop());
                _subscription.Disposable = source.Subscribe(
                    OnNext,
                    error => OnCompletion(Notification.CreateOnError<TSource>(error)),
                    () => OnCompletion(Notification.CreateOnCompleted<TSource>()));
            }

            private void OnNext(TSource value)
            {
                lock (_subscription)
                {
                    switch (_state)
                    {
                        case State.Idle:
                            _head = value;
                            _state = State.Head;
                            Monitor.Pulse(_subscription);
                            break;
                        case State.Head:
                        case State.Queue:
                            if (_completion != null) return;
                            try { _queue.Enqueue(value); }
                            catch (Exception error) // probably OutOfMemoryException
                            {
                                _completion = Notification.CreateOnError<TSource>(error);
                                _subscription.Dispose();
                            }
                            break;
                    }
                }
            }

            private void OnCompletion(Notification<TSource> completion)
            {
                lock (_subscription)
                {
                    switch (_state)
                    {
                        case State.Idle:
                            _completion = completion;
                            _state = State.Queue;
                            Monitor.Pulse(_subscription);
                            _subscription.Dispose();
                            break;
                        case State.Head:
                        case State.Queue:
                            if (_completion != null) return;
                            _completion = completion;
                            _subscription.Dispose();
                            break;
                    }
                }
            }

            public void Dispose()
            {
                lock (_subscription)
                {
                    if (_state == State.Disposed) return;

                    _head = default(TSource);
                    _queue = null;
                    _completion = null;
                    _state = State.Disposed;
                    Monitor.Pulse(_subscription);
                    _subscription.Dispose();
                }
            }

            private void Loop()
            {
                try
                {
                    while (true) // overall loop for all notifications
                    {
                        // next notification to emit
                        Notification<TSource> completion;
                        TSource next; // iff completion == null

                        lock (_subscription)
                        {
                            while (true)
                            {
                                while (_state == State.Idle)
                                    Monitor.Wait(_subscription);

                                if (_state == State.Head)
                                {
                                    completion = null;
                                    next = _head;
                                    _head = default(TSource);
                                    _state = State.Queue;
                                    break;
                                }
                                if (_state == State.Queue)
                                {
                                    if (!_queue.IsEmpty)
                                    {
                                        completion = null;
                                        next = _queue.Dequeue(); // assumption: this never throws
                                        break;
                                    }
                                    if (_completion != null)
                                    {
                                        completion = _completion;
                                        next = default(TSource);
                                        break;
                                    }
                                    _state = State.Idle;
                                    continue;
                                }
                                Debug.Assert(_state == State.Disposed);
                                return;
                            }
                        }

                        if (completion != null)
                        {
                            completion.Accept(_observer);
                            return;
                        }
                        _observer.OnNext(next);
                    }
                }
                finally { Dispose(); }
            }
        }

        /// <summary>
        /// Represents a subscription to <see cref="Latest{TSource}(IObservable{TSource}, int, IScheduler)"/> which notifies recursively.
        /// </summary>
        private sealed class RecursiveSubscription : IDisposable
        {
            private enum State
            {
                Idle, // nothing to notify
                Scheduled, // emitter scheduled or executing
                Disposed, // disposed
            }

            private readonly SingleAssignmentDisposable _subscription = new SingleAssignmentDisposable();
            private readonly MultipleAssignmentDisposable _emitter = new MultipleAssignmentDisposable(); // scheduled emit action
            private readonly IScheduler _scheduler;
            private readonly IObserver<TSource> _observer;
            private State _state;
            private IQueue _queue; // queued items
            private Notification<TSource> _completion; // completion notification

            public RecursiveSubscription(IObservable<TSource> source, int maxQueueSize, IScheduler scheduler, IObserver<TSource> observer)
            {
                _scheduler = scheduler;
                _observer = observer;
                _queue = Queue.Create(maxQueueSize);
                _subscription.Disposable = source.Subscribe(
                    OnNext,
                    error => OnCompletion(Notification.CreateOnError<TSource>(error)),
                    () => OnCompletion(Notification.CreateOnCompleted<TSource>()));
            }

            private void OnNext(TSource value)
            {
                lock (_subscription)
                {
                    switch (_state)
                    {
                        case State.Idle:
                            _emitter.Disposable = _scheduler.Schedule(value, EmitNext);
                            _state = State.Scheduled;
                            break;
                        case State.Scheduled:
                            if (_completion != null) return;
                            try { _queue.Enqueue(value); }
                            catch (Exception error) // probably OutOfMemoryException
                            {
                                _completion = Notification.CreateOnError<TSource>(error);
                                _subscription.Dispose();
                            }
                            break;
                    }
                }
            }

            private void OnCompletion(Notification<TSource> completion)
            {
                lock (_subscription)
                {
                    switch (_state)
                    {
                        case State.Idle:
                            _completion = completion;
                            _emitter.Disposable = _scheduler.Schedule(() => EmitCompletion(completion));
                            _state = State.Scheduled;
                            _subscription.Dispose();
                            break;
                        case State.Scheduled:
                            if (_completion != null) return;
                            _completion = completion;
                            _subscription.Dispose();
                            break;
                    }
                }
            }

            public void Dispose()
            {
                lock (_subscription)
                {
                    if (_state == State.Disposed) return;

                    _emitter.Dispose();
                    _queue = null;
                    _completion = null;
                    _state = State.Disposed;
                    _subscription.Dispose();
                }
            }

            private void EmitNext(TSource value, Action<TSource> self)
            {
                try { _observer.OnNext(value); }
                catch { Dispose(); return; }

                lock (_subscription)
                {
                    if (_state == State.Disposed) return;
                    Debug.Assert(_state == State.Scheduled);
                    if (!_queue.IsEmpty)
                        self(_queue.Dequeue());
                    else if (_completion != null)
                        _emitter.Disposable = _scheduler.Schedule(() => EmitCompletion(_completion));
                    else
                        _state = State.Idle;
                }
            }

            private void EmitCompletion(Notification<TSource> completion)
            {
                try { completion.Accept(_observer); }
                finally { Dispose(); }
            }
        }

        #endregion

        #region IQueue

        /// <summary>
        /// FIFO queue that discards least recent items if size limit is reached.
        /// </summary>
        private interface IQueue
        {
            bool IsEmpty { get; }
            void Enqueue(TSource item);
            TSource Dequeue();
        }

        /// <summary>
        /// <see cref="IQueue"/> implementations.
        /// </summary>
        private static class Queue
        {
            public static IQueue Create(int maxSize)
            {
                switch (maxSize)
                {
                    case 0: return Zero.Instance;
                    case 1: return new One();
                    default: return new Many(maxSize);
                }
            }

            private sealed class Zero : IQueue
            {
                // ReSharper disable once StaticMemberInGenericType
                public static Zero Instance { get; } = new Zero();
                private Zero() { }

                public bool IsEmpty => true;
                public void Enqueue(TSource item) { }
                public TSource Dequeue() { throw new InvalidOperationException(); }
            }

            private sealed class One : IQueue
            {
                private TSource _item;

                public bool IsEmpty { get; private set; } = true;

                public void Enqueue(TSource item)
                {
                    _item = item;
                    IsEmpty = false;
                }

                public TSource Dequeue()
                {
                    if (IsEmpty) throw new InvalidOperationException();

                    var item = _item;
                    _item = default(TSource);
                    IsEmpty = true;
                    return item;
                }
            }

            private sealed class Many : IQueue
            {
                private readonly int _maxSize, _initialSize;
                private int _deq, _enq; // indices of deque and enqueu positions
                private TSource[] _buffer;

                public Many(int maxSize)
                {
                    if (maxSize < 2) throw new ArgumentOutOfRangeException(nameof(maxSize));

                    _maxSize = maxSize;
                    if (maxSize == int.MaxValue)
                        _initialSize = 4;
                    else
                    {
                        // choose an initial size that won't get us too close to maxSize when doubling
                        _initialSize = maxSize;
                        while (_initialSize >= 7)
                            _initialSize = (_initialSize + 1) / 2;
                    }
                }

                public bool IsEmpty { get; private set; } = true;

                public void Enqueue(TSource item)
                {
                    if (IsEmpty)
                    {
                        if (_buffer == null) _buffer = new TSource[_initialSize];
                        _buffer[0] = item;
                        _deq = 0;
                        _enq = 1;
                        IsEmpty = false;
                        return;
                    }
                    if (_deq == _enq) // full
                    {
                        if (_buffer.Length == _maxSize) // overwrite least recent
                        {
                            _buffer[_enq] = item;
                            if (++_enq == _buffer.Length) _enq = 0;
                            _deq = _enq;
                            return;
                        }

                        // increse buffer size
                        var newSize = _buffer.Length >= _maxSize / 2 ? _maxSize : 2 * _buffer.Length;
                        var newBuffer = new TSource[newSize];
                        var count = _buffer.Length - _deq;
                        Array.Copy(_buffer, _deq, newBuffer, 0, count);
                        Array.Copy(_buffer, 0, newBuffer, count, _deq);
                        _deq = 0;
                        _enq = _buffer.Length;
                        _buffer = newBuffer;
                    }
                    _buffer[_enq] = item;
                    if (++_enq == _buffer.Length) _enq = 0;
                }

                public TSource Dequeue()
                {
                    if (IsEmpty) throw new InvalidOperationException();

                    var result = ReadAndClear(ref _buffer[_deq]);
                    if (++_deq == _buffer.Length) _deq = 0;
                    if (_deq == _enq)
                    {
                        IsEmpty = true;
                        if (_buffer.Length > _initialSize) _buffer = null;
                    }
                    return result;
                }

                private static TSource ReadAndClear(ref TSource item)
                {
                    var result = item;
                    item = default(TSource);
                    return result;
                }
            }
        }

        #endregion
    }
}

Ответ 7

Здесь реализована реализация Task с семантикой отмены, которая не использует объект. Вызов dispose позволяет, если это необходимо, активировать подписанное действие.

    public static IDisposable SampleSubscribe<T>(this IObservable<T> observable, Action<T, CancellationToken> action)
    {
        var cancellation = new CancellationDisposable();
        var token = cancellation.Token;
        Task task = null;

        return new CompositeDisposable(
            cancellation,
            observable.Subscribe(value =>
            {
                if (task == null || task.IsCompleted)
                    task = Task.Factory.StartNew(() => action(value, token), token);
            })
        );
    }

Вот простой тест:

Observable.Interval(TimeSpan.FromMilliseconds(150))
                      .SampleSubscribe((v, ct) =>
                      {   
                          //cbeck for cancellation, do work
                          for (int i = 0; i < 10 && !ct.IsCancellationRequested; i++)
                              Thread.Sleep(100);

                          Console.WriteLine(v);
                      });

Выход:

0
7
14
21
28
35

Ответ 8

С Rx 2.0 RC вы можете использовать Chunkify, чтобы получить IEnumerable из списков, каждый из которых содержит то, что наблюдалось со времени последнего MoveNext.

Затем вы можете использовать ToObservable для преобразования обратно в IObservable и обращать внимание только на последнюю запись в каждом непустом списке.

var messages = Observable.Interval(TimeSpan.FromMilliseconds(100));

messages.Chunkify()
        .ToObservable(Scheduler.TaskPool)
        .Where(list => list.Any())
        .Select(list => list.Last())
        .Subscribe(n =>
        {
          Thread.Sleep(TimeSpan.FromMilliseconds(250));
          Console.WriteLine(n);
        });