Подтвердить что ты не робот

Как выполнить повторение RDD в искры apache (scala)

Я использую следующую команду, чтобы заполнить RDD множеством массивов, содержащих 2 строки [ "filename", "content" ].

Теперь я хочу перебирать все эти вхождения, чтобы что-то делать с каждым именем и содержимым.

val someRDD = sc.wholeTextFiles("hdfs://localhost:8020/user/cloudera/*")

Кажется, я не могу найти документацию о том, как это сделать.

Итак, я хочу:

foreach occurrence-in-the-rdd{
   //do stuff with the array found on loccation n of the RDD
} 
4b9b3361

Ответ 1

Основные операции в Spark: map и filter.

val txtRDD = someRDD filter { case(id, content) => id.endsWith(".txt") }

txtRDD теперь будет содержать только файлы с расширением ".txt"

И если вы хотите, чтобы слова подсчитывали эти файлы, вы можете сказать

//split the documents into words in one long list
val words = txtRDD flatMap { case (id,text) => text.split("\\s+") }
// give each word a count of 1
val wordT = words map (x => (x,1))  
//sum up the counts for each word
val wordCount = wordsT reduceByKey((a, b) => a + b)

Вы хотите использовать mapPartitions, когда у вас есть некоторая дорогостоящая инициализация, которую вы должны выполнить, например, если вы хотите сделать Именованное распознавание объектов с помощью библиотеки, такой как инструменты coreNLP в Stanford.

Мастер map, filter, flatMap и reduce, и вы хорошо на пути к освоению Искры.

Ответ 2

Вы вызываете различные методы на RDD, которые принимают функции как параметры.

// set up an example -- an RDD of arrays
val sparkConf = new SparkConf().setMaster("local").setAppName("Example")
val sc = new SparkContext(sparkConf)
val testData = Array(Array(1,2,3), Array(4,5,6,7,8))
val testRDD = sc.parallelize(testData, 2)

// Print the RDD of arrays.
testRDD.collect().foreach(a => println(a.size))

// Use map() to create an RDD with the array sizes.
val countRDD = testRDD.map(a => a.size)

// Print the elements of this new RDD.
countRDD.collect().foreach(a => println(a))

// Use filter() to create an RDD with just the longer arrays.
val bigRDD = testRDD.filter(a => a.size > 3)

// Print each remaining array.
bigRDD.collect().foreach(a => {
    a.foreach(e => print(e + " "))
    println()
  })
}

Обратите внимание, что функции, которые вы пишете, принимают один элемент RDD в качестве входных данных и возвращают данные некоторого однородного типа, поэтому вы создаете RDD последнего типа. Например, countRDD является RDD[Int], а bigRDD по-прежнему является RDD[Array[Int]].

В какой-то момент, вероятно, возникнет соблазн написать foreach, который изменяет некоторые другие данные, но вы должны сопротивляться по причинам, описанным в этом вопросе и ответе.

Изменить: не пытайтесь печатать большие RDD s

Несколько читателей попросили об использовании collect() и println(), чтобы увидеть их результаты, как в приведенном выше примере. Конечно, это работает только в том случае, если вы работаете в интерактивном режиме, таком как Spark REPL (read-eval-print-loop.) Лучше всего называть collect() на RDD, чтобы получить последовательный массив для упорядоченной печати. Но collect() может вернуть слишком много данных, и в любом случае слишком много может быть напечатано. Вот несколько альтернативных способов получить представление о вашем RDD, если они большие:

  • RDD.take(): Это дает вам прекрасный контроль над количеством элементов, которые вы получаете, но не там, откуда они появились, - определяется как "первый", который представляет собой концепцию, рассматриваемую различными другими вопросами и ответами здесь,

    // take() returns an Array so no need to collect()
    myHugeRDD.take(20).foreach(a => println(a))
    
  • RDD.sample(): Это позволяет вам (грубо) контролировать долю полученных результатов, независимо от того, использует ли выборка замещение и даже произвольно случайное число.

    // sample() does return an RDD so you may still want to collect()
    myHugeRDD.sample(true, 0.01).collect().foreach(a => println(a))
    
  • RDD.takeSample(): Это гибрид: использование случайной выборки, которую вы можете контролировать, но позволяющее указать точное количество результатов и вернуть Array.

    // takeSample() returns an Array so no need to collect() 
    myHugeRDD.takeSample(true, 20).foreach(a => println(a))
    
  • RDD.count(): Иногда лучшая проницательность исходит из того, сколько элементов вы получили - я часто делаю это сначала.

    println(myHugeRDD.count())       
    

Ответ 3

Я бы попытался использовать функцию отображения разделов. В приведенном ниже коде показано, как весь набор данных RDD можно обрабатывать в цикле, чтобы каждый вход проходил через ту же самую функцию. Я боюсь, что у меня нет знаний о Scala, поэтому все, что я могу предложить, это java-код. Однако перевести его в scala не очень сложно.

JavaRDD<String> res = file.mapPartitions(new FlatMapFunction <Iterator<String> ,String>(){ 
      @Override
      public Iterable<String> call(Iterator <String> t) throws Exception {  

          ArrayList<String[]> tmpRes = new ArrayList <>();
          String[] fillData = new String[2];

          fillData[0] = "filename";
          fillData[1] = "content";

          while(t.hasNext()){
               tmpRes.add(fillData);  
          }

          return Arrays.asList(tmpRes);
      }

}).cache();

Ответ 4

что возвращение wholeTextFiles - это пара RDD:

def wholeTextFiles (путь: String, minPartitions: Int): RDD [(String, String)]

Прочитайте каталог текстовых файлов из HDFS, локальной файловой системы (доступной на всех узлах) или любого поддерживаемого Hadoop URI файловой системы. Каждый файл считывается как одна запись и возвращается в пару ключ-значение, где ключ - это путь к каждому файлу, значение - это содержимое каждого файла.

Вот пример чтения файлов по локальному пути, затем печать каждого имени файла и содержимого.

val conf = new SparkConf().setAppName("scala-test").setMaster("local")
val sc = new SparkContext(conf)
sc.wholeTextFiles("file:///Users/leon/Documents/test/")
  .collect
  .foreach(t => println(t._1 + ":" + t._2));

результат:

file:/Users/leon/Documents/test/1.txt:{"name":"tom","age":12}

file:/Users/leon/Documents/test/2.txt:{"name":"john","age":22}

file:/Users/leon/Documents/test/3.txt:{"name":"leon","age":18}

или преобразование пары RDD в RDD сначала

sc.wholeTextFiles("file:///Users/leon/Documents/test/")
  .map(t => t._2)
  .collect
  .foreach { x => println(x)}

результат:

{"name":"tom","age":12}

{"name":"john","age":22}

{"name":"leon","age":18}

И я думаю, что wholeTextFiles более подходит для небольших файлов.