Подтвердить что ты не робот

Как раздел Spark (ing) работает с файлами в HDFS?

Я работаю с Apache Spark в кластере с использованием HDFS. Насколько я понимаю, HDFS распространяет файлы на узлах данных. Поэтому, если поставить файловую систему "file.txt", она будет разделена на разделы. Теперь я звоню

rdd = SparkContext().textFile("hdfs://.../file.txt") 

от Apache Spark. Имеет ли rdd автоматически те же разделы, что и "file.txt" в файловой системе? Что происходит, когда я вызываю

rdd.repartition(x)

где x > , то разделы, используемые hdfs? Будет ли Spark физически переупорядочивать данные по hdfs для работы локально?

Пример: Я поместил 30GB Textfile в HDFS-систему, которая распределяет его на 10 узлах. Will Spark а) использовать те же 10 партионов? и б) перетасовать 30 ГБ по кластеру при вызове перераспределения (1000)?

4b9b3361

Ответ 1

Когда Spark считывает файл из HDFS, он создает один раздел для одного разделения входа. Разделение входа устанавливается с помощью Hadoop InputFormat, используемого для чтения этого файла. Например, если вы используете textFile(), то в Hadoop будет TextInputFormat, который вернет вам один раздел для одного блока HDFS (но разделение между разделами будет выполняться по расщеплению строки, а не по точной разбивке блоков), если у вас нет сжатого текстового файла. В случае с сжатым файлом вы получите один раздел для одного файла (поскольку сжатые текстовые файлы не разделяются).

Когда вы вызываете rdd.repartition(x), он будет выполнять перетасовку данных из N partititons, которые у вас есть в rdd до x разделов, которые вы хотите иметь, разбиение будет выполняться с округлой структурой.

Если у вас есть несжатый текстовый файл 30 ГБ, хранящийся на HDFS, тогда с настройкой размера блока HDFS по умолчанию (128 МБ) он будет сохранен в 235 блоках, а это значит, что RDD, который вы прочитали из этого файла, будет иметь 235 разделов. Когда вы вызываете repartition(1000), ваш RDD будет отмечен как подлежащий перераспределению, но на самом деле он будет перетасован на 1000 разделов только тогда, когда вы выполните действие поверх этого RDD (ленивая концепция выполнения)

Ответ 2

Вот снимок " Как блоки в HDFS загружаются в рабочие места Spark как разделы"

В этих изображениях 4 блока HDFS загружаются как разделы Spark внутри 3 рабочих памяти

Dataset in HDFS broken into partitions


Пример: я поместил 30GB текстовый файл в HDFS-систему, которая распределяет его на 10 узлах.

Будет ли Spark

a) использовать те же 10 разделов?

Spark загружает те же самые 10 наборов HDFS для рабочей памяти как разделы. Я предполагаю, что размер блока размером 30 ГБ должен быть 3 ГБ, чтобы получить 10 разделов/блоков (со стандартным conf)

b) перетасовать 30 ГБ по кластеру, когда я вызываю перераспределение (1000)?

Да, Spark перемещает данные между рабочими узлами, чтобы создать 1000 разделов в рабочей памяти.

Примечание:

HDFS Block -> Spark partition   : One block can represent as One partition (by default)
Spark partition -> Workers      : Many/One partitions can present in One workers 

Ответ 3

Дополнение к @0x0FFF Если взять HDFS в качестве входного файла, он будет рассчитываться как для этого rdd = SparkContext().textFile("hdfs://.../file.txt"), а когда вы сделаете rdd.getNumPatitions, это приведет к Max(2, Number of HDFS block). Я провел много экспериментов и нашел это в результате. Снова явным образом вы можете сделать rdd = SparkContext().textFile("hdfs://.../file.txt", 400), чтобы получить 400 в качестве разделов или даже сделать переразделы на rdd.repartition или уменьшить до 10 на rdd.coalesce(10)

Ответ 4

При чтении файлов HDFS без буфера (например, паркета) с помощью spark-sql количество разделов df.rdd.getNumPartitions зависит от следующих факторов:

  • spark.default.parallelism (примерно переводит в #core, доступные для приложения)
  • spark.sql.files.maxPartitionBytes (по умолчанию 128 МБ)
  • spark.sql.files.openCostInBytes (по умолчанию 4 МБ)

Примерная оценка количества перегородок:

  • Если у вас достаточно ядер для чтения всех ваших данных параллельно (то есть как минимум одно ядро на каждые 128 МБ ваших данных)

    AveragePartitionSize ≈ min(4MB, TotalDataSize/#cores) NumberOfPartitions ≈ TotalDataSize/AveragePartitionSize

  • Если вам не хватает ядер,

    AveragePartitionSize ≈ 128MB NumberOfPartitions ≈ TotalDataSize/AveragePartitionSize

Точные вычисления немного сложны и могут быть найдены в базе кода для FileSourceScanExec, см. Здесь.