Подтвердить что ты не робот

Scala степень параллельной коллекции parallelism

Есть ли эквивалент в scala параллельных коллекциях LINQ withDegreeOfParallelism, который задает количество потоков, которые будут запускать запрос? Я хочу запустить параллельную операцию, которая должна иметь заданное количество потоков.

4b9b3361

Ответ 1

С помощью новейшей соединительной линии с использованием JVM 1.6 или новее используйте:

collection.parallel.ForkJoinTasks.defaultForkJoinPool.setParallelism(parlevel: Int)

Это может быть предметом изменений в будущем. Для следующих выпусков запланирован более унифицированный подход к настройке всех Scala задач параллельных API.

Обратите внимание, однако, что, пока это будет определять количество процессоров, используемых в запросе, это может быть не фактическое количество потоков, участвующих в выполнении запроса. Поскольку параллельные коллекции поддерживают вложенный parallelism, фактическая реализация пула потоков может выделять больше потоков для запуска запроса, если он обнаруживает, что это необходимо.

EDIT:

Начиная с Scala 2.10, предпочтительным способом установки уровня parallelism является установка поля tasksupport для нового объекта tasksupport, как в следующем примере:

scala> import scala.collection.parallel._
import scala.collection.parallel._

scala> val pc = mutable.ParArray(1, 2, 3)
pc: scala.collection.parallel.mutable.ParArray[Int] = ParArray(1, 2, 3)

scala> pc.tasksupport = new ForkJoinTaskSupport(new scala.concurrent.forkjoin.ForkJoinPool(2))
pc.tasksupport: scala.collection.parallel.TaskSupport = [email protected]

scala> pc map { _ + 1 }
res0: scala.collection.parallel.mutable.ParArray[Int] = ParArray(2, 3, 4)

При создании объекта ForkJoinTaskSupport с пулом соединений fork уровень parallelism пула соединений fork должен быть установлен в нужное значение (2 в примере).

Ответ 2

Независимо от версии JVM с Scala 2.9+ (введенные параллельные коллекции) вы также можете использовать комбинацию функций grouped(Int) и par для выполнения параллельных заданий на небольших кусках, например:

scala> val c = 1 to 5
c: scala.collection.immutable.Range.Inclusive = Range(1, 2, 3, 4, 5)

scala> c.grouped(2).seq.flatMap(_.par.map(_ * 2)).toList
res11: List[Int] = List(2, 4, 6, 8, 10)

grouped(2) создает куски длиной 2 или менее, seq гарантирует, что сбор кусков не параллелен (бесполезен в этом примере), тогда функция _ * 2 выполняется на небольших параллельных фрагментах (созданных с помощью par), таким образом гарантируя, что не более двух потоков выполняется параллельно.

Это может быть немного менее эффективным, чем установка параметра пула работников, я не уверен в этом.