Подтвердить что ты не робот

Параллельные потоки, коллекторы и безопасность потоков

См. простой пример ниже, который подсчитывает количество вхождений каждого слова в списке:

Stream<String> words = Stream.of("a", "b", "a", "c");
Map<String, Integer> wordsCount = words.collect(toMap(s -> s, s -> 1,
                                                      (i, j) -> i + j));

В конце < <21 > {a=2, b=1, c=1}.

Но мой поток очень большой, и я хочу распараллелить работу, поэтому пишу:

Map<String, Integer> wordsCount = words.parallel()
                                       .collect(toMap(s -> s, s -> 1,
                                                      (i, j) -> i + j));

Однако я заметил, что wordsCount является простым HashMap, поэтому я задаюсь вопросом, нужно ли мне явно запрашивать параллельную карту для обеспечения безопасности потоков:

Map<String, Integer> wordsCount = words.parallel()
                                       .collect(toConcurrentMap(s -> s, s -> 1,
                                                                (i, j) -> i + j));

Можно ли безопасно использовать неконкурентные коллекторы с параллельным потоком или использовать только параллельные версии при сборе из параллельного потока?

4b9b3361

Ответ 1

Можно ли безопасно использовать неавтономные коллекторы с параллельным потоком или использовать только параллельные версии при сборе из параллельного потока?

Безопасно использовать неконкурентный коллектор в операции collect параллельного потока.

В спецификация интерфейса Collector в разделе с полдюжины пулевых точек:

Для неконкурентных коллекторов любой результат, возвращаемый функциями поставщика результата, аккумулятора или объединителя, должен быть последовательно ограниченным потоком. Это позволяет собирать коллекцию параллельно, не имея коллектора, нуждающегося в реализации любой дополнительной синхронизации. Реализация сокращения должна управлять тем, что вход правильно разбит на разделы, что разделы обрабатываются изолированно, а объединение происходит только после завершения накопления.

Это означает, что различные реализации, предоставляемые классом Collectors, могут использоваться с параллельными потоками, хотя некоторые из этих реализаций могут быть не параллельными коллекторами. Это также относится к любому из ваших собственных несовпадающих коллекционеров, которые вы можете реализовать. Они могут безопасно использоваться с параллельными потоками, если ваши коллекторы не мешают источнику потока, свободны от побочных эффектов, независимы от заказа и т.д.

Я также рекомендую прочитать раздел Mutable Reduction в документации пакета java.util.stream. В середине этого раздела приведен пример, который считается параллелизуемым, но который собирает результаты в ArrayList, который не является потокобезопасным.

Как это работает, параллельный поток, заканчивающийся в неконкурентном коллекторе, гарантирует, что разные потоки всегда работают в разных экземплярах промежуточных коллекций результатов. Поэтому сборщик имеет функцию Supplier для создания как можно большего количества промежуточных коллекций, чем потоки, поэтому каждый поток может накапливаться в свои собственные. Когда промежуточные результаты должны быть объединены, они безопасно передаются между потоками, и в любой момент времени только один поток объединяет любую пару промежуточных результатов.

Ответ 2

Все коллекторы, если они следуют правилам в спецификации, безопасны для запуска параллельно или последовательно. Параллельная готовность является ключевой частью дизайна здесь.

Различие между параллельными и неконкурентными коллекторами связано с подходом к распараллеливанию.

Обычный (неконкурентный) сборщик работает путем слияния суб-результатов. Таким образом, источник разбивается на кучу кусков, каждый фрагмент собран в контейнер результатов (например, список или карту), а затем суб-результаты объединяются в контейнер большего результата. Это безопасно и сохраняет порядок, но для некоторых видов контейнеров, особенно карт, может быть дорого, так как слияние двух карт по ключу часто бывает дорогостоящим.

Параллельный коллектор вместо этого создает один контейнер результата, операции вставки которого гарантируют поточность и взрывают элементы из нескольких потоков. С очень параллельным контейнером результата, таким как ConcurrentHashMap, этот подход может работать лучше, чем слияние обычных HashMaps.

Таким образом, параллельные коллекторы представляют собой строго оптимизацию по сравнению с обычными аналогами. И они не приходят без затрат; потому что элементы взорваны из многих потоков, параллельные коллекторы обычно не могут сохранить порядок встреч. (Но часто вам все равно - при создании гистограммы счисления слов вам все равно, какой экземпляр "foo" вы считали первым.)

Ответ 3

Безопасно использовать неконкурентные коллекции и неатомарные счетчики с параллельными потоками.

Если вы посмотрите документацию Stream:: collect, вы найдете следующий абзац:

Как и reduce(Object, BinaryOperator), операции сбора могут быть распараллелены без дополнительной синхронизации.

И для метода Stream:: уменьшить:

Хотя это может показаться более окольным способом выполнения агрегации по сравнению с простое изменение общего количества в цикле, операции сокращения распараллеливаются более грациозно, без необходимости дополнительной синхронизации и с значительно меньшим риском расследований данных.

Это может быть немного удивительно. Тем не менее, обратите внимание, что параллельные потоки основаны на модели объединения fork. Это означает, что одновременное выполнение работает следующим образом:

  • разделяет последовательность на две части с примерно одинаковым размером
  • обрабатывать каждую часть отдельно
  • собрать результаты обеих частей и объединить их в один результат

На втором этапе три этапа рекурсивно применяются к подпоследовательности.

Пример должен сделать это понятным.

IntStream.range(0, 4)
    .parallel()
    .collect(Trace::new, Trace::accumulate, Trace::combine);

Единственная цель класса Trace - регистрировать вызовы конструктора и метода. Если вы выполняете это утверждение, он печатает следующие строки:

thread:  9  /  operation: new
thread: 10  /  operation: new
thread: 10  /  operation: accumulate
thread:  1  /  operation: new
thread:  1  /  operation: accumulate
thread:  1  /  operation: combine
thread: 11  /  operation: new
thread: 11  /  operation: accumulate
thread:  9  /  operation: accumulate
thread:  9  /  operation: combine
thread:  9  /  operation: combine

Вы можете видеть, что были созданы четыре объекта Trace, накопление было вызвано один раз на каждом объекте, и объединение было использовано три раза, чтобы объединить четыре объекта в один. Доступ к каждому объекту возможен только по одному потоку за раз. Это делает код потокобезопасным, и то же самое относится к методу Collectors:: toMap.