Подтвердить что ты не робот

Сохраняет ли groupByKey в Spark первоначальный заказ?

В Spark функция groupByKey преобразует пару RDD (K,V) в пару RDD (K,Iterable<V>).

Тем не менее, эта функция стабильна? т.е. порядок в итерабельности сохраняется в исходном порядке?

Например, если я изначально прочитал файл формы:

K1;V11
K2;V21
K1;V12

Может ли моя итерабельность для K1 выглядеть как (V12, V11) (таким образом не сохраняя исходный порядок) или может быть только (V11, V12) (таким образом сохранение оригинала порядок)?

4b9b3361

Ответ 1

Нет, порядок не сохраняется. Пример в spark-shell:

scala> sc.parallelize(Seq(0->1, 0->2), 2).groupByKey.collect
res0: Array[(Int, Iterable[Int])] = Array((0,ArrayBuffer(2, 1)))

Порядок зависит от времени, поэтому он может варьироваться между прогонами. (В следующий раз я получил противоположный порядок.)

Что здесь происходит? groupByKey работает, перераспределяя RDD с помощью HashPartitioner, так что все значения для ключа заканчиваются в одном разделе. Затем он выполняет агрегацию локально на каждом разделе.

Перераспределение также называется "перетасовкой", поскольку строки RDD перераспределяются между узлами. Файлы тасования вытягиваются из других узлов параллельно. Новый раздел построен из этих частей в том порядке, в котором они поступают. Данные из самого медленного источника будут в конце нового раздела и в конце списка в groupByKey.

(Данные, извлеченные из самого рабочего, конечно, быстрее всего. Поскольку здесь нет сетевой передачи, эти данные вытягиваются синхронно и, следовательно, поступают в порядке. (Кажется, по крайней мере.) Таким образом, чтобы воспроизвести мой эксперимент вам нужно как минимум 2 работника искры.)

Источник: http://apache-spark-user-list.1001560.n3.nabble.com/Is-shuffle-quot-stable-quot-td7628.html

Ответ 2

Spark (и другие карты сокращают рамки) сортируют данные путем разбиения на разделы, а затем слияния. Поскольку сортировка слияния является стабильной операцией, я бы предположил, что результат стабилен. После того, как я посмотрел больше на источник, я обнаружил, что если spark.shuffle.spill является истинным, он использует внешний сортировку, сортировку слияния в этом случае, которая является стабильной. Я не уверен на 100%, что он делает, если это позволяет разливать на диск.

Из источника:

private val externalSorting = SparkEnv.get.conf.getBoolean("spark.shuffle.spill", true)

Разделение также является стабильной операцией, поскольку она не выполняет переупорядочения