Подтвердить что ты не робот

GAE: тестовое задание с тестовым стендом

Я использую testbed для unit test моего приложения для приложений Google, а мое приложение использует задачу.

Когда я отправляю задачу на задание во время unit test, кажется, что задача находится в очереди, но задача не выполняется.

Как мне заставить задачу выполнить во время unit test?

4b9b3361

Ответ 1

Сервер приложений Dev однопоточен, поэтому он не может запускать задачи в фоновом режиме, в то время как поток переднего плана запускает тесты.

Я изменил TaskQueueTestCase в taskqueue.py в gaetestbed, чтобы добавить следующую функцию:

def execute_tasks(self, application):
    """
    Executes all currently queued tasks, and also removes them from the 
    queue.
    The tasks are execute against the provided web application.
    """

    # Set up the application for webtest to use (resetting _app in case a
    # different one has been used before). 
    self._app = None
    self.APPLICATION = application

    # Get all of the tasks, and then clear them.
    tasks = self.get_tasks()
    self.clear_task_queue()

    # Run each of the tasks, checking that they succeeded.
    for task in tasks:
        response = self.post(task['url'], task['params'])
        self.assertOK(response)

Для этого мне также пришлось изменить базовый класс TaskQueueTestCase от BaseTestCase до WebTestCase.

Мои тесты затем делают что-то вроде этого:

# Do something which enqueues a task.

# Check that a task was enqueued, then execute it.
self.assertTrue(len(self.get_tasks()), 1)
self.execute_tasks(some_module.application)

# Now test that the task did what was expected.

Таким образом, эта задача выполняется непосредственно с переднего плана unit test. Это не совсем то же самое, что и в производстве (т.е. Задача будет выполняться "через некоторое время" по отдельному запросу), но для меня это работает достаточно хорошо.

Ответ 2

Используя Saxon отличный ответ, я смог сделать то же самое, используя testbed вместо gaetestbed. Вот что я сделал.

Добавил это к моему setUp():

    self.taskqueue_stub = apiproxy_stub_map.apiproxy.GetStub('taskqueue')

Затем в моем тесте я использовал следующее:

    # Execute the task in the taskqueue
    tasks = self.taskqueue_stub.GetTasks("default")
    self.assertEqual(len(tasks), 1)
    task = tasks[0]
    params = base64.b64decode(task["body"])
    response = self.app.post(task["url"], params)

Где-то вдоль линии параметры POST получают base64, поэтому пришлось отменить это, чтобы заставить его работать.

Мне нравится это лучше, чем саксонский ответ, поскольку я могу использовать официальный тестовый пакет, и я могу сделать все это в своем собственном тестовом коде.

EDIT: позже я хотел сделать то же самое с задачами, представленными с использованием отложенной библиотеки, и потребовалось немного поворота, чтобы понять это, поэтому я делюсь здесь, чтобы облегчить боль другим людям.

Если ваше задание содержит только задания, отправленные с отсрочкой, тогда будут выполняться все задачи и задачи, поставленные в очередь этими задачами:

def submit_deferred(taskq):
    tasks = taskq.GetTasks("default")
    taskq.FlushQueue("default")
    while tasks:
        for task in tasks:
            (func, args, opts) = pickle.loads(base64.b64decode(task["body"]))
            func(*args)
        tasks = taskq.GetTasks("default")
        taskq.FlushQueue("default")

Ответ 3

Другим (более чистым) вариантом для этого является использование заглушки очереди задач в тестовом стенде. Для этого сначала нужно инициализировать заглушку очереди задач, добавив следующее к вашему методу setUp():

self.testbed = init_testbed()
self.testbed.init_taskqueue_stub()

Доступ к планировщику задач можно получить с помощью следующего кода:

taskq = self.testbed.get_stub(testbed.TASKQUEUE_SERVICE_NAME)

Интерфейс для работы с заглушкой очереди выглядит следующим образом:

GetQueues() #returns a list of dictionaries with information about the available queues

#returns a list of dictionaries with information about the tasks in a given queue
GetTasks(queue_name)

DeleteTask(queue_name, task_name) #removes the task task_name from the given queue

FlushQueue(queue_name) #removes all the tasks from the queue

#returns tasks filtered by name & url pointed to by the task from the given queues
get_filtered_tasks(url, name, queue_names)

StartBackgroundExecution() #Executes the queued tasks

Shutdown() #Requests the task scheduler to shutdown.

Кроме того, поскольку это использует собственные средства App Engine SDK - он отлично работает с отложенной библиотекой.

Ответ 4

Возможно, вы захотите попробовать следующий код. Полное объяснение здесь: http://www.geewax.org/task-queue-support-in-app-engines-ext-testbed/

import unittest
from google.appengine.api import taskqueue
from google.appengine.ext import testbed


class TaskQueueTestCase(unittest.TestCase):

  def setUp(self):
    self.testbed = testbed.Testbed()
    self.testbed.activate()
    self.testbed.init_taskqueue_stub()
    self.task_queue_stub = self.testbed.get_stub(testbed.TASKQUEUE_SERVICE_NAME)

  def tearDown(self):
    self.testbed.deactivate()

  def testTaskAdded(self):
    taskqueue.add(url='/path/to/task')

    tasks = self.taskqueue_stub.get_filtered_tasks(url='/path/to/task')
    self.assertEqual(1, len(tasks))
    self.assertEqual('/path/to/task', tasks[0].url)

unittest.main()