Подтвердить что ты не робот

Перемещение списков и потоков с функцией, возвращающей будущее

Введение

Scala Future (новое в 2.10 и теперь 2.9.3) является аппликативным функтором, что означает, что если у нас есть проходимый тип F, мы можем взять F[A] и функцию A => Future[B] и превратить их в Future[F[B]].

Эта операция доступна в стандартной библиотеке как Future.traverse. Scalaz 7 также предоставляет более общий traverse, который мы можем использовать здесь, если импортируем экземпляр аппликативного функтора для Future из библиотеки scalaz-contrib.

Эти два метода traverse ведут себя по-разному в случае потоков. Обход стандартной библиотеки потребляет поток перед возвратом, а Scalaz немедленно возвращает будущее:

import scala.concurrent._
import ExecutionContext.Implicits.global

// Hangs.
val standardRes = Future.traverse(Stream.from(1))(future(_))

// Returns immediately.
val scalazRes = Stream.from(1).traverse(future(_))

Есть и другое отличие, поскольку Лейф Уорнер наблюдает здесь. Стандартная библиотека traverse немедленно запускает все асинхронные операции, в то время как Scalaz запускает первое, ждет его завершения, запускает второе, ждет его и т.д.

Разное поведение для потоков

Это довольно легко показать это второе различие, написав функцию, которая будет спать несколько секунд для первого значения в потоке:

def howLong(i: Int) = if (i == 1) 10000 else 0

import scalaz._, Scalaz._
import scalaz.contrib.std._

def toFuture(i: Int)(implicit ec: ExecutionContext) = future {
  printf("Starting %d!\n", i)
  Thread.sleep(howLong(i))
  printf("Done %d!\n", i)
  i
}

Теперь Future.traverse(Stream(1, 2))(toFuture) напечатает следующее:

Starting 1!
Starting 2!
Done 2!
Done 1!

И версия Scalaz (Stream(1, 2).traverse(toFuture)):

Starting 1!
Done 1!
Starting 2!
Done 2!

Что, вероятно, не то, что мы хотим здесь.

А для списков?

Как ни странно, оба обхода ведут себя одинаково в списках - Scalaз не ждет, пока одно будущее завершится, прежде чем начинать следующее.

Другое будущее

Scalaz также включает свой собственный пакет concurrent с собственной реализацией фьючерсов. Мы можем использовать тот же тип настройки, что и выше:

import scalaz.concurrent.{ Future => FutureZ, _ }

def toFutureZ(i: Int) = FutureZ {
  printf("Starting %d!\n", i)
  Thread.sleep(howLong(i))
  printf("Done %d!\n", i)
  i
}

И затем мы получаем поведение Scalaz для потоков как для списков, так и для потоков:

Starting 1!
Done 1!
Starting 2!
Done 2!

Возможно, не удивительно, что пересечение бесконечного потока все равно возвращается немедленно.

Вопрос

На данный момент нам действительно нужна таблица для подведения итогов, но список должен сделать:

  • Потоки со стандартным обходом библиотеки: потреблять перед возвратом; не жди каждого будущего.
  • Потоки с прохождением Scalaза: немедленно возвращайтесь; дождитесь завершения каждого будущего.
  • Scalaz фьючерсы с потоками: немедленно вернуться; дождитесь завершения каждого будущего.

И:

  • Списки со стандартным обходом библиотеки: не ждите.
  • Списки с прохождением Scalaза: не ждите.
  • Scalaz со списками: ждите завершения каждого будущего.

Есть ли в этом смысл? Есть ли "правильное" поведение для этой операции в списках и потоках? Есть ли какая-то причина, по которой "наиболее асинхронное" поведение, т.е. не использовать коллекцию перед возвратом и не ждать завершения каждого будущего, прежде чем перейти к следующему, здесь не представлено?

4b9b3361

Ответ 1

Я не могу ответить на все, но я пытаюсь выполнить некоторые части:

Есть ли какая-то причина, что "самое асинхронное" поведение, т.е., не потребляйте коллекцию перед возвратом и не ждите каждого будущее для завершения, прежде чем перейти к следующему - не представлено здесь?

Если у вас есть зависимые вычисления и ограниченное количество потоков, вы можете столкнуться с взаимоблокировками. Например, у вас есть два фьючерса в зависимости от третьего (все три в списке фьючерсов) и только два потока, вы можете столкнуться с ситуацией, когда первые два фьючерса блокируют все два потока, а третий никогда не будет выполнен. (Конечно, если ваш размер пула один, т.е. Zou выполняет один расчет за другим, вы можете получить аналогичные ситуации)

Чтобы решить эту проблему, вам понадобится один поток в будущем без каких-либо ограничений. Это работает для небольших списков фьючерсов, но не для больших. Поэтому, если вы запускаете все параллельно, вы получите ситуацию, когда маленькие примеры будут выполняться во всех случаях, а больше - тупик. (Пример: тесты разработчика работают нормально, производственные блокировки).

Есть ли "правильное" поведение для этой операции в списках и потоках?

Я думаю, что это невозможно с фьючерсами. Если вы знаете что-то большее из зависимостей или когда вы точно знаете, что вычисления не будут блокироваться, может возникнуть более параллельное решение. Но выполнение списков фьючерсов выглядит для меня "сломанным дизайном". Лучшее решение похоже на то, что уже будет неудачно для небольших примеров для тупиков (т.е. Выполнить одно будущее за другим).

Фьючерсы Scalaz со списками: дождитесь завершения каждого будущего.

Я думаю, что scalaz использует для понимания внутри для обхода. С соображениями не гарантируется независимость вычислений. Поэтому я предполагаю, что Scalaz делает правильные вещи здесь для понимания: Выполнение одного вычисления за другим. В случае фьючерсов это всегда будет работать, поскольку у вас есть неограниченные потоки в вашей операционной системе.

Итак, другими словами: вы видите только артефакт о том, как работать (должно) работать.

Надеюсь, это имеет смысл.

Ответ 2

Если я правильно понимаю вопрос, я думаю, что это действительно сводится к семантике потоков против списков.

Обход списка делает то, что мы ожидаем от документов:

Преобразует TraversableOnce[A] в Future[TraversableOnce[B]], используя предоставленную функцию A => Future[B]. Это полезно для выполнения параллельной карты. Например, чтобы применить функцию ко всем элементам списка параллельно:

Что касается потоков, то разработчик должен решить, как он хочет, чтобы он работал, потому что это зависит от большего количества знаний о потоке, чем у компилятора (потоки могут быть бесконечными, но система типов не знает об этом). если мой поток читает строки из файла, я хочу сначала использовать его, поскольку цепочка фьючерсов строка за строкой фактически не распараллеливает вещи. в этом случае я бы хотел параллельный подход.

С другой стороны, если мой поток представляет собой бесконечный список, генерирующий последовательные целые числа и ищущий первое простое число, превышающее некоторое большое число, было бы невозможно использовать поток первым за один цикл (потребуется цепной подход Future, и мы, вероятно, хотели бы перебирать партии из потока).

Вместо того, чтобы пытаться найти канонический способ справиться с этим, мне интересно, есть ли пропущенные типы, которые помогли бы сделать различные случаи более явными.