Подтвердить что ты не робот

Коллекция Spark, отсортированная по значению

Я пытался использовать этот учебник http://spark.apache.org/docs/latest/quick-start.html Сначала я создал коллекцию из файла

textFile = sc.textFile("README.md")

Затем я попробовал команду, чтобы закодировать слова:

wordCounts = textFile.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)

Чтобы распечатать коллекцию:

 wordCounts.collect()

Я нашел, как сортировать его по слову, используя команду sortByKey. Мне было интересно, как можно было бы сделать то же самое для сортировки по значению, что в этом случае в количестве, которое в документе происходит в слове.

4b9b3361

Ответ 1

Сортировка обычно должна выполняться до вызова команды collect(), так как она возвращает набор данных в программу драйвера, а также то, как задание с сокращением картой hasoop будет запрограммировано в java, так что окончательный вывод, который вы хотите записать (обычно) в HDFS. Благодаря искровому API этот подход обеспечивает гибкость записи вывода в "сырой" форме, где вы хотите, например, в файл, где он может использоваться как вход для дальнейшей обработки.

Использование искры scala Сортировка API перед сборкой() может быть выполнена по предложению eliasah и с использованием Tuple2.swap() дважды, один раз перед сортировкой и один раз после этого, чтобы создать список кортежей, отсортированных по возрастанию или уменьшению порядка их второе поле (которое называется _2) и содержит количество слов в их первом поле (с именем _1). Ниже приведен пример того, как это выполняется в искровой оболочке:

// this whole block can be pasted in spark-shell in :paste mode followed by <Ctrl>D
val file = sc.textFile("some_local_text_file_pathname")
val wordCounts = file.flatMap(line => line.split(" "))
  .map(word => (word, 1))
  .reduceByKey(_ + _, 1)  // 2nd arg configures one task (same as number of partitions)
  .map(item => item.swap) // interchanges position of entries in each tuple
  .sortByKey(true, 1) // 1st arg configures ascending sort, 2nd arg configures one task
  .map(item => item.swap)

Чтобы отменить порядок сортировки, используйте sortByKey (false, 1), так как его первым аргументом является булевское значение возрастания. Его вторым аргументом является количество задач (равное количеству разделов), которое установлено равным 1 для тестирования с небольшим входным файлом, где требуется только один файл выходных данных; reduceByKey также принимает этот необязательный аргумент.

После этого wordCounts RDD можно сохранить как текстовые файлы в каталог с saveAsTextFile (имя_каталога), в который будут внесены один или несколько файлов part-xxxxx (начиная с part-00000) в зависимости от количества редукторов, настроенных для задания (1 файл выходных данных на редуктор), файл _SUCCESS в зависимости от того, выполнено ли задание или нет, и .crc файлы.

Используя pyspark, питон script, очень похожий на scala script, показанный выше, дает результат, который фактически тот же. Вот версия pyspark, демонстрирующая сортировку коллекции по значению:

file = sc.textFile("file:some_local_text_file_pathname")
wordCounts = file.flatMap(lambda line: line.strip().split(" ")) \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda a, b: a + b, 1) \ # last arg configures one reducer task
    .map(lambda (a, b): (b, a)) \
    .sortByKey(1, 1) \ # 1st arg configures ascending sort, 2nd configures 1 task
    .map(lambda (a, b): (b, a))

Чтобы отсортироватьKey в порядке убывания, его первый arg должен быть 0. Поскольку python захватывает ведущее и конечное пробелы в качестве данных, strip() вставлен перед разбиением каждой строки на пробелы, но это необязательно с использованием spark-shell/ scala.

Основное различие в выводе словарной строки и версии python wordCount заключается в том, что при искровых выводах (word, 3) выходы python (u'word ', 3).

Для получения дополнительной информации об искровых методах RDD см. http://spark.apache.org/docs/1.1.0/api/python/pyspark.rdd.RDD-class.html для python и https://spark.apache.org/docs/latest/api/scala/#org.apache.spark.rdd.RDD для scala.

В искровой оболочке running collect() в wordCounts преобразует его из RDD в Array [(String, Int)] = Array [Tuple2 (String, Int)], который сам может быть отсортирован во втором поле каждый элемент Tuple2, используя:

Array.sortBy(_._2) 

sortBy также принимает необязательный неявный аргумент math.Ordering, такой как Ромео Киенцлер, показал в предыдущем ответе на этот вопрос. Array.sortBy(_._ 2) будет делать обратный вид элементов массива Array Tuple2 в своих _2-областях, просто определяя неявное обратное упорядочение перед запуском map-reduce script, потому что он переопределяет ранее существовавшее упорядочение Int. Обратный int Заказ, уже определенный Ромео Киенцлером:

// for reverse order
implicit val sortIntegersByString = new Ordering[Int] {
  override def compare(a: Int, b: Int) = a.compare(b)*(-1)
}

Другим распространенным способом определения этого обратного упорядочения является изменение порядка a и b и падение (-1) в правой части определения сравнения:

// for reverse order
implicit val sortIntegersByString = new Ordering[Int] {
  override def compare(a: Int, b: Int) = b.compare(a)
}   

Ответ 2

Выполнение этого более питоническим способом.

# In descending order
''' The first parameter tells number of elements
    to be present in output.
''' 
data.takeOrdered(10, key=lambda x: -x[1])
# In Ascending order
data.takeOrdered(10, key=lambda x: x[1])

Ответ 3

Для тех, кто хочет получить верхние N элементов, упорядоченных по значению:

theRDD.takeOrdered(N, lambda (key, value): -1 * len(value))

если вы хотите заказать длину строки.

С другой стороны, если значения уже находятся в форме, подходящей для вашего желаемого порядка, то:

theRDD.takeOrdered(N, lambda (key, value): -1 * value)

было бы достаточно.

Ответ 4

вы можете сделать это таким образом

// for reverse order
implicit val sortIntegersByString = new Ordering[Int] {
    override def compare(a: Int, b: Int) = a.compare(b)*(-1)
}

counts.collect.toSeq.sortBy(_._2)

Итак, в основном вы конвертируете свой RDD в последовательность и используете метод сортировки для сортировки.

Блок выше глобально изменяет поведение сортировки, чтобы получить убывающий порядок сортировки.

Ответ 5

Я думаю, вы можете использовать общее преобразование sortBy (а не действие, т.е. оно возвращает RDD, а не массив), документированное здесь.

Итак, в вашем случае вы могли бы сделать

wordCounts.sortBy(lambda (word, count): count)

Ответ 6

Простейший способ сортировки вывода по значениям. После reduceByKey вы можете заменить вывод как ключ как значение и значение как ключ, а затем вы можете применить метод sortByKey, где false сортирует в порядке убывания. По умолчанию он будет сортироваться в порядке возрастания.

 val test=textFile.flatMap(line=> line.split(" ")).map(word=> (word, 1)).reduceByKey(_ + _).map(item => item.swap).sortByKey(false)

Ответ 7

Решение @kef для python является точкой на...

Необходимо изменить следующее:

.map(lambda (a, b): (b, a))

к

.map(lambda a: (a[1], a[0]))

Ответ 8

Мне удалось решить эту проблему с помощью Python. Поэтому я создаю список значений пар и отсортировал его по значению:

out = wordCounts.collect()
outSort = sorted(out, key=lambda word:word[1])

Ответ 9

 wordCounts.map(lambda (a,b) : (b,a)).sortByKey(ascending=False).map(lambda (a,b) : (b,a)).collect()

Это решение работает, потому что каждая строка wordCount rdd выглядит так:

(WORD, COUNT)

первая карта производит rdd с порядком переупорядочения кортежей, то есть теперь они выглядят так:

(COUNT, WORD)

Теперь, когда мы делаем sortByKey, COUNT берется за ключ, который мы хотим. Вторая карта затем отображает теперь отсортированный второй rdd обратно в исходный формат

(WORD, COUNT)

для каждой строки, но не теперь строки сортируются по счету слов.

Неявное предположение здесь состоит в том, что отображение не изменяет порядок строк RDD, иначе вторая карта может испортить сортировку.