Подтвердить что ты не робот

Очереди задач тестирования модулей в AppEngine

В течение очень долгого времени я использую очереди задач в AppEngine для планирования задач, как раз так, как я должен.

Но то, что мне всегда было интересно, - это как писать тесты для этого? До сих пор я просто делал тесты, чтобы убедиться, что в API нет очереди, которая ставит в очередь задачу, а затем написала более подходящие тесты для API, выполняющего задачу.

Однако в последнее время я начал чувствовать себя немного неудовлетворенным этим, и я ищу способ проверить, что правильная задача была добавлена ​​в правильную очередь. Надеюсь, это можно сделать лучше, чем просто путем развертывания кода и надежды на лучшее.

Я использую django-nonrel, если это имеет какое-либо отношение к ответу.

Напомним: как записать unit test для подтверждения задач, поставленных в очередь?

4b9b3361

Ответ 1

Использование GAE Test Bed позволит вам опустить очередь задач.

Если вы наследуете от FunctionalTestCase или TaskQueueTestCase, вы получите методы, такие как get_tasks и assertTasksInQueue.

Вы также можете запускать задачи. Как это сделать, зависит от того, используете ли вы задачи или откладываете их.

Для отложенных слов у меня есть такой код:

from google.appengine.ext import deferred
import base64

# gets the most recent task -- since the task queue is reset between tests,
# this is usually what you want
def get_task(self):
    for task in self.get_task_queue_stub().GetTasks('default'):
        return task

# decode and execute the deferred method
def run_deferred(task):
    deferred.run(base64.b64decode(task['body']))

Запуск задач аналогичен, но после выполнения задачи вы используете WebTest (какая тестовая кровать GAE построена поверх), чтобы отправить в качестве запроса POST URL-адрес задачи с его параметрами.

Ответ 2

Если вы используете google.appengine.ext.testbed вместо тестового стенда GAE (тестовый стенд GAE теперь устарел и перемещается в ext.testbed), вы можете сделать следующее:

import base64
import unittest2

from google.appengine.ext import deferred
from google.appengine.ext import testbed


class TestTasks(unittest2.TestCase):
  def setUp(self):
    self.testbed = testbed.Testbed()
    self.testbed.activate()
    self.testbed.init_taskqueue_stub()
    self.taskqueue_stub = self.testbed.get_stub(testbed.TASKQUEUE_SERVICE_NAME)

  def tearDown(self):
    self.testbed.deactivate()

  def test_send_contact_request(self):
    # Make the request to your app that "defers" something:
    response = ...
    self.assertEqual(response.status_int, 200)

    # Get the task out of the queue
    tasks = self.taskqueue_stub.get_filtered_tasks()
    self.assertEqual(1, len(tasks))

    # Run the task
    task = tasks[0]
    deferred.run(task.payload)

    # Assert that other things happened (ie, if the deferred was sending mail...)
    self.assertEqual(...)

Ответ 3

Существует небольшая тестовая структура, называемая gaetestbed, которая может удовлетворить ваши потребности. Подробнее см. https://github.com/jgeewax/gaetestbed.

Эта среда тестирования работает в сочетании с плагином для носа, носа-gae и пакетом WebTest. Данное сочетание пакетов python - лучший способ проверить приложения GAE, насколько я могу судить.

Ответ 4

SDK 1.4.3 Тестирование API обеспечивает простоту конфигурация библиотек-заглушек для тестов локальной интеграции.

Доступны служебные заглушки для Task Queue.