Подтвердить что ты не робот

Какие факторы определяют количество исполнителей в автономном режиме?

Учитывая приложение Spark

  • Какие факторы определяют количество исполнителей в автономном режиме? В Mesos и YARN согласно этим документам мы можем указать количество исполнителей/ядер и памяти.

  • После запуска нескольких исполнителей. Запускает ли Spark задачи в циклическом режиме или достаточно умен, чтобы убедиться, что некоторые из исполнителей не работают/заняты, а затем планируют задачи соответственно.

  • Также, как Spark принимает решение о количестве задач? Я сделал написать простую программу максимальной температуры с небольшим набором данных, а Spark породила две задачи в одном исполнителе. Это находится в автономном режиме Spark.

4b9b3361

Ответ 1

Отвечая на ваши вопросы:

  • В автономном режиме используется одна и та же переменная конфигурации, как режимы Mesos и Yarn, чтобы установить количество исполнителей. Переменная spark.cores.max определяет максимальное количество ядер, используемых в искровом контексте. Значение по умолчанию - бесконечность, поэтому Spark будет использовать все ядра в кластере. Переменная spark.task.cpus определяет, сколько процессоров Spark будет выделяться для одной задачи, значение по умолчанию равно 1. С помощью этих двух переменных вы можете определить максимальное количество параллельных задач в вашем кластере.

  • При создании подкласса RDD вы можете определить, в каких машинах запускается ваша задача. Это определено в методе getPreferredLocations. Но поскольку сигнатуры метода позволяют предположить, что это только предпочтение, поэтому, если Spark обнаруживает, что одна машина не занята, она запустит задачу на этой незанятой машине. Однако я не знаю, какой механизм Spark знает, какие машины простаивают. Чтобы достичь локальности, мы (Stratio) решили сделать каждый Partions меньше, поэтому задача занимает меньше времени и достигает локальности.

  • Количество задач каждой операции Spark определяется в соответствии с длиной разделов RDD. Этот вектор является результатом метода getPartitions, который вы должны переопределить, если вы хотите создать новый подкласс класса RDD. Этот метод возвращает способ разделения RDD, где находится информация, и разделов. Когда вы присоединяетесь к двум или более RDD, используя, например, операции объединения или объединения, количество задач результирующего RDD - это максимальное количество задач RDD, участвующих в операции. Например: если вы присоединяетесь к RDD1, у которого есть 100 задач и RDD2, у которых есть 1000 задач, следующая операция результирующего RDD будет иметь 1000 задач. Обратите внимание, что большое количество разделов не обязательно является синонимом большего количества данных.

Надеюсь, это поможет.

Ответ 2

Я согласен с @jlopezmat о том, как Spark выбирает свою конфигурацию. Что касается вашего тестового кода, вы видите две задачи из-за реализации textFile. От SparkContext.scala:

  /**
   * Read a text file from HDFS, a local file system (available on all nodes), or any
   * Hadoop-supported file system URI, and return it as an RDD of Strings.
   */
  def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = {
    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
      minPartitions).map(pair => pair._2.toString)
  }

и если мы проверим, что такое значение defaultMinPartitions:

  /** Default min number of partitions for Hadoop RDDs when not given by user */
  def defaultMinPartitions: Int = math.min(defaultParallelism, 2)

Ответ 3

Spark выбирает количество задач на основе количества разделов в исходном наборе данных. Если вы используете HDFS в качестве источника данных, то по умолчанию количество разделов, равное количеству блоков HDFS. Вы можете изменить количество разделов несколькими способами. Две верхние: как дополнительный аргумент метода SparkContext.textFile; вызвав метод RDD.repartion.

Ответ 4

Отвечая на некоторые вопросы, которые не были рассмотрены в предыдущих ответах:

  • в автономном режиме вам нужно сыграть с --executor-cores и --max-executor-cores, чтобы установить количество исполнителей, которые будут запущены (если у вас достаточно памяти для этого номера, если вы укажете --executor-memory)

  • Spark не распределяет задачу циклически, он использует механизм, называемый Delay Scheduling", который является позволяя каждому исполнителю предлагать ему доступность мастеру, который будет решать, отправлять или не отправлять на него задачу.