Подтвердить что ты не робот

Когда аккумуляторы действительно надежны?

Я хочу использовать накопитель для сбора статистики о данных, которые я манипулирую при работе Spark. В идеале я бы это сделал, пока работа вычисляет необходимые преобразования, но поскольку Spark будет перекомпилировать задачи в разных случаях, аккумуляторы не будут отражать истинные показатели. Вот как документация описывает это:

Для обновлений аккумуляторов, выполненных только внутри действий, Spark гарантирует, что каждое обновление задач на аккумуляторе будет только применяется один раз, то есть перезапущенные задачи не будут обновлять значение. В преобразования, пользователи должны знать, что каждое обновление задач может применяется несколько раз, если задачи или этапы работы перезапускаются.

Это запутанно, так как большинство действий не позволяют запускать собственный код (где могут использоваться аккумуляторы), они в основном принимают результаты предыдущих преобразований (лениво). В документации также показано следующее:

val acc = sc.accumulator(0)
data.map(x => acc += x; f(x))
// Here, acc is still 0 because no actions have cause the `map` to be computed.

Но если мы добавим data.count() в конец, гарантированно ли это быть правильным (не иметь дубликатов) или нет? Ясно, что acc не используется "только внутри действий", так как карта является преобразованием. Поэтому это не должно быть гарантировано.

С другой стороны, обсуждение связанных билетов Jira говорит о "задачах результата", а не о "действиях". Например здесь и здесь. Это, по-видимому, указывает на то, что результат действительно будет гарантированно правильным, поскольку мы используем acc непосредственно перед и действием и поэтому должны вычисляться как один этап.

Я предполагаю, что эта концепция "задачи результата" имеет отношение к типу задействованных операций, являясь последней, которая включает в себя действие, как в этом примере, которое показывает, как несколько операций делятся на этапы ( в пурпуре, изображение взято из здесь):

A job dividing several operations into multiple purple stages

Итак, гипотетически, действие count() в конце этой цепочки будет частью одной и той же заключительной стадии, и мне будет гарантировано, что аккумуляторы, используемые на последней карте, не будут содержать никаких дубликатов?

Разъяснение вокруг этой проблемы было бы здорово! Спасибо.

4b9b3361

Ответ 1

Чтобы ответить на вопрос "Когда надежны аккумуляторы действительно?"

Ответ: Когда они присутствуют в операции Действие.

В соответствии с документацией в Action Task, даже если какие-либо перезапущенные задачи присутствуют, он будет обновлять Accumulator только один раз.

Для обновлений аккумуляторов, выполняемых только внутри действий, Spark гарантирует, что каждое обновление задач для аккумулятора будет применяться только один раз, то есть перезапущенные задачи не будут обновлять значение. В преобразованиях пользователи должны знать, что обновление каждой задачи может применяться несколько раз, если задачи или этапы работы будут повторно выполняться.

И действие разрешает запускать собственный код.

Для примера

val accNotEmpty = sc.accumulator(0)
ip.foreach(x=>{
  if(x!=""){
    accNotEmpty += 1
  }
})

Но, почему Карта + Действие, а именно. Операция Задачи результатов не надежна для операции Аккумулятора?

  • Не удалось выполнить задачу из-за некоторого исключения в коде. Spark попробует 4 раза (количество попыток по умолчанию). Если задача терпит неудачу каждый раз, когда она даст исключение. Если случайно это удастся, то Spark продолжит работу и просто обновит значение аккумулятора для успешного состояния, а значения накопившихся состояний игнорируются.
    Вердикт: правильно обработано
  • Сбой этапа: если сбой исполнителя node, отсутствие сбоя в работе пользователя, но аппаратный сбой - и если node переходит в режим тасования. Когда выходной файл в случайном порядке хранится локально, если node что выход в тасование отсутствует. Так что Spark возвращается на сцену, которая генерирует вывод в случайном порядке, смотрит, какие задачи нужно перезапустить, и выполняет их на одном из узлов, которые все еще живы. После того как мы восстановим отсутствующий вывод в случайном порядке, этап, который сгенерировал вывод карты, выполнил некоторые из его задач несколько раз. Spark подсчитывает обновления аккумулятора от всех них.
    Вердикт: не обрабатывается в Задаче результата. Аккумулятор выдаст неверный результат.
  • Если задача работает медленно, Spark может запустить спекулятивную копию этой задачи на другом node.
    Вердикт: не обрабатывается. Счетчик выдаст неверный результат.
  • RDD, который кэшируется, огромен и не может находиться в памяти. Поэтому всякий раз, когда используется RDD, он повторно запускает операцию Map, чтобы получить RDD, и снова аккумулятор будет обновляться им.
    Вердикт: не обрабатывается.Аккумулятор выдаст неверный результат.

Так может случиться, что одна и та же функция может работать несколько раз на одних и тех же данных. Так что Spark не предоставляет никаких гарантий для обновления аккумулятора из-за операции с картой.

Так что лучше использовать Accumulator in Action в Spark.

Чтобы узнать больше о Accumulator и его проблемах, обратитесь к этому сообщению в блоге - Имраном Рашидом.

Ответ 2

Обновления накопителей отправляются обратно драйверу, когда задача успешно завершена. Таким образом, результаты вашего аккумулятора гарантированы, если вы уверены, что каждая задача будет выполнена ровно один раз, и каждая задача будет выполнена так, как вы ожидали.

Я предпочитаю полагаться на reduce и aggregate вместо аккумуляторов, потому что довольно сложно перечислить все способы выполнения задач.

  • Действие запускает задачи.
  • Если действие зависит от более раннего этапа, и результаты этого этапа не будут (полностью) кэшированы, тогда будут запущены задачи с более ранней стадии.
  • Спекулятивное выполнение запускает повторяющиеся задачи при обнаружении небольшого числа медленных задач.

Тем не менее, существует множество простых случаев, когда аккумуляторы могут полностью доверять.

val acc = sc.accumulator(0)
val rdd = sc.parallelize(1 to 10, 2)
val accumulating = rdd.map { x => acc += 1; x }
accumulating.count
assert(acc == 10)

Будет ли это гарантировано правильным (не имеют дубликатов)?

Да, если спекулятивное выполнение отключено. map и count будут одноступенчатыми, так как вы говорите, что задача не может быть успешно выполнена более одного раза.

Но аккумулятор обновляется как побочный эффект. Поэтому вы должны быть очень осторожны, думая о том, как будет выполняться код. Рассмотрим это вместо accumulating.count:

// Same setup as before.
accumulating.mapPartitions(p => Iterator(p.next)).collect
assert(acc == 2)

Это также создаст одну задачу для каждого раздела, и каждая задача будет выполняться ровно один раз. Но код в map не будет выполняться во всех элементах, а только в каждом из них.

Аккумулятор похож на глобальную переменную. Если вы разделяете ссылку на RDD, которая может наращивать аккумулятор, тогда другой код (другие потоки) может также привести к его увеличению.

// Same setup as before.
val x = new X(accumulating) // We don't know what X does.
                            // It may trigger the calculation
                            // any number of times.
accumulating.count
assert(acc >= 10)

Ответ 3

Я думаю, что Матей ответил на это в указанной документации:

Как обсуждалось в https://github.com/apache/spark/pull/2524, это довольно сложно обеспечить хорошую семантику в общем случае (обновления аккумуляторов внутри стадий без результата) для следующих причины:

  • RDD может быть вычислен как часть нескольких этапов. Для Например, если вы обновите аккумулятор внутри MappedRDD, а затем перетасовать его, это может быть один этап. Но если вы снова вызовете карту() на MappedRDD и перетасовать результат этого, вы получите второй где эта карта является конвейером. Вы хотите посчитать это обновление аккумулятора дважды или нет?

  • Целые этапы могут быть повторно отправлены, если файлы в случайном порядке удаляются периодическим очистителем или теряются из-за node, поэтому все, что отслеживает RDD, должно было бы сделать это для длительные периоды времени (пока RDD является ссылочным в пользователе программа), что было бы довольно сложно реализовать.

Итак, я собираюсь отметить это как "не исправить" на данный момент, за исключением части результата этапы, выполненные в SPARK-3628.