Подтвердить что ты не робот

Как полностью использовать все узлы Spark в кластере?

Я запустил кластер 10 node с ec2- script в автономном режиме для Spark. Я получаю доступ к данным в ведрах s3 изнутри оболочки PySpark, но когда я выполняю трансормирования на RDD, используется только один node. Например, нижеследующее будет считывать данные из CommonCorpus:

bucket = ("s3n://@aws-publicdatasets/common-crawl/crawl-data/CC-MAIN-2014-23/"
          "/segments/1404776400583.60/warc/CC-MAIN-20140707234000-00000-ip-10"
          "-180-212-248.ec2.internal.warc.gz")

data = sc.textFile(bucket)
data.count()

Когда я запускаю это, только один из моих 10 подчиненных обрабатывает данные. Я знаю это, потому что только один подчиненный (213) имеет любые журналы активности при просмотре с веб-консоли Spark. Когда я просматриваю активность в Ganglia, этот же node (213) является единственным подчиненным с использованием всплеска в использовании памяти, когда действие было запущено. enter image description here

Кроме того, у меня есть такая же производительность, когда я запускаю тот же script с кластером ec2 только одного подчиненного устройства. Я использую Spark 1.1.0, и любая помощь или совет приветствуются.

4b9b3361

Ответ 1

...ec2.internal.warc.gz

Я думаю, что вы столкнулись с довольно типичной проблемой с gzipped файлами, поскольку они не могут быть загружены параллельно. Более конкретно, один gzipped файл не может быть загружен параллельно несколькими задачами, поэтому Spark загрузит его с 1 задачей и, таким образом, предоставит вам RDD с 1 разделом.

(Обратите внимание, однако, что Spark может загружать 10 gzipped файлов параллельно, просто так, что каждый из этих 10 файлов может быть загружен только одной задачей. Вы все равно можете получить parallelism через файлы, просто не в пределах файл.)

Вы можете подтвердить, что у вас есть только 1 раздел, явно указав количество разделов в вашем RDD:

data.getNumPartitions()

Верхняя граница числа задач, которые могут выполняться параллельно на RDD, - это количество разделов в RDD или количество ведомых ядер в вашем кластере, в зависимости от того, что меньше.

В вашем случае это количество разделов RDD. Это можно увеличить, перераспределив ваш RDD следующим образом:

data = sc.textFile(bucket).repartition(sc.defaultParallelism * 3)

Почему sc.defaultParallelism * 3?

Руководство по настройке Spark рекомендует иметь 2-3 задания на ядро ​​, а sc.defaultParalellism - количество ядер в вашем кластере.