Подтвердить что ты не робот

Воздушный поток: как удалить DAG?

Я запустил веб-сервер Airflow и заплатил несколько промахов. Я могу видеть dags в веб-графическом интерфейсе.

Как я могу удалить определенную группу DAG для запуска и отобразить в веб-графическом интерфейсе? Есть ли команда CLI Airflow для этого?

Я огляделся, но не смог найти ответ на простой способ удаления DAG после его загрузки и планирования.

4b9b3361

Ответ 1

Редактировать 27/8/18 - Airflow 1.10 теперь выпущен на PyPI!

https://pypi.org/project/apache-airflow/1.10.0/


Как полностью удалить DAG

У нас есть эта функция сейчас в Airflow ≥ 1.10!

PR № 2199 (Jira: AIRFLOW-1002), добавляющий удаление DAG в Airflow, теперь объединен, что позволяет полностью удалить записи DAG из всех связанных таблиц.

Базовый код delete_dag (...) теперь является частью экспериментального API, и есть точки входа, доступные через CLI, а также через REST API.

CLI:

airflow delete_dag my_dag_id

REST API (локально работающий веб-сервер):

curl -X "DELETE" http://127.0.0.1:8080/api/experimental/dags/my_dag_id

Предупреждение относительно REST API: убедитесь, что ваш кластер Airflow использует аутентификацию на производстве.

Установка/обновление до Airflow 1.10 (актуально)

Для обновления запустите:

export SLUGIFY_USES_TEXT_UNIDECODE=yes

или же:

export AIRFLOW_GPL_UNIDECODE=yes

Затем:

pip install -U apache-airflow

Не забудьте сначала проверить UPDATING.md на полную информацию!

Ответ 2

Это мой адаптированный код, используя PostgresHook с параметром connection_id по умолчанию.

import sys
from airflow.hooks.postgres_hook import PostgresHook

dag_input = sys.argv[1]
hook=PostgresHook( postgres_conn_id= "airflow_db")

for t in ["xcom", "task_instance", "sla_miss", "log", "job", "dag_run", "dag" ]:
    sql="delete from {} where dag_id='{}'".format(t, dag_input)
    hook.run(sql, True)

Ответ 4

Я просто написал script, который удаляет все, что связано с определенным dag, но это только для MySQL. Вы можете написать другой метод соединения, если используете PostgreSQL. Первоначально команды, которые размещены Лэнсом на https://groups.google.com/forum/#!topic/airbnb_airflow/GVsNsUxPRC0 Я просто положил его в script. Надеюсь это поможет. Формат: python script.py dag_id

import sys
import MySQLdb

dag_input = sys.argv[1]

query = {'delete from xcom where dag_id = "' + dag_input + '"',
        'delete from task_instance where dag_id = "' + dag_input + '"',
        'delete from sla_miss where dag_id = "' + dag_input + '"',
        'delete from log where dag_id = "' + dag_input + '"',
        'delete from job where dag_id = "' + dag_input + '"',
        'delete from dag_run where dag_id = "' + dag_input + '"',
        'delete from dag where dag_id = "' + dag_input + '"' }

def connect(query):
        db = MySQLdb.connect(host="hostname", user="username", passwd="password", db="database")
        cur = db.cursor()
        cur.execute(query)
        db.commit()
        db.close()
        return

for value in query:
        print value
        connect(value)

Ответ 5

DAG-ы могут быть удалены в Airflow 1.10, но процесс и последовательность действий должны быть правильными. Возникает "проблема с яйцом и курицей" - если вы удалите DAG из внешнего интерфейса, пока файл еще там, DAG будет перезагружен (так как файл не удален). Если вы сначала удалите файл и обновите страницу, DAG больше не удалится из веб-интерфейса. Итак, последовательность действий, которая позволила мне удалить DAG из внешнего интерфейса, была такой:

  1. удалить файл DAG (в моем случае удалить из репозитория конвейера и развернуть на серверах воздушного потока, особенно в планировщике)
  2. НЕ обновляйте веб-интерфейс.
  3. В веб-интерфейсе пользователя в представлении DAG (обычная главная страница) нажмите "Удалить метку" → enter image description here красный значок справа внизу.
  4. Он очищает все остатки этого DAG из базы данных.

Ответ 6

Я написал script, который удаляет все метаданные, относящиеся к определенному дагу для SQLite по умолчанию SQLite. Это основано на ответе Иисуса выше, но адаптировано из Postgres для SQLite. Пользователи должны устанавливать ../airflow.db везде, где script.py хранится относительно файла airflow.db по умолчанию (обычно ~/airflow). Для выполнения используйте python script.py dag_id.

import sqlite3
import sys

conn = sqlite3.connect('../airflow.db')
c = conn.cursor()

dag_input = sys.argv[1]

for t in ["xcom", "task_instance", "sla_miss", "log", "job", "dag_run", "dag" ]:
    query = "delete from {} where dag_id='{}'".format(t, dag_input)
    c.execute(query)

conn.commit()
conn.close()

Ответ 7

Воздушный поток 1.10.1 был выпущен. В этом выпуске добавлена возможность удаления DAG из веб-интерфейса после удаления соответствующей DAG из файловой системы.

Смотрите этот билет для более подробной информации:

[AIRFLOW-2657] Добавлена возможность удаления DAG из веб-интерфейса.

Airflow Links menu with delete icon

Обратите внимание, что это на самом деле не удаляет группу обеспечения доступности баз данных из файловой системы, сначала вам нужно будет сделать это вручную, в противном случае группа доступности базы данных будет перезагружена.

Ответ 8

В Airflow нет ничего встроенного, который сделает это за вас. Чтобы удалить DAG, удалите его из репозитория и удалите записи базы данных в таблице метастатистики Airflow - dag.

Ответ 9

Вы можете очистить набор экземпляров задачи, как если бы они никогда не запускались с помощью:

airflow clear dag_id -s 2017-1-23 -e 2017-8-31

И затем удалите файл dag из папки dags

Ответ 10

версии> = 1.10.0:

У меня версия 1.10.2 airflow, и я попытался выполнить команду delete_dag airflow, но команда выдает следующую ошибку:

bash-4.2 # airflow delete_dag dag_id

[2019-03-16 15: 37: 20,804] {settings.py:174} INFO - settings.configure_orm(): использование настроек пула. pool_size = 5, pool_recycle = 1800, pid = 28224 /usr/lib64/python2.7/site-packages/psycopg2/init.py: 144: предупреждение пользователя: пакет колеса psycopg2 будет переименован из выпуска 2.8; чтобы продолжить установку из бинарного файла, используйте вместо этого "pip install psycopg2-binary". Подробнее см.: http://initd.org/psycopg/docs/install.html#binary-install-from-pypi. "" ") Это приведет к удалению всех существующих записей, связанных с указанным DAG. Продолжить? (Г/л) у Traceback (последний вызов был последним): Файл "/usr/bin/airflow", строка 32, в   args.func (арг) Файл "/usr/lib/python2.7/site-packages/airflow/utils/cli.py", строка 74, в оболочке   вернуть f (* args, ** kwargs) Файл "/usr/lib/python2.7/site-packages/airflow/bin/cli.py", строка 258, в delete_dag   повысить AirflowException (ошибка) airflow.exceptions.AirflowException: ошибка сервера

Хотя я могу удалить через команду Curl. Пожалуйста, дайте мне знать, если у кого-то есть представление об исполнении этой команды, это известно или я что-то не так делаю.

версии & lt; = 1.9.0:

Команды для удаления dag нет, поэтому сначала необходимо удалить файл dag, а затем удалить все ссылки на dag_id из базы данных метаданных воздушного потока.

ПРЕДУПРЕЖДЕНИЕ

Вы можете сбросить базу данных метаданных воздушного потока, вы удалите все, включая теги, но помните, что вы также удалите историю, пулы, переменные и т.д.

airflow resetdb, а затем airflow initdb

Ответ 11

Основываясь на ответе @OlegYamin, я делаю следующее, чтобы удалить метку, поддерживаемую postgres, где airflow использует public схему.

delete from public.dag_pickle where id = (
    select pickle_id from public.dag where dag_id = 'my_dag_id'
);
delete from public.dag_run where dag_id = 'my_dag_id';
delete from public.dag_stats where dag_id = 'my_dag_id';
delete from public.log where dag_id = 'my_dag_id';
delete from public.sla_miss where dag_id = 'my_dag_id';
delete from public.task_fail where dag_id = 'my_dag_id';
delete from public.task_instance where dag_id = 'my_dag_id';
delete from public.xcom where dag_id = 'my_dag_id';
delete from public.dag where dag_id = 'my_dag_id';

ВНИМАНИЕ: Эффект/правильность первого запроса на удаление мне неизвестна. Это просто предположение, что это необходимо.

Ответ 12

просто удалите его из MySQL, отлично работает для меня. удалите их из таблиц ниже:

  • даг

  • dag_constructor

  • dag_group_ship
  • dag_pickle
  • dag_run
  • dag_stats

(в будущем выпуске может быть больше таблиц), затем перезапустите веб-сервер и работника.

Ответ 13

Удалите dag (вы хотите удалить) из папки dags и запустите airflow resetdb.

Кроме того, вы можете войти в airflow_db и вручную удалить эти записи из таблиц dag (task_fail, xcom, task_instance, sla_miss, log, job, dag_run, dag, dag_stats).

Ответ 14

Для тех, кто все еще находит ответы. В версии Airflow версии 1.8 очень сложно удалить DAG, вы можете обратиться к ответам выше. Но начиная с версии 1.9, вам просто нужно

удалить dag в папке dags и перезапустить веб-сервер