Подтвердить что ты не робот

Параллельный поток из HashSet не запускается параллельно

У меня есть набор элементов, которые я хочу обрабатывать параллельно. Когда я использую List, parallelism работает. Однако, когда я использую Set, он не запускается параллельно.

Я написал образец кода, который показывает проблему:

public static void main(String[] args) {
    ParallelTest test = new ParallelTest();

    List<Integer> list = Arrays.asList(1,2);
    Set<Integer> set = new HashSet<>(list);

    ForkJoinPool forkJoinPool = new ForkJoinPool(4);

    System.out.println("set print");
    try {
        forkJoinPool.submit(() ->
            set.parallelStream().forEach(test::print)
        ).get();
    } catch (Exception e) {
        return;
    }

    System.out.println("\n\nlist print");
    try {
        forkJoinPool.submit(() ->
            list.parallelStream().forEach(test::print)
        ).get();
    } catch (Exception e) {
        return;
    }   
}

private void print(int i){
    System.out.println("start: " + i);
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
    }
    System.out.println("end: " + i);
}

Это результат, который я получаю на окнах 7

set print
start: 1
end: 1
start: 2
end: 2

list print
start: 2
start: 1
end: 1
end: 2

Мы видим, что первый элемент из Set должен был закончить до обработки второго элемента. Для List второй элемент начинается до завершения первого элемента.

Можете ли вы рассказать мне, что вызывает эту проблему, и как ее избежать с помощью коллекции Set?

4b9b3361

Ответ 1

Я могу воспроизвести поведение, которое вы видите, когда параллелизм не соответствует параллелизму указанного вами параллелизма пула ветвления-соединения. После установки параллельности пула разветвленного объединения на 10 и увеличения количества элементов в коллекции до 50 я вижу, что параллелизм потока на основе списка увеличивается только до 6, тогда как параллелизм потока на основе набора никогда не становится выше 2.

Обратите внимание, однако, что этот метод отправки задачи в пул fork-join для запуска параллельного потока в этом пуле является "хитростью" реализации и не гарантированно работает. Действительно, потоки или пул потоков, который используется для выполнения параллельных потоков, не определены. По умолчанию используется общий пул fork-join, но в разных средах могут использоваться разные пулы потоков. (Рассмотрим контейнер на сервере приложений.)

В классе java.util.stream.AbstractTask поле LEAF_TARGET определяет объем LEAF_TARGET, что, в свою очередь, определяет степень параллелизма, которая может быть достигнута. Значение этого поля основано на ForkJoinPool.getCommonPoolParallelism() который, конечно, использует параллелизм общего пула, а не того пула, который выполняет задачи.

Возможно, это ошибка (см. Выпуск OpenJDK JDK-8190974), однако, эта область в любом случае не указана. Тем не менее, эта область системы определенно нуждается в разработке, например, с точки зрения политики разделения, степени доступного параллелизма, решения задач блокирования и других проблем. В будущем выпуске JDK могут быть рассмотрены некоторые из этих проблем.

Между тем, можно управлять параллелизмом общего пула ветвления и соединения с помощью системных свойств. Если вы добавите эту строку в вашу программу,

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "10");

и вы запустите потоки в общем пуле (или если вы отправите их в свой собственный пул с достаточно высоким уровнем параллелизма), вы увидите, что гораздо больше задач выполняется параллельно.

Вы также можете установить это свойство в командной строке, используя -D.

Опять же, это не гарантированное поведение, и оно может измениться в будущем. Но этот метод, вероятно, будет работать для реализаций JDK 8 в обозримом будущем.

ОБНОВЛЕНИЕ 2019-06-12: ошибка JDK-8190974 была исправлена в JDK 10, и это исправление было перенесено в предстоящий выпуск JDK 8u (8u222).