Документация по сельдеву упоминает тестирование Celery в Django, но не объясняет, как проверить задачу Celery, если вы не используете Django. Как вы это делаете?
Как вы unit test задание сельдерея?
Ответ 1
Синхронно проверять задачи можно с помощью любой unittest lib. Я нормально делаю 2 разных сеанса тестирования при работе с задачами сельдерея. Первый (как я предлагаю ниже) полностью синхронный, и должен быть тот, который гарантирует, что алгоритм делает то, что он должен делать. Второй сеанс использует всю систему (включая брокера) и гарантирует, что у меня нет проблем с сериализацией или любой другой проблемы с распространением, сообщением.
Итак:
from celery import Celery
celery = Celery()
@celery.task
def add(x, y):
return x + y
И ваш тест:
from nose.tools import eq_
def test_add_task():
rst = add.apply(args=(4, 4)).get()
eq_(rst, 8)
Надеюсь, что это поможет!
Ответ 2
Я использую это:
with mock.patch('celeryconfig.CELERY_ALWAYS_EAGER', True, create=True):
...
Документы: http://docs.celeryproject.org/en/3.1/configuration.html#celery-always-eager
CELERY_ALWAYS_EAGER позволяет выполнять вашу задачу синхронно, и вам не нужен сервер сельдерея.
Ответ 3
Зависит от того, что именно вы хотите тестировать.
- Проверьте код задачи напрямую. Не вызывайте "task.delay(...)" просто вызов "задачи (...)" из ваших модульных тестов.
- Используйте CELERY_ALWAYS_EAGER. Это приведет к немедленному вызову ваших задач в точке, где вы скажете "task.delay(...)", чтобы вы могли протестировать весь путь (но не какое-либо асинхронное поведение).
Ответ 4
UnitTest
import unittest
from myproject.myapp import celeryapp
class TestMyCeleryWorker(unittest.TestCase):
def setUp(self):
celeryapp.conf.update(CELERY_ALWAYS_EAGER=True)
приборы py.test
# conftest.py
from myproject.myapp import celeryapp
@pytest.fixture(scope='module')
def celery_app(request):
celeryapp.conf.update(CELERY_ALWAYS_EAGER=True)
return celeryapp
# test_tasks.py
def test_some_task(celery_app):
...
Добавление: сделать send_task уважающим нетерпением
from celery import current_app
def send_task(name, args=(), kwargs={}, **opts):
# https://github.com/celery/celery/issues/581
task = current_app.tasks[name]
return task.apply(args, kwargs, **opts)
current_app.send_task = send_task
Ответ 5
Для тех, кто на Сельдерей 4, это:
@override_settings(CELERY_TASK_ALWAYS_EAGER=True)
Поскольку имена настроек были изменены и необходимо обновить, если вы решите обновить, см.
http://docs.celeryproject.org/en/latest/whatsnew-4.0.html#lowercase-setting-names
Ответ 6
Как и для Celery 3.0, один из способов установить CELERY_ALWAYS_EAGER
в Django:
from django.test import TestCase, override_settings
from .foo import foo_celery_task
class MyTest(TestCase):
@override_settings(CELERY_ALWAYS_EAGER=True)
def test_foo(self):
self.assertTrue(foo_celery_task.delay())
Ответ 7
В моем случае (и я предполагаю, что многие другие), все, что я хотел, это проверить внутреннюю логику задачи, используя pytest.
TL; DR; закончил все насмехаться (ВАРИАНТ 2)
Пример использования:
proj/tasks.py
@shared_task(bind=True)
def add_task(self, a, b):
return a+b;
tests/test_tasks.py
from proj import add_task
def test_add():
assert add_task(1, 2) == 3, '1 + 2 should equal 3'
но, поскольку shared_task
делает много внутренней логики сельдерея, это не действительно модульные тесты.
Итак, для меня было два варианта:
ВАРИАНТ 1: Отдельная внутренняя логика
proj/tasks_logic.py
def internal_add(a, b):
return a + b;
proj/tasks.py
from .tasks_logic import internal_add
@shared_task(bind=True)
def add_task(self, a, b):
return internal_add(a, b);
Это выглядит очень странно и, кроме того, что делает его менее читаемым, требуется вручную извлекать и передавать атрибуты, которые являются частью запроса, например task_id
в случае необходимости, что делает логику менее чистой.
ВАРИАНТ 2: издевательства
издевательство над внутренним деревом сельдерея
tests/__init__.py
# noinspection PyUnresolvedReferences
from celery import shared_task
from mock import patch
def mock_signature(**kwargs):
return {}
def mocked_shared_task(*decorator_args, **decorator_kwargs):
def mocked_shared_decorator(func):
func.signature = func.si = func.s = mock_signature
return func
return mocked_shared_decorator
patch('celery.shared_task', mocked_shared_task).start()
который затем позволяет мне высмеивать объект запроса (опять же, если вам нужны вещи из запроса, например идентификатор или счетчик попыток.
tests/test_tasks.py
from proj import add_task
class MockedRequest:
def __init__(self, id=None):
self.id = id or 1
class MockedTask:
def __init__(self, id=None):
self.request = MockedRequest(id=id)
def test_add():
mocked_task = MockedTask(id=3)
assert add_task(mocked_task, 1, 2) == 3, '1 + 2 should equal 3'
Это решение гораздо более ручное, но оно дает мне контроль, необходимый для фактического модульного тестирования, без повторения себя и без потери масштаба сельдерея.
Ответ 8
Поскольку Celery v4.0, инструменты py.test предназначены для запуска работника сельдерея только для теста и завершаются, когда это делается:
def test_myfunc_is_executed(celery_session_worker):
# celery_session_worker: <Worker: [email protected] (running)>
assert myfunc.delay().wait(3)
Среди других инструментов, описанных в http://docs.celeryproject.org/en/latest/userguide/testing.html#py-test, вы можете изменить параметры по умолчанию для сельдерея, переопределив прибор celery_config
следующим образом:
@pytest.fixture(scope='session')
def celery_config():
return {
'accept_content': ['json', 'pickle'],
'result_serializer': 'pickle',
}
По умолчанию тестовый работник использует брокер в оперативной памяти и результирующий бэкэнд. Не нужно использовать локальный Redis или RabbitMQ, если не тестировать определенные функции.