Я использую Spark 1.0.1 для обработки большого количества данных. Каждая строка содержит идентификационный номер, некоторые с дублирующимися идентификаторами. Я хочу сохранить все строки с тем же идентификационным номером в одном месте, но у меня возникают проблемы с его эффективностью. Я создаю RDD [(String, String)] из (пары ID number, data row):
val mapRdd = rdd.map{ x=> (x.split("\\t+")(1), x)}
Способ, который работает, но не работает, заключается в сборе идентификационных номеров, фильтрации RDD для каждого идентификатора и сохранении RDD значений с тем же идентификатором, что и текстовый файл.
val ids = rdd.keys.distinct.collect
ids.foreach({ id =>
val dataRows = mapRdd.filter(_._1 == id).values
dataRows.saveAsTextFile(id)
})
Я также попробовал groupByKey или reduceByKey, чтобы каждый кортеж в RDD содержал уникальный идентификационный номер в качестве ключа и строку комбинированных строк данных, разделенных новыми строками для этого идентификационного номера. Я хочу выполнять итерацию через RDD только один раз, используя foreach для сохранения данных, но он не может давать значения как RDD
groupedRdd.foreach({ tup =>
val data = sc.parallelize(List(tup._2)) //nested RDD does not work
data.saveAsTextFile(tup._1)
})
По сути, я хочу разделить RDD на несколько RDD с помощью идентификационного номера и сохранить значения для этого идентификационного номера в своем собственном местоположении.