Подтвердить что ты не робот

Как работает агрегат Spark - aggregateByKey?

Скажем, что у меня есть распределенная система на 3 узлах, и мои данные распределяются между этими узлами. например, у меня есть файл test.csv, который существует на всех трех узлах и содержит 2 столбца:

**row   | id,  c.**
---------------
row1  | k1 , c1  
row2  | k1 , c2  
row3  | k1 , c3  
row4  | k2 , c4  
row5  | k2 , c5  
row6  | k2 , c6  
row7  | k3 , c7  
row8  | k3 , c8  
row9  | k3 , c9  
row10 | k4 , c10   
row11 | k4 , c11  
row12 | k4 , c12 

Затем я использую SparkContext.textFile для чтения файла как rdd и так далее. Насколько я понимаю, каждый искровой рабочий node будет читать часть из файла. Поэтому прямо сейчас скажем, что каждый node будет хранить:

  • node 1: строка 1 ~ 4
  • node 2: строка 5 ~ 8
  • node 3: строка 9 ~ 12

Мой вопрос в том, что, допустим, я хочу делать вычисления по этим данным, и есть один шаг, который мне нужно сгруппировать вместе, поэтому пара значений ключа будет [k1 [{k1 c1} {k1 c2} {k1 c3}]].. и т.д.

Существует функция groupByKey(), которая очень дорога в использовании, и рекомендуется использовать aggregateByKey(). Поэтому мне интересно, как работают groupByKey() и aggregateByKey() под капотом? Может кто-нибудь, используя приведенный выше пример, объяснить, пожалуйста? После перетасовки, где строки находятся на каждом node?

4b9b3361

Ответ 1

aggregateByKey() почти идентичен reduceByKey() (оба вызывающие combineByKey() за кулисами), за исключением того, что вы даете начальное значение для aggregateByKey(). Большинство людей знакомы с reduceByKey(), поэтому я буду использовать это в объяснении.

Причина reduceByKey() намного лучше, потому что она использует функцию MapReduce, называемую объединителем. Любую функцию типа + или * можно использовать таким образом, потому что порядок элементов, на которые он вызван, не имеет значения. Это позволяет Spark начинать "уменьшать" значения одним и тем же ключом, даже если они еще не все в одном разделе.

На оборотной стороне groupByKey() вы получаете больше универсальности, так как вы пишете функцию, которая принимает Iterable, что означает, что вы можете даже вывести все элементы в массив. Однако он неэффективен, потому что для его работы полный набор пар (K,V,) должен находиться в одном разделе.

Шаг, который перемещает данные вокруг операции уменьшения типа, обычно называется shuffle, на самом простейшем уровне данные разбиваются на каждый node (часто с помощью хэш-разделителя) а затем отсортированы на каждом node.

Ответ 2

aggregateByKey() сильно отличается от reduceByKey. Что происходит, так это то, что reduceByKey - это особый случай aggregateByKey.

aggregateByKey() будет комбинировать значения для определенного ключа, а результатом такой комбинации может быть любой объект, который вы указали. Вы должны указать, как значения объединяются ( "добавляются" ) внутри одного раздела (который выполняется в том же node) и как вы комбинируете результат из разных разделов (которые могут быть в разных узлах). reduceByKey - частный случай в том смысле, что результат комбинации (например, сумма) имеет тот же тип, что и значения, и что операция при объединении из разных разделов также совпадает с операцией при объединении значений внутри раздел.

Пример: Представьте, что у вас есть список пар. Вы распараллеливаете его:

val pairs = sc.parallelize(Array(("a", 3), ("a", 1), ("b", 7), ("a", 5)))

Теперь вы хотите "объединить" их с помощью ключа, производящего сумму. В этом случае reduceByKey и aggregateByKey одинаковы:

val resReduce = pairs.reduceByKey(_ + _) //the same operation for everything
resReduce.collect
res3: Array[(String, Int)] = Array((b,7), (a,9))

//0 is initial value, _+_ inside partition, _+_ between partitions
val resAgg = pairs.aggregateByKey(0)(_+_,_+_)
resAgg.collect
res4: Array[(String, Int)] = Array((b,7), (a,9))

Теперь представьте, что вы хотите, чтобы агрегация была набором значений, это другой тип, который представляет собой целые числа (сумма целых чисел также является целыми):

import scala.collection.mutable.HashSet
//the initial value is a void Set. Adding an element to a set is the first
//_+_ Join two sets is the  _++_
val sets = pairs.aggregateByKey(new HashSet[Int])(_+_, _++_)
sets.collect
res5: Array[(String, scala.collection.mutable.HashSet[Int])]  =Array((b,Set(7)), (a,Set(1, 5, 3)))