Подтвердить что ты не робот

Использование сельдерея в процессах и gentent в задачах одновременно

Я хотел бы использовать Celery в качестве очереди для своих задач, чтобы мое веб-приложение могло занять задание, вернуть ответ, и задача будет обработана в то же время /someday/... Я создаю своего рода API, поэтому я не знаю, какие задачи будут заблаговременно - в будущем могут быть задачи, связанные с HTTP-запросами, другим IO, но также и потребностями процессора. Что касается этого, я бы хотел запустить рабочих Celery в процессах, поскольку они являются универсальными типами parallelism в Python.

Однако, я хотел бы использовать gevent в своих задачах, поэтому у меня могла быть одна задача, порождающая множество HTTP-запросов и т.д. Проблема в том, когда я это делаю:

from gevent import monkey
monkey.patch_all()

Сельдерей перестает работать. Это начинается, но никакие задачи не могут быть эффективно выставлены в очередь - они, похоже, обращаются к брокеру, но работник сельдерей не собирает их и не обрабатывает. Только начинается и ждет. Если я удалю эти строки и выполню задачу без каких-либо gevent и распараллеливаний, все будет работать.

Я думаю, это может быть потому, что gevent исправления также нарезаются. Поэтому я попробовал

from gevent import monkey
monkey.patch_all(thread=False)

... но тогда сельдерей даже не запускается, он падает без объяснения причин (уровень отладки включен в журнал).

Можно ли использовать Celery для выполнения задач и gevent для выполнения некоторых действий внутри одной задачи? Как? Что я делаю неправильно?

4b9b3361

Ответ 1

Я считаю, что рекомендуемый способ начать задачу состоит в следующем.

python manage.py celery worker -P gevent --loglevel=INFO

Gevent необходимо заправить как можно раньше.

Ответ 2

Вы можете запустить сельдерей с несколькими потоками, содержащими несколько таких зеленых:

$ celery multi start 4 -P gevent -l info -c:1-4 1000

Ответ 3

Насколько мне удалось узнать, этот невозможен. Если кто-то найдет лучший ответ, я приму его вместо этого.

Единственный вариант - использовать gevent также в качестве основы для работников сельдерей. Для этого нужно сделать следующее:

CELERYD_POOL = 'gevent'

Подробнее об этих параметрах можно найти здесь. Более подробная информация о пуле gevent приведена на этой странице. Помните о том, что пул gevent по-прежнему отмечен как экспериментальный. Я не нашел тестов для сравнения процессов и асинхронного пула при различных задачах (задачи, ориентированные на IO, задачи, ориентированные на ЦП), но, в конце концов, я понял, что даже мои задачи, связанные с процессором, будут на самом деле более IO, чем процессор, потому что я использую базу данных для сохранения результатов, а соединение с базой данных будет узким местом, а не вычислительной частью. У меня не будет никаких научных задач, которые бы действительно поразили процессор.

Ответ 4

Из моего странного опыта Celery Beat не может нормально работать с работниками с gevent pool (запланированные задачи блокируются и ждут навсегда), если вы не активируете gevent monkey patching для Beat process.

Однако celery beat не поддерживает параметр --pool=gevent или -P gevent. Правильный способ введения gevent monkey patching заключается в использовании двоичного файла ctstomized celery, например:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from gevent import monkey
monkey.patch_all()

import re
import sys

from celery.__main__ import main

if __name__ == '__main__':
    sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0])
    sys.exit(main())

Сохраните его как celery-gevent и запустите службу Beat следующим образом:

celery-gevent beat --app=proj.celery:app --loader=djcelery.loaders.DjangoLoader -f /var/log/celery/beat.log -l INFO --workdir=/my/proj --pidfile=/var/run/celery/beat.pid

В proj.celery вы также должны установить соединение Django, чтобы избежать DatabaseError:

from __future__ import absolute_import

import os
# Set the Django settings module for the 'celery' program
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

import django
# Load Django model definitions, etc
django.setup()

from django.db import connection
# Allow thread sharing to ensure that Django database connection
# works properly with gevent.
connection.allow_thread_sharing = True

from django.conf import settings
from celery import Celery

app = Celery('proj')

# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

(приведенный выше пример работает для Python 2.7.10, Celery 3.1.18, Django 1.8.2 и gevent 1.0.2)