Подтвердить что ты не робот

Как писать в CSV в Spark

Я пытаюсь найти эффективный способ сохранить результат работы Spark Job в виде файла csv. Я использую Spark с Hadoop, и до сих пор все мои файлы сохраняются как part-00000.

Любые идеи о том, как сделать сохранение искры для файла с указанным именем файла?

4b9b3361

Ответ 1

Так как Spark использует API файловой системы Hadoop для записи данных в файлы, это неизбежно. Если вы делаете

rdd.saveAsTextFile("foo")

Он будет сохранен как "foo/part-XXXXX" с одним part- * файлом каждого раздела в RDD, который вы пытаетесь сохранить. Причина, по которой каждый раздел в RDD написан отдельным файлом, относится к отказоустойчивости. Если задача, записывающая третий раздел (т.е. На part-00002), не работает, Spark просто перезапускает задачу и перезаписывает частично написанную/поврежденную part-00002, не влияя на другие части. Если все они писали в один файл, то для сбоев намного сложнее восстановить одну задачу.

Файлы part-XXXXX обычно не являются проблемой, если вы собираетесь использовать его снова в инфраструктуре на основе Spark/Hadoop, потому что, поскольку все они используют API HDFS, если вы попросите их прочитать "foo", они все будут читать все файлы part-XXXXX внутри foo.

Ответ 2

Я предлагаю сделать это таким образом (пример Java):

theRddToPrint.coalesce(1, true).saveAsTextFile(textFileName);
FileSystem fs = anyUtilClass.getHadoopFileSystem(rootFolder);
FileUtil.copyMerge(
    fs, new Path(textFileName),
    fs, new Path(textFileNameDestiny),
    true, fs.getConf(), null);

Ответ 3

Существует другой подход, основанный на операциях Hadoop FileSystem.

Ответ 4

У меня есть идея, но не готовый фрагмент кода. Внутренне (как следует из названия) Spark использует выходной формат Hadoop. (а также InputFormat при чтении из HDFS).

В hasoop FileOutputFormat существует защищенный член setOutputFormat, который вы можете вызывать из унаследованного класса для установки другого базового имени.

Ответ 5

Расширение Tathagata Das ответа на Spark 2.x и Scala 2.11

Используя Spark SQL, мы можем сделать это в один лайнер

//implicits for magic functions like .toDf
import spark.implicits._

val df = Seq(
  ("first", 2.0),
  ("choose", 7.0),
  ("test", 1.5)
).toDF("name", "vals")

//write DataFrame/DataSet to external storage
df.write
  .format("csv")
  .save("csv/file/location")

Затем вы можете пойти и продолжить с adoalonso ответом.

Ответ 6

Это не очень чистое решение, но внутри foreachRDD() вы можете в основном делать все, что захотите, а также создавать новый файл.

В моем решении это то, что я делаю: я сохраняю вывод на HDFS (по причинам отказоустойчивости), а внутри foreachRDD я также создаю файл TSV со статистикой в ​​локальной папке.

Я думаю, вы могли бы сделать то же самое, если это вам нужно.

http://spark.apache.org/docs/0.9.1/streaming-programming-guide.html#output-operations