Iteratees в Scala, которые используют ленивую оценку или слияние? - программирование
Подтвердить что ты не робот

Iteratees в Scala, которые используют ленивую оценку или слияние?

Я слышал, что iteratees ленивы, но насколько они ленивы? Альтернативно, можно ли итерации сливаться с функцией постпроцессинга, чтобы не создавать промежуточную структуру данных?

Могу ли я, например, создать 1 миллионный элемент Stream[Option[String]] из java.io.BufferedReader, а затем отфильтровать None s по-своему, не требуя весь поток будет храниться в памяти? И в то же время гарантируйте, что я не взорву стек? Или что-то в этом роде - ему не нужно использовать Stream.

В настоящее время я использую Scalaz 6, но если другие реализации iteratee могут сделать это лучше, мне было бы интересно узнать.

Пожалуйста, предоставьте полное решение, включая закрытие BufferedReader и вызов unsafePerformIO, если применимо.

4b9b3361

Ответ 1

Здесь приведен пример быстрой итерации с использованием библиотеки Scalaz 7, которая демонстрирует интересующие вас свойства: постоянная память и использование стека.

Проблема

Сначала предположим, что у нас есть большой текстовый файл с строкой десятичных цифр на каждой строке, и мы хотим найти все строки, содержащие не менее двадцати нулей. Мы можем сгенерировать некоторые данные образца следующим образом:

val w = new java.io.PrintWriter("numbers.txt")
val r = new scala.util.Random(0)

(1 to 1000000).foreach(_ =>
  w.println((1 to 100).map(_ => r.nextInt(10)).mkString)
)

w.close()

Теперь у нас есть файл с именем numbers.txt. Пусть это откроется с помощью BufferedReader:

val reader = new java.io.BufferedReader(new java.io.FileReader("numbers.txt"))

Он не слишком большой (~ 97 мегабайт), но он достаточно велик, чтобы мы могли легко видеть, остается ли наше использование памяти постоянным, пока мы его обрабатываем.

Настройка нашего счетчика

Сначала для некоторых импорта:

import scalaz._, Scalaz._, effect.IO, iteratee.{ Iteratee => I }

И перечислитель (обратите внимание, что я меняю IoExceptionOr на Option для удобства):

val enum = I.enumReader(reader).map(_.toOption)

Scalaz 7 в настоящее время не предоставляет хороший способ перечислить строки файлов, поэтому мы последовательно перебираем файл по одному символу. Конечно, это будет болезненно медленным, но я не буду беспокоиться об этом здесь, поскольку цель этой демонстрации заключается в том, чтобы показать, что мы можем обрабатывать этот большой файл в постоянной памяти и не дуть в стек. Последний раздел этого ответа дает подход с лучшей производительностью, но здесь мы просто разделим на разрывы строк:

val split = I.splitOn[Option[Char], List, IO](_.cata(_ != '\n', false))

И если тот факт, что splitOn берет предикат, который указывает, где не разбить, вас смущает, вы не одиноки. split - наш первый пример перечисления. Мы перейдем к нашему перечислителю:

val lines = split.run(enum).map(_.sequence.map(_.mkString))

Теперь у нас есть счетчик Option[String] в монаде IO.

Фильтрация файла с помощью enumeratee

Далее для нашего предиката - помните, что мы сказали, что нам нужны строки с не менее чем двадцатью нулями:

val pred = (_: String).count(_ == '0') >= 20

Мы можем превратить это в фильтрацию enumeratee и обернуть наш счетчик тем, что:

val filtered = I.filter[Option[String], IO](_.cata(pred, true)).run(lines)

Мы создадим простое действие, которое просто печатает все, что делает это через этот фильтр:

val printAction = (I.putStrTo[Option[String]](System.out) &= filtered).run

Конечно, мы еще ничего не читали. Для этого мы используем unsafePerformIO:

printAction.unsafePerformIO()

Теперь мы можем медленно просматривать прокрутку Some("0946943140969200621607610..."), пока наше использование памяти остается постоянным. Он медленный, и обработка ошибок и вывод немного неуклюжие, но не так уж плохо, я думаю примерно девять строк кода.

Получение вывода из итератора

Это было использование foreach -ish. Мы также можем создать итерацию, которая больше похожа на сгиб, например, на сбор элементов, которые проходят через фильтр и возвращают их в список. Просто повторите все выше до определения printAction, а затем напишите это:

val gatherAction = (I.consume[Option[String], IO, List] &= filtered).run

Убейте это действие:

val xs: Option[List[String]] = gatherAction.unsafePerformIO().sequence

Теперь иди кофе (возможно, он должен быть довольно далеко). Когда вы вернетесь, у вас будет либо None (в случае IOException где-то по пути), либо Some, содержащий список из 1,943 строк.

Завершить (быстрее) пример, который автоматически закрывает файл

Чтобы ответить на вопрос о закрытии читателя, вот полный рабочий пример, который примерно эквивалентен второй программе выше, но с перечислителем, который берет на себя ответственность за открытие и закрытие читателя. Это также намного, намного быстрее, поскольку он читает строки, а не символы. Сначала для импорта и пары вспомогательных методов:

import java.io.{ BufferedReader, File, FileReader }
import scalaz._, Scalaz._, effect._, iteratee.{ Iteratee => I, _ }

def tryIO[A, B](action: IO[B]) = I.iterateeT[A, IO, Either[Throwable, B]](
  action.catchLeft.map(
    r => I.sdone(r, r.fold(_ => I.eofInput, _ => I.emptyInput))
  )
)

def enumBuffered(r: => BufferedReader) =
  new EnumeratorT[Either[Throwable, String], IO] {
    lazy val reader = r
    def apply[A] = (s: StepT[Either[Throwable, String], IO, A]) => s.mapCont(
      k =>
        tryIO(IO(reader.readLine())).flatMap {
          case Right(null) => s.pointI
          case Right(line) => k(I.elInput(Right(line))) >>== apply[A]
          case e => k(I.elInput(e))
        }
    )
  }

И теперь перечислитель:

def enumFile(f: File): EnumeratorT[Either[Throwable, String], IO] =
  new EnumeratorT[Either[Throwable, String], IO] {
    def apply[A] = (s: StepT[Either[Throwable, String], IO, A]) => s.mapCont(
      k =>
        tryIO(IO(new BufferedReader(new FileReader(f)))).flatMap {
          case Right(reader) => I.iterateeT(
            enumBuffered(reader).apply(s).value.ensuring(IO(reader.close()))
          )
          case Left(e) => k(I.elInput(Left(e)))
        }
      )
  }

И мы готовы пойти:

val action = (
  I.consume[Either[Throwable, String], IO, List] %=
  I.filter(_.fold(_ => true, _.count(_ == '0') >= 20)) &=
  enumFile(new File("numbers.txt"))
).run

Теперь читатель будет закрыт, когда обработка будет выполнена.

Ответ 2

Я должен был прочитать немного дальше... это именно то, для чего нужны перечисляющие. Перечисления определены в Scalaz 7 и Play 2, но не в Scalaz 6.

Перечисления предназначены для "вертикального" состава (в смысле "вертикально интегрированной промышленности" ), а обычные итерации составляют монадически "горизонтальным" способом.