Подтвердить что ты не робот

Apache Spark: Какова эквивалентная реализация RDD.groupByKey() с использованием RDD.aggregateByKey()?

В документах API Apache Spark pyspark.RDD упоминается, что groupByKey() неэффективен. Вместо этого рекомендуется использовать reduceByKey(), aggregateByKey(), combineByKey() или foldByKey(). Это приведет к тому, что некоторые аггрегации у рабочих будут перемещены до тасования, что уменьшит перетасовку данных между рабочими.

Учитывая следующий набор данных и выражение groupByKey(), что эквивалентная и эффективная реализация (сокращение перетасовки данных между рабочими группами), которое не использует groupByKey(), но дает тот же результат?

dataset = [("a", 7), ("b", 3), ("a", 8)]
rdd = (sc.parallelize(dataset)
       .groupByKey())
print sorted(rdd.mapValues(list).collect())

Вывод:

[('a', [7, 8]), ('b', [3])]
4b9b3361

Ответ 1

Насколько я могу судить, нет ничего, чтобы получить * в этом конкретном случае, используя aggregateByKey или аналогичную функцию. Поскольку вы создаете список, нет "реального" сокращения, и количество данных, которое нужно перетасовать, более или менее одинаково.

Чтобы действительно наблюдать некоторый прирост производительности, вам нужны преобразования, которые фактически уменьшают количество перенесенных данных, например, подсчет, вычисление сводных статистических данных, поиск уникальных элементов.

Относительно различий преимуществ использования reduceByKey(), combineByKey() или foldByKey() существует важная концептуальная разница, которую легче увидеть, когда вы рассматриваете Scala API-функции.

Оба reduceByKey и foldByKey отображают от RDD[(K, V)] до RDD[(K, V)], а второй - дополнительный нулевой элемент.

reduceByKey(func: (V, V) ⇒ V): RDD[(K, V)] 
foldByKey(zeroValue: V)(func: (V, V) ⇒ V): RDD[(K, V)]

combineByKey (нет aggregateByKey, но это тот же тип преобразования) преобразуется из RDD[(K, V)] в RDD[(K, C)]:

combineByKey[C](
   createCombiner: (V) ⇒ C,
   mergeValue: (C, V) ⇒ C,
   mergeCombiners: (C, C) ⇒ C): RDD[(K, C)] 

Возвращаясь к вашему примеру, только combineByKey (и в PySpark aggregateByKey) действительно применим, поскольку вы трансформируетесь из RDD[(String, Int)] в RDD[(String, List[Int])].

На динамическом языке, таком как Python, на самом деле можно выполнить такую ​​операцию с помощью foldByKey или reduceByKey, что делает семантику кода неясной и цитирует @tim-peters. "Должно быть одно - и желательно только один - простой способ сделать это" [1].

Разница между aggregateByKey и combineByKey примерно такая же, как между reduceByKey и foldByKey, поэтому для списка это в основном вопрос вкуса:

def merge_value(acc, x):
    acc.append(x)
    return acc

def merge_combiners(acc1, acc2):
    acc1.extend(acc2)
    return acc1

rdd = (sc.parallelize([("a", 7), ("b", 3), ("a", 8)])
   .combineByKey(
       lambda x: [x],
       lambda u, v: u + [v],
       lambda u1,u2: u1+u2))

На практике вам следует предпочесть groupByKey. Реализация PySpark значительно более оптимизирована по сравнению с наивной реализацией, подобной приведенной выше.

1.Peters, T. PEP 20 - Дзен Питона. (2004). на https://www.python.org/dev/peps/pep-0020/


* На практике на самом деле довольно много, чтобы проиграть здесь, особенно при использовании PySpark. Реализация Python groupByKey значительно оптимизирована, чем наивная комбинация по ключу. Вы можете проверить "Умный" о groupByKey, созданный мной, и @eliasah для дополнительное обсуждение.

Ответ 2

Вот один из вариантов, который использует aggregateByKey(). Мне было бы интересно узнать, как это можно сделать с помощью reduceByKey(), combineByKey() или foldByKey(), и какая стоимость/польза есть для каждой альтернативы.

rdd = (sc.parallelize([("a", 7), ("b", 3), ("a", 8)])
       .aggregateByKey(list(),
                       lambda u,v: u+[v],
                       lambda u1,u2: u1+u2))
print sorted(rdd.mapValues(list).collect())

Вывод:

[('a', [7, 8]), ('b', [3])]

Ниже приведена немного более эффективная с точки зрения памяти реализация, хотя она менее читаема новичкам на основе python, которые производят один и тот же вывод:

rdd = (sc.parallelize([("a", 7), ("b", 3), ("a", 8)])
       .aggregateByKey(list(),
                       lambda u,v: itertools.chain(u,[v]),
                       lambda u1,u2: itertools.chain(u1,u2)))
print sorted(rdd.mapValues(list).collect())