У меня есть некоторые проблемы с пониманием того, как subscribeOn/observOn работает в RxJava. Я создал простое приложение с наблюдаемым, которое испускает имена планет солнечной системы, выполняет некоторые сопоставления, фильтрует и печатает результаты.
Как я понимаю, планирование работы в фоновый поток выполняется с помощью оператора subscribeOn
(и, похоже, он работает нормально).
Наблюдение за фоновым потоком также отлично работает с оператором observeOn
.
Но у меня есть проблема в понимании, как наблюдать за вызовом потока (либо это основной поток, либо любой другой). Это легко сделать на Android с оператором AndroidSchedulers.mainThread()
, но я не знаю, как добиться этого в чистой java.
Здесь мой код:
public class Main {
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 5, 3000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
System.out.println("Main thread: " + getCurrentThreadInfo());
Observable<String> stringObservable = Observable.from(Arrays.asList("Merkury", "Wenus", "Ziemia", "Mars", "Jowisz", "Saturn", "Uran", "Neptun", "Pluton"))
.map(in -> {
System.out.println("map on: " + getCurrentThreadInfo());
return in.toUpperCase();
})
.filter(in -> {
System.out.println("filter on: " + getCurrentThreadInfo());
return in.contains("A");
})
.subscribeOn(Schedulers.from(executor));
for (int i = 0; i < 5; i++) {
Thread thread = new Thread("Thread-" + i) {
@Override
public void run() {
stringObservable
.buffer(5)
.subscribe(s -> System.out.println("Result " + s + " on: " + getCurrentThreadInfo()));
}
};
thread.start();
}
}
private static String getCurrentThreadInfo() {
return Thread.currentThread().getName() + "(" + Thread.currentThread().getId() + ")";
}
}
Наблюдаемый в созданном и работа подписан на один из трех потоков от исполнителя. Это работает так, как ожидалось. Но как наблюдать результаты по тем динамически созданным потокам в цикле? Есть ли способ создания Планировщика из текущего потока?
Кроме того, я узнал, что после запуска этого кода он никогда не заканчивается, и я не знаю почему?: (