Подтвердить что ты не робот

Task.async в Elixir Stream

Я хочу сделать параллельную карту над большим списком. Код выглядит примерно так:

big_list
|> Stream.map(&Task.async(Module, :do_something, [&1]))
|> Stream.map(&Task.await(&1))
|> Enum.filter filter_fun

Но я проверял реализацию Stream, и насколько я понимаю, Stream.map объединяет функции и применяет комбинированную функцию к элементам в потоке, что означает, что последовательность выглядит так:

  • Возьмите первый элемент
  • Создать задачу async
  • Подождите, пока он закончит.
  • Возьмите второй элемент...

В этом случае он не делает это параллельно. Я прав, или я чего-то не хватает?

Если я прав, как насчет этого кода?

Stream.map Task.async ...
|> Enum.map Task.await ...

Это будет работать параллельно?

4b9b3361

Ответ 1

Второй также не делает то, что вы хотите. Вы можете ясно видеть это с помощью этого кода:

defmodule Test do
  def test do
    [1,2,3]
    |> Stream.map(&Task.async(Test, :job, [&1]))
    |> Enum.map(&Task.await(&1))
  end

  def job(number) do
    :timer.sleep 1000
    IO.inspect(number)
  end
end

Test.test

Вы увидите число, затем 1 секунду ожидания, другое число и т.д. Ключевым моментом здесь является то, что вы хотите как можно скорее создать задачи, поэтому не следует использовать ленивый Stream.map вообще. Вместо этого используйте нетерпение Enum.map в этой точке:

|> Enum.map(&Task.async(Test, :job, [&1]))
|> Enum.map(&Task.await(&1))

С другой стороны, вы можете использовать Stream.map при ожидании, до тех пор, пока вы выполняете какую-либо активную операцию позже, например, filter. Таким образом, ожидания будут перемешаны с любой обработкой, которую вы могли бы делать по результатам.

Ответ 2

Elixir 1.4 предоставляет новую функцию Task.async_stream/5, которая будет возвращать поток, который одновременно запускает заданную функцию по каждому элементу в перечислим.

Существуют также опции для указания максимального числа рабочих и таймаута с использованием параметров параметров :max_concurrency и :timeout.


Это приведет к тому, что ваш пример будет запущен одновременно:

big_list
|> Task.async_stream(Module, :do_something, [&1])
|> Enum.filter(filter_fun)

Ответ 3

Вы можете попробовать Parallel Stream.

stream = 1..10 |> ParallelStream.map(fn i -> i * 2 end)
stream |> Enum.into([])
[2,4,6,8,10,12,14,16,18,20]

UPD Или лучше использовать Flow