Что означает заголовок "Уровень локальности" и данные состояния 5 local → local → node local → rack local → Any?
Что означает заголовок "Уровень локальности" и данные состояния 5 local → local → node local → rack local → Any?
Уровень локальности, насколько я знаю, указывает, какой тип доступа к данным был выполнен. Когда node завершает всю свою работу, и его процессор становится бездействующим, Spark может решить запустить другую ожидающую задачу, требующую получения данных из других мест. Поэтому в идеале все ваши задачи должны быть локальными, поскольку они связаны с меньшей задержкой доступа к данным.
Вы можете настроить время ожидания перед переходом на другие уровни местности, используя:
spark.locality.wait
Более подробную информацию о параметрах можно найти в Документации по конфигурации Spark
Что касается разных уровней PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL или ANY, я думаю, что методы findTask и findSpeculativeTask в org.apache.spark.scheduler. TaskSetManager иллюстрирует, как Spark выбирает задачи на основе их локальности. Сначала он будет проверять задачи PROCESS_LOCAL, которые будут запущены в том же процессе выполнения. Если нет, он будет проверять задачи NODE_LOCAL, которые могут быть у других исполнителей в одном и том же node, или их нужно извлекать из таких систем, как HDFS, кешированные и т.д. RACK_LOCAL означает, что данные находятся в другом node, и поэтому это необходимо для передачи предыдущего исполнения. И, наконец, ANY - это просто выполнить любую ожидающую задачу, которая может выполняться в текущем node.
/**
* Dequeue a pending task for a given node and return its index and locality level.
* Only search for tasks matching the given locality constraint.
*/
private def findTask(execId: String, host: String, locality: TaskLocality.Value)
: Option[(Int, TaskLocality.Value)] =
{
for (index <- findTaskFromList(execId, getPendingTasksForExecutor(execId))) {
return Some((index, TaskLocality.PROCESS_LOCAL))
}
if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {
for (index <- findTaskFromList(execId, getPendingTasksForHost(host))) {
return Some((index, TaskLocality.NODE_LOCAL))
}
}
if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
for {
rack <- sched.getRackForHost(host)
index <- findTaskFromList(execId, getPendingTasksForRack(rack))
} {
return Some((index, TaskLocality.RACK_LOCAL))
}
}
// Look for no-pref tasks after rack-local tasks since they can run anywhere.
for (index <- findTaskFromList(execId, pendingTasksWithNoPrefs)) {
return Some((index, TaskLocality.PROCESS_LOCAL))
}
if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
for (index <- findTaskFromList(execId, allPendingTasks)) {
return Some((index, TaskLocality.ANY))
}
}
// Finally, if all else has failed, find a speculative task
findSpeculativeTask(execId, host, locality)
}