Как параллельные потоки Java 8 ведут себя в случае исключения в разделе потребления, например, при обработке forEach
? Например, следующий код:
final AtomicBoolean throwException = new AtomicBoolean(true);
IntStream.range(0, 1000)
.parallel()
.forEach(i -> {
// Throw only on one of the threads.
if (throwException.compareAndSet(true, false)) {
throw new RuntimeException("One of the tasks threw an exception. Index: " + i);
});
Остановит ли обработанные элементы сразу? Остается ли ждать завершения уже начатых элементов? Дождитесь завершения всего потока? Запускает ли он обработку элементов потока после исключения?
Когда он возвращается? Сразу после исключения? Ведь часть/часть элементов обрабатывалась потребителем?
Продолжают ли элементы обрабатываться после того, как параллельный поток выбрал исключение? (Найден случай, когда это произошло).
Здесь есть общее правило?
РЕДАКТИРОВАТЬ (15-11-2016)
Попытка определить, возвращается ли параллельный поток раньше, я обнаружил, что он не определен:
@Test
public void testParallelStreamWithException() {
AtomicInteger overallCount = new AtomicInteger(0);
AtomicInteger afterExceptionCount = new AtomicInteger(0);
AtomicBoolean throwException = new AtomicBoolean(true);
try {
IntStream.range(0, 1000)
.parallel()
.forEach(i -> {
overallCount.incrementAndGet();
afterExceptionCount.incrementAndGet();
try {
System.out.println(i + " Sleeping...");
Thread.sleep(1000);
System.out.println(i + " After Sleeping.");
}
catch (InterruptedException e) {
e.printStackTrace();
}
// Throw only on one of the threads and not on main thread.
if (!Thread.currentThread().getName().equals("main") && throwException.compareAndSet(true, false)) {
System.out.println("Throwing exception - " + i);
throw new RuntimeException("One of the tasks threw an exception. Index: " + i);
}
});
Assert.fail("Should not get here.");
}
catch (Exception e) {
System.out.println("Cought Exception. Resetting the afterExceptionCount to zero - 0.");
afterExceptionCount.set(0);
}
System.out.println("Overall count: " + overallCount.get());
System.out.println("After exception count: " + afterExceptionCount.get());
}
Поздний возврат при бросании не из основного потока. Это вызвало много элементов new, которые нужно обрабатывать после исключения. На моей машине было обработано около 200 элементов после исключения. НО, не все 1000 элементов были обработаны. Так что же такое здесь? Почему больше элементов обрабатывалось, даже если исключение было брошено?
Раннее возвращение при удалении знака not (!
), в результате чего исключение будет выбрано в основном потоке. Только обработанные элементы уже были обработаны, а новые не обработаны. Возвращение раньше было здесь. Не соответствует предыдущему поведению.
Что мне здесь не хватает?