Подтвердить что ты не робот

Каково значение "уровня локальности" на Spark cluster

Что означает заголовок "Уровень локальности" и данные состояния 5 local → local → node local → rack local → Any?

enter image description here

4b9b3361

Ответ 1

Уровень локальности, насколько я знаю, указывает, какой тип доступа к данным был выполнен. Когда node завершает всю свою работу, и его процессор становится бездействующим, Spark может решить запустить другую ожидающую задачу, требующую получения данных из других мест. Поэтому в идеале все ваши задачи должны быть локальными, поскольку они связаны с меньшей задержкой доступа к данным.

Вы можете настроить время ожидания перед переходом на другие уровни местности, используя:

spark.locality.wait

Более подробную информацию о параметрах можно найти в Документации по конфигурации Spark

Что касается разных уровней PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL или ANY, я думаю, что методы findTask и findSpeculativeTask в org.apache.spark.scheduler. TaskSetManager иллюстрирует, как Spark выбирает задачи на основе их локальности. Сначала он будет проверять задачи PROCESS_LOCAL, которые будут запущены в том же процессе выполнения. Если нет, он будет проверять задачи NODE_LOCAL, которые могут быть у других исполнителей в одном и том же node, или их нужно извлекать из таких систем, как HDFS, кешированные и т.д. RACK_LOCAL означает, что данные находятся в другом node, и поэтому это необходимо для передачи предыдущего исполнения. И, наконец, ANY - это просто выполнить любую ожидающую задачу, которая может выполняться в текущем node.

  /**
   * Dequeue a pending task for a given node and return its index and locality level.
   * Only search for tasks matching the given locality constraint.
   */
  private def findTask(execId: String, host: String, locality: TaskLocality.Value)
    : Option[(Int, TaskLocality.Value)] =
  {
    for (index <- findTaskFromList(execId, getPendingTasksForExecutor(execId))) {
      return Some((index, TaskLocality.PROCESS_LOCAL))
    }

    if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {
      for (index <- findTaskFromList(execId, getPendingTasksForHost(host))) {
        return Some((index, TaskLocality.NODE_LOCAL))
      }
    }

    if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
      for {
        rack <- sched.getRackForHost(host)
        index <- findTaskFromList(execId, getPendingTasksForRack(rack))
      } {
        return Some((index, TaskLocality.RACK_LOCAL))
      }
    }

    // Look for no-pref tasks after rack-local tasks since they can run anywhere.
    for (index <- findTaskFromList(execId, pendingTasksWithNoPrefs)) {
      return Some((index, TaskLocality.PROCESS_LOCAL))
    }

    if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
      for (index <- findTaskFromList(execId, allPendingTasks)) {
        return Some((index, TaskLocality.ANY))
      }
    }

    // Finally, if all else has failed, find a speculative task
    findSpeculativeTask(execId, host, locality)
  }