Остановка приложения для запуска Spark - программирование

Остановка приложения для запуска Spark

Я запускаю Spark-кластер в автономном режиме.

Я представил приложение Spark в режиме кластера с помощью опций:

--deploy-mode cluster –supervise 

Чтобы работа была отказоустойчивой.

Теперь мне нужно, чтобы кластер работал, но остановил запуск приложения.

Вещи, которые я пробовал:

  • Остановка кластера и его перезапуск. Но приложение возобновляется выполнение, когда я это делаю.
  • Используется Kill -9 из демона с именем DriverWrapper, но после этого снова возобновляется работа.
  • Я также удалил временные файлы и каталоги и перезапустил кластер, но снова возобновит работу.

Итак, запущенное приложение действительно отказоустойчиво:)

Вопрос:   Основываясь на приведенном выше сценарии, кто-то может предложить, как я могу остановить выполнение задания, или что еще я могу попытаться остановить запуск приложения, но сохранить работу кластера.

Что-то просто начислило мне, если я вызову sparkContext.stop(), который должен это сделать, но это требует некоторой работы в коде, который в порядке, но вы можете предложить любой другой способ без изменения кода.

4b9b3361

Ответ 1

Если вы хотите убить приложение, которое терпит неудачу, вы можете сделать это через:

./bin/spark-class org.apache.spark.deploy.Client kill <master url> <driver ID>

Идентификатор драйвера можно найти через автономный веб-интерфейс мастера по адресу http://: 8080.

Из Spark Doc

Ответ 2

Повторить это, потому что я не смог использовать существующий ответ, не отлаживая несколько вещей.

Моя цель состояла в том, чтобы программно убить драйвер, который работает постоянно один раз в день, развертывать любые обновления для кода, а затем перезапускать его. Поэтому я не буду заранее знать, что такое идентификатор моего водителя. Мне потребовалось некоторое время, чтобы понять, что вы можете убить только драйверы, если вы отправили свой драйвер с опцией --deploy-mode cluster. Мне также потребовалось некоторое время, чтобы понять, что существует разница между идентификатором приложения и идентификатором драйвера, и, хотя вы можете легко сопоставить имя приложения с идентификатором приложения, мне еще предстоит найти способ прояснить идентификатор драйвера через их конечные точки api и сопоставить это либо с именем приложения, либо с классом, в котором вы работаете. Поэтому, пока run-class org.apache.spark.deploy.Client kill <master url> <driver ID> работает, вам нужно убедиться, что вы развертываете драйвер в режиме кластера и используете идентификатор драйвера, а не идентификатор приложения.

Кроме того, есть конечная точка представления, которая по умолчанию включает значение http://<spark master>:6066/v1/submissions, и вы можете использовать http://<spark master>:6066/v1/submissions/kill/<driver ID>, чтобы убить ваш драйвер.

Поскольку мне не удалось найти идентификатор драйвера, который коррелировал с конкретным заданием с любой конечной точки api, я написал веб-скребок python, чтобы получить информацию с базовой веб-страницы мастера искры на порту 8080, затем убейте ее, используя endpoint в порту 6066. Я бы предпочел, чтобы эти данные поддерживались, но это лучшее решение, которое я смог найти.

#!/usr/bin/python

import sys, re, requests, json
from selenium import webdriver

classes_to_kill = sys.argv
spark_master = 'masterurl'

driver = webdriver.PhantomJS()
driver.get("http://" + spark_master + ":8080/")

for running_driver in driver.find_elements_by_xpath("//*/div/h4[contains(text(), 'Running Drivers')]"):
    for driver_id in running_driver.find_elements_by_xpath("..//table/tbody/tr/td[contains(text(), 'driver-')]"):
        for class_to_kill in classes_to_kill:
            right_class = driver_id.find_elements_by_xpath("../td[text()='" + class_to_kill + "']")
            if len(right_class) > 0:
                driver_to_kill = re.search('^driver-\S+', driver_id.text).group(0)
                print "Killing " + driver_to_kill
                result = requests.post("http://" + spark_master + ":6066/v1/submissions/kill/" + driver_to_kill)
                print json.dumps(json.loads(result.text), indent=4)

driver.quit()