Подтвердить что ты не робот

Объединение соединений с базами данных сельдерей

Я использую Celery отдельно (не в Django). Я планирую иметь один рабочий тип задачи, работающий на нескольких физических машинах. Задача делает следующее

  • Принять XML-документ.
  • Преобразовать его.
  • Записывать и записывать базы данных multiple.

Я использую PostgreSQL, но это применимо в равной степени к другим типам хранилища, использующим соединения. Раньше я использовал пул соединений с базами данных, чтобы избежать создания нового соединения с базой данных по каждому запросу или не слишком долго поддерживать соединение. Однако, поскольку каждый работник сельдерея работает в отдельном процессе, я не уверен, как они действительно смогут делиться пулом. Я что-то упускаю? Я знаю, что сельдерей позволяет вам сохранить результат, полученный от работника сельдерейца, но это не то, что я пытаюсь сделать здесь. Каждая задача может выполнять несколько различных обновлений или вставок в зависимости от обрабатываемых данных.

Каким образом можно получить доступ к базе данных у работника сельдерея?

Можно ли делиться пулом между несколькими рабочими/задачами или есть ли другой способ сделать это?

4b9b3361

Ответ 1

Мне нравится идея tigeronk2 о одном соединении на одного работника. По его словам, Celery поддерживает свой собственный пул работников, поэтому на самом деле нет необходимости в отдельном пуле подключения к базе данных. Документы о сигналах сельдерея объясняют, как выполнить пользовательскую инициализацию, когда рабочий создан, поэтому я добавил следующий код к своим задачам .py и, похоже, работает как и следовало ожидать. Я даже смог закрыть соединения, когда рабочие отключены:

db_conn = None

@worker_process_init.connect
def init_worker(**kwargs):
    global db_conn
    print('Initializing database connection for worker.')
    db_conn = db.connect(DB_CONNECT_STRING)


@worker_process_shutdown.connect
def shutdown_worker(**kwargs):
    global db_conn
    if db_conn:
        print('Closing database connectionn for worker.')
        db_conn.close()

Ответ 2

Вы можете переопределить поведение по умолчанию для работы с потоковыми рабочими вместо рабочего процесса в вашей конфигурации celery:

CELERYD_POOL = "celery.concurrency.threads.TaskPool"

Затем вы можете сохранить экземпляр совместно используемого пула в своем экземпляре задачи и ссылаться на него из каждого вызова с заданной цепочкой.

Ответ 3

Имеет одно соединение БД на рабочий процесс. Поскольку сам сельдерей поддерживает пул рабочих процессов, ваши связи db всегда будут равны числу работников сельдерея. Откидная сторона, вроде, она свяжет объединение соединений db с управлением процессами сельдерея. Но это должно быть хорошо, учитывая, что GIL допускает только один поток за раз в процессе.

Ответ 4

Возможно, celery.concurrency.gevent может предоставить общий доступ к пулу и не усугублять GIL. Однако поддержка по-прежнему "экспериментальна".

И psycopg2.pool.SimpleConnectionPool для совместного использования среди зеленых (сопрограммы), которые будут выполняться в одном процессе/потоке.

Маленький бит другого обсуждения stack по теме.

Ответ 5

Возможно, вы можете использовать pgbouncer. Для сельдерея ничто не должно меняться, и объединение соединений происходит за пределами процессов. У меня есть тот же вопрос.

( "возможно", потому что я не уверен, могут ли быть какие-либо побочные эффекты)

Ответ 6

Внести вклад в мои результаты путем внедрения и мониторинга.

Приветственная обратная связь.

Ссылка: использовать объединение http://www.prschmid.com/2013/04/using-sqlalchemy-with-celery-tasks.html

Каждый рабочий процесс (режим предпрограммы, заданный -c k) устанавливает одно новое соединение с БД без объединения или повторного использования. Поэтому, если вы используете объединение, пул рассматривается только на каждом уровне рабочего процесса. Таким образом, размер пулa > 1 не полезен, но повторное использование соединения по-прежнему отлично подходит для сохранения соединения из открытого и закрытого.

При использовании одного соединения для каждого рабочего процесса на каждом рабочем процессе устанавливается 1 соединение DB (режим предпродажи celery -App-рабочий -c k) на этапе инициализации. Он сохраняет соединение от открытого и закрытого повторно.

Независимо от того, сколько рабочих потоков (eventlet), каждый рабочий поток (celery -App-worker -P eventlet) устанавливает только одно соединение с БД без объединения или повторного использования. Таким образом, для eventlet все рабочие потоки (eventlets) на одном процессе сельдерея (celery - приложение-приложение...) имеют 1 db-соединение в каждый момент.

Согласно документам сельдерея

но вам нужно, чтобы ваши задачи не выполняли блокирующие вызовы, так как это остановит все остальные операции у рабочего до блокировки возврат вызовов.

Вероятно, это связано с тем, что соединение с MYSQL DB блокирует вызовы.