Подтвердить что ты не робот

RxJava Наблюдение за вызовом/подпиской на поток

У меня есть некоторые проблемы с пониманием того, как subscribeOn/observOn работает в RxJava. Я создал простое приложение с наблюдаемым, которое испускает имена планет солнечной системы, выполняет некоторые сопоставления, фильтрует и печатает результаты.

Как я понимаю, планирование работы в фоновый поток выполняется с помощью оператора subscribeOn (и, похоже, он работает нормально).

Наблюдение за фоновым потоком также отлично работает с оператором observeOn.

Но у меня есть проблема в понимании, как наблюдать за вызовом потока (либо это основной поток, либо любой другой). Это легко сделать на Android с оператором AndroidSchedulers.mainThread(), но я не знаю, как добиться этого в чистой java.

Здесь мой код:

public class Main {

    public static void main(String[] args) throws InterruptedException {

        ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 5, 3000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());

        System.out.println("Main thread: " + getCurrentThreadInfo());

        Observable<String> stringObservable = Observable.from(Arrays.asList("Merkury", "Wenus", "Ziemia", "Mars", "Jowisz", "Saturn", "Uran", "Neptun", "Pluton"))
                .map(in -> {
                    System.out.println("map on: " + getCurrentThreadInfo());
                    return in.toUpperCase();
                })
                .filter(in -> {
                    System.out.println("filter on: " + getCurrentThreadInfo());
                    return in.contains("A");
                })
                .subscribeOn(Schedulers.from(executor));

        for (int i = 0; i < 5; i++) {
            Thread thread = new Thread("Thread-" + i) {
                @Override
                public void run() {
                    stringObservable
                            .buffer(5)
                            .subscribe(s -> System.out.println("Result " + s + " on: " + getCurrentThreadInfo()));
                }
            };
            thread.start();
        }

    }

    private static String getCurrentThreadInfo() {
        return Thread.currentThread().getName() + "(" + Thread.currentThread().getId() + ")";
    }
}

Наблюдаемый в созданном и работа подписан на один из трех потоков от исполнителя. Это работает так, как ожидалось. Но как наблюдать результаты по тем динамически созданным потокам в цикле? Есть ли способ создания Планировщика из текущего потока?

Кроме того, я узнал, что после запуска этого кода он никогда не заканчивается, и я не знаю почему?: (

4b9b3361

Ответ 1

Чтобы ответить на ваш вопрос, позвольте мне начать с самого начала, это позволяет другим людям понять то, что вы уже знаете.

Планировщики

Планировщики играют ту же роль, что и Executors for Java. Вкратце - они решают, какие действия потока выполняются.

Обычно Observable и операторы выполняются в текущем потоке. Иногда вы можете передать Scheduler в Observable или оператор в качестве параметра (например, Observable.timer()).

Кроме того, RxJava предоставляет 2 оператора для указания планировщика:

  • subscribeOn - укажите планировщик, на котором будет работать Observable
  • наблюдать за - указать планировщик, на котором наблюдатель будет наблюдать эту наблюдаемую

Чтобы понять их быстро, я использую пример кода:

Во всех примерах я буду использовать помощник createObservable, который выдает имя потока, в котором работает Observable:

 public static Observable<String> createObservable(){
        return Observable.create((Subscriber<? super String> subscriber) -> {
                subscriber.onNext(Thread.currentThread().getName());
                subscriber.onCompleted();
            }
        );
    }

Без планировщиков:

createObservable().subscribe(message -> {
        System.out.println("Case 1 Observable thread " + message);
        System.out.println("Case 1 Observer thread " + Thread.currentThread().getName());
    });
    //will print:
    //Case 1 Observable thread main
    //Case 1 Observer thread main

SubscribeOn:

createObservable()
            .subscribeOn(Schedulers.newThread())
            .subscribe(message -> {
                System.out.println("Case 2 Observable thread " + message);
                System.out.println("Case 2 Observer thread " + Thread.currentThread().getName());
            });
            //will print:
            //Case 2 Observable thread RxNewThreadScheduler-1
            //Case 2 Observer thread RxNewThreadScheduler-1

Подписаться и Наблюдать за:

reateObservable()
            .subscribeOn(Schedulers.newThread())
            .observeOn(Schedulers.newThread())
            .subscribe(message -> {
                System.out.println("Case 3 Observable thread " + message);
                System.out.println("Case 3 Observer thread " + Thread.currentThread().getName());
            });
            //will print:
            //Case 3 Observable thread RxNewThreadScheduler-2
            //Case 3 Observer thread RxNewThreadScheduler-1

ObserveOn:

createObservable()
            .observeOn(Schedulers.newThread())
            .subscribe(message -> {
                System.out.println("Case 4 Observable thread " + message);
                System.out.println("Case 4 Observer thread " + Thread.currentThread().getName());
            });
            //will print:
            //Case 4 Observable thread main
            //Case 4 Observer thread RxNewThreadScheduler-1

Ответ:

AndroidSchedulers.mainThread() возвращает шедулер, который делегирует работу MessageQueue, связанной с основным потоком.
Для этого он использует android.os.Looper.getMainLooper() и android.os.Handler.

Другими словами, если вы хотите указать конкретный поток, вы должны предоставить средства для планирования и выполнения задач в потоке.

Под ним может использоваться любой тип MQ для хранения задач и логики, которая зацикливает очередь и выполняет задачи.

В Java у нас есть Executor, который предназначен для таких задач. RxJava может легко создать планировщик из такого исполнителя.

Ниже приведен пример, который показывает, как вы можете наблюдать в основном потоке (не особенно полезно, но показывает все необходимые части).

public class RunCurrentThread implements Executor {

    private BlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>();

    public static void main(String[] args) throws InterruptedException {
        RunCurrentThread sample = new RunCurrentThread();
        sample.observerOnMain();
        sample.runLoop();
    }

    private void observerOnMain() {
        createObservable()
                .subscribeOn(Schedulers.newThread())
                .observeOn(Schedulers.from(this))
                .subscribe(message -> {
                    System.out.println("Observable thread " + message);
                    System.out.println("Observer thread " + Thread.currentThread().getName());
                });
        ;
    }

    public Observable<String> createObservable() {
        return Observable.create((Subscriber<? super String> subscriber) -> {
                    subscriber.onNext(Thread.currentThread().getName());
                    subscriber.onCompleted();
                }
        );
    }

    private void runLoop() throws InterruptedException {
        while(!Thread.interrupted()){
            tasks.take().run();
        }
    }

    @Override
    public void execute(Runnable command) {
        tasks.add(command);
    }
}

И последний вопрос, почему ваш код не заканчивается:

ThreadPoolExecutor по умолчанию использует не демоновские потоки, поэтому ваша программа не завершается, пока они не существуют. Вы должны использовать метод shutdown, чтобы закрыть потоки.

Ответ 2

Здесь приведен упрощенный пример, обновленный для RxJava 2. Это та же концепция, что и Marek: Executor, который добавляет runnables к BlockingQueue, потребляемому в потоке вызывающего.

public class ThreadTest {

    @Test
    public void test() throws InterruptedException {

        final BlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>();

        System.out.println("Caller thread: " + Thread.currentThread().getName());

        Observable.fromCallable(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                System.out.println("Observable thread: " + Thread.currentThread().getName());
                return 1;
            }
        })
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.from(new Executor() {
                @Override
                public void execute(@NonNull Runnable runnable) {
                    tasks.add(runnable);
                }
            }))
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(@NonNull Integer integer) throws Exception {
                    System.out.println("Observer thread: " + Thread.currentThread().getName());
                }
            });
        tasks.take().run();
    }

}

// Output: 
// Caller thread main
// Observable thread RxCachedThreadScheduler-1
// Observer thread main