Подтвердить что ты не робот

Apache Spark: разделение пары RDD на несколько RDD с помощью ключа для сохранения значений

Я использую Spark 1.0.1 для обработки большого количества данных. Каждая строка содержит идентификационный номер, некоторые с дублирующимися идентификаторами. Я хочу сохранить все строки с тем же идентификационным номером в одном месте, но у меня возникают проблемы с его эффективностью. Я создаю RDD [(String, String)] из (пары ID number, data row):

val mapRdd = rdd.map{ x=> (x.split("\\t+")(1), x)} 

Способ, который работает, но не работает, заключается в сборе идентификационных номеров, фильтрации RDD для каждого идентификатора и сохранении RDD значений с тем же идентификатором, что и текстовый файл.

val ids = rdd.keys.distinct.collect
ids.foreach({ id =>
    val dataRows = mapRdd.filter(_._1 == id).values
    dataRows.saveAsTextFile(id)
})

Я также попробовал groupByKey или reduceByKey, чтобы каждый кортеж в RDD содержал уникальный идентификационный номер в качестве ключа и строку комбинированных строк данных, разделенных новыми строками для этого идентификационного номера. Я хочу выполнять итерацию через RDD только один раз, используя foreach для сохранения данных, но он не может давать значения как RDD

groupedRdd.foreach({ tup =>
  val data = sc.parallelize(List(tup._2)) //nested RDD does not work
  data.saveAsTextFile(tup._1)
})

По сути, я хочу разделить RDD на несколько RDD с помощью идентификационного номера и сохранить значения для этого идентификационного номера в своем собственном местоположении.

4b9b3361

Ответ 1

Я думаю, что эта проблема аналогична Запись на несколько выходов с помощью ключа Spark - одно задание Spark

Пожалуйста, ответьте там.

import org.apache.hadoop.io.NullWritable

import org.apache.spark._
import org.apache.spark.SparkContext._

import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat

class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
  override def generateActualKey(key: Any, value: Any): Any = 
    NullWritable.get()

  override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = 
    key.asInstanceOf[String]
}

object Split {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Split" + args(1))
    val sc = new SparkContext(conf)
    sc.textFile("input/path")
    .map(a => (k, v)) // Your own implementation
    .partitionBy(new HashPartitioner(num))
    .saveAsHadoopFile("output/path", classOf[String], classOf[String],
      classOf[RDDMultipleTextOutputFormat])
    spark.stop()
  }
}

Просто увидел аналогичный ответ выше, но на самом деле нам не нужны настроенные разделы. MultipleTextOutputFormat создаст файл для каждого ключа. Это нормально, что несколько записей с теми же ключами попадают в один раздел.

новый HashPartitioner (num), где num - номер раздела, который вы хотите. В случае, если у вас есть большое количество разных ключей, вы можете установить число в большое. В этом случае в каждом разделе не будет открыто слишком много обработчиков файлов hdfs.

Ответ 2

Это сохранит данные для каждого идентификатора пользователя

val mapRdd = rdd.map{ x=> (x.split("\\t+")(1),
x)}.groupByKey(numPartitions).saveAsObjectFile("file")

Если вам нужно снова получить данные на основе идентификатора пользователя, вы можете сделать что-то вроде

val userIdLookupTable = sc.objectFile("file").cache() //could use persist() if data is to big for memory  
val data = userIdLookupTable.lookup(id) //note this returns a sequence, in this case you can just get the first one  

Обратите внимание, что нет особых причин для сохранения файла в этом случае, я просто сделал это с тех пор, как OP запросил его, так как сохранение сохранения в файл позволяет загружать RDD в любое время после того, как начальная группировка было сделано.

Последнее, lookup быстрее, чем подход фильтра к доступу к идентификаторам, но если вы готовы отказаться от запроса на тягу от искры, вы можете проверить этот ответ для более быстрого подхода

Ответ 3

вы можете напрямую вызвать saveAsTextFile в сгруппированном RDD, здесь он будет сохранять данные на основе разделов, я имею в виду, если у вас есть 4 разных идентификатора, и вы указали число разделенных групп сгруппированных RDD как 4, затем искроберируйте каждый раздел данных в один файл (так что вы можете иметь только один идентификатор файла), вы можете даже видеть данные как iterables eachId в файловой системе.