Подтвердить что ты не робот

Очередь удаленных вызовов на Python Twisted перспективного брокера?

Сила Twisted (для python) - это асинхронная структура (я думаю). Я написал сервер обработки изображений, который обрабатывает запросы через Perspective Broker. Он отлично работает, пока я кормлю его менее чем несколькими сотнями изображений за раз. Тем не менее, иногда он получает сотни изображений практически в одно и то же время. Поскольку он пытается обрабатывать их все одновременно, сервер выходит из строя.

В качестве решения я хотел бы поставить очередь на remote_calls на сервере, чтобы он обрабатывал только 100 изображений за раз. Похоже, это может быть нечто, что Twisted уже делает, но я не могу найти его. Любые идеи о том, как начать реализацию этого? Точка в правильном направлении? Благодарю!

4b9b3361

Ответ 1

Один готовый вариант, который может помочь с этим, - twisted.internet.defer.DeferredSemaphore. Это асинхронная версия обычного (счетного) семафора, который вы, возможно, уже знаете, если вы сделали многопрограммированное программирование.

Семафор A (счет) очень похож на мьютекс (блокировка). Но если мьютекс можно получить только один раз до соответствующего выпуска, семафор (подсчет) можно настроить таким образом, чтобы произвольное (но определенное) количество приобретений было успешным до того, как требуются соответствующие выпуски.

Вот пример использования DeferredSemaphore для запуска десяти асинхронных операций, но для запуска не более трех из них сразу:

from twisted.internet.defer import DeferredSemaphore, gatherResults
from twisted.internet.task import deferLater
from twisted.internet import reactor


def async(n):
    print 'Starting job', n
    d = deferLater(reactor, n, lambda: None)
    def cbFinished(ignored):
        print 'Finishing job', n
    d.addCallback(cbFinished)
    return d


def main():
    sem = DeferredSemaphore(3)

    jobs = []
    for i in range(10):
        jobs.append(sem.run(async, i))

    d = gatherResults(jobs)
    d.addCallback(lambda ignored: reactor.stop())
    reactor.run()


if __name__ == '__main__':
    main()

DeferredSemaphore также имеет явные методы acquire и release, но метод run настолько удобен, что почти всегда вы хотите. Он вызывает метод acquire, который возвращает Deferred. К этому первому Deferred он добавляет обратный вызов, который вызывает функцию, которую вы передали (вместе с любыми позиционными или ключевыми аргументами). Если эта функция возвращает a Deferred, то к этой секунде Deferred добавляется обратный вызов, который вызывает метод release.

Также обрабатывается синхронный случай, немедленно позвонив release. Ошибки также обрабатываются, позволяя им распространяться, но делая необходимым release, чтобы оставить DeferredSemaphore в согласованном состоянии. Результат функции, переданной в run (или результат возврата Deferred), становится результатом Deferred, возвращаемого run.

Другой возможный подход может основываться на DeferredQueue и cooperate. DeferredQueue в основном похож на обычную очередь, но его метод get возвращает Deferred. Если в момент вызова не было элементов в очереди, Deferred не будет срабатывать до тех пор, пока элемент не будет добавлен.

Вот пример:

from random import randrange

from twisted.internet.defer import DeferredQueue
from twisted.internet.task import deferLater, cooperate
from twisted.internet import reactor


def async(n):
    print 'Starting job', n
    d = deferLater(reactor, n, lambda: None)
    def cbFinished(ignored):
        print 'Finishing job', n
    d.addCallback(cbFinished)
    return d


def assign(jobs):
    # Create new jobs to be processed
    jobs.put(randrange(10))
    reactor.callLater(randrange(10), assign, jobs)


def worker(jobs):
    while True:
        yield jobs.get().addCallback(async)


def main():
    jobs = DeferredQueue()

    for i in range(10):
        jobs.put(i)

    assign(jobs)

    for i in range(3):
        cooperate(worker(jobs))

    reactor.run()


if __name__ == '__main__':
    main()

Обратите внимание, что рабочая функция async такая же, как и в первом примере. Однако на этот раз также существует функция worker, которая явно вытягивает задания из DeferredQueue и обрабатывает их с помощью async (добавив async в качестве обратного вызова Deferred, возвращаемого get)), Генератор worker управляется cooperate, который повторяет его один раз после каждого Deferred, который дает огни. Затем основной цикл запускает три из этих рабочих генераторов, так что в любой момент времени будут выполняться три задания.

Этот подход включает в себя немного больше кода, чем подход DeferredSemaphore, но имеет некоторые преимущества, которые могут быть интересными. Во-первых, cooperate возвращает экземпляр CooperativeTask, который имеет полезные методы, такие как pause, resume и еще пару других. Кроме того, все задания, назначенные одному и тому же сотруднику, будут сотрудничать друг с другом в планировании, чтобы не перегружать цикл событий (и это то, что дает API его имя). На стороне DeferredQueue также можно установить ограничения на количество элементов, ожидающих обработки, поэтому вы можете избежать полной перегрузки сервера (например, если ваши графические процессоры застревают и перестают выполнять задачи). Если код, вызывающий put, обрабатывает исключение переполнения очереди, вы можете использовать это как давление, чтобы попытаться прекратить прием новых заданий (возможно, шунтирование их на другой сервер или предупреждение администратора). Выполнение подобных действий с помощью DeferredSemaphore немного сложнее, поскольку нет возможности ограничить количество заданий, ожидающих получения семафора.

Ответ 2

Вам также может понравиться txRDQ (Resizable Dispatch Queue), который я написал. Google это, он в коллекции tx на LaunchPad. Извините, у меня нет больше времени, чтобы ответить - собираюсь выйти на сцену.

Терри