Если у меня есть RDD, который мне больше не нужен, как его удалить из памяти? Было бы достаточно, чтобы сделать это:
del thisRDD
Спасибо!
Если у меня есть RDD, который мне больше не нужен, как его удалить из памяти? Было бы достаточно, чтобы сделать это:
del thisRDD
Спасибо!
Нет, del thisRDD
недостаточно, он просто удалит указатель на RDD. Вы должны вызвать thisRDD.unpersist()
, чтобы удалить кешированные данные.
Для информации Spark использует модель ленивых вычислений, а это означает, что при запуске этого кода:
>>> thisRDD = sc.parallelize(xrange(10),2).cache()
у вас не будет кэшированных данных на самом деле, он будет помечен как "кэшированный" в плане выполнения RDD. Вы можете проверить это так:
>>> print thisRDD.toDebugString()
(2) PythonRDD[6] at RDD at PythonRDD.scala:43 [Memory Serialized 1x Replicated]
| ParallelCollectionRDD[5] at parallelize at PythonRDD.scala:364 [Memory Serialized 1x Replicated]
Но когда вы вызываете действие поверх этого RDD хотя бы один раз, он будет кэшироваться:
>>> thisRDD.count()
10
>>> print thisRDD.toDebugString()
(2) PythonRDD[6] at RDD at PythonRDD.scala:43 [Memory Serialized 1x Replicated]
| CachedPartitions: 2; MemorySize: 174.0 B; TachyonSize: 0.0 B; DiskSize: 0.0 B
| ParallelCollectionRDD[5] at parallelize at PythonRDD.scala:364 [Memory Serialized 1x Replicated]
Вы можете легко проверить сохраненные данные и уровень персистентности в пользовательском интерфейсе Spark, используя адрес http://<driver_node>:4040/storage
. Вы увидите, что del thisRDD
не изменит персистентность этого RDD, но thisRDD.unpersist()
отменит его, пока вы все равно сможете использовать этот RDD в своем коде (пока он больше не будет сохраняться в памяти и будет пересчитываться каждый раз, когда он запрашивается)
Короткий ответ: это зависит.
Согласно pyspark v.1.3.0 исходный код, del thisRDD
должно быть достаточно для PipelinedRDD
, которое является RDD, сгенерированным Преобразователь/редуктор Python:
class PipelinedRDD(RDD):
# ...
def __del__(self):
if self._broadcast:
self._broadcast.unpersist()
self._broadcast = None
RDD
class, с другой стороны, не имеет метода __del__
(хотя, вероятно, это и должно быть), поэтому вы должны вызвать метод unpersist
самостоятельно.
Изменить: __del__
метод был удален в this commit.
Короткий ответ: Следующий код должен сделать трюк:
import gc
del thisRDD
gc.collect()
Объяснение:
Даже если вы используете PySpark, ваши данные RDD управляются на стороне Java, поэтому сначала задайте один и тот же вопрос, но для Java вместо Python:
Если я использую Java, и я просто освобождаю все ссылки на мой RDD, достаточно ли этого, чтобы автоматически отменить его?
Для Java ответ "ДА", RDD будет автоматически отключен, когда будет собран мусор, в соответствии с этим ответом. (По-видимому, эта функциональность была добавлена в Spark в этот PR.)
Хорошо, что происходит в Python? Если я удалю все ссылки на мой RDD в Python, это приведет к их удалению со стороны Java?
PySpark использует Py4J для отправки объектов с Python на Java и наоборот. Согласно Py4J Memory Model Docs:
Когда объект представляет собой мусор, собранный на виртуальной машине Python (счетчик ссылок == 0), ссылка удаляется на виртуальной машине Java
Но обратите внимание: удаление ссылок Python на ваш RDD не приведет к немедленному удалению его. Вам нужно подождать, пока сборщик мусора Python очистит ссылки. Вы можете прочитать описание Py4J, где они рекомендуют следующее:
Обычно также работает вызов
gc.collect()
.
ОК, вернемся к исходному вопросу:
Было бы достаточно, чтобы сделать это:
del thisRDD
Почти. Вы должны удалить последнюю ссылку на него (т.е. del thisRDD
), а затем, если вам действительно нужно, чтобы RDD был немедленно отключен **, вызовите gc.collect()
.
** Ну, технически, это немедленно удалит ссылку на стороне Java, но будет небольшая задержка до тех пор, пока сборщик мусора Java фактически не выполнит окончательный фидзавод RDD и тем самым не будет распечатывать данные.
Просто FYI, Я бы порекомендовал "gc.collect()" после "del" (если rdd занимает много памяти).