Я использую неблокирующий сервер Python I/O Tornado. У меня есть класс запросов GET
, которые могут занять значительное количество времени (подумайте в диапазоне 5-10 секунд). Проблема в том, что Tornado блокирует эти запросы, чтобы последующие быстрые запросы поддерживались до тех пор, пока не завершится медленный запрос.
Я посмотрел: https://github.com/facebook/tornado/wiki/Threading-and-concurrency и пришел к выводу, что мне нужна комбинация # 3 (другие процессы) и # 4 (другие потоки). # 4 сам по себе имел проблемы, и я не смог получить надежный контроль обратно на ioloop, когда был другой поток, выполняющий "heavy_lifting". (Я предполагаю, что это было связано с GIL и тем фактом, что задача heavy_lifting имеет высокую нагрузку на процессор и продолжает вытаскивать управление от основного ioloop, но это предположение).
Итак, я проработал прототипом, чтобы решить эту задачу, выполнив задачи "тяжелого подъема" в этих медленных запросах GET
в отдельном процессе, а затем поместив обратный вызов обратно в Tornado ioloop, когда процесс завершен, чтобы завершить запрос. Это освобождает ioloop для обработки других запросов.
Я создал простой пример, демонстрирующий возможное решение, но мне любопытно получить обратную связь от сообщества на нем.
Мой вопрос в два раза: как можно упростить этот нынешний подход? Какие подводные камни потенциально существуют с ним?
Подход
-
Используйте декоратор
asynchronous
, встроенный в Tornado, который позволяет просить остаться открытым и продолжить работу с ioloop. -
Создайте отдельный процесс для задач "тяжелого подъема" с помощью модуля python
multiprocessing
. Сначала я попытался использовать модульthreading
, но не смог получить надежное отключение управления до ioloop. Также представляется, чтоmutliprocessing
также будет использовать многоядерные процессоры. -
Начните поток "наблюдателя" в основном процессе ioloop с помощью модуля
threading
, который должен следить заmultiprocessing.Queue
за результатами задачи "тяжелого подъема" , когда он завершается. Это было необходимо, потому что мне нужен был способ узнать, что задача heavy_lifting была завершена, хотя она все еще может уведомить ioloop о завершении этого запроса. -
Убедитесь, что поток "watcher" автоматически отключает управление основным циклом ioloop с помощью вызовов
time.sleep(0)
, чтобы другие запросы продолжали легко обрабатываться. -
Когда результат возникает в очереди, добавьте обратный вызов из потока "наблюдателя", используя
tornado.ioloop.IOLoop.instance().add_callback()
, который задокументирован как единственный безопасный способ вызова экземпляров ioloop из других потоков. -
Обязательно вызовите
finish()
в обратном вызове, чтобы завершить запрос и передать ответ.
Ниже приведен пример кода, показывающего этот подход. multi_tornado.py
- это сервер, реализующий вышеуказанный контур, а call_multi.py
- это образец script, который вызывает сервер двумя способами для тестирования сервера. Оба теста вызывают сервер с 3 медленными запросами GET
, за которыми следуют 20 быстрых запросов GET
. Результаты показаны как для работы с включенным, так и без него.
В случае запуска с "no threading" блок 3 медленных запросов (каждый занимает чуть больше секунды для завершения). Несколько из 20 быстрых запросов сжимаются между некоторыми медленными запросами в ioloop (не совсем точно, как это происходит), но могут быть артефактом, когда я запускаю как сервер, так и клиентский тест script на том же компьютере), Дело в том, что все быстрые запросы поддерживаются в различной степени.
В случае запуска с поточной поддержкой 20 быстрых запросов сначала завершаются сразу, а три медленных запроса завершаются примерно в то же время после того, как они выполняются параллельно. Это желаемое поведение. Три медленных запроса занимают 2,5 секунды, чтобы завершить их параллельно - в то время как в несетевом случае три медленных запроса занимают около 3,5 секунд. Таким образом, около 35% ускоряется в целом (я предполагаю из-за многоядерного обмена). Но что более важно - быстрые запросы были немедленно обработаны в медленных медленных.
У меня нет большого опыта в многопоточном программировании, поэтому, хотя это, похоже, работает здесь, мне любопытно узнать:
Есть ли более простой способ сделать это? Какой монстр может скрываться в этом подходе?
(Примечание: будущий компромисс может состоять в том, чтобы просто запускать больше экземпляров Tornado с обратным прокси-сервером, например nginx, выполняющим балансировку нагрузки. Независимо от того, что я буду запускать несколько экземпляров с балансировщиком нагрузки, - но я беспокоюсь о том, чтобы просто метать оборудование по этой проблеме, поскольку кажется, что аппаратное обеспечение так напрямую связано с проблемой с точки зрения блокировки.)
Пример кода
multi_tornado.py
(пример сервера):
import time
import threading
import multiprocessing
import math
from tornado.web import RequestHandler, Application, asynchronous
from tornado.ioloop import IOLoop
# run in some other process - put result in q
def heavy_lifting(q):
t0 = time.time()
for k in range(2000):
math.factorial(k)
t = time.time()
q.put(t - t0) # report time to compute in queue
class FastHandler(RequestHandler):
def get(self):
res = 'fast result ' + self.get_argument('id')
print res
self.write(res)
self.flush()
class MultiThreadedHandler(RequestHandler):
# Note: This handler can be called with threaded = True or False
def initialize(self, threaded=True):
self._threaded = threaded
self._q = multiprocessing.Queue()
def start_process(self, worker, callback):
# method to start process and watcher thread
self._callback = callback
if self._threaded:
# launch process
multiprocessing.Process(target=worker, args=(self._q,)).start()
# start watching for process to finish
threading.Thread(target=self._watcher).start()
else:
# threaded = False just call directly and block
worker(self._q)
self._watcher()
def _watcher(self):
# watches the queue for process result
while self._q.empty():
time.sleep(0) # relinquish control if not ready
# put callback back into the ioloop so we can finish request
response = self._q.get(False)
IOLoop.instance().add_callback(lambda: self._callback(response))
class SlowHandler(MultiThreadedHandler):
@asynchronous
def get(self):
# start a thread to watch for
self.start_process(heavy_lifting, self._on_response)
def _on_response(self, delta):
_id = self.get_argument('id')
res = 'slow result {} <--- {:0.3f} s'.format(_id, delta)
print res
self.write(res)
self.flush()
self.finish() # be sure to finish request
application = Application([
(r"/fast", FastHandler),
(r"/slow", SlowHandler, dict(threaded=False)),
(r"/slow_threaded", SlowHandler, dict(threaded=True)),
])
if __name__ == "__main__":
application.listen(8888)
IOLoop.instance().start()
call_multi.py
(клиентский тестер):
import sys
from tornado.ioloop import IOLoop
from tornado import httpclient
def run(slow):
def show_response(res):
print res.body
# make 3 "slow" requests on server
requests = []
for k in xrange(3):
uri = 'http://localhost:8888/{}?id={}'
requests.append(uri.format(slow, str(k + 1)))
# followed by 20 "fast" requests
for k in xrange(20):
uri = 'http://localhost:8888/fast?id={}'
requests.append(uri.format(k + 1))
# show results as they return
http_client = httpclient.AsyncHTTPClient()
print 'Scheduling Get Requests:'
print '------------------------'
for req in requests:
print req
http_client.fetch(req, show_response)
# execute requests on server
print '\nStart sending requests....'
IOLoop.instance().start()
if __name__ == '__main__':
scenario = sys.argv[1]
if scenario == 'slow' or scenario == 'slow_threaded':
run(scenario)
Результаты испытаний
Запустив python call_multi.py slow
(поведение блокировки):
Scheduling Get Requests:
------------------------
http://localhost:8888/slow?id=1
http://localhost:8888/slow?id=2
http://localhost:8888/slow?id=3
http://localhost:8888/fast?id=1
http://localhost:8888/fast?id=2
http://localhost:8888/fast?id=3
http://localhost:8888/fast?id=4
http://localhost:8888/fast?id=5
http://localhost:8888/fast?id=6
http://localhost:8888/fast?id=7
http://localhost:8888/fast?id=8
http://localhost:8888/fast?id=9
http://localhost:8888/fast?id=10
http://localhost:8888/fast?id=11
http://localhost:8888/fast?id=12
http://localhost:8888/fast?id=13
http://localhost:8888/fast?id=14
http://localhost:8888/fast?id=15
http://localhost:8888/fast?id=16
http://localhost:8888/fast?id=17
http://localhost:8888/fast?id=18
http://localhost:8888/fast?id=19
http://localhost:8888/fast?id=20
Start sending requests....
slow result 1 <--- 1.338 s
fast result 1
fast result 2
fast result 3
fast result 4
fast result 5
fast result 6
fast result 7
slow result 2 <--- 1.169 s
slow result 3 <--- 1.130 s
fast result 8
fast result 9
fast result 10
fast result 11
fast result 13
fast result 12
fast result 14
fast result 15
fast result 16
fast result 18
fast result 17
fast result 19
fast result 20
Запустив python call_multi.py slow_threaded
(желаемое поведение):
Scheduling Get Requests:
------------------------
http://localhost:8888/slow_threaded?id=1
http://localhost:8888/slow_threaded?id=2
http://localhost:8888/slow_threaded?id=3
http://localhost:8888/fast?id=1
http://localhost:8888/fast?id=2
http://localhost:8888/fast?id=3
http://localhost:8888/fast?id=4
http://localhost:8888/fast?id=5
http://localhost:8888/fast?id=6
http://localhost:8888/fast?id=7
http://localhost:8888/fast?id=8
http://localhost:8888/fast?id=9
http://localhost:8888/fast?id=10
http://localhost:8888/fast?id=11
http://localhost:8888/fast?id=12
http://localhost:8888/fast?id=13
http://localhost:8888/fast?id=14
http://localhost:8888/fast?id=15
http://localhost:8888/fast?id=16
http://localhost:8888/fast?id=17
http://localhost:8888/fast?id=18
http://localhost:8888/fast?id=19
http://localhost:8888/fast?id=20
Start sending requests....
fast result 1
fast result 2
fast result 3
fast result 4
fast result 5
fast result 6
fast result 7
fast result 8
fast result 9
fast result 10
fast result 11
fast result 12
fast result 13
fast result 14
fast result 15
fast result 19
fast result 20
fast result 17
fast result 16
fast result 18
slow result 2 <--- 2.485 s
slow result 3 <--- 2.491 s
slow result 1 <--- 2.517 s