Подтвердить что ты не робот

Как сделать saveAsTextFile НЕ разделить вывод на несколько файлов?

При использовании Scala в Spark всякий раз, когда я выгружаю результаты с помощью saveAsTextFile, он, похоже, разбивает вывод на несколько частей. Я просто передаю ему параметр (путь).

val year = sc.textFile("apat63_99.txt").map(_.split(",")(1)).flatMap(_.split(",")).map((_,1)).reduceByKey((_+_)).map(_.swap)
year.saveAsTextFile("year")
  • Соответствует ли количество выходов количеству используемых им редукторов?
  • Означает ли это, что результат сжат?
  • Я знаю, что я могу объединить вывод вместе с помощью bash, но есть ли возможность сохранить вывод в одном текстовом файле без разделения? Я просмотрел документы API, но в нем мало говорится об этом.
4b9b3361

Ответ 1

Причина, по которой он сохраняет это, поскольку несколько файлов - это потому, что вычисление распределено. Если выход достаточно мал, чтобы вы считали, что можете поместить его на одну машину, вы можете завершить свою программу с помощью

val arr = year.collect()

И затем сохраните результирующий массив в виде файла. Другим способом было бы использовать пользовательский разделитель, partitionBy и сделать так, чтобы все переходило в один раздел хотя это нецелесообразно, потому что вы не получите никакой распараллеливания.

Если вам требуется сохранить файл с помощью saveAsTextFile, вы можете использовать coalesce(1,true).saveAsTextFile(). Это в основном означает, что вычисление затем объединяется в 1 раздел. Вы также можете использовать repartition(1), который является всего лишь оболочкой для coalesce с аргументом shuffle, установленным в true. Просматривая источник RDD.scala, как я понял, что из этого вышло, вы должны взглянуть.

Ответ 2

Вы можете вызвать coalesce(1), а затем saveAsTextFile() - но это может быть плохой идеей, если у вас много данных. Отдельные файлы на расщепление генерируются так же, как и в Hadoop, чтобы отдельные отпечатки и редукторы записывались в разные файлы. Наличие одного выходного файла является хорошей идеей, если у вас очень мало данных, и в этом случае вы могли бы собрать() также, как сказал @aaronman.

Ответ 3

Для тех, кто работает с большим набором данных:

  • rdd.collect() случае не следует использовать rdd.collect() поскольку он будет собирать все данные в виде Array в драйвере, что является самым простым способом выхода из памяти.

  • rdd.coalesce(1).saveAsTextFile() также не следует использовать, поскольку параллелизм восходящих этапов будет потерян для выполнения на одном узле, откуда будут храниться данные.

  • rdd.coalesce(1, shuffle = true).saveAsTextFile() - лучший простой вариант, поскольку он будет поддерживать параллельную обработку задач в восходящем направлении, а затем выполнять только перемешивание для одного узла (rdd.repartition(1).saveAsTextFile() точный синоним).

  • rdd.saveAsSingleTextFile() как указано ниже, дополнительно позволяет хранить rdd в одном файле с определенным именем, сохраняя свойства параллелизма rdd.coalesce(1, shuffle = true).saveAsTextFile().


Что-то, что может быть неудобно с rdd.coalesce(1, shuffle = true).saveAsTextFile("path/to/file.txt") - то, что это фактически производит файл, путь которого является path/to/file.txt/part-00000 а не path/to/file.txt.

Следующее решение rdd.saveAsSingleTextFile("path/to/file.txt") фактически создаст файл, путь которого - path/to/file.txt:

package com.whatever.package

import org.apache.spark.rdd.RDD
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import org.apache.hadoop.io.compress.CompressionCodec

object SparkHelper {

  // This is an implicit class so that saveAsSingleTextFile can be attached to
  // SparkContext and be called like this: sc.saveAsSingleTextFile
  implicit class RDDExtensions(val rdd: RDD[String]) extends AnyVal {

    def saveAsSingleTextFile(path: String): Unit =
      saveAsSingleTextFileInternal(path, None)

    def saveAsSingleTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit =
      saveAsSingleTextFileInternal(path, Some(codec))

    private def saveAsSingleTextFileInternal(
        path: String, codec: Option[Class[_ <: CompressionCodec]]
    ): Unit = {

      // The interface with hdfs:
      val hdfs = FileSystem.get(rdd.sparkContext.hadoopConfiguration)

      // Classic saveAsTextFile in a temporary folder:
      hdfs.delete(new Path(s"$path.tmp"), true) // to make sure it not there already
      codec match {
        case Some(codec) => rdd.saveAsTextFile(s"$path.tmp", codec)
        case None        => rdd.saveAsTextFile(s"$path.tmp")
      }

      // Merge the folder of resulting part-xxxxx into one file:
      hdfs.delete(new Path(path), true) // to make sure it not there already
      FileUtil.copyMerge(
        hdfs, new Path(s"$path.tmp"),
        hdfs, new Path(path),
        true, rdd.sparkContext.hadoopConfiguration, null
      )
      // Working with Hadoop 3?: https://stackoverflow.com/a/50545815/9297144

      hdfs.delete(new Path(s"$path.tmp"), true)
    }
  }
}

который можно использовать таким образом:

import com.whatever.package.SparkHelper.RDDExtensions

rdd.saveAsSingleTextFile("path/to/file.txt")
// Or if the produced file is to be compressed:
import org.apache.hadoop.io.compress.GzipCodec
rdd.saveAsSingleTextFile("path/to/file.txt.gz", classOf[GzipCodec])

Этот фрагмент:

  • Сначала сохраняет rdd с помощью rdd.saveAsTextFile("path/to/file.txt") во временной папке path/to/file.txt.tmp как если бы мы не хотели хранить данные в одном файле (который сохраняет обработку параллельных заданий)

  • И только затем, используя api файловой системы hadoop, мы продолжаем слияние (FileUtil.copyMerge()) различных выходных файлов, чтобы создать наш окончательный выходной path/to/file.txt одному файлу path/to/file.txt.

Ответ 4

Как уже упоминалось, вы можете собрать или объединить набор данных, чтобы заставить Spark создать один файл. Но это также ограничивает количество задач Spark, которые могут работать на вашем наборе данных параллельно. Я предпочитаю, чтобы он создавал сто файлов в выходном каталоге HDFS, затем используйте hadoop fs -getmerge /hdfs/dir /local/file.txt для извлечения результатов в один файл в локальной файловой системе. Это имеет наибольший смысл, когда ваш вывод является относительно небольшим отчетом, конечно.

Ответ 5

Вы можете позвонить repartition() и следовать этому пути:

val year = sc.textFile("apat63_99.txt").map(_.split(",")(1)).flatMap(_.split(",")).map((_,1)).reduceByKey((_+_)).map(_.swap)

var repartitioned = year.repartition(1)
repartitioned.saveAsTextFile("C:/Users/TheBhaskarDas/Desktop/wc_spark00")

введите описание изображения здесь

Ответ 6

Вы сможете сделать это в следующей версии Spark, в текущей версии 1.0.0 это невозможно, если вы не сделаете это вручную так или иначе, например, как вы упомянули, с вызовом bash script,

Ответ 7

Я также хочу упомянуть, что в документации четко указано, что пользователи должны быть осторожны при вызове coalesce с реальным небольшим количеством разделов. это может привести к тому, что восходящие разделы наследуют это количество разделов.

Я бы не рекомендовал использовать coalesce (1), если это действительно не требуется.

Ответ 8

В Spark 1.6.1 формат показан ниже. Он создает один выходной файл. Лучше всего использовать его, если выход достаточно мал, чтобы обрабатывать. В основном, что он делает, так это то, что он возвращает новое RDD, которое сводится к разделам numPartitions. Если вы делаете радикальное объединение, например to numPartitions = 1, это может привести к тому, что ваши вычисления будут выполняться на меньшем количестве узлов, чем вам нравится (например, один node в случае numPartitions = 1)

pair_result.coalesce(1).saveAsTextFile("/app/data/")

Ответ 9

Здесь мой ответ для вывода одного файла. Я просто добавил coalesce(1)

val year = sc.textFile("apat63_99.txt")
              .map(_.split(",")(1))
              .flatMap(_.split(","))
              .map((_,1))
              .reduceByKey((_+_)).map(_.swap)
year.saveAsTextFile("year")

код:

year.coalesce(1).saveAsTextFile("year")