Подтвердить что ты не робот

Функциональная обработка потоков Scala без ошибок OutOfMemory

Можно ли применить функциональное программирование к потокам Scala, так что поток обрабатывается последовательно, но уже обработанная часть потока может быть собрана мусором?

Например, я определяю Stream, который содержит числа от start до end:

def fromToStream(start: Int, end: Int) : Stream[Int] = {
  if (end < start) Stream.empty
  else start #:: fromToStream(start+1, end)
}

Если подытожить значения в функциональном стиле:

println(fromToStream(1,10000000).reduceLeft(_+_))

Я получаю OutOfMemoryError - возможно, так как стековый кадр вызова reduceLeft содержит ссылку на заголовок потока. Но если я делаю это в итеративном стиле, он работает:

var sum = 0
for (i <- fromToStream(1,10000000)) {
  sum += i
}

Есть ли способ сделать это в функциональном стиле, не получив OutOfMemory?

UPDATE. Это была ошибка в scala, которая исправлена. Так что теперь это более или менее устарело.

4b9b3361

Ответ 1

Да, вы можете. Хитрость заключается в использовании рекурсивных методов хвоста, так что локальный стек стека содержит единственную ссылку на экземпляр Stream. Поскольку метод является хвостовым рекурсивным, локальная ссылка на предыдущую голову Stream будет удалена, как только она рекурсивно вызовет себя, тем самым позволяя GC собирать начало Stream по мере продвижения.

Welcome to Scala version 2.9.0.r23459-b20101108091606 (Java HotSpot(TM) Server VM, Java 1.6.0_20).
Type in expressions to have them evaluated.
Type :help for more information.

scala> import collection.immutable.Stream
import collection.immutable.Stream

scala> import annotation.tailrec
import annotation.tailrec

scala> @tailrec def last(s: Stream[Int]): Int = if (s.tail.isEmpty) s.head else last(s.tail)
last: (s: scala.collection.immutable.Stream[Int])Int

scala> last(Stream.range(0, 100000000))                                                                             
res2: Int = 99999999

Кроме того, вы должны убедиться, что вещь, которую вы передаете методу last выше, имеет только одну ссылку в стеке. Если вы храните Stream в локальной переменной или значении, это не будет собирать мусор, когда вы вызываете метод last, так как его аргумент не является единственной ссылкой, оставшейся до Stream. В приведенном ниже коде не хватает памяти.

scala> val s = Stream.range(0, 100000000)                                                                           
s: scala.collection.immutable.Stream[Int] = Stream(0, ?)                                                            

scala> last(s)                                                                                                      
Exception in thread "main" java.lang.OutOfMemoryError: Java heap space                                              
        at sun.net.www.ParseUtil.encodePath(ParseUtil.java:84)                                                      
        at sun.misc.URLClassPath$JarLoader.checkResource(URLClassPath.java:674)                                     
        at sun.misc.URLClassPath$JarLoader.getResource(URLClassPath.java:759)                                       
        at sun.misc.URLClassPath.getResource(URLClassPath.java:169)                                                 
        at java.net.URLClassLoader$1.run(URLClassLoader.java:194)                                                   
        at java.security.AccessController.doPrivileged(Native Method)                                               
        at java.net.URLClassLoader.findClass(URLClassLoader.java:190)                                               
        at java.lang.ClassLoader.loadClass(ClassLoader.java:307)                                                    
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)                                            
        at java.lang.ClassLoader.loadClass(ClassLoader.java:248)                                                    
        at scala.tools.nsc.Interpreter$Request$$anonfun$onErr$1$1.apply(Interpreter.scala:978)                      
        at scala.tools.nsc.Interpreter$Request$$anonfun$onErr$1$1.apply(Interpreter.scala:976)                      
        at scala.util.control.Exception$Catch.apply(Exception.scala:80)
        at scala.tools.nsc.Interpreter$Request.loadAndRun(Interpreter.scala:984)                                    
        at scala.tools.nsc.Interpreter.loadAndRunReq$1(Interpreter.scala:579)                                       
        at scala.tools.nsc.Interpreter.interpret(Interpreter.scala:599)                                             
        at scala.tools.nsc.Interpreter.interpret(Interpreter.scala:576)
        at scala.tools.nsc.InterpreterLoop.reallyInterpret$1(InterpreterLoop.scala:472)                             
        at scala.tools.nsc.InterpreterLoop.interpretStartingWith(InterpreterLoop.scala:515)                         
        at scala.tools.nsc.InterpreterLoop.command(InterpreterLoop.scala:362)
        at scala.tools.nsc.InterpreterLoop.processLine$1(InterpreterLoop.scala:243)
        at scala.tools.nsc.InterpreterLoop.repl(InterpreterLoop.scala:249)
        at scala.tools.nsc.InterpreterLoop.main(InterpreterLoop.scala:559)
        at scala.tools.nsc.MainGenericRunner$.process(MainGenericRunner.scala:75)
        at scala.tools.nsc.MainGenericRunner$.main(MainGenericRunner.scala:31)
        at scala.tools.nsc.MainGenericRunner.main(MainGenericRunner.scala)

Подводя итог:

  • Использовать хвостовые рекурсивные методы
  • Аннотировать их как хвост-рекурсивный
  • Когда вы вызываете их, убедитесь, что их аргумент является единственной ссылкой на Stream

EDIT:

Обратите внимание, что это также работает и не приводит к ошибке из памяти:

scala> def s = Stream.range(0, 100000000)                                                   
s: scala.collection.immutable.Stream[Int]

scala> last(s)                                                                              
res1: Int = 99999999

EDIT2:

И в случае reduceLeft, который вам нужен, вам нужно будет определить вспомогательный метод с аргументом аккумулятора для результата.

Для reduceLeft вам нужен аргумент аккумулятора, который вы можете установить для определенного значения, используя аргументы по умолчанию. Упрощенный пример:

scala> @tailrec def rcl(s: Stream[Int], acc: Int = 0): Int = if (s.isEmpty) acc else rcl(s.tail, acc + s.head)
rcl: (s: scala.collection.immutable.Stream[Int],acc: Int)Int

scala> rcl(Stream.range(0, 10000000))
res6: Int = -2014260032

Ответ 2

Когда я начал изучать Stream, я подумал, что это круто. Тогда я понял, что Iterator - это то, что я хочу использовать почти все время.

Если вам понадобится Stream, но вы хотите сделать работу reduceLeft:

fromToStream(1,10000000).toIterator.reduceLeft(_ + _)

Если вы попробуете линию выше, она будет собирать мусор просто отлично. Я обнаружил, что использование Stream сложно, так как легко держать его в голове, не осознавая этого. Иногда стандартная библиотека будет держаться за нее для вас - очень тонко.

Ответ 3

Возможно, вам захочется взглянуть на эскалаторские потоки Scalaz .

Ответ 4

Как оказалось, это ошибка в текущей реализации reduceLeft. Проблема заключается в том, что reduceLeft вызывает foldLeft, и поэтому stackframe of reduceLeft содержит ссылку на головку потока во время всего вызова. foldLeft использует хвостовую рекурсию, чтобы избежать этой проблемы. Для сравнения:

(1 to 10000000).toStream.foldLeft(0)(_+_)
(1 to 10000000).toStream.reduceLeft(_+_)

Они семантически эквивалентны. В версии Scala версии 2.8.0 вызов foldLeft работает, но вызов reduceLeft вызывает OutOfMemory. Если reduceLeft выполнит свою работу, эта проблема не возникнет.