Подтвердить что ты не робот

Эликсир: могу ли я использовать Stream.resource для постепенного чтения большого файла данных?

Я знаю, как использовать Stream.resource(), чтобы взять первые 5 строк из файл и поместить их в список.

str = Stream.resource(fn -> File.open!("./data/fidap011.mtx") end,
                fn file ->
                  case IO.read(file, :line) do
                    data when is_binary(data) -> {[data], file}
                    _ -> {:halt, file}
                  end
                end,
                fn file -> File.close(file) end)
str |>  Enum.take(5)

Но как мне взять, скажем, следующие 5 строк из одного потока? Если я снова напечатаю:

str |>  Enum.take(5)

Я получаю только первые 5 строк.

Я пропустил что-то очевидное здесь?

В конце концов, я хочу прочитать достаточно данных из моего потока, чтобы вызвать некоторые процессы что обрабатывают данные. Когда некоторые из этих процессов завершены, я желаю читать больше из одного потока и, таким образом, обрабатывать следующий набор данных и т.д. Должен ли Stream.chunk() играть здесь? Но без примера я, похоже, не могу понять, как это сделать.

EDIT - несколько итераций дизайна позже!

В моих целях проще использовать Stream. Вместо этого я просто создаю указатель/процесс файла, используя

{: ok, fp} = File.open( "data/fidap011.mtx" )

то я фактически передаю это fp 30000 различным порожденным процессам и им не трудно читать от нее, когда захотят. Каждый из этих процессов меняет свое состояние, читая его новое состояние переменные из файла. В модуле ниже oR и vR находятся два "роутер", которые получают сообщение - код является частью редкой матричный/векторный множитель.

defmodule M_Cells do
 @moduledoc """
 Provides matrix related code
 Each cell process serves for that row & col
 """

 defp get_next_state( fp ) do
    case IO.read( fp, :line ) do
        data when is_binary(data) ->
            [rs,cs,vs] = String.split( data )
            r = String.to_integer(rs)
            c = String.to_integer(cs)
            v = String.to_float(vs)
            {r,c,v}
        _ -> 
            File.close( fp )
            :fail
    end
 end


 defp loop(fp, r,c,v, oR,vR) do
  # Maintains state of Matrix Cell, row, col, value 
  # receives msgs and responds
   receive do

    :start  ->  
        send vR, { :multiply, c, self() }  # get values for operands via router vR
        loop(fp, r,c,v, oR,vR)

    { :multiply, w } ->  # handle request to multiply by w and relay to router oR
        send oR, { :sum, r, v*w }
        case get_next_state( fp ) do # read line from file and fill in rcv
            {r1,c1,v1} ->
                send vR, { :multiply, c1, self() }
                loop(fp, r1,c1,v1, oR,vR)
            _ -> ## error or end of file etc
              ##IO.puts(":kill rcv: #{r},#{c},#{v}")
              Process.exit( self(), :kill )
        end
   end
 end

 # Launch each matrix cell using iteration by tail recursion
 def launch(_fp, _oR,_vR, result, 0) do
   result |> Enum.reverse # reverse is cosmetic, not substantive
 end

 def launch(fp, oR,vR, result, count) do
    #IO.inspect count
    case get_next_state( fp ) do
        {r,c,v} ->
            pid = spawn fn -> loop( fp, r,c,v, oR,vR) end
            launch( fp, oR,vR, [pid|result], count-1 )

        _ -> ## error or end of file etc, skip to count 0
            launch( fp, oR,vR, result, 0 )
    end
 end

end

наслаждайтесь!

4b9b3361

Ответ 1

В качестве побочной заметки основной задачей является создание потока из файла. Об этом позаботились, поэтому вы можете просто использовать File.stream! для создания потока, не нужно напрямую использовать Stream.resource.

Относительно вашего первоначального вопроса: да, вы правы, Stream.chunk - это путь сюда. Он будет лениво разделить поток на куски поставляемого размера:

File.stream!("./data/fidap011.mtx") |> Stream.chunk(5) |> Enum.each(fn chunk ->
  # do something with chunk
end)