Подтвердить что ты не робот

Как выполнить последовательное выполнение фьючерсов в scala

У меня есть этот сценарий, где мне нужно использовать итератор, для каждого из элементов вызывается функция f (item) и возвращает Future[Unit].

Однако мне нужно сделать так, чтобы каждый вызов f(item) выполнялся последовательно, они не могут выполняться параллельно.

for(item <- it)
  f(item)

не будет работать, потому что это запустит все вызовы параллельно.

Как это сделать, чтобы они выполнялись последовательно?

4b9b3361

Ответ 1

Если вы не против очень локализованного var, вы можете сериализовать асинхронную обработку (каждый f(item)) следующим образом (flatMap выполняет сериализацию):

val fSerialized = {
  var fAccum = Future{()}
  for(item <- it) {
    println(s"Processing ${item}")
    fAccum = fAccum flatMap { _ => f(item) }
  }
  fAccum
}

fSerialized.onComplete{case resTry => println("All Done.")}

В общем, избегайте операций Await - они блокируют (вид поражений точки асинхронизации, потребляет ресурсы и для неаккуратных конструкций, может быть заблокирован)


Прохладный трюк 1:

Вы можете объединить Futures через этого обычного подозреваемого, flatMap - он сериализует асинхронные операции. Есть ли что-то, что он не может сделать?; -)

def f1 = Future { // some background running logic here...}
def f2 = Future { // other background running logic here...}

val fSerialized: Future[Unit] = f1 flatMap(res1 => f2)  

fSerialized.onComplete{case resTry => println("Both Done: Success=" + resTry.isSuccess)}

Ни один из вышеперечисленных блоков - основная нить проходит через несколько десятков наносекунд. Фьючерсы используются во всех случаях для выполнения параллельных потоков и отслеживания асинхронного состояния/результатов и цепной логики.

fSerialized представляет собой совокупность двух различных асинхронных операций, соединенных вместе. Как только значение val оценивается, он немедленно запускает f1 (работает несинхронно). f1 работает как любой Future - когда он заканчивается, он называет его onComplete блоком обратного вызова. Здесь классный бит - flatMap устанавливает этот аргумент как блок ответа f1 onComplete - поэтому f2 запускается, как только f1 завершается, без блокировки, опроса или расточительного использования ресурсов. Когда f2 завершено, то fSerialized завершено - поэтому он запускает блок обратного вызова fSerialized.onComplete - печать "Both Done".

Не только это, но вы можете объединять плоские карты столько, сколько хотите, с аккуратным кодом без спагетти

 f1 flatmap(res1 => f2) flatMap(res2 => f3) flatMap(res3 => f4) ...

Если вы должны были сделать это через Future.onComplete, вам нужно было бы встроить последовательные операции как вложенные onComplete слои:

f1.onComplete{case res1Try => 
  f2
  f2.onComplete{case res2Try =>
    f3
    f3.onComplete{case res3Try =>
      f4
      f4.onComplete{ ...
      }
    }
  }
}

Не так приятно.

Тест, чтобы доказать:

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.blocking
import scala.concurrent.duration._

def f(item: Int): Future[Unit] = Future{
  print("Waiting " + item + " seconds ...")
  Console.flush
  blocking{Thread.sleep((item seconds).toMillis)}
  println("Done")
}

val fSerial = f(4) flatMap(res1 => f(16)) flatMap(res2 => f(2)) flatMap(res3 => f(8))

fSerial.onComplete{case resTry => println("!!!! That a wrap !!!! Success=" + resTry.isSuccess)}

Прохладный трюк 2:

для -понимания типа:

for {a <- aExpr; b <- bExpr; c <- cExpr; d <- dExpr} yield eExpr

- это не что иное, как синтаксический сахар для этого:

aExpr.flatMap{a => bExpr.flatMap{b => cExpr.flatMap{c => dExpr.map{d => eExpr} } } }

что цепочка flatMaps, за которой следует финальная карта.

Это означает, что

f1 flatmap(res1 => f2) flatMap(res2 => f3) flatMap(res3 => f4) map(res4 => "Did It!")

совпадает с

for {res1 <- f1; res2 <- f2; res3 <- f3; res4 <- f4} yield "Did It!"

Тест для проверки (следующий из предыдущего теста):

val fSerial = for {res1 <- f(4); res2 <- f(16); res3 <- f(2); res4 <- f(8)} yield "Did It!"
fSerial.onComplete{case resTry => println("!!!! That a wrap !!!! Success=" + resTry.isSuccess)}

Не очень-то трюк 3:

К сожалению, вы не можете смешивать итераторы и фьючерсы в одном и том же понимании. Ошибка компиляции:

val fSerial = {for {nextItem <- itemIterable; nextRes <- f(nextItem)} yield "Did It"}.last

И вложенность fors создает проблему. Следующие не сериализуют, но параллельно выполняют асинхронные блоки (вложенные методы не привязывают последующие фьючерсы с помощью flatMap/Map, а вместо этого цепочки как Iterable.flatMap {item = > f (item)} - не то же самое!)

val fSerial = {for {nextItem <- itemIterable} yield
                 for {nextRes <- f(nextItem)} yield "Did It"}.last

Также использование foldLeft/foldRight plus flatMap не работает так, как вы ожидали, - кажется ошибкой/ограничением; все асинхронные блоки обрабатываются параллельно (поэтому Iterator.foldLeft/Right не работает с Future.flatMap):

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.blocking
import scala.concurrent.duration._

def f(item: Int): Future[Unit] = Future{
  print("Waiting " + item + " seconds ...")
  Console.flush
  blocking{Thread.sleep((item seconds).toMillis)}
  println("Done")
}

val itemIterable: Iterable[Int] = List[Int](4, 16, 2, 8)
val empty = Future[Unit]{()}
def serialize(f1: Future[Unit], f2: Future[Unit]) = f1 flatMap(res1 => f2)

//val fSerialized = itemIterable.iterator.foldLeft(empty){(fAccum, item) => serialize(fAccum, f(item))}
val fSerialized = itemIterable.iterator.foldRight(empty){(item, fAccum) => serialize(fAccum, f(item))}

fSerialized.onComplete{case resTry => println("!!!! That a wrap !!!! Success=" + resTry.isSuccess)}

Но это работает (используется var):

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.blocking
import scala.concurrent.duration._

def f(item: Int): Future[Unit] = Future{
  print("Waiting " + item + " seconds ...")
  Console.flush
  blocking{Thread.sleep((item seconds).toMillis)}
  println("Done")
}

val itemIterable: Iterable[Int] = List[Int](4, 16, 2, 8)

var fSerial = Future{()}
for {nextItem <- itemIterable} fSerial = fSerial.flatMap(accumRes => f(nextItem)) 

Ответ 2

def seqFutures[T, U](items: TraversableOnce[T])(yourfunction: T => Future[U]): Future[List[U]] = {
  items.foldLeft(Future.successful[List[U]](Nil)) {
    (f, item) => f.flatMap {
      x => yourfunction(item).map(_ :: x)
    }
  } map (_.reverse)
}

Если вы работаете последовательно, потому что ограничения ресурсов предотвращают запуск более чем одного Future за раз, может быть проще создать и использовать пользовательский ExecutionContext только с одним потоком.

Ответ 3

Другой вариант - использование потоков Akka:

val doneFuture = Source
  .fromIterator(() => it)
  .mapAsync(parallelism = 1)(f)
  .runForeach{identity}

Ответ 4

Этот код показывает вам, как запускать фьючерсы последовательно, используя простой обещание, чтобы выполнить его.

Код содержит два секвенсора, один из которых выполняет работу один за другим, а другой позволяет указать, сколько из них будет выполняться в одно и то же время.

Исключения не могут быть упрощены.

import scala.concurrent.{Await, Future, Promise}
import scala.util.Try
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration

/**
  * Simple class to encapsulate work, the important element here is the future
  * you can ignore the rest
  */
case class Work(id:String, workTime:Long = 100) {
  def doWork(): Future[String] = Future {
    println(s"Starting $id")
    Thread.sleep(workTime)
    println(s"End $id")
    s"$id ready"
  }
}

/**
  * SimpleSequencer is the one by one execution, the promise is the element
  * who allow to the sequencer to work, pay attention to it.
  *
  * Exceptions are ignore, this is not production code
  */
object SimpleSequencer {
  private def sequence(works:Seq[Work], results:Seq[String], p:Promise[Seq[String]]) : Unit = {
    works match {
      case Nil => p.tryComplete(Try(results))
      case work::tail => work.doWork() map {
        result => sequence(tail, results :+ result, p)
      }
    }
  }

  def sequence(works:Seq[Work]) : Future[Seq[String]] = {
    val p = Promise[Seq[String]]()
    sequence(works, Seq.empty, p)
    p.future
  }
}

/**
  * MultiSequencer fire N works at the same time
  */
object MultiSequencer {
  private def sequence(parallel:Int, works:Seq[Work], results:Seq[String], p:Promise[Seq[String]]) : Unit = {
    works match {
      case Nil => p.tryComplete(Try(results))
      case work =>
        val parallelWorks: Seq[Future[String]] = works.take(parallel).map(_.doWork())
        Future.sequence(parallelWorks) map {
          result => sequence(parallel, works.drop(parallel), results ++ result, p)
        }
    }
  }

  def sequence(parallel:Int, works:Seq[Work]) : Future[Seq[String]] = {
    val p = Promise[Seq[String]]()
    sequence(parallel, works, Seq.empty, p)
    p.future
  }

}


object Sequencer {

  def main(args: Array[String]): Unit = {
    val works = Seq.range(1, 10).map(id => Work(s"w$id"))
    val p = Promise[Unit]()

    val f = MultiSequencer.sequence(4, works) map {
      resultFromMulti =>
        println(s"MultiSequencer Results: $resultFromMulti")
        SimpleSequencer.sequence(works) map {
          resultsFromSimple =>
            println(s"MultiSequencer Results: $resultsFromSimple")
            p.complete(Try[Unit]())
        }
    }

    Await.ready(p.future, Duration.Inf)
  }
}

Ответ 5

Возможно, более элегантным решением будет использование рекурсии, как описано ниже.

Это можно использовать в качестве примера для длительной операции, возвращающей будущее:

def longOperation(strToReturn: String): Future[String] = Future {
  Thread.sleep(5000)
  strToReturn
}

Ниже приведена рекурсивная функция, которая проходит через обрабатываемые элементы и обрабатывает их последовательно:

def processItems(strToReturn: Seq[String]): Unit = strToReturn match {
  case x :: xs => longOperation(x).onComplete {
    case Success(str) =>
      println("Got: " + str)
      processItems(xs)
    case Failure(_) =>
      println("Something went wrong")
      processItems(xs)
  }
  case Nil => println("Done")
}

Это делается с помощью функции, которая рекурсивно вызывает себя с остальными элементами для обработки, как только "Будущее" завершилось или завершилось неудачей.

Чтобы начать эту операцию, вы вызываете функцию processItems с несколькими элементами для обработки, например:

processItems(Seq("item1", "item2", "item3"))

Ответ 6

вы можете использовать Await.result: (код не проверен)

"Ожидание: объект singleton, используемый для блокировки в будущем (передача его результата в текущий поток).

val result  = item map {it => Await.result(f(it), Duration.Inf) }