Подтвердить что ты не робот

Параллельный итератор в Scala

Возможно ли, используя Scala параллельные коллекции, чтобы распараллелить Iterator без предварительной оценки?

Здесь я говорю о параллелизации функциональных преобразований на Iterator, а именно map и flatMap. Я думаю, что это требует предварительной оценки некоторых элементов Iterator, а затем вычисления большего количества, как только некоторые из них будут потребляться через next.

Все, что я мог найти, потребует, чтобы итератор был преобразован в Iterable или Stream в лучшем случае. Stream затем получает полную оценку, когда я вызываю .par на нем.

Я также приветствую предложения по реализации, если это не доступно. Реализации должны поддерживать параллельные map и flatMap.

4b9b3361

Ответ 1

Я понимаю, что это старый вопрос, но реализация ParIterator в iterata библиотека делает то, что вы искали?

scala> import com.timgroup.iterata.ParIterator.Implicits._
scala> val it = (1 to 100000).toIterator.par().map(n => (n + 1, Thread.currentThread.getId))
scala> it.map(_._2).toSet.size
res2: Int = 8 // addition was distributed over 8 threads

Ответ 2

Лучше всего использовать стандартную библиотеку, возможно, не используя параллельные коллекции, но concurrent.Future.traverse:

import concurrent._
import ExecutionContext.Implicits.global
Future.traverse(Iterator(1,2,3))(i => Future{ i*i })

хотя я думаю, что это выполнит все, начиная с самого начала.

Ответ 3

От ML, перемещая элементы итератора параллельно:

https://groups.google.com/d/msg/scala-user/q2NVdE6MAGE/KnutOq3iT3IJ

Я отошел по Future.traverse по той же причине. В моем случае использования, поддерживая N рабочих мест, я получил код для дросселя, подающего контекст выполнения из очереди заданий.

Моя первая попытка заключалась в блокировании потока фидера, но это рисковало также блокировать задачи, которые хотели вызвать задачи в контексте выполнения. Что вы знаете, блокирование - это зло.

Ответ 4

Немного сложно точно следить за тем, что вы после, но, возможно, это примерно так:

val f = (x: Int) => x + 1
val s = (0 to 9).toStream map f splitAt(6) match { 
  case (left, right) => left.par; right 
}

Это будет определять f на первых 6 элементах параллельно, а затем возвращать поток поверх остальных.