Я пытаюсь понять, как работает Spark cache.
Вот мое наивное понимание, пожалуйста, дайте мне знать, если я что-то упустил:
val rdd1 = sc.textFile("some data")
rdd1.cache() //marks rdd1 as cached
val rdd2 = rdd1.filter(...)
val rdd3 = rdd1.map(...)
rdd2.saveAsTextFile("...")
rdd3.saveAsTextFile("...")
В приведенном выше примере rdd1 будет загружаться с диска (например, HDFS) только один раз. (при сохранении rdd2 я предполагаю), а затем из кеша (при условии, что имеется достаточное количество ОЗУ), когда сохраняется rdd3)
Теперь вот мой вопрос. Скажем, я хочу кэшировать rdd2 и rdd3, поскольку они оба будут использоваться позже, но мне не нужно rdd1 после их создания.
В основном существует дублирование, не так ли? Поскольку, как только rdd2 и rdd3 вычисляются, мне больше не нужен rdd1, я, вероятно, должен его отменить, не так ли? вопрос в том, когда?
Будет ли это работать? (Вариант A)
val rdd1 = sc.textFile("some data")
rdd1.cache() // marks rdd as cached
val rdd2 = rdd1.filter(...)
val rdd3 = rdd1.map(...)
rdd2.cache()
rdd3.cache()
rdd1.unpersist()
Является ли искра добавлением негерметичного вызова в DAG? или это делается немедленно? если это будет сделано немедленно, то в основном rdd1 будет не кэшироваться, когда я прочитаю из rdd2 и rdd3, правильно?
Должен ли я сделать это вместо этого (вариант B)?
val rdd1 = sc.textFile("some data")
rdd1.cache() // marks rdd as cached
val rdd2 = rdd1.filter(...)
val rdd3 = rdd1.map(...)
rdd2.cache()
rdd3.cache()
rdd2.saveAsTextFile("...")
rdd3.saveAsTextFile("...")
rdd1.unpersist()
Итак, вопрос в следующем:
Вариант А достаточно хорош? т.е. будет ли rdd1
загружать файл только один раз?
Или мне нужно пойти с вариантом B?