Подтвердить что ты не робот

Запись на несколько выходов с помощью ключа Spark - одно искровое задание

Как вы можете писать на несколько выходов в зависимости от ключа, используя Spark в одном задании.

Связано: пишите на несколько выходов с помощью ключа Scalding Hadoop, один MapReduce Job

Например

sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
.writeAsMultiple(prefix, compressionCodecOption)

будет гарантировать, что cat prefix/1

a
b

и cat prefix/2 будут

c

Ответ

Для точного ответа с полным импортом, сутенером и кодеком сжатия, см. qaru.site/info/112819/...

4b9b3361

Ответ 1

Это включает в себя кодек по запросу, необходимый импорт и сутенер в соответствии с запросом.

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext

// TODO Need a macro to generate for each Tuple length, or perhaps can use shapeless
implicit class PimpedRDD[T1, T2](rdd: RDD[(T1, T2)]) {
  def writeAsMultiple(prefix: String, codec: String,
                      keyName: String = "key")
                     (implicit sqlContext: SQLContext): Unit = {
    import sqlContext.implicits._

    rdd.toDF(keyName, "_2").write.partitionBy(keyName)
    .format("text").option("codec", codec).save(prefix)
  }
}

val myRdd = sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec")

Одно тонкое отличие от OP состоит в том, что он будет префикс <keyName>= для имен каталогов. Например.

myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec")

Давал бы:

prefix/key=1/part-00000
prefix/key=2/part-00000

где prefix/my_number=1/part-00000 будет содержать строки a и b, а prefix/my_number=2/part-00000 будет содержать строку c.

и

myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec", "foo")

Давал бы:

prefix/foo=1/part-00000
prefix/foo=2/part-00000

Должно быть понятно, как отредактировать для parquet.

Наконец, ниже приведен пример для Dataset, что, возможно, лучше, чем использование Tuples.

implicit class PimpedDataset[T](dataset: Dataset[T]) {
  def writeAsMultiple(prefix: String, codec: String, field: String): Unit = {
    dataset.write.partitionBy(field)
    .format("text").option("codec", codec).save(prefix)
  }
}

Ответ 2

Если вы используете Spark 1.4+, это стало намного проще, благодаря API DataFrame. (DataFrames были введены в Spark 1.3, но partitionBy(), который нам нужен, был представлен в 1.4.)

Если вы начинаете с RDD, сначала нужно преобразовать его в DataFrame:

val people_rdd = sc.parallelize(Seq((1, "alice"), (1, "bob"), (2, "charlie")))
val people_df = people_rdd.toDF("number", "name")

В Python этот же код:

people_rdd = sc.parallelize([(1, "alice"), (1, "bob"), (2, "charlie")])
people_df = people_rdd.toDF(["number", "name"])

Как только у вас есть DataFrame, запись на несколько выходов на основе конкретного ключа проста. Что еще - и это красота API DataFrame - код почти одинаковый для Python, Scala, Java и R:

people_df.write.partitionBy("number").text("people")

И вы можете легко использовать другие форматы вывода, если хотите:

people_df.write.partitionBy("number").json("people-json")
people_df.write.partitionBy("number").parquet("people-parquet")

В каждом из этих примеров Spark создаст подкаталог для каждого из ключей, которые мы разделили DataFrame на:

people/
  _SUCCESS
  number=1/
    part-abcd
    part-efgh
  number=2/
    part-abcd
    part-efgh

Ответ 3

Я бы сделал это так, как это масштабируется

import org.apache.hadoop.io.NullWritable

import org.apache.spark._
import org.apache.spark.SparkContext._

import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat

class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
  override def generateActualKey(key: Any, value: Any): Any = 
    NullWritable.get()

  override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = 
    key.asInstanceOf[String]
}

object Split {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Split" + args(1))
    val sc = new SparkContext(conf)
    sc.textFile("input/path")
    .map(a => (k, v)) // Your own implementation
    .partitionBy(new HashPartitioner(num))
    .saveAsHadoopFile("output/path", classOf[String], classOf[String],
      classOf[RDDMultipleTextOutputFormat])
    spark.stop()
  }
}

Просто увидел аналогичный ответ выше, но на самом деле нам не нужны настроенные разделы. MultipleTextOutputFormat создаст файл для каждого ключа. Это нормально, что несколько записей с теми же ключами попадают в один раздел.

новый HashPartitioner (num), где num - номер раздела, который вы хотите. В случае, если у вас есть большое количество разных ключей, вы можете установить число в большое. В этом случае в каждом разделе не будет открыто слишком много обработчиков файлов hdfs.

Ответ 4

Если у вас потенциально много значений для данного ключа, я думаю, что масштабируемое решение состоит в том, чтобы выписать один файл за ключ для каждого раздела. К сожалению, в Spark нет встроенной поддержки, но мы можем что-то взломать.

sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
  .mapPartitionsWithIndex { (p, it) =>
    val outputs = new MultiWriter(p.toString)
    for ((k, v) <- it) {
      outputs.write(k.toString, v)
    }
    outputs.close
    Nil.iterator
  }
  .foreach((x: Nothing) => ()) // To trigger the job.

// This one is Local, but you could write one for HDFS
class MultiWriter(suffix: String) {
  private val writers = collection.mutable.Map[String, java.io.PrintWriter]()
  def write(key: String, value: Any) = {
    if (!writers.contains(key)) {
      val f = new java.io.File("output/" + key + "/" + suffix)
      f.getParentFile.mkdirs
      writers(key) = new java.io.PrintWriter(f)
    }
    writers(key).println(value)
  }
  def close = writers.values.foreach(_.close)
}

(Замените PrintWriter на ваш выбор работы с распределенной файловой системой.)

Это делает один проход над RDD и не выполняет тасование. Он дает вам один каталог на ключ, с количеством файлов внутри каждого.

Ответ 5

saveAsText() и saveAsHadoop (...) реализованы на основе данных RDD, в частности по методу: PairRDD.saveAsHadoopDataset, который принимает данные из PairRdd, где он выполняется. Я вижу два возможных варианта. Если ваши данные относительно невелики по размеру, вы можете сэкономить некоторое время реализации, объединив RDD, создав новый RDD из каждой коллекции и используя этот RDD для записи данных. Что-то вроде этого:

val byKey = dataRDD.groupByKey().collect()
val rddByKey = byKey.map{case (k,v) => k->sc.makeRDD(v.toSeq)}
val rddByKey.foreach{ case (k,rdd) => rdd.saveAsText(prefix+k}

Обратите внимание, что он не будет работать для больших наборов данных b/c, что материализация итератора в v.toSeq может не соответствовать памяти.

Другой вариант, который я вижу, и на самом деле тот, который я бы рекомендовал в этом случае, - это: сворачивать свой собственный, путем прямого вызова apoop/hdfs api.

Здесь я начал обсуждение, исследуя этот вопрос: Как создать RDD из другого RDD?

Ответ 6

У меня есть аналогичная потребность и нашел способ. Но у него есть один недостаток (что не является проблемой для моего случая): вам нужно переразделить данные с одним разделом на выходной файл.

Для разделения таким образом обычно требуется заранее знать, сколько файлов будет выводиться задание, и найти функцию, которая будет отображать каждую клавишу в каждый раздел.

Сначала создадим наш класс на основе MultipleTextOutputFormat:

import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat

class KeyBasedOutput[T >: Null, V <: AnyRef] extends MultipleTextOutputFormat[T , V] {
  override def generateFileNameForKeyValue(key: T, value: V, leaf: String) = {
    key.toString
  }
  override protected def generateActualKey(key: T, value: V) = {
    null
  }
}

С этим классом Spark получит ключ от раздела (первый/последний, я думаю) и назову файл с помощью этого ключа, поэтому не рекомендуется смешивать несколько ключей в одном разделе.

В вашем примере вам понадобится пользовательский разделитель. Это сделает работу:

import org.apache.spark.Partitioner

class IdentityIntPartitioner(maxKey: Int) extends Partitioner {
  def numPartitions = maxKey

  def getPartition(key: Any): Int = key match {
    case i: Int if i < maxKey => i
  }
}

Теперь давайте все вместе:

val rdd = sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c"), (7, "d"), (7, "e")))

// You need to know the max number of partitions (files) beforehand
// In this case we want one partition per key and we have 3 keys,
// with the biggest key being 7, so 10 will be large enough
val partitioner = new IdentityIntPartitioner(10)

val prefix = "hdfs://.../prefix"

val partitionedRDD = rdd.partitionBy(partitioner)

partitionedRDD.saveAsHadoopFile(prefix,
    classOf[Integer], classOf[String], classOf[KeyBasedOutput[Integer, String]])

Это будет генерировать 3 файла под префиксом (с именами 1, 2 и 7), обрабатывая все за один проход.

Как вы можете видеть, вам нужно знать свои ключи, чтобы иметь возможность использовать это решение.

Для меня это было проще, потому что мне нужен был один выходной файл для каждого хэша ключа, и количество файлов находилось под моим контролем, поэтому я мог использовать запасной файл HashPartitioner, чтобы сделать трюк.

Ответ 7

У меня был аналогичный вариант использования, когда я разбил входной файл на Hadoop HDFS на несколько файлов на основе ключа (1 файл за ключ). Вот мой scala код для искры

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

val hadoopconf = new Configuration();
val fs = FileSystem.get(hadoopconf);

@serializable object processGroup {
    def apply(groupName:String, records:Iterable[String]): Unit = {
        val outFileStream = fs.create(new Path("/output_dir/"+groupName))
        for( line <- records ) {
                outFileStream.writeUTF(line+"\n")
            }
        outFileStream.close()
    }
}
val infile = sc.textFile("input_file")
val dateGrouped = infile.groupBy( _.split(",")(0))
dateGrouped.foreach( (x) => processGroup(x._1, x._2))

Я сгруппировал записи на основе ключа. Значения для каждого ключа записываются в отдельный файл.

Ответ 8

Я нуждался в том же самом в Java. Отправьте мой перевод Zhang Zhan Scala ответ на пользователей Spark Java API:

import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.Arrays;


class RDDMultipleTextOutputFormat<A, B> extends MultipleTextOutputFormat<A, B> {

    @Override
    protected String generateFileNameForKeyValue(A key, B value, String name) {
        return key.toString();
    }
}

public class Main {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("Split Job")
                .setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        String[] strings = {"Abcd", "Azlksd", "whhd", "wasc", "aDxa"};
        sc.parallelize(Arrays.asList(strings))
                // The first character of the string is the key
                .mapToPair(s -> new Tuple2<>(s.substring(0,1).toLowerCase(), s))
                .saveAsHadoopFile("output/", String.class, String.class,
                        RDDMultipleTextOutputFormat.class);
        sc.stop();
    }
}

Ответ 9

хорошие новости для пользователя python в случае, если у вас много столбцов, и вы хотите сохранить все остальные столбцы, не разбитые на разделы в формате csv, которые не удастся, если вы используете метод "текст", как предложение Ника Чаммаса.

people_df.write.partitionBy("number").text("people") 

сообщение об ошибке "AnalysisException: источник данных u'Text поддерживает только один столбец, и у вас есть 2 столбца.;"

In spark 2.0.0 (моя тестовая среда - это hdp spark 2.0.0), теперь пакет "com.databricks.spark.csv" интегрирован и позволяет нам сохранять текстовый файл, разделенный только на один столбец, см. пример удара

people_rdd = sc.parallelize([(1,"2016-12-26", "alice"),
                             (1,"2016-12-25", "alice"),
                             (1,"2016-12-25", "tom"), 
                             (1, "2016-12-25","bob"), 
                             (2,"2016-12-26" ,"charlie")])
df = people_rdd.toDF(["number", "date","name"])

df.coalesce(1).write.partitionBy("number").mode("overwrite").format('com.databricks.spark.csv').options(header='false').save("people")

[[email protected] people]# tree
.
├── number=1
│?? └── part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv
├── number=2
│?? └── part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv
└── _SUCCESS

[[email protected] people]# cat number\=1/part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv
2016-12-26,alice
2016-12-25,alice
2016-12-25,tom
2016-12-25,bob
[[email protected] people]# cat number\=2/part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv
2016-12-26,charlie

В моей среде 1.6.1 среды код не выдавал никакой ошибки, однако это только один файл. он не разбивается на две папки.

Надеюсь, это поможет.

Ответ 10

У меня был аналогичный вариант использования. Я разрешил его на Java, написав два пользовательских класса, реализующих MultipleTextOutputFormat и RecordWriter.

Мой вход был JavaPairRDD<String, List<String>>, и я хотел сохранить его в файле с именем по его ключу со всеми строками, содержащимися в его значении.

Вот код для моей реализации MultipleTextOutputFormat

class RDDMultipleTextOutputFormat<K, V> extends MultipleTextOutputFormat<K, V> {

    @Override
    protected String generateFileNameForKeyValue(K key, V value, String name) {
        return key.toString(); //The return will be used as file name
    }

    /** The following 4 functions are only for visibility purposes                 
    (they are used in the class MyRecordWriter) **/
    protected String generateLeafFileName(String name) {
        return super.generateLeafFileName(name);
    }

    protected V generateActualValue(K key, V value) {
        return super.generateActualValue(key, value);
    }

    protected String getInputFileBasedOutputFileName(JobConf job,     String name) {
        return super.getInputFileBasedOutputFileName(job, name);
        }

    protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs, JobConf job, String name, Progressable arg3) throws IOException {
        return super.getBaseRecordWriter(fs, job, name, arg3);
    }

    /** Use my custom RecordWriter **/
    @Override
    RecordWriter<K, V> getRecordWriter(final FileSystem fs, final JobConf job, String name, final Progressable arg3) throws IOException {
    final String myName = this.generateLeafFileName(name);
        return new MyRecordWriter<K, V>(this, fs, job, arg3, myName);
    }
} 

Вот код для моей реализации RecordWriter.

class MyRecordWriter<K, V> implements RecordWriter<K, V> {

    private RDDMultipleTextOutputFormat<K, V> rddMultipleTextOutputFormat;
    private final FileSystem fs;
    private final JobConf job;
    private final Progressable arg3;
    private String myName;

    TreeMap<String, RecordWriter<K, V>> recordWriters = new TreeMap();

    MyRecordWriter(RDDMultipleTextOutputFormat<K, V> rddMultipleTextOutputFormat, FileSystem fs, JobConf job, Progressable arg3, String myName) {
        this.rddMultipleTextOutputFormat = rddMultipleTextOutputFormat;
        this.fs = fs;
        this.job = job;
        this.arg3 = arg3;
        this.myName = myName;
    }

    @Override
    void write(K key, V value) throws IOException {
        String keyBasedPath = rddMultipleTextOutputFormat.generateFileNameForKeyValue(key, value, myName);
        String finalPath = rddMultipleTextOutputFormat.getInputFileBasedOutputFileName(job, keyBasedPath);
        Object actualValue = rddMultipleTextOutputFormat.generateActualValue(key, value);
        RecordWriter rw = this.recordWriters.get(finalPath);
        if(rw == null) {
            rw = rddMultipleTextOutputFormat.getBaseRecordWriter(fs, job, finalPath, arg3);
            this.recordWriters.put(finalPath, rw);
        }
        List<String> lines = (List<String>) actualValue;
        for (String line : lines) {
            rw.write(null, line);
        }
    }

    @Override
    void close(Reporter reporter) throws IOException {
        Iterator keys = this.recordWriters.keySet().iterator();

        while(keys.hasNext()) {
            RecordWriter rw = (RecordWriter)this.recordWriters.get(keys.next());
            rw.close(reporter);
        }

        this.recordWriters.clear();
    }
}

Большая часть кода точно такая же, как в FileOutputFormat. Единственное отличие состоит в том, что несколько строк

List<String> lines = (List<String>) actualValue;
for (String line : lines) {
    rw.write(null, line);
}

Эти строки позволили мне написать каждую строку моего ввода List<String> в файле. Первый аргумент функции write установлен на null, чтобы избежать нажатия клавиши на каждой строке.

Чтобы закончить, мне нужно только сделать этот вызов, чтобы написать мои файлы

javaPairRDD.saveAsHadoopFile(path, String.class, List.class, RDDMultipleTextOutputFormat.class);