Подтвердить что ты не робот

Как стадии разбиваются на задачи в Spark?

Предположим для следующего, что в каждый момент времени выполняется только одно задание Spark.

То, что я получаю до сих пор

Вот что я понимаю, что происходит в Spark:

  • Когда создается SparkContext, каждый рабочий node запускает исполнитель. Исполнители - это отдельные процессы (JVM), которые соединяются с программой драйвера. Каждый исполнитель имеет банку программы драйвера. Выход из драйвера завершает работу исполнителей. Каждый исполнитель может содержать некоторые разделы.
  • Когда выполняется задание, в соответствии с графиком линии создается план выполнения.
  • Задание выполнения разбивается на этапы, где этапы содержат столько смежных (в графике линий) преобразований и действий, но не перемешиваются. Таким образом, этапы разделяются путем тасования.

image 1

Я понимаю, что

  • Задача - это команда, отправленная из драйвера исполнителю путем сериализации объекта Function.
  • Исполнитель выполняет десериализацию (с помощью флага драйвера) команды (задачи) и выполняет ее на разделе.

но

Вопрос (ы)

Как мне разделить этап на эти задачи?

В частности:

  • Являются ли задачи, определяемые преобразованиями и действиями, или могут быть множественными преобразованиями/действиями в задаче?
  • Задачи, определяемые разделом (например, одна задача на каждый этап на раздел).
  • Задачи, определяемые узлами (например, одна задача на этап на node)?

Что я думаю (только частичный ответ, даже если правильный)

В https://0x0fff.com/spark-architecture-shuffle тасование объясняется изображением

введите описание изображения здесь

и создается впечатление, что правило

каждый этап разбивается на # задачи с номерами разделов, не обращая внимания на количество узлов

Для моего первого изображения я бы сказал, что у меня будет 3 задачи карты и 3 задачи сокращения.

Для изображения из 0x0fff я бы сказал, что есть 8 задач карты и 3 задачи сокращения (при условии, что есть только три оранжевых и три темно-зеленых файла).

Открытые вопросы в любом случае

Это правильно? Но даже если это правильно, на мои вопросы выше не все ответы, потому что он по-прежнему открыт, если несколько операций (например, несколько карт) находятся в одной задаче или разделены на одну задачу на операцию.

Что говорят другие

Что такое задача в Spark? Как рабочий Spark выполняет файл jar? и Как планировщик Apache Spark разделяет файлы на задачи? похожи, но я не чувствовал, что мой на вопрос ответили четко.

4b9b3361

Ответ 1

У вас здесь неплохая схема. Чтобы ответить на ваши вопросы

  • Для каждого раздела данных для каждого stage необходимо запустить отдельный task. Учтите, что каждый раздел, вероятно, будет находиться в разных физических местах - например. блоков в HDFS или каталогов/томов для локальной файловой системы.

Обратите внимание, что представление stage управляется DAG Scheduler. Это означает, что этапы, которые не являются взаимозависимыми, могут быть представлены кластеру для выполнения параллельно: это максимизирует возможности параллелизации в кластере. Поэтому, если операции в нашем потоке данных могут происходить одновременно, мы ожидаем увидеть несколько этапов.

Мы видим, что в действии в следующем примере игрушек мы выполняем следующие типы операций:

  • загрузить два источника данных
  • выполнить некоторую операцию отображения на обоих источниках данных отдельно
  • присоединиться к ним
  • выполнить некоторые операции с картами и фильтрами по результату
  • сохранить результат

Итак, сколько стадий мы получим?

  • 1 этап для загрузки двух источников данных параллельно = 2 этапа
  • Третий этап, представляющий join, который зависит от двух других этапов
  • Примечание: все последующие операции, работающие с объединенными данными, могут выполняться на одном и том же этапе, потому что они должны выполняться последовательно. Нет никакой пользы для запуска дополнительных этапов, потому что они не могут начать работу до завершения предыдущей операции.

Вот эта игрушечная программа

val sfi  = sc.textFile("/data/blah/input").map{ x => val xi = x.toInt; (xi,xi*xi) }
val sp = sc.parallelize{ (0 until 1000).map{ x => (x,x * x+1) }}
val spj = sfi.join(sp)
val sm = spj.mapPartitions{ iter => iter.map{ case (k,(v1,v2)) => (k, v1+v2) }}
val sf = sm.filter{ case (k,v) => v % 10 == 0 }
sf.saveAsTextFile("/data/blah/out")

И вот DAG результата

введите описание изображения здесь

Теперь: сколько задач? Количество задач должно быть равно

Сумма (stage * #Partitions in the stage)

В моем случае #Partitions in the stage равно number of processors на моих машинах кластера.

Ответ 2

Если я правильно понимаю, есть две (связанные) вещи, которые вас путают:

1) Что определяет содержание задачи?

2) Что определяет количество выполняемых задач?

Двигатель искры "склеивает" вместе простые операции с последовательными rdds, например:

rdd1 = sc.textFile( ... )
rdd2 = rdd1.filter( ... )
rdd3 = rdd2.map( ... )
rdd3RowCount = rdd3.count

поэтому, когда rdd3 (лениво) вычисляется, искра генерирует задачу на один раздел rdd1, и каждая задача будет выполнять как фильтр, так и карту на строку, чтобы привести к rdd3.

Количество задач определяется количеством разделов. Каждый RDD имеет определенное количество разделов. Для исходного RDD, который считывается из HDFS (например, с помощью sc.textFile(...)) количество разделов - это количество разделов, сгенерированных входным форматом. Некоторые операции с RDD (-ами) могут приводить к RDD с различным количеством разделов:

rdd2 = rdd1.repartition( 1000 ) will result in rdd2 having 1000 partitions ( regardless of how many partitions rdd1 had ).

Другим примером является объединение:

rdd3 = rdd1.join( rdd2  , numPartitions = 1000 ) will result in rdd3 having 1000 partitions ( regardless of partitions number of rdd1 and rdd2 ).
Операции

(Most), которые изменяют количество разделов, связаны с тасованием. Когда мы делаем, например:

rdd2 = rdd1.repartition( 1000 ) 

что на самом деле происходит, задача каждого раздела rdd1 должна генерировать конечный вывод, который может быть прочитан следующим этапом, чтобы rdd2 имел ровно 1000 разделов (как они это делают? Hash или Sort). Задачи на этой стороне иногда называются задачами "Карта (сторона)". Задача, которая позже будет работать на rdd2, будет действовать на один раздел (из rdd2!) И должна будет выяснить, как читать/комбинировать выходы на стороне карты, относящиеся к этому разделу. Задачи на этой стороне иногда называются задачами "Уменьшить (side)".

Связаны два вопроса: количество задач на этапе - это количество разделов (общее для последовательных rdds "склеенных" вместе), а количество разделов rdd может меняться между этапами (путем указания количества например, разделы на случай случайной перетасовки).

Как только начинается этап, его задачи могут занимать слоты задач. Количество параллельных слотов задач - numExecutors * ExecutorCores. В общем, они могут быть заняты задачами из разных, независящих этапов.

Ответ 3

Это может помочь вам лучше понять разные части:

  • Этап: представляет собой набор задач. Тот же процесс работает против различные подмножества данных (разделов).
  • Задача: представляет собой единицу работа над разделом распределенного набора данных. Поэтому на каждом этапе, количество заданий = количество разделов или, как вы сказали, "одна задача за этап на раздел ".
  • Каждый исполнитель запускается на одном контейнере пряжи и каждый контейнер находится на одном node.
  • Каждый этап использует несколько исполнителей, каждому исполнителю выделяется несколько приложений.
  • Каждый vcore может выполнять ровно одну задачу за раз
  • Таким образом, на любом этапе несколько задач могут выполняться параллельно. число выполняемых задач = число-в-vcores.