Подтвердить что ты не робот

Как объединить два отсортированных наблюдателя в одну сортировку Observable?

Дано:

Integer[] arr1 = {1, 5, 9, 17};
Integer[] arr2 = {1, 2, 3, 6, 7, 12, 15};
Observable<Integer> o1 = Observable.from(arr1);
Observable<Integer> o2 = Observable.from(arr2);

Как получить Observable, содержащий 1, 1, 2, 3, 5, 6, 7, 9, 12, 15, 17?

4b9b3361

Ответ 1

Изменить: Пожалуйста, см. комментарий the_joric, если вы собираетесь использовать это. Есть краевой регистр, который не обрабатывается, я не вижу быстрого способа его исправить, и поэтому у меня нет времени исправить его прямо сейчас.

Здесь решение в С#, так как у вас есть тег system.reactive.

static IObservable<int> MergeSorted(IObservable<int> a, IObservable<int> b)
{
    var source = Observable.Merge(
        a.Select(x => Tuple.Create('a', x)),
        b.Select(y => Tuple.Create('b', y)));
    return source.Publish(o =>
    {
        var published_a = o.Where(t => t.Item1 == 'a').Select(t => t.Item2);
        var published_b = o.Where(t => t.Item1 == 'b').Select(t => t.Item2);
        return Observable.Merge(
            published_a.Delay(x => published_b.FirstOrDefaultAsync(y => x <= y)),
            published_b.Delay(y => published_a.FirstOrDefaultAsync(x => y <= x)));
    });
}

Идея суммируется следующим образом.

  • Когда a испускает значение x, мы задерживаем его до тех пор, пока b не испустит значение y, чтобы x <= y.

  • Когда b испускает значение y, мы задерживаем его до тех пор, пока a не испустит значение x, чтобы y <= x.

Если у вас были только горячие наблюдаемые, вы могли бы сделать следующее. Но следующее не будет работать, если в миксе присутствуют какие-либо холодные наблюдаемые. Я бы всегда советовал использовать версию, которая работает как для горячих, так и для холодных наблюдаемых.

static IObservable<int> MergeSortedHot(IObservable<int> a, IObservable<int> b)
{
    return Observable.Merge(
        a.Delay(x => b.FirstOrDefaultAsync(y => x <= y)),
        b.Delay(y => a.FirstOrDefaultAsync(x => y <= x)));
}

Ответ 2

Вы можете объединять, сортировать и сглаживать последовательности, но у него будут значительные накладные расходы:

o1.mergeWith(o2).toSortedList().flatMapIterable(v -> v).subscribe(...)

или

o1.concatWith(o2).toSortedList().flatMapIterable(v -> v).subscribe(...)

В противном случае вам нужно написать довольно сложный оператор.

Редактировать 04/06/2015:

Вот оператор, который выполняет эту сортировку-слияние более эффективно.

Ответ 3

Я также искал решение для сортировки слияния, которое поддерживало противодавление и не могло найти его. Поэтому я решил реализовать его самостоятельно самостоятельно на основе существующего оператора zip.

Аналогично zip, отсортированный оператор слияния собирает элемент из каждого источника, наблюдаемого первым, но затем помещает их в очередь приоритетов, из которой испускает их по одному в соответствии с их естественным порядком или указанный компаратор.

Вы можете захватить его из GitHub в качестве готовой к использованию библиотеки или просто скопировать/вставить код:

https://github.com/ybayk/rxjava-recipes

См. модульные тесты для использования.

Ответ 4

Это обсуждалось некоторое время назад в рассылке RxJava, вы найдете ссылки на возможные решения в этом потоке.

Ответ 5

Как насчет слияния и сортировки?

@Test
public void testMergeChains() {
    Observable.merge(Observable.from(Arrays.asList(1, 2, 13, 11, 5)), Observable.from(Arrays.asList(10, 4, 12, 3, 14, 15)))
              .collect(ArrayList<Integer>::new, ArrayList::add)
            .doOnNext(Collections::sort)
            .subscribe(System.out::println);

}

Здесь вы можете увидеть больше примеров.

https://github.com/politrons/reactive