Я пишу процесс ETL, где мне нужно будет читать часовые файлы журналов, разбивать данные и сохранять их. Я использую Spark (в Databricks). Файлы журналов - это CSV, поэтому я читаю их и применяю схему, а затем выполняю мои преобразования.
Моя проблема в том, как я могу сохранить данные часа в виде формата паркета, но добавить к существующему набору данных? При сохранении мне нужно разделить на 4 столбца, присутствующих в dataframe.
Вот моя строка сохранения:
data
.filter(validPartnerIds($"partnerID"))
.write
.partitionBy("partnerID","year","month","day")
.parquet(saveDestination)
Проблема заключается в том, что если папка-получатель существует, файл save генерирует ошибку. Если место назначения не существует, я не добавляю свои файлы.
Я пробовал использовать .mode("append")
, но обнаружил, что Spark иногда терпит неудачу на полпути, поэтому я теряю часть моих данных и сколько мне еще нужно писать.
Я использую паркет, потому что разбиение существенно увеличивает мои запросы в будущем. Кроме того, я должен записать данные как некоторый формат файла на диске и не может использовать базу данных, такую как Друид или Кассандра.
Приветствуются любые предложения о том, как разделить мой фреймворк и сохранить файлы (либо придерживаться паркета, либо другого формата).