Подтвердить что ты не робот

Отображение потока с функцией, возвращающей будущее

Я иногда оказываюсь в ситуации, когда у меня есть несколько Stream[X] и function X => Future Y, которые я хотел бы объединить с Future[Stream[Y]], и я не могу найти способ сделать это, Например, у меня есть

val x = (1 until 10).toStream
def toFutureString(value : Integer) = Future(value toString)

val result : Future[Stream[String]] = ???

Я пробовал

 val result = Future.Traverse(x, toFutureString)

который дает правильный результат, но, кажется, поглощает весь поток перед возвращением Будущего, которое более или менее поражает пупс

Я пробовал

val result = x.flatMap(toFutureString)

но не компилируется с type mismatch; found : scala.concurrent.Future[String] required: scala.collection.GenTraversableOnce[?]

val result = x.map(toFutureString)

возвращает несколько нечетный и бесполезный Stream[Future[String]]

Что мне делать, чтобы все исправлено?

Изменить: я не застрял на Stream, я был бы в равной степени счастлив с той же операцией на Iterator, если он не будет блокировать оценку всех элементов, прежде чем приступать к обработке головы

Edit2: я не уверен на 100%, что конструкция Future.Traverse должна пересекать весь поток, прежде чем возвращать Future [Stream], но я думаю, что это так. Если это не так, это прекрасный ответ сам по себе.

Edit3: Мне тоже не нужно, чтобы результат был в порядке, я в порядке с потоком или итератором, возвращающимся в любой порядок.

4b9b3361

Ответ 1

Вы находитесь на правильном пути с traverse, но, к сожалению, похоже, что стандартное определение библиотеки немного сломано в этом случае - ему не нужно будет потреблять поток перед возвратом.

Future.traverse представляет собой конкретную версию гораздо более общей функции, которая работает на любом прикладном функторе, обернутом в "обходной" тип (см. эти papers или мой ответ здесь для получения дополнительной информации, например).

Библиотека Scalaz предоставляет эту более общую версию, и в этом случае она работает как ожидалось (обратите внимание, что я получаю экземпляр прикладного функтора для Future из scalaz-contrib, он еще не находится в стабильных версиях Scalaz, которые по-прежнему кросс-построены против Scala 2.9.2, который не имеет этого Future):

import scala.concurrent._
import scalaz._, Scalaz._, scalaz.contrib.std._

import ExecutionContext.Implicits.global

def toFutureString(value: Int) = Future(value.toString)

val result: Future[Stream[String]] = Stream.from(0) traverse toFutureString

Это немедленно возвращается к бесконечному потоку, поэтому мы точно знаем, что он не потребляет в первую очередь.


Как сноска: если вы посмотрите источник для Future.traverse, вы увидите, что он реализован в терминах foldLeft, что удобно, но не обязательно или целесообразно в случае потоков.

Ответ 2

Забывание о потоке:

import scala.concurrent.Future
import ExecutionContext.Implicits.global

val x = 1 to 10 toList
def toFutureString(value : Int) = Future {
  println("starting " + value)
  Thread.sleep(1000)
  println("completed " + value)
  value.toString
}

дает (на моем 8-ядерном ящике):

scala> Future.traverse(x)(toFutureString)
starting 1
starting 2
starting 3
starting 4
starting 5
starting 6
starting 7
starting 8
res12: scala.concurrent.Future[List[String]] = [email protected]

scala> completed 1
completed 2
starting 9
starting 10
completed 3
completed 4
completed 5
completed 6
completed 7
completed 8
completed 9
completed 10

Итак, 8 из них сразу же запускаются (по одному для каждого ядра, хотя это настраивается с помощью исполнителя threadpool), а затем, когда все больше завершаются. The Future [List [String]] немедленно возвращается, а затем после паузы начинает печатать эти сообщения "завершено x".

Пример использования этого может быть, когда у вас есть List [Url's] и функция типа Url = > Future [HttpResponseBody]. Вы можете вызвать Future.traverse в этом списке с помощью этой функции и параллельно запускать эти HTTP-запросы, возвращая единственное будущее, которое представляет список результатов.

Было что-то вроде того, что вы делали?

Ответ 3

Принятый ответ больше не действителен, так как современная версия Scalaz traverse() ведет себя по-другому и пытается использовать весь поток во время вызова.

Что касается вопроса, я бы сказал, что добиться этого по-настоящему неблокирующим способом невозможно.

Future[Stream[Y]] не может быть разрешено, пока не будет доступен Stream[Y]. А так как Y генерируется асинхронно функцией X => Future[Y] вы не можете получить Y не блокируя время, когда вы пересекаете Stream[Y]. Это означает, что либо все Future[Y] должны быть разрешены до разрешения Future[Stream[Y]] (для которого требуется использование всего потока), либо вы должны разрешить появление блоков при прохождении Stream[Y] (для элементов, базовое будущее которых) еще не завершены). Но если мы допустим блокировку обхода, каково будет определение итогового будущего? С этой точки зрения это может быть то же самое, что и Future.successful(BlockingStream[Y]). Это, в свою очередь, семантически равно оригинальному Stream[Future[Y]].

Другими словами, я думаю, что есть проблема в самом вопросе.