Подтвердить что ты не робот

DataFrame partitionBy в один файл Parquet (для каждого раздела)

Я хотел бы перераспределить/объединить мои данные, чтобы они сохранялись в одном файле Parquet на раздел. Я также хотел бы использовать API Spark SQL partitionBy. Так что я мог бы сделать это так:

df.coalesce(1).write.partitionBy("entity", "year", "month", "day", "status")
  .mode(SaveMode.Append).parquet(s"$location")

Я проверил это, и это, кажется, не работает хорошо. Это связано с тем, что в наборе данных работает только один раздел, а все разбиение, сжатие и сохранение файлов должно выполняться одним ядром ЦП.

Я мог бы переписать это, чтобы выполнить разбиение вручную (например, используя фильтр с различными значениями разбиения) перед вызовом coalesce.

Но есть ли лучший способ сделать это, используя стандартный Spark SQL API?

4b9b3361

Ответ 1

У меня была точно такая же проблема, и я нашел способ сделать это с помощью DataFrame.repartition(). Проблема использования coalesce(1) состоит в том, что ваш параллелизм падает до 1, и в лучшем случае он может быть медленным, а в худшем - ошибочным. Увеличение этого числа также не поможет - если вы выполните coalesce(10) вы получите больше параллелизма, но в итоге получите 10 файлов на раздел.

Чтобы получить один файл на раздел без использования coalesce(), используйте repartition() с теми же столбцами, для которых вы хотите, чтобы вывод был разделен. Так что в вашем случае сделайте это:

import spark.implicits._
df.repartition($"entity", $"year", $"month", $"day", $"status").write.partitionBy("entity", "year", "month", "day", "status").mode(SaveMode.Append).parquet(s"$location")

Как только я это делаю, я получаю один паркетный файл на выходной раздел вместо нескольких файлов.

Я проверял это в Python, но я предполагаю, что в Scala это должно быть то же самое.

Ответ 2

По определению:

coalesce (numPartitions: Int): DataFrame Возвращает новый DataFrame, который имеет ровно разделы numPartitions.

Вы можете использовать его для уменьшения количества разделов в RDD/DataFrame с параметром numPartitions. Это полезно для выполнения операций более эффективно после фильтрации большого набора данных.

Что касается вашего кода, он не работает хорошо, потому что то, что вы на самом деле делаете, это:

  • помещает все в 1 раздел, который перегружает драйвер, так как он вытаскивает все данные в 1 раздел на драйвере (а также это не очень хорошая практика).

  • coalesce фактически перетасовывает все данные в сети, что также может привести к потере производительности.

Shuffle - это механизм Sparks для перераспределения данных, так что он группируется по-разному между разделами. Обычно это связано с копированием данных между исполнителями и машинами, что делает перетасовку сложной и дорогостоящей.

Концепция shuffle очень важна для управления и понимания. Всегда предпочтительнее перетасовывать минимально возможное, потому что это дорогостоящая операция, поскольку она связана с дисковым вводом-выводом, сериализацией данных и сетевым вводом-выводом. Чтобы организовать данные для тасования, Spark генерирует множество задач - сопоставляет задачи для организации данных и набор задач сокращения для их агрегирования. Эта номенклатура происходит от MapReduce и напрямую не относится к карте Sparks и сокращает операции.

Внутренне результаты отдельных задач карты сохраняются в памяти до тех пор, пока они не подходят. Затем они сортируются на основе целевого раздела и записываются в один файл. На стороне сокращения задачи считывают соответствующие отсортированные блоки.

Что касается разбиения паркета, я предлагаю вам прочитать здесь о Spark DataFrames с разделением паркета, а также этот в Руководстве по программированию Spark для Настройка производительности.

Надеюсь, это поможет!