Подтвердить что ты не робот

Как экспортировать данные из Spark SQL в CSV

Эта команда работает с HiveQL:

insert overwrite directory '/data/home.csv' select * from testtable;

Но с Spark SQL я получаю сообщение об ошибке с трассировкой стека org.apache.spark.sql.hive.HiveQl:

java.lang.RuntimeException: Unsupported language features in query:
    insert overwrite directory '/data/home.csv' select * from testtable

Пожалуйста, направляйте меня, чтобы написать экспорт в CSV-функцию в Spark SQL.

4b9b3361

Ответ 1

Вы можете использовать инструкцию ниже для записи содержимого фрейма данных в формате CSV df.write.csv("/data/home/csv")

Если вам нужно записать весь файл данных в один файл CSV, используйте df.coalesce(1).write.csv("/data/home/sample.csv")

Для spark 1.x, вы можете использовать spark-csv для записи результатов в CSV файлы

Ниже scala фрагмент поможет

import org.apache.spark.sql.hive.HiveContext
// sc - existing spark context
val sqlContext = new HiveContext(sc)
val df = sqlContext.sql("SELECT * FROM testtable")
df.write.format("com.databricks.spark.csv").save("/data/home/csv")

Чтобы записать содержимое в один файл

import org.apache.spark.sql.hive.HiveContext
// sc - existing spark context
val sqlContext = new HiveContext(sc)
val df = sqlContext.sql("SELECT * FROM testtable")
df.coalesce(1).write.format("com.databricks.spark.csv").save("/data/home/sample.csv")

Ответ 2

Обратите внимание: как говорится в комментариях, он создает каталог с таким именем, а не стандартный CSV файл.


Поскольку Spark 2.X spark-csv интегрирован как собственный источник данных. Следовательно, необходимое утверждение упрощается до (windows)

df.write
  .option("header", "true")
  .csv("file:///C:/out.csv")

или UNIX

df.write
  .option("header", "true")
  .csv("/var/out.csv")

Ответ 3

Ответ выше с spark-csv правильный, но есть проблема - библиотека создает несколько файлов на основе разбиения фреймов данных. И это не то, что нам обычно нужно. Таким образом, вы можете объединить все разделы в один:

df.coalesce(1).
    write.
    format("com.databricks.spark.csv").
    option("header", "true").
    save("myfile.csv")

и переименуйте вывод lib (name "part-00000" ) в желаемое имя файла.

В этом сообщении блога содержится более подробная информация: https://fullstackml.com/2015/12/21/how-to-export-data-frame-from-apache-spark/

Ответ 4

Простейшим способом является отображение над RDD DataFrame и использование mkString:

  df.rdd.map(x=>x.mkString(","))

Начиная с Spark 1.5 (или даже до этого) df.map(r=>r.mkString(",")) будет делать то же самое если вы хотите сбрасывать CSV, вы можете использовать для этого apache commons lang. например здесь код, который мы используем

 def DfToTextFile(path: String,
                   df: DataFrame,
                   delimiter: String = ",",
                   csvEscape: Boolean = true,
                   partitions: Int = 1,
                   compress: Boolean = true,
                   header: Option[String] = None,
                   maxColumnLength: Option[Int] = None) = {

    def trimColumnLength(c: String) = {
      val col = maxColumnLength match {
        case None => c
        case Some(len: Int) => c.take(len)
      }
      if (csvEscape) StringEscapeUtils.escapeCsv(col) else col
    }
    def rowToString(r: Row) = {
      val st = r.mkString("~-~").replaceAll("[\\p{C}|\\uFFFD]", "") //remove control characters
      st.split("~-~").map(trimColumnLength).mkString(delimiter)
    }

    def addHeader(r: RDD[String]) = {
      val rdd = for (h <- header;
                     if partitions == 1; //headers only supported for single partitions
                     tmpRdd = sc.parallelize(Array(h))) yield tmpRdd.union(r).coalesce(1)
      rdd.getOrElse(r)
    }

    val rdd = df.map(rowToString).repartition(partitions)
    val headerRdd = addHeader(rdd)

    if (compress)
      headerRdd.saveAsTextFile(path, classOf[GzipCodec])
    else
      headerRdd.saveAsTextFile(path)
  }

Ответ 5

В сообщении об ошибке указано, что это не поддерживается в языке запросов. Но вы можете сохранить DataFrame в любом формате, как обычно, через интерфейс RDD (df.rdd.saveAsTextFile). Или вы можете проверить https://github.com/databricks/spark-csv.

Ответ 6

С помощью spark-csv мы можем записать в файл CSV.

val dfsql = sqlContext.sql("select * from tablename")
dfsql.write.format("com.databricks.spark.csv").option("header","true").save("output.csv")'

Ответ 7

введите код здесь В ДАННЫХ:

val p=spark.read.format("csv").options(Map("header"->"true","delimiter"->"^")).load("filename.csv")