Подтвердить что ты не робот

Spark Streaming: перекрывающиеся окна без сохранения состояния и сохранение состояния

Каковы будут некоторые соображения для выбора операций со скользящими окнами без сохранения состояния (например, reduceByKeyAndWindow) и выбора сохранения состояния (например, через updateStateByKey или нового mapStateByKey) при обработке потока последовательных сеансов конечных событий с помощью Spark Streaming?

Например, рассмотрим следующий сценарий:

Носимое устройство отслеживает физические упражнения, выполняемые владельца. Устройство автоматически обнаруживает, когда начинается упражнение, и испускает сообщение; испускает дополнительные сообщения, пока упражнение (например, частота сердечных сокращений); и, наконец, испускает сообщение, когда упражнение выполнено.

Желаемый результат - поток агрегированных записей за сеанс тренировки. то есть все события одного и того же сеанса должны быть объединены вместе (например, чтобы каждый сеанс мог быть сохранен в одной строке БД). Обратите внимание, что каждый сеанс имеет конечную длину, но весь поток из нескольких устройств является непрерывным. Для удобства предположим, что устройство генерирует GUID для каждого сеанса тренировки.

Я вижу два подхода для обработки этого прецедента с помощью Spark Streaming:

  • Использование неперекрывающихся окон и сохранение состояния. Состояние сохраняется на каждый GUID со всеми соответствующими ему событиями. Когда приходит новое событие, состояние обновляется (например, с помощью mapWithState), и в случае, если событие является "завершением сеанса тренировки", будет выведена агрегированная запись на основе состояния, а ключ удален.

  • Использование перекрывающих скользящих окон и сохранение только первых сеансов. Предположим, что скользящее окно длины 2 и интервал 1 (см. Диаграмму ниже). Также предположим, что длина окна 2 X (максимальное возможное время тренировки). В каждом окне события агрегируются с помощью GUID, например. используя reduceByKeyAndWindow. Затем все сеансы, которые начинаются во второй половине окна, сбрасываются, а остальные сеансы испускаются. Это позволяет использовать каждое событие ровно один раз и гарантирует, что все события, принадлежащие одному сеансу, будут объединены вместе.

Диаграмма для подхода № 2:

Only sessions starting in the areas marked with \\\ will be emitted. 
-----------
|window 1 |
|\\\\|    |
-----------
     ----------
     |window 2 |
     |\\\\|    |  
     -----------
          ----------
          |window 3 |
          |\\\\|    |
          -----------

Плюсы и минусы я вижу:

Подход №1 менее дорогостоящий, но требует сохранения и управления состоянием (например, если число одновременных сеансов увеличивается, состояние может увеличиться, чем память). Однако, если максимальное количество одновременных сеансов ограничено, это может не быть проблемой.

Подход №2 в два раза дороже (каждое событие обрабатывается дважды) и с более высокой задержкой (максимальное время выполнения 2 X), но более простым и легко управляемым, поскольку состояние не сохраняется.

Что было бы лучшим способом справиться с этим вариантом использования - есть ли какой-либо из этих подходов "правильный", или есть ли лучшие способы?

Какие другие плюсы и минусы следует принимать во внимание?

4b9b3361

Ответ 1

Обычно нет подхода справа, у каждого есть компромиссы. Поэтому я добавлю дополнительный подход к миксу и расскажу о своих преимуществах и недостатках. Поэтому вы можете решить, какой из них вам больше подходит.

Подход к внешнему состоянию (подход № 3)

Вы можете накапливать состояние событий во внешнем хранилище. Кассандра часто используется для этого. Вы можете обрабатывать финальные и текущие события отдельно, например, например:

val stream = ...

val ongoingEventsStream = stream.filter(!isFinalEvent)
val finalEventsStream = stream.filter(isFinalEvent)

ongoingEventsStream.foreachRDD { /*accumulate state in casssandra*/ }
finalEventsStream.foreachRDD { /*finalize state in casssandra, move to final destination if needed*/ }

подход trackStateByKey (подход # 1.1)

Это может быть потенциально оптимальное решение для вас, поскольку оно устраняет недостатки updateStateByKey, но, учитывая, что он только что выпущен как часть выпуска Spark 1.6, это может быть рискованным (так как по какой-то причине он не очень рекламируется). Вы можете использовать ссылку в качестве отправной точки, если хотите узнать больше

Плюсы/минусы

Подход №1 (updateStateByKey)

Pros

  • Легко понять или объяснить (остальной состав команды, новички и т.д.) (субъективный)
  • Хранение. Лучшее использование памяти сохраняет только последнее состояние упражнений.
  • Хранение. Сохраняет только текущие упражнения и отбрасывает их, как только они заканчивают.
  • Задержка ограничена только производительностью каждой микро-пакетной обработки

Против

  • Хранение. Если количество ключей (параллельных упражнений) велико, оно может не вписаться в память вашего кластера.
  • Обработка. Он будет запускать функцию updateState для каждой клавиши на карте состояния, поэтому, если количество параллельных упражнений велико, производительность будет страдать.

Подход №2 (окно)

В то время как вы можете добиться того, что вам нужно с окнами, оно выглядит значительно менее естественным в вашем сценарии.

Pros

  • Обработка в некоторых случаях (в зависимости от данных) может быть более эффективной, чем updateStateByKey, из-за тенденции updateStateByKey запуска обновления на каждом ключе, даже если фактических обновлений нет.

Против

  • "Максимальное возможное время тренировки" - это звучит как огромный риск - это может быть довольно произвольная продолжительность, основанная на человеческом поведении. Некоторые люди могут забыть "закончить упражнение". Также зависит от видов упражнений, но может варьироваться от нескольких секунд до нескольких часов, когда требуется более низкая латентность для быстрых упражнений, в то время как при этом должна сохраняться латентность дольше, чем могут существовать самые длинные упражнения.
  • Чувствуется сложнее объяснить другим, как это будет работать (субъективно)
  • Хранение. Необходимо будет хранить все данные в рамке окна, а не только последние. Также освободит память только тогда, когда окно будет отходить от этого временного интервала, а не когда упражнение действительно закончено. Хотя это может быть не огромная разница, если вы сохраните только последние два временных интервала - это будет увеличиваться, если вы попытаетесь добиться большей гибкости, скользясь в окне чаще.

Подход № 3 (внешнее состояние)

Pros

  • Легко объяснить и т.д. (субъективный)
  • Чистый подход к потоковой обработке, означающий, что искра отвечает за каждое отдельное событие, но не пытается сохранить состояние и т.д. (субъективное).
  • Хранение: не ограничено памятью кластера для хранения состояния - может обрабатывать огромное количество одновременных упражнений
  • Обработка. Состояние обновляется только тогда, когда есть фактические обновления (в отличие от updateStateByKey)
  • Задержка похожа на updateStateByKey и ограничена временем, необходимым для обработки каждой микропакеты

Против

  • Дополнительный компонент в вашей архитектуре (если вы уже не используете Cassandra для окончательного вывода)
  • Обработка: по умолчанию медленнее обработки только искры, а не в памяти + вам необходимо передать данные через сеть.
  • вам нужно будет реализовать ровно один раз семантический вывод данных в cassandra (для случая сбоя рабочего процесса во время foreachRDD)

Предлагаемый подход

Я бы попробовал следующее:

  • test updateStateByKey подход к вашим данным и кластеру
  • Посмотрите, приемлемо ли потребление и обработка памяти даже при большом количестве параллельных упражнений (ожидается в часы пик).
  • вернуться к Cassandra, если не

Ответ 2

Я думаю, что один из других недостатков третьего подхода заключается в том, что RDD не принимаются хронологически. Рассмотрение их работы на кластере.

ongoingEventsStream.foreachRDD { /*accumulate state in casssandra*/ }

также о прокрутке кода и ошибке драйвера node. В этом случае вы снова читаете все данные? любопытно узнать, как вы хотите справиться с этим?

Я думаю, возможно, mapwithstate - лучший подход, почему вы рассматриваете все эти сценарии.