Подтвердить что ты не робот

Добавление новых данных в файлы секционированных паркетов

Я пишу процесс ETL, где мне нужно будет читать часовые файлы журналов, разбивать данные и сохранять их. Я использую Spark (в Databricks). Файлы журналов - это CSV, поэтому я читаю их и применяю схему, а затем выполняю мои преобразования.

Моя проблема в том, как я могу сохранить данные часа в виде формата паркета, но добавить к существующему набору данных? При сохранении мне нужно разделить на 4 столбца, присутствующих в dataframe.

Вот моя строка сохранения:

data
    .filter(validPartnerIds($"partnerID"))
    .write
    .partitionBy("partnerID","year","month","day")
    .parquet(saveDestination)

Проблема заключается в том, что если папка-получатель существует, файл save генерирует ошибку. Если место назначения не существует, я не добавляю свои файлы.

Я пробовал использовать .mode("append"), но обнаружил, что Spark иногда терпит неудачу на полпути, поэтому я теряю часть моих данных и сколько мне еще нужно писать.

Я использую паркет, потому что разбиение существенно увеличивает мои запросы в будущем. Кроме того, я должен записать данные как некоторый формат файла на диске и не может использовать базу данных, такую ​​как Друид или Кассандра.

Приветствуются любые предложения о том, как разделить мой фреймворк и сохранить файлы (либо придерживаться паркета, либо другого формата).

4b9b3361

Ответ 1

Если вам нужно добавить файлы, вам обязательно нужно использовать режим добавления. Я не знаю, сколько разделов вы ожидаете от его генерации, но я обнаружил, что если у вас много разделов, partitionBy вызовет ряд проблем (одинаковые как с памятью, так и с IO).

Если вы считаете, что ваша проблема вызвана слишком большими операциями записи, я рекомендую вам попробовать следующие две вещи:

1) Используйте snappy, добавив в конфигурацию:

conf.set("spark.sql.parquet.compression.codec", "snappy")

2) Отключите генерацию файлов метаданных в hadoopConfiguration в SparkContext следующим образом:

sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")

Файлы метаданных будут отнимать много времени для генерации (см. этот пост в блоге), но согласно this они на самом деле не важны. Лично я всегда выключаю их и не имею проблем.

Если вы создаете много разделов ( > 500), я боюсь, что лучшее, что я могу сделать, это предложить вам, что вы смотрите на решение, не использующее append-mode - мне просто не удалось получить partitionBy для работы с что много разделов.

Ответ 2

Если вы используете несортированное разбиение на разделы, ваши данные будут разделены на все ваши разделы. Это означает, что каждая задача будет генерировать и записывать данные в каждый из ваших выходных файлов.

Рассмотрим перераспределение ваших данных в соответствии с вашими столбцами раздела перед записью, чтобы иметь все данные на выходной файл на тех же разделах:

data
 .filter(validPartnerIds($"partnerID"))
 .repartition([optional integer,] "partnerID","year","month","day")
 .write
 .partitionBy("partnerID","year","month","day")
 .parquet(saveDestination)

Смотрите: DataFrame.repartition