Подтвердить что ты не робот

Split Rx Наблюдается в несколько потоков и обрабатывается индивидуально

Вот фотография того, что я пытаюсь выполнить.

- а-б-с-а - БББ - а

разбивается на

- a ----- a ------- a → поток

---- b ------ bbb --- → b поток

------ c ---------- → c поток

Тогда, уметь

a.subscribe()
b.subscribe()
c.subscribe()

До сих пор все, что я нашел, разделило поток с помощью groupBy(), но затем свернуло все обратно в один поток и обработало их все в одной и той же функции. То, что я хочу сделать, это обрабатывать каждый производный поток по-другому.

То, как я делаю это сейчас, - это набор фильтров. Есть ли лучший способ сделать это?

4b9b3361

Ответ 1

Вам не нужно сворачивать Observables с groupBy. Вы можете подписаться на них.

Что-то вроде этого:

    String[] inputs= {"a", "b", "c", "a", "b", "b", "b", "a"};

    Action1<String> a = s -> System.out.print("-a-");

    Action1<String> b = s -> System.out.print("-b-");

    Action1<String> c = s -> System.out.print("-c-");

    Observable
            .from(inputs)
            .groupBy(s -> s)
            .subscribe((g) -> {
                if ("a".equals(g.getKey())) {
                    g.subscribe(a);
                }

                if ("b".equals(g.getKey())) {
                    g.subscribe(b);
                }

                if ("c".equals(g.getKey())) {
                    g.subscribe(c);
                }
            });

Если утверждения выглядят кажущимися уродливыми, но по крайней мере вы можете обрабатывать каждый поток отдельно. Возможно, есть способ избежать их.

Ответ 2

Просто как пирог, просто используйте filter

Пример в scala

import rx.lang.scala.Observable

val o: Observable[String] = Observable.just("a", "b", "c", "a", "b", "b", "b", "a")
val hotO: Observable[String] = o.share
val aSource: Observable[String] = hotO.filter(x ⇒ x == "a")
val bSource: Observable[String] = hotO.filter(x ⇒ x == "b")
val cSource: Observable[String] = hotO.filter(x ⇒ x == "c")

aSource.subscribe(o ⇒ println("A: " + o), println, () ⇒ println("A Completed"))

bSource.subscribe(o ⇒ println("B: " + o), println, () ⇒ println("B Completed"))

cSource.subscribe(o ⇒ println("C: " + o), println, () ⇒ println("C Completed"))

Вам просто нужно убедиться, что источник, наблюдаемый, горячий. Самый простой способ - share it.