Подтвердить что ты не робот

Запись одного CSV файла с использованием spark-csv

Я использую https://github.com/databricks/spark-csv, я пытаюсь написать один CSV, но не могу, он создает папку.

Нужна функция Scala, которая примет параметр, такой как путь и имя файла, и напишет этот CSV файл.

4b9b3361

Ответ 1

Он создает папку с несколькими файлами, потому что каждый раздел сохраняется отдельно. Если вам нужен один выходной файл (все еще в папке), вы можете repartition (предпочтительнее, если данные выше по потоку большие, но требуется перетасовка):

df
   .repartition(1)
   .write.format("com.databricks.spark.csv")
   .option("header", "true")
   .save("mydata.csv")

или coalesce:

df
   .coalesce(1)
   .write.format("com.databricks.spark.csv")
   .option("header", "true")
   .save("mydata.csv")

кадр данных перед сохранением:

Все данные будут записаны в mydata.csv/part-00000. Прежде чем использовать этот параметр , убедитесь, что вы понимаете, что происходит, и какова стоимость передачи всех данных одному работнику. Если вы используете распределенную файловую систему с репликацией, данные будут передаваться несколько раз - сначала выбраны одному работнику и затем распределены по узлам хранения.

В качестве альтернативы вы можете оставить свой код так, как есть, и использовать инструменты общего назначения, такие как cat или HDFS getmerge чтобы просто объединить все части позже.

Ответ 2

Если вы используете Spark с HDFS, я решаю проблему, обычно записывая файлы csv и используя HDFS для слияния. Я делаю это прямо в Spark (1.6):

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._

def merge(srcPath: String, dstPath: String): Unit =  {
   val hadoopConfig = new Configuration()
   val hdfs = FileSystem.get(hadoopConfig)
   FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), true, hadoopConfig, null) 
   // the "true" setting deletes the source files once they are merged into the new output
}


val newData = << create your dataframe >>


val outputfile = "/user/feeds/project/outputs/subject"  
var filename = "myinsights"
var outputFileName = outputfile + "/temp_" + filename 
var mergedFileName = outputfile + "/merged_" + filename
var mergeFindGlob  = outputFileName

    newData.write
        .format("com.databricks.spark.csv")
        .option("header", "false")
        .mode("overwrite")
        .save(outputFileName)
    merge(mergeFindGlob, mergedFileName )
    newData.unpersist()

Не могу вспомнить, где я узнал этот трюк, но он может сработать для вас.

Ответ 3

Я мог бы немного опоздать на игру здесь, но использование coalesce(1) или repartition(1) может работать для небольших наборов данных, но большие наборы данных будут выбрасываться в один раздел на одном узле. Вероятно, это приведет к ошибкам OOM или, в лучшем случае, медленно.

Я настоятельно рекомендую использовать FileUtil.copyMerge() из API Hadoop. Это объединит выходы в один файл.

EDIT - это эффективно приводит данные к драйверу, а не к узлу-исполнителю. Coalesce() будет хорошо, если один исполнитель имеет больше оперативной памяти для использования, чем драйвер.

EDIT 2: copyMerge() удаляется в Hadoop 3.0. Дополнительную информацию о работе с новейшей версией см. В следующей статье: Hadoop, как сделать CopyMerge в Hadoop 3.0

Ответ 4

Если вы используете блоки данных и можете разместить все данные в ОЗУ на одном работнике (и, следовательно, можете использовать .coalesce(1)), вы можете использовать dbfs, чтобы найти и переместить полученный CSV файл:

val fileprefix= "/mnt/aws/path/file-prefix"

dataset
  .coalesce(1)       
  .write             
//.mode("overwrite") // I usually don't use this, but you may want to.
  .option("header", "true")
  .option("delimiter","\t")
  .csv(fileprefix+".tmp")

val partition_path = dbutils.fs.ls(fileprefix+".tmp/")
     .filter(file=>file.name.endsWith(".csv"))(0).path

dbutils.fs.cp(partition_path,fileprefix+".tab")

dbutils.fs.rm(fileprefix+".tmp",recurse=true)

Если ваш файл не помещается в ОЗУ на рабочем месте, вы можете рассмотреть предложение chaotic3quilibrium использовать FileUtils.copyMerge(). Я этого не делал и пока не знаю, возможно ли это или нет, например, на S3.

Этот ответ основан на предыдущих ответах на этот вопрос, а также на моих собственных тестах предоставленного фрагмента кода. Я первоначально отправил это в Databricks и переиздаю это здесь.

Лучшая документация по рекурсивной опции dbfs rm, которую я нашел, находится на форуме Databricks.

Ответ 5

перераспределить/объединить до 1 раздела перед сохранением (вы все равно получите папку, но в ней будет один файл детали)

Ответ 6

вы можете использовать rdd.coalesce(1, true).saveAsTextFile(path)

он будет хранить данные в виде файла-исполнителя в пути/part-00000

Ответ 7

Решение, которое работает для S3, модифицированное от Minkymorgan.

Просто передайте путь к временному разделенному каталогу (с другим именем, отличным от окончательного) в качестве srcPath а единственный конечный csv/txt в качестве destPath Укажите также deleteSource если вы хотите удалить исходный каталог.

/**
* Merges multiple partitions of spark text file output into single file. 
* @param srcPath source directory of partitioned files
* @param dstPath output path of individual path
* @param deleteSource whether or not to delete source directory after merging
* @param spark sparkSession
*/
def mergeTextFiles(srcPath: String, dstPath: String, deleteSource: Boolean): Unit =  {
  import org.apache.hadoop.fs.FileUtil
  import java.net.URI
  val config = spark.sparkContext.hadoopConfiguration
  val fs: FileSystem = FileSystem.get(new URI(srcPath), config)
  FileUtil.copyMerge(
    fs, new Path(srcPath), fs, new Path(dstPath), deleteSource, config, null
  )
}

Ответ 8

Вы можете попробовать сделать это

df.coalesce(1).
    write.option("header","true").
    csv("/path/new_folder")

Файл csv будет создан внутри new_folder

Ответ 9

API-интерфейс spark df.write() создаст несколько файлов df.write() внутри заданного пути..., чтобы заставить spark записывать только один файл df.coalesce(1).write.csv(...) используйте df.coalesce(1).write.csv(...) вместо df.repartition(1).write.csv(...) поскольку coalesce является узким преобразованием, тогда как перераспределение является широким преобразованием, см. Spark - repartition() vs coalesce()

df.coalesce(1).write.csv(filepath,header=True) 

создаст папку в заданном пути к файлу с одним использованием part-0001-...-c000.csv

cat filepath/part-0001-...-c000.csv > filename_you_want.csv 

иметь понятное имя пользователя

Ответ 10

Существует еще один способ использования Java

import java.io._

def printToFile(f: java.io.File)(op: java.io.PrintWriter => Unit) 
  {
     val p = new java.io.PrintWriter(f);  
     try { op(p) } 
     finally { p.close() }
  } 

printToFile(new File("C:/TEMP/df.csv")) { p => df.collect().foreach(p.println)}