Подтвердить что ты не робот

Как ведут себя параллельные потоки Java 8 на исключение?

Как параллельные потоки Java 8 ведут себя в случае исключения в разделе потребления, например, при обработке forEach? Например, следующий код:

final AtomicBoolean throwException = new AtomicBoolean(true);
IntStream.range(0, 1000)
    .parallel()
    .forEach(i -> {
        // Throw only on one of the threads.
        if (throwException.compareAndSet(true, false)) {
            throw new RuntimeException("One of the tasks threw an exception. Index: " + i);
        });

Остановит ли обработанные элементы сразу? Остается ли ждать завершения уже начатых элементов? Дождитесь завершения всего потока? Запускает ли он обработку элементов потока после исключения?

Когда он возвращается? Сразу после исключения? Ведь часть/часть элементов обрабатывалась потребителем?

Продолжают ли элементы обрабатываться после того, как параллельный поток выбрал исключение? (Найден случай, когда это произошло).

Здесь есть общее правило?

РЕДАКТИРОВАТЬ (15-11-2016)

Попытка определить, возвращается ли параллельный поток раньше, я обнаружил, что он не определен:

@Test
public void testParallelStreamWithException() {
    AtomicInteger overallCount = new AtomicInteger(0);
    AtomicInteger afterExceptionCount = new AtomicInteger(0);
    AtomicBoolean throwException = new AtomicBoolean(true);

    try {
        IntStream.range(0, 1000)
            .parallel()
            .forEach(i -> {
                overallCount.incrementAndGet();
                afterExceptionCount.incrementAndGet();
                try {
                    System.out.println(i + " Sleeping...");
                    Thread.sleep(1000);
                    System.out.println(i + " After Sleeping.");
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // Throw only on one of the threads and not on main thread.
                if (!Thread.currentThread().getName().equals("main") && throwException.compareAndSet(true, false)) {
                    System.out.println("Throwing exception - " + i);
                    throw new RuntimeException("One of the tasks threw an exception. Index: " + i);
                }
            });
        Assert.fail("Should not get here.");
    }
    catch (Exception e) {
        System.out.println("Cought Exception. Resetting the afterExceptionCount to zero - 0.");
        afterExceptionCount.set(0);
    }
    System.out.println("Overall count: " + overallCount.get());
    System.out.println("After exception count: " + afterExceptionCount.get());
}

Поздний возврат при бросании не из основного потока. Это вызвало много элементов new, которые нужно обрабатывать после исключения. На моей машине было обработано около 200 элементов после исключения. НО, не все 1000 элементов были обработаны. Так что же такое здесь? Почему больше элементов обрабатывалось, даже если исключение было брошено?

Раннее возвращение при удалении знака not (!), в результате чего исключение будет выбрано в основном потоке. Только обработанные элементы уже были обработаны, а новые не обработаны. Возвращение раньше было здесь. Не соответствует предыдущему поведению.

Что мне здесь не хватает?

4b9b3361

Ответ 1

Когда какое-либо исключение вызывается на одном из этапов, оно не дожидается завершения других операций, исключение повторно передается вызывающему. Вот как это делает ForkJoinPool.

В отличие от findFirst, например, при параллельном запуске, результат будет представлен вызывающему абоненту только после завершения всех операций обработки (даже если результат известен до завершения всех операций).

Другими словами: он вернется раньше, но оставит все текущие задачи законченными.

ИЗМЕНИТЬ, чтобы ответить на последний комментарий

Это очень объясняется ответом Хольгера (ссылка в комментариях), но вот некоторые подробности.

1) Когда вы убиваете все, кроме основного потока, вы также убиваете все задачи, которые должны были выполняться этими потоками. Так что число должно быть больше около 250, поскольку есть 1000 задач и 4 потока, я предполагаю, что это возвращает 3?:

int result = ForkJoinPool.getCommonPoolParallelism();

Теоретически существует 1000 задач, есть 4 потока, каждый из которых должен обрабатывать 250 задач, а затем вы убиваете 3 из них, что потеряло 750 задач. Осталось 250 задач, и ForkJoinPool будет охватывать 3 новых потока для выполнения этих 250 левых задач.

Несколько вещей, которые вы можете попробовать, измените свой поток следующим образом (сделав поток не размером):

IntStream.generate(random::nextInt).limit(1000).parallel().forEach

На этот раз закончится гораздо больше операций, потому что начальный индекс разделения неизвестен и выбран другой стратегией. Вы также можете попробовать:

 if (!Thread.currentThread().getName().equals("main") && throwException.compareAndSet(true, false)) {

:

 if (!Thread.currentThread().getName().equals("main")) {

На этот раз вы всегда будете убивать все потоки, кроме основного, до определенного момента, когда ForkJoinPool не будет создавать новые потоки, поскольку задача слишком мала для разделения, поэтому нет необходимости в других потоках. В этом случае еще меньше задач будет завершено.

2) Второй пример, когда вы фактически убиваете основной поток, как код пути, вы не увидите фактический запуск других потоков. Измените его:

    } catch (Exception e) {
        System.out.println("Cought Exception. Resetting the afterExceptionCount to zero - 0.");
        afterExceptionCount.set(0);
    }

    // give some time for other threads to finish their work. You could play commenting and de-commenting this line to see a big difference in results. 
    TimeUnit.SECONDS.sleep(60);

    System.out.println("Overall count: " + overallCount.get());
    System.out.println("After exception count: " + afterExceptionCount.get());