Подтвердить что ты не робот

Pyspark: repartition vs partitionBy

Я работаю над этими двумя концепциями прямо сейчас и хочу получить некоторую ясность. Из командной строки я пытаюсь определить различия и когда разработчик будет использовать repartition vs partitionBy.

Вот пример кода:

rdd = sc.parallelize([('a', 1), ('a', 2), ('b', 1), ('b', 3), ('c',1), ('ef',5)])
rdd1 = rdd.repartition(4)
rdd2 = rdd.partitionBy(4)

rdd1.glom().collect()
[[('b', 1), ('ef', 5)], [], [], [('a', 1), ('a', 2), ('b', 3), ('c', 1)]]

rdd2.glom().collect()
[[('a', 1), ('a', 2)], [], [('c', 1)], [('b', 1), ('b', 3), ('ef', 5)]]

Я взглянул на реализацию обоих, и единственная разница, которую я заметил по большей части, заключается в том, что partitionBy может выполнять функцию разбиения по секциям или использовать по умолчанию портативный_хаши. Поэтому в разделе partitionBy все те же ключи должны находиться в одном разделе. В перераспределении я ожидал бы, что значения будут распределяться более равномерно по разделам, но это не так.

Учитывая это, почему кто-либо когда-либо использовал передел? Я полагаю, что единственный раз, когда я мог видеть, что он используется, - это если я не работаю с PairRDD, или у меня большой обход данных?

Есть ли что-то, что мне не хватает, или кто-то может пролить свет под другим углом для меня?

4b9b3361

Ответ 1

repartition уже существует в RDD и не обрабатывает разделение по ключу (или любым другим критерием, кроме Ordering). Теперь PairRDD добавляют понятие ключей и впоследствии добавляют другой метод, который позволяет разбить этот ключ.

Итак, да, если ваши данные введены ключом, вы должны полностью разделить этот ключ, что во многих случаях является точкой использования PairRDD в первую очередь (для объединений, reduceByKey и т.д.).

Ответ 2

repartition() используется для указания количества разделов с учетом количества ядер и объема данных, которые у вас есть.

partitionBy() используется для повышения эффективности функций перетасовки, таких как reduceByKey(), join(), cogroup() и т.д. Это полезно только в случаях, когда RDD используется несколько раз, поэтому обычно а затем persist().

Различия между ними в действии:

pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1, 5, 6, 7, 7, 5, 5, 6, 4]).map(lambda x: (x, x))

pairs.partitionBy(3).glom().collect()
[[(3, 3), (6, 6), (6, 6)],
 [(1, 1), (4, 4), (4, 4), (1, 1), (7, 7), (7, 7), (4, 4)],
 [(2, 2), (2, 2), (5, 5), (5, 5), (5, 5)]]

pairs.repartition(3).glom().collect()
[[(4, 4), (2, 2), (6, 6), (7, 7), (5, 5), (5, 5)],
 [(1, 1), (4, 4), (6, 6), (4, 4)],
 [(2, 2), (3, 3), (1, 1), (5, 5), (7, 7)]]