Подтвердить что ты не робот

Сложить/уменьшить по списку фьючерсов с ассоциативно-коммутативным оператором

Рассмотрим следующее:

import scala.concurrent._
import scala.concurrent.duration.Duration.Inf
import scala.concurrent.ExecutionContext.Implicits.global

def slowInt(i: Int) = { Thread.sleep(200); i }
def slowAdd(x: Int, y: Int) = { Thread.sleep(100); x + y }
def futures = (1 to 20).map(i => future(slowInt(i)))

def timeFuture(fn: => Future[_]) = {
  val t0 = System.currentTimeMillis
  Await.result(fn, Inf)
  println((System.currentTimeMillis - t0) / 1000.0 + "s")
}

обе следующие печати ~ 2.5s:

// Use Future.reduce directly (Future.traverse is no different)
timeFuture { Future.reduce(futures)(slowAdd) }

// First wait for all results to come in, convert to Future[List], and then map the List[Int]
timeFuture { Future.sequence(futures).map(_.reduce(slowAdd)) }

Насколько я понимаю, причиной этого является то, что Future.reduce/traverse является общим и, следовательно, не работает быстрее с ассоциативным оператором, однако есть ли простой способ определить вычисление, в котором начнется свертывание/восстановление как только будет доступно хотя бы 2 значения (или 1 в случае fold), так что, пока некоторые элементы в списке все еще сгенерированы, уже сгенерированные уже вычисляются на?

4b9b3361

Ответ 1

Scalaz имеет реализацию фьючерсов, которая включает chooseAny, который берет коллекцию фьючерсов и возвращает будущее кортежа первого завершенного элемента и остальной части фьючерса:

def chooseAny[A](h: Future[A], t: Seq[Future[A]]): Future[(A, Seq[Future[A]])]

Twitter реализации фьючерсов называет это select. Стандартная библиотека не включает его (но см. Viktor Klang реализация, о которой говорил Сом Снытт выше). Я буду использовать версию Scalaz здесь, но перевод должен быть простым.

Один из подходов к выполнению операций, по вашему желанию, состоит в том, чтобы вытащить два завершенных элемента из списка, направить будущее их суммы обратно в список и перезаписать (см. this gist для полного рабочего примера):

def collapse[A](fs: Seq[Future[A]])(implicit M: Monoid[A]): Future[A] =
  Nondeterminism[Future].chooseAny(fs).fold(Future.now(M.zero))(
    _.flatMap {
      case (hv, tf) =>
        Nondeterminism[Future].chooseAny(tf).fold(Future.now(hv))(
          _.flatMap {
            case (hv2, tf2) => collapse(Future(hv |+| hv2) +: tf2)
          }
        )
    }
  )

В вашем случае вы вызываете что-то вроде этого:

timeFuture(
  collapse(futures)(
    Monoid.instance[Int]((a, b) => slowAdd(a, b), 0)
  )
)

Это работает всего лишь на 1,6 секунды на моем двухъядерном ноутбуке, поэтому он работает как ожидается (и будет продолжать делать то, что вы хотите, даже если время, затраченное на slowInt, меняется).

Ответ 2

Чтобы получить похожие тайминги, мне пришлось использовать локальный ExecutionContext (здесь):

implicit val ec = ExecutionContext.fromExecutor(Executors.newCachedThreadPool())

После этого я получил лучшую производительность, разделив список и запустив работу над каждым списком, назначив их в vals (на основе запоминания того, что фьючерсы в for-comprehenion обрабатываются по порядку, если они не назначены vals перед для-comprehenion). Из-за ассоциативной природы списков я мог бы повторно объединить их с еще одним вызовом той же функции. Я изменил функцию timeFuture, чтобы взять описание и распечатать результат добавления:

def timeFuture(desc: String, fn: => Future[_]) = {
  val t0 = System.currentTimeMillis
  val res = Await.result(fn, Inf)
  println(desc + " = " + res + " in " + (System.currentTimeMillis - t0) / 1000.0 + "s")
}

Я новичок в Scala, поэтому я все еще работаю над повторным использованием той же функции на последнем этапе (я думаю, это должно быть возможно), поэтому я обманул и создал вспомогательную функцию:

def futureSlowAdd(x: Int, y: Int) = future(slowAdd(x, y))

Тогда я мог бы сделать следующее:

timeFuture( "reduce", { Future.reduce(futures)(slowAdd) } )

val right = Future.reduce(futures.take(10))(slowAdd)
val left = Future.reduce(futures.takeRight(10))(slowAdd)
timeFuture( "split futures", (right zip left) flatMap (futureSlowAdd _).tupled)

С этим последним zip и т.д. здесь.

Я думаю, что это параллелизирует работу и рекомбинирует результаты. Когда я запускаю те, я получаю:

reduce = 210 in 2.111s
split futures = 210 in 1.201s

Я использовал жестко закодированную пару взяток, но моя идея состоит в том, что полное расщепление списков может быть введено в функцию и фактически повторно использовать ассоциативную функцию, переданную как в правую, так и в левую ветки (с разрешенными слегка несбалансированными деревьями из-за остатков) в конце.


Когда я рандомизирую функции slowInt() и slowAdd(), такие как:

def rand(): Int = Random.nextInt(3)+1
def slowInt(i: Int) = { Thread.sleep(rand()*100); i }
def slowAdd(x: Int, y: Int) = { Thread.sleep(rand()*100); x + y }

Я все еще вижу, что "раскол фьючерсов" заканчивается раньше, чем "уменьшить". Кажется, есть некоторые накладные расходы для запуска, что влияет на первый вызов timeFuture. Вот несколько примеров их запуска со стартовым штрафом за "раздельные фьючерсы":

split futures = 210 in 2.299s
reduce = 210 in 4.7s

split futures = 210 in 2.594s
reduce = 210 in 3.5s

split futures = 210 in 2.399s
reduce = 210 in 4.401s

На более быстром компьютере, чем мой ноутбук, и используя тот же ExecutionContext в вопросе, я не вижу таких больших различий (без рандомизации в медленных * функциях):

split futures = 210 in 2.196s
reduce = 210 in 2.5s

Здесь, похоже, "раздельные фьючерсы" ведут лишь немного.


Один последний раз. Здесь функция (ака мерзости), которая расширяет идею, которую я имел выше:

def splitList[A <: Any]( f: List[Future[A]], assocFn: (A, A) => A): Future[A] = {
    def applyAssocFn( x: Future[A], y: Future[A]): Future[A] = {
      (x zip y) flatMap( { case (a,b) => future(assocFn(a, b)) } )
    }
    def divideAndConquer( right: List[Future[A]], left: List[Future[A]]): Future[A] = {
      (right, left) match {
        case(r::Nil, Nil) => r
        case(Nil, l::Nil) => l
        case(r::Nil, l::Nil) => applyAssocFn( r, l )
        case(r::Nil, l::ls) => {
          val (l_right, l_left) = ls.splitAt(ls.size/2)
          val lret = applyAssocFn( l, divideAndConquer( l_right, l_left ) )
          applyAssocFn( r, lret )
        }
        case(r::rs, l::Nil) => {
          val (r_right, r_left) = rs.splitAt(rs.size/2)
          val rret = applyAssocFn( r, divideAndConquer( r_right, r_left ) )
          applyAssocFn( rret, l )
        }
        case (r::rs, l::ls) => {
          val (r_right, r_left) = rs.splitAt(rs.size/2)
          val (l_right, l_left) = ls.splitAt(ls.size/2)
          val tails = applyAssocFn(divideAndConquer( r_right, r_left ), divideAndConquer( l_right, l_left ))
          val heads = applyAssocFn(r, l)
          applyAssocFn( heads, tails )
        }
      }
    }
    val( right, left ) = f.splitAt(f.size/2)
    divideAndConquer( right, left )
  }

Для разбиения списка вверх по хвосту рекурсивно требуется все, что угодно: Scala и быстро присваивать фьючерсы значениям (для их запуска).

Когда я тестирую его так:

timeFuture( "splitList", splitList( futures.toList, slowAdd) )

Я получаю следующие тайминги на своем ноутбуке с помощью newCachedThreadPool():

splitList = 210 in 0.805s
split futures = 210 in 1.202s
reduce = 210 in 2.105s

Я заметил, что тайминги "split futures" могут быть недействительными, поскольку фьючерсы запускаются за пределами блока timeFutures. Однако функцию splitList следует корректно вызывать внутри функции timeFutures. Для меня важна важность выбора ExecutionContext, который лучше всего подходит для аппаратного обеспечения.

Ответ 3

Ответ ниже будет выполняться в течение 700 мс на 20-ядерном компьютере, который дает то, что нужно выполнить в последовательности, а также можно делать на любой машине с любой реализацией (20 параллельных вызовов 200 мс slowInt, за которыми следуют 5 вложенных 100 мс slowAdd). Он работает в 1600 мс на моем 4-ядерном компьютере, который также можно делать на этой машине.

Когда вызовы slowAdd расширяются, f представляет slowAdd:

f(f(f(f(f(x1, x2), f(x3, x4)), f(f(x5, x6), f(x7, x8))), f(f(f(x9, x10), f(x11, x12)), f(f(x13, x14), f(x15, x16)))), f(f(x17, x18), f(x19, x20)))

Пример, который вы указали при использовании Future.sequence, будет выполняться в 2100 мс на 20-ядерном компьютере (20 параллельных вызовов 200 мс slowInt, за которыми следуют 19 вложенных вызовов 100 мс slowAdd). Он работает в 2900 мс на моем 4-ядерном компьютере.

Когда вызовы slowAdd расширяются, f представляет slowAdd:

f(f(f(f(f(f(f(f(f(f(f(f(f(f(f(f(f(f(f(x1, x2), x3), x4), x5), x6), x7), x8), x9), x10), x11), x12), x13), x14), x15) x16) x17) x18) x19) x20)

Метод Future.reduce вызывает Future.sequence(futures).map(_ reduceLeft op), поэтому два приведенных вами примера эквивалентны.

В моем ответе используется функция combine, которая берет список фьючерсов и op, функцию, которая объединяет два фьючерса в один как параметры. Функция возвращает op применительно ко всем парам фьючерсов и парам пар и так далее до тех пор, пока не останется одно будущее:

def combine[T](list: List[Future[T]], op: (Future[T], Future[T]) => Future[T]): Future[T] =
  if (list.size == 1) list.head
  else if(list.size == 2) list.reduce(op)
  else list.grouped(2).map(combine(_, op)).reduce(op)

Примечание. Я немного изменил код в соответствии с моими предпочтениями стиля.

def slowInt(i: Int): Future[Int] = Future { Thread.sleep(200); i }
def slowAdd(fx: Future[Int], fy: Future[Int]): Future[Int] = fx.flatMap(x => fy.map { y => Thread.sleep(100); x + y })
var futures: List[Future[Int]] = List.range(1, 21).map(slowInt)

В приведенном ниже коде используется функция combine для вашего случая:

timeFuture(combine(futures, slowAdd))

Код ниже обновляет ваш пример Future.sequence для моих модификаций:

timeFuture(Future.sequence(futures).map(_.reduce{(x, y) => Thread.sleep(100); x + y }))