Подтвердить что ты не робот

Понимание кэширования искры

Я пытаюсь понять, как работает Spark cache.

Вот мое наивное понимание, пожалуйста, дайте мне знать, если я что-то упустил:

val rdd1 = sc.textFile("some data")
rdd1.cache() //marks rdd1 as cached
val rdd2 = rdd1.filter(...)
val rdd3 = rdd1.map(...)
rdd2.saveAsTextFile("...")
rdd3.saveAsTextFile("...")

В приведенном выше примере rdd1 будет загружаться с диска (например, HDFS) только один раз. (при сохранении rdd2 я предполагаю), а затем из кеша (при условии, что имеется достаточное количество ОЗУ), когда сохраняется rdd3)

Теперь вот мой вопрос. Скажем, я хочу кэшировать rdd2 и rdd3, поскольку они оба будут использоваться позже, но мне не нужно rdd1 после их создания.

В основном существует дублирование, не так ли? Поскольку, как только rdd2 и rdd3 вычисляются, мне больше не нужен rdd1, я, вероятно, должен его отменить, не так ли? вопрос в том, когда?

Будет ли это работать? (Вариант A)

val rdd1 = sc.textFile("some data")
rdd1.cache()   // marks rdd as cached
val rdd2 = rdd1.filter(...)
val rdd3 = rdd1.map(...)
rdd2.cache()
rdd3.cache()
rdd1.unpersist()

Является ли искра добавлением негерметичного вызова в DAG? или это делается немедленно? если это будет сделано немедленно, то в основном rdd1 будет не кэшироваться, когда я прочитаю из rdd2 и rdd3, правильно?

Должен ли я сделать это вместо этого (вариант B)?

val rdd1 = sc.textFile("some data")
rdd1.cache()   // marks rdd as cached
val rdd2 = rdd1.filter(...)
val rdd3 = rdd1.map(...)

rdd2.cache()
rdd3.cache()

rdd2.saveAsTextFile("...")
rdd3.saveAsTextFile("...")

rdd1.unpersist()

Итак, вопрос в следующем: Вариант А достаточно хорош? т.е. будет ли rdd1 загружать файл только один раз? Или мне нужно пойти с вариантом B?

4b9b3361

Ответ 1

Казалось бы, требуется вариант B. Причина связана с тем, как Spare пытается выполнить сохранение/кеш и непервис. Поскольку преобразования RDD просто создают описания DAG без выполнения, в Варианте A к моменту, когда вы вызываете unpersist, у вас все еще есть только описания заданий, а не выполнение.

Это актуально, потому что вызов cache или persist просто добавляет RDD к карте RDD, которые помечали, что они сохраняются во время выполнения задания. Тем не менее, unpersist непосредственно сообщает blockManager об исключении RDD из хранилища и удаляет ссылку на карте постоянных RDD.

сохранить функцию

функция непервистов

Итак, вам нужно будет вызвать непервист после того, как Spark выполнит и сохранит RDD с менеджером блоков.

Комментарии для метода RDD.persist указывают на это: rdd.persist

Ответ 2

В опции A вы не показывались, когда вы вызываете действие (вызов для сохранения)

val rdd1 = sc.textFile("some data")
rdd.cache() //marks rdd as cached
val rdd2 = rdd1.filter(...)
val rdd3 = rdd1.map(...)
rdd2.cache()
rdd3.cache()
rdd1.unpersist()
rdd2.saveAsTextFile("...")
rdd3.saveAsTextFile("...")

Если последовательность такая, как указано выше, в опции A должна использоваться кешированная версия rdd1 для вычисления как rdd2, так и rdd 3