Подтвердить что ты не робот

Как лучше всего выполнять многопроцессорство в запросах с сервером Porn Tornado?

Я использую неблокирующий сервер Python I/O Tornado. У меня есть класс запросов GET, которые могут занять значительное количество времени (подумайте в диапазоне 5-10 секунд). Проблема в том, что Tornado блокирует эти запросы, чтобы последующие быстрые запросы поддерживались до тех пор, пока не завершится медленный запрос.

Я посмотрел: https://github.com/facebook/tornado/wiki/Threading-and-concurrency и пришел к выводу, что мне нужна комбинация # 3 (другие процессы) и # 4 (другие потоки). # 4 сам по себе имел проблемы, и я не смог получить надежный контроль обратно на ioloop, когда был другой поток, выполняющий "heavy_lifting". (Я предполагаю, что это было связано с GIL и тем фактом, что задача heavy_lifting имеет высокую нагрузку на процессор и продолжает вытаскивать управление от основного ioloop, но это предположение).

Итак, я проработал прототипом, чтобы решить эту задачу, выполнив задачи "тяжелого подъема" в этих медленных запросах GET в отдельном процессе, а затем поместив обратный вызов обратно в Tornado ioloop, когда процесс завершен, чтобы завершить запрос. Это освобождает ioloop для обработки других запросов.

Я создал простой пример, демонстрирующий возможное решение, но мне любопытно получить обратную связь от сообщества на нем.

Мой вопрос в два раза: как можно упростить этот нынешний подход? Какие подводные камни потенциально существуют с ним?

Подход

  • Используйте декоратор asynchronous, встроенный в Tornado, который позволяет просить остаться открытым и продолжить работу с ioloop.

  • Создайте отдельный процесс для задач "тяжелого подъема" с помощью модуля python multiprocessing. Сначала я попытался использовать модуль threading, но не смог получить надежное отключение управления до ioloop. Также представляется, что mutliprocessing также будет использовать многоядерные процессоры.

  • Начните поток "наблюдателя" в основном процессе ioloop с помощью модуля threading, который должен следить за multiprocessing.Queue за результатами задачи "тяжелого подъема" , когда он завершается. Это было необходимо, потому что мне нужен был способ узнать, что задача heavy_lifting была завершена, хотя она все еще может уведомить ioloop о завершении этого запроса.

  • Убедитесь, что поток "watcher" автоматически отключает управление основным циклом ioloop с помощью вызовов time.sleep(0), чтобы другие запросы продолжали легко обрабатываться.

  • Когда результат возникает в очереди, добавьте обратный вызов из потока "наблюдателя", используя tornado.ioloop.IOLoop.instance().add_callback(), который задокументирован как единственный безопасный способ вызова экземпляров ioloop из других потоков.

  • Обязательно вызовите finish() в обратном вызове, чтобы завершить запрос и передать ответ.

Ниже приведен пример кода, показывающего этот подход. multi_tornado.py - это сервер, реализующий вышеуказанный контур, а call_multi.py - это образец script, который вызывает сервер двумя способами для тестирования сервера. Оба теста вызывают сервер с 3 медленными запросами GET, за которыми следуют 20 быстрых запросов GET. Результаты показаны как для работы с включенным, так и без него.

В случае запуска с "no threading" блок 3 медленных запросов (каждый занимает чуть больше секунды для завершения). Несколько из 20 быстрых запросов сжимаются между некоторыми медленными запросами в ioloop (не совсем точно, как это происходит), но могут быть артефактом, когда я запускаю как сервер, так и клиентский тест script на том же компьютере), Дело в том, что все быстрые запросы поддерживаются в различной степени.

В случае запуска с поточной поддержкой 20 быстрых запросов сначала завершаются сразу, а три медленных запроса завершаются примерно в то же время после того, как они выполняются параллельно. Это желаемое поведение. Три медленных запроса занимают 2,5 секунды, чтобы завершить их параллельно - в то время как в несетевом случае три медленных запроса занимают около 3,5 секунд. Таким образом, около 35% ускоряется в целом (я предполагаю из-за многоядерного обмена). Но что более важно - быстрые запросы были немедленно обработаны в медленных медленных.

У меня нет большого опыта в многопоточном программировании, поэтому, хотя это, похоже, работает здесь, мне любопытно узнать:

Есть ли более простой способ сделать это? Какой монстр может скрываться в этом подходе?

(Примечание: будущий компромисс может состоять в том, чтобы просто запускать больше экземпляров Tornado с обратным прокси-сервером, например nginx, выполняющим балансировку нагрузки. Независимо от того, что я буду запускать несколько экземпляров с балансировщиком нагрузки, - но я беспокоюсь о том, чтобы просто метать оборудование по этой проблеме, поскольку кажется, что аппаратное обеспечение так напрямую связано с проблемой с точки зрения блокировки.)

Пример кода

multi_tornado.py (пример сервера):

import time
import threading
import multiprocessing
import math

from tornado.web import RequestHandler, Application, asynchronous
from tornado.ioloop import IOLoop


# run in some other process - put result in q
def heavy_lifting(q):
    t0 = time.time()
    for k in range(2000):
        math.factorial(k)

    t = time.time()
    q.put(t - t0)  # report time to compute in queue


class FastHandler(RequestHandler):
    def get(self):
        res = 'fast result ' + self.get_argument('id')
        print res
        self.write(res)
        self.flush()


class MultiThreadedHandler(RequestHandler):
    # Note:  This handler can be called with threaded = True or False
    def initialize(self, threaded=True):
        self._threaded = threaded
        self._q = multiprocessing.Queue()

    def start_process(self, worker, callback):
        # method to start process and watcher thread
        self._callback = callback

        if self._threaded:
            # launch process
            multiprocessing.Process(target=worker, args=(self._q,)).start()

            # start watching for process to finish
            threading.Thread(target=self._watcher).start()

        else:
            # threaded = False just call directly and block
            worker(self._q)
            self._watcher()

    def _watcher(self):
        # watches the queue for process result
        while self._q.empty():
            time.sleep(0)  # relinquish control if not ready

        # put callback back into the ioloop so we can finish request
        response = self._q.get(False)
        IOLoop.instance().add_callback(lambda: self._callback(response))


class SlowHandler(MultiThreadedHandler):
    @asynchronous
    def get(self):
        # start a thread to watch for
        self.start_process(heavy_lifting, self._on_response)

    def _on_response(self, delta):
        _id = self.get_argument('id')
        res = 'slow result {} <--- {:0.3f} s'.format(_id, delta)
        print res
        self.write(res)
        self.flush()
        self.finish()   # be sure to finish request


application = Application([
    (r"/fast", FastHandler),
    (r"/slow", SlowHandler, dict(threaded=False)),
    (r"/slow_threaded", SlowHandler, dict(threaded=True)),
])


if __name__ == "__main__":
    application.listen(8888)
    IOLoop.instance().start()

call_multi.py (клиентский тестер):

import sys
from tornado.ioloop import IOLoop
from tornado import httpclient


def run(slow):
    def show_response(res):
        print res.body

    # make 3 "slow" requests on server
    requests = []
    for k in xrange(3):
        uri = 'http://localhost:8888/{}?id={}'
        requests.append(uri.format(slow, str(k + 1)))

    # followed by 20 "fast" requests
    for k in xrange(20):
        uri = 'http://localhost:8888/fast?id={}'
        requests.append(uri.format(k + 1))

    # show results as they return
    http_client = httpclient.AsyncHTTPClient()

    print 'Scheduling Get Requests:'
    print '------------------------'
    for req in requests:
        print req
        http_client.fetch(req, show_response)

    # execute requests on server
    print '\nStart sending requests....'
    IOLoop.instance().start()

if __name__ == '__main__':
    scenario = sys.argv[1]

    if scenario == 'slow' or scenario == 'slow_threaded':
        run(scenario)

Результаты испытаний

Запустив python call_multi.py slow (поведение блокировки):

Scheduling Get Requests:
------------------------
http://localhost:8888/slow?id=1
http://localhost:8888/slow?id=2
http://localhost:8888/slow?id=3
http://localhost:8888/fast?id=1
http://localhost:8888/fast?id=2
http://localhost:8888/fast?id=3
http://localhost:8888/fast?id=4
http://localhost:8888/fast?id=5
http://localhost:8888/fast?id=6
http://localhost:8888/fast?id=7
http://localhost:8888/fast?id=8
http://localhost:8888/fast?id=9
http://localhost:8888/fast?id=10
http://localhost:8888/fast?id=11
http://localhost:8888/fast?id=12
http://localhost:8888/fast?id=13
http://localhost:8888/fast?id=14
http://localhost:8888/fast?id=15
http://localhost:8888/fast?id=16
http://localhost:8888/fast?id=17
http://localhost:8888/fast?id=18
http://localhost:8888/fast?id=19
http://localhost:8888/fast?id=20

Start sending requests....
slow result 1 <--- 1.338 s
fast result 1
fast result 2
fast result 3
fast result 4
fast result 5
fast result 6
fast result 7
slow result 2 <--- 1.169 s
slow result 3 <--- 1.130 s
fast result 8
fast result 9
fast result 10
fast result 11
fast result 13
fast result 12
fast result 14
fast result 15
fast result 16
fast result 18
fast result 17
fast result 19
fast result 20

Запустив python call_multi.py slow_threaded (желаемое поведение):

Scheduling Get Requests:
------------------------
http://localhost:8888/slow_threaded?id=1
http://localhost:8888/slow_threaded?id=2
http://localhost:8888/slow_threaded?id=3
http://localhost:8888/fast?id=1
http://localhost:8888/fast?id=2
http://localhost:8888/fast?id=3
http://localhost:8888/fast?id=4
http://localhost:8888/fast?id=5
http://localhost:8888/fast?id=6
http://localhost:8888/fast?id=7
http://localhost:8888/fast?id=8
http://localhost:8888/fast?id=9
http://localhost:8888/fast?id=10
http://localhost:8888/fast?id=11
http://localhost:8888/fast?id=12
http://localhost:8888/fast?id=13
http://localhost:8888/fast?id=14
http://localhost:8888/fast?id=15
http://localhost:8888/fast?id=16
http://localhost:8888/fast?id=17
http://localhost:8888/fast?id=18
http://localhost:8888/fast?id=19
http://localhost:8888/fast?id=20

Start sending requests....
fast result 1
fast result 2
fast result 3
fast result 4
fast result 5
fast result 6
fast result 7
fast result 8
fast result 9
fast result 10
fast result 11
fast result 12
fast result 13
fast result 14
fast result 15
fast result 19
fast result 20
fast result 17
fast result 16
fast result 18
slow result 2 <--- 2.485 s
slow result 3 <--- 2.491 s
slow result 1 <--- 2.517 s
4b9b3361

Ответ 1

Если вы хотите использовать concurrent.futures.ProcessPoolExecutor вместо multiprocessing, это на самом деле очень просто. Tornado ioloop уже поддерживает concurrent.futures.Future, поэтому они будут прекрасно играть вместе из коробки. concurrent.futures включен в Python 3.2+, а был отправлен обратно в Python 2.x.

Вот пример:

import time
from concurrent.futures import ProcessPoolExecutor
from tornado.ioloop import IOLoop
from tornado import gen

def f(a, b, c, blah=None):
    print "got %s %s %s and %s" % (a, b, c, blah)
    time.sleep(5)
    return "hey there"

@gen.coroutine
def test_it():
    pool = ProcessPoolExecutor(max_workers=1)
    fut = pool.submit(f, 1, 2, 3, blah="ok")  # This returns a concurrent.futures.Future
    print("running it asynchronously")
    ret = yield fut
    print("it returned %s" % ret)
    pool.shutdown()

IOLoop.instance().run_sync(test_it)

Вывод:

running it asynchronously
got 1 2 3 and ok
it returned hey there

ProcessPoolExecutor имеет более ограниченный API, чем multiprocessing.Pool, но если вам не нужны более продвинутые функции multiprocessing.Pool, его стоит использовать, потому что интеграция намного проще.

Ответ 2

multiprocessing.Pool может быть интегрирован в цикл ввода/вывода tornado, но это немного беспорядочно. Более чистая интеграция может быть выполнена с помощью concurrent.futures (см. мой другой ответ), но если вы застряли на Python 2.x и не можете установить concurrent.futures backport, вот как вы можете сделать это строго с помощью multiprocessing:

В методах multiprocessing.Pool.apply_async и multiprocessing.Pool.map_async есть необязательный параметр callback, что означает, что оба они могут быть подключены к tornado.gen.Task. Таким образом, в большинстве случаев асинхронно выполняемый код в подпроцессе прост:

import multiprocessing
import contextlib

from tornado import gen
from tornado.gen import Return
from tornado.ioloop import IOLoop
from functools import partial

def worker():
    print "async work here"

@gen.coroutine
def async_run(func, *args, **kwargs):
    result = yield gen.Task(pool.apply_async, func, args, kwargs)
    raise Return(result)

if __name__ == "__main__":
    pool = multiprocessing.Pool(multiprocessing.cpu_count())
    func = partial(async_run, worker)
    IOLoop().run_sync(func)

Как я уже упоминал, это хорошо работает в большинстве случаев. Но если worker() выдает исключение, callback никогда не вызывается, что означает, что gen.Task никогда не заканчивается, и вы вешаете вечно. Теперь, если вы знаете, что ваша работа никогда не будет генерировать исключение (потому что вы завернули все это в try/except, например), вы можете с радостью использовать этот подход. Тем не менее, если вы хотите, чтобы исключения исключались из вашего рабочего, единственное решение, которое я нашел, заключалось в подклассе некоторых компонентов многопроцессорности и заставить их вызвать callback, даже если подпроцесс рабочего вызвал исключение:

from multiprocessing.pool import ApplyResult, Pool, RUN
import multiprocessing
class TornadoApplyResult(ApplyResult):
    def _set(self, i, obj):
        self._success, self._value = obj 
        if self._callback:
            self._callback(self._value)
        self._cond.acquire()
        try:
            self._ready = True
            self._cond.notify()
        finally:
            self._cond.release()
        del self._cache[self._job]

class TornadoPool(Pool):
    def apply_async(self, func, args=(), kwds={}, callback=None):
        ''' Asynchronous equivalent of `apply()` builtin

        This version will call `callback` even if an exception is
        raised by `func`.

        '''
        assert self._state == RUN
        result = TornadoApplyResult(self._cache, callback)
        self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
        return result
 ...

 if __name__ == "__main__":
     pool = TornadoPool(multiprocessing.cpu_count())
     ...

При этих изменениях объект исключения будет возвращен gen.Task, а не gen.Task, висящим на неопределенный срок. Я также обновил мой метод async_run, чтобы повторно собрать исключение при его возврате и внес некоторые другие изменения, чтобы обеспечить лучшие трассировки для исключений, созданных в рабочих подпроцессах. Здесь полный код:

import multiprocessing
from multiprocessing.pool import Pool, ApplyResult, RUN
from functools import wraps

import tornado.web
from tornado.ioloop import IOLoop
from tornado.gen import Return
from tornado import gen

class WrapException(Exception):
    def __init__(self):
        exc_type, exc_value, exc_tb = sys.exc_info()
        self.exception = exc_value
        self.formatted = ''.join(traceback.format_exception(exc_type, exc_value, exc_tb))

    def __str__(self):
        return '\n%s\nOriginal traceback:\n%s' % (Exception.__str__(self), self.formatted)

class TornadoApplyResult(ApplyResult):
    def _set(self, i, obj):
        self._success, self._value = obj 
        if self._callback:
            self._callback(self._value)
        self._cond.acquire()
        try:
            self._ready = True
            self._cond.notify()
        finally:
            self._cond.release()
        del self._cache[self._job]   

class TornadoPool(Pool):
    def apply_async(self, func, args=(), kwds={}, callback=None):
        ''' Asynchronous equivalent of `apply()` builtin

        This version will call `callback` even if an exception is
        raised by `func`.

        '''
        assert self._state == RUN
        result = TornadoApplyResult(self._cache, callback)
        self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
        return result

@gen.coroutine
def async_run(func, *args, **kwargs):
    """ Runs the given function in a subprocess.

    This wraps the given function in a gen.Task and runs it
    in a multiprocessing.Pool. It is meant to be used as a
    Tornado co-routine. Note that if func returns an Exception 
    (or an Exception sub-class), this function will raise the 
    Exception, rather than return it.

    """
    result = yield gen.Task(pool.apply_async, func, args, kwargs)
    if isinstance(result, Exception):
        raise result
    raise Return(result)

def handle_exceptions(func):
    """ Raise a WrapException so we get a more meaningful traceback"""
    @wraps(func)
    def inner(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except Exception:
            raise WrapException()
    return inner

# Test worker functions
@handle_exceptions
def test2(x):
    raise Exception("eeee")

@handle_exceptions
def test(x):
    print x
    time.sleep(2)
    return "done"

class TestHandler(tornado.web.RequestHandler):
    @gen.coroutine
    def get(self):
        try:
            result = yield async_run(test, "inside get")
            self.write("%s\n" % result)
            result = yield async_run(test2, "hi2")
        except Exception as e:
            print("caught exception in get")
            self.write("Caught an exception: %s" % e)
        finally:
            self.finish()

app = tornado.web.Application([
    (r"/test", TestHandler),
])

if __name__ == "__main__":
    pool = TornadoPool(4)
    app.listen(8888)
    IOLoop.instance().start()

Вот как это работает для клиента:

[email protected]:~$ curl localhost:8888/test
done
Caught an exception: 

Original traceback:
Traceback (most recent call last):
  File "./mutli.py", line 123, in inner
    return func(*args, **kwargs)
  File "./mutli.py", line 131, in test2
    raise Exception("eeee")
Exception: eeee

И если я отправлю два одновременных запроса на завивки, мы увидим, что они обрабатываются асинхронно на стороне сервера:

[email protected]:~$ ./mutli.py 
inside get
inside get
caught exception inside get
caught exception inside get

Edit:

Обратите внимание, что этот код становится проще с Python 3, потому что он вводит аргумент ключевого слова error_callback для всех асинхронных методов multiprocessing.Pool. Это значительно упрощает интеграцию с Tornado:

class TornadoPool(Pool):
    def apply_async(self, func, args=(), kwds={}, callback=None):
        ''' Asynchronous equivalent of `apply()` builtin

        This version will call `callback` even if an exception is
        raised by `func`.

        '''
        super().apply_async(func, args, kwds, callback=callback,
                            error_callback=callback)

@gen.coroutine
def async_run(func, *args, **kwargs):
    """ Runs the given function in a subprocess.

    This wraps the given function in a gen.Task and runs it
    in a multiprocessing.Pool. It is meant to be used as a
    Tornado co-routine. Note that if func returns an Exception
    (or an Exception sub-class), this function will raise the
    Exception, rather than return it.

    """
    result = yield gen.Task(pool.apply_async, func, args, kwargs)
    raise Return(result)

Все, что нам нужно сделать в нашем переопределенном apply_async, вызывает родителя с аргументом ключевого слова error_callback в дополнение к callback kwarg. Нет необходимости переопределять ApplyResult.

Мы можем стать еще более привлекательными, используя MetaClass в нашем TornadoPool, чтобы его методы *_async вызывались напрямую, как если бы они были сопрограммами:

import time
from functools import wraps
from multiprocessing.pool import Pool

import tornado.web
from tornado import gen
from tornado.gen import Return
from tornado import stack_context
from tornado.ioloop import IOLoop
from tornado.concurrent import Future

def _argument_adapter(callback):
    def wrapper(*args, **kwargs):
        if kwargs or len(args) > 1:
            callback(Arguments(args, kwargs))
        elif args:
            callback(args[0])
        else:
            callback(None)
    return wrapper

def PoolTask(func, *args, **kwargs):
    """ Task function for use with multiprocessing.Pool methods.

    This is very similar to tornado.gen.Task, except it sets the
    error_callback kwarg in addition to the callback kwarg. This
    way exceptions raised in pool worker methods get raised in the
    parent when the Task is yielded from.

    """
    future = Future()
    def handle_exception(typ, value, tb):
        if future.done():
            return False
        future.set_exc_info((typ, value, tb))
        return True
    def set_result(result):
        if future.done():
            return
        if isinstance(result, Exception):
            future.set_exception(result)
        else:
            future.set_result(result)
    with stack_context.ExceptionStackContext(handle_exception):
        cb = _argument_adapter(set_result)
        func(*args, callback=cb, error_callback=cb)
    return future

def coro_runner(func):
    """ Wraps the given func in a PoolTask and returns it. """
    @wraps(func)
    def wrapper(*args, **kwargs):
        return PoolTask(func, *args, **kwargs)
    return wrapper

class MetaPool(type):
    """ Wrap all *_async methods in Pool with coro_runner. """
    def __new__(cls, clsname, bases, dct):
        pdct = bases[0].__dict__
        for attr in pdct:
            if attr.endswith("async") and not attr.startswith('_'):
                setattr(bases[0], attr, coro_runner(pdct[attr]))
        return super().__new__(cls, clsname, bases, dct)

class TornadoPool(Pool, metaclass=MetaPool):
    pass

# Test worker functions
def test2(x):
    print("hi2")
    raise Exception("eeee")

def test(x):
    print(x)
    time.sleep(2)
    return "done"

class TestHandler(tornado.web.RequestHandler):
    @gen.coroutine
    def get(self):
        try:
            result = yield pool.apply_async(test, ("inside get",))
            self.write("%s\n" % result)
            result = yield pool.apply_async(test2, ("hi2",))
            self.write("%s\n" % result)
        except Exception as e:
            print("caught exception in get")
            self.write("Caught an exception: %s" % e)
            raise
        finally:
            self.finish()

app = tornado.web.Application([
    (r"/test", TestHandler),
])

if __name__ == "__main__":
    pool = TornadoPool()
    app.listen(8888)
    IOLoop.instance().start()

Ответ 3

Если ваши запросы получить так долго, то торнадо является неправильной структурой.

Я предлагаю вам использовать nginx для маршрутизации быстрого доступа к торнадо, а более медленные - к другому серверу.

У PeterBe есть интересная статья, в которой он запускает несколько серверов Tornado и устанавливает один из них как "медленный" для обработки длительных запросов: worrying-about-io-blocking Я бы попробовал этот метод.