Подтвердить что ты не робот

Как я могу использовать Tornado и Redis асинхронно?

Я пытаюсь найти, как я могу использовать Redis и Tornado асинхронно. Я нашел tornado-redis, но мне нужно больше, чем просто добавить yield в код.

У меня есть следующий код:

import redis
import tornado.web

class WaiterHandler(tornado.web.RequestHandler):

    @tornado.web.asynchronous
    def get(self):
        client = redis.StrictRedis(port=6279)
        pubsub = client.pubsub()
        pubsub.subscribe('test_channel')

        for item in pubsub.listen():
            if item['type'] == 'message':
                print item['channel']
                print item['data']

        self.write(item['data'])
        self.finish()


class GetHandler(tornado.web.RequestHandler):

    def get(self):
        self.write("Hello world")


application = tornado.web.Application([
    (r"/", GetHandler),
    (r"/wait", WaiterHandler),
])

if __name__ == '__main__':
    application.listen(8888)
    print 'running'
    tornado.ioloop.IOLoop.instance().start()

Мне нужно получить доступ к URL-адресу / и получить "Hello World", в то время как запрос находится в /wait. Как я могу это сделать?

4b9b3361

Ответ 1

Вы не должны использовать Redis pub/sub в основном потоке Tornado, так как он блокирует цикл ввода-вывода. Вы можете обрабатывать длинный опрос от веб-клиентов в основном потоке, но вы должны создать отдельный поток для прослушивания Redis. Затем вы можете использовать ioloop.add_callback() и/или threading.Queue для связи с основным потоком при получении сообщений.

Ответ 3

Для Python >= 3.3 я бы посоветовал вам использовать aioredis. Я не тестировал код ниже, но он должен быть примерно таким:

import redis
import tornado.web
from tornado.web import RequestHandler

import aioredis
import asyncio
from aioredis.pubsub import Receiver


class WaiterHandler(tornado.web.RequestHandler):

    @tornado.web.asynchronous
    def get(self):
        client = await aioredis.create_redis((host, 6279), encoding="utf-8", loop=IOLoop.instance().asyncio_loop)

        ch = redis.channels['test_channel']
        result = None
        while await ch.wait_message():
            item = await ch.get()
            if item['type'] == 'message':
                print item['channel']
                print item['data']
                result = item['data']

        self.write(result)
        self.finish()


class GetHandler(tornado.web.RequestHandler):

    def get(self):
        self.write("Hello world")


application = tornado.web.Application([
    (r"/", GetHandler),
    (r"/wait", WaiterHandler),
])

if __name__ == '__main__':
    print 'running'
    tornado.ioloop.IOLoop.configure('tornado.platform.asyncio.AsyncIOLoop')
    server = tornado.httpserver.HTTPServer(application)
    server.bind(8888)
    # zero means creating as many processes as there are cores.
    server.start(0)
    tornado.ioloop.IOLoop.instance().start()

Ответ 4

Хорошо, так вот мой пример того, как я буду делать это с получением запросов.

Я добавил два основных компонента:

Первый - это простой поточный прослушиватель pubsub, который добавляет новые сообщения в локальный объект списка. Я также добавил в список аксессоров списка, чтобы вы могли читать из потока слушателей, как если бы вы читали из обычного списка. Что касается вашего WebRequest, вы просто читаете данные из локального объекта списка. Это немедленно возвращается и не блокирует текущий запрос от завершения или будущих запросов от принятия и обработки.

class OpenChannel(threading.Thread):
    def __init__(self, channel, host = None, port = None):
        threading.Thread.__init__(self)
        self.lock = threading.Lock()
        self.redis = redis.StrictRedis(host = host or 'localhost', port = port or 6379)
        self.pubsub = self.redis.pubsub()
        self.pubsub.subscribe(channel)

        self.output = []

    # lets implement basic getter methods on self.output, so you can access it like a regular list
    def __getitem__(self, item):
        with self.lock:
            return self.output[item]

    def __getslice__(self, start, stop = None, step = None):
        with self.lock:
            return self.output[start:stop:step]

    def __str__(self):
        with self.lock:
            return self.output.__str__()

    # thread loop
    def run(self):
        for message in self.pubsub.listen():
            with self.lock:
                self.output.append(message['data'])

    def stop(self):
        self._Thread__stop()

Второй класс ApplicationMixin. Это дополнительный объект, на который вы наследуете свой класс веб-запросов, чтобы добавить функциональность и атрибуты. В этом случае он проверяет, существует ли уже прослушиватель каналов для запрошенного канала, создается один, если ни один не найден, и возвращает дескриптор прослушивателя в WebRequest.

# add a method to the application that will return existing channels
# or create non-existing ones and then return them
class ApplicationMixin(object):
    def GetChannel(self, channel, host = None, port = None):
        if channel not in self.application.channels:
            self.application.channels[channel] = OpenChannel(channel, host, port)
            self.application.channels[channel].start()
        return self.application.channels[channel]

Класс WebRequest теперь рассматривает слушателя как статический список (имея в виду, что вам нужно дать строку self.write)

class ReadChannel(tornado.web.RequestHandler, ApplicationMixin):
    @tornado.web.asynchronous
    def get(self, channel):
        # get the channel
        channel = self.GetChannel(channel)
        # write out its entire contents as a list
        self.write('{}'.format(channel[:]))
        self.finish() # not necessary?

Наконец, после создания приложения я добавил пустой словарь в качестве атрибута

# add a dictionary containing channels to your application
application.channels = {}

Как и некоторая очистка работающих потоков, после выхода из приложения

# clean up the subscribed channels
for channel in application.channels:
    application.channels[channel].stop()
    application.channels[channel].join()

Полный код:

import threading
import redis
import tornado.web



class OpenChannel(threading.Thread):
    def __init__(self, channel, host = None, port = None):
        threading.Thread.__init__(self)
        self.lock = threading.Lock()
        self.redis = redis.StrictRedis(host = host or 'localhost', port = port or 6379)
        self.pubsub = self.redis.pubsub()
        self.pubsub.subscribe(channel)

        self.output = []

    # lets implement basic getter methods on self.output, so you can access it like a regular list
    def __getitem__(self, item):
        with self.lock:
            return self.output[item]

    def __getslice__(self, start, stop = None, step = None):
        with self.lock:
            return self.output[start:stop:step]

    def __str__(self):
        with self.lock:
            return self.output.__str__()

    # thread loop
    def run(self):
        for message in self.pubsub.listen():
            with self.lock:
                self.output.append(message['data'])

    def stop(self):
        self._Thread__stop()


# add a method to the application that will return existing channels
# or create non-existing ones and then return them
class ApplicationMixin(object):
    def GetChannel(self, channel, host = None, port = None):
        if channel not in self.application.channels:
            self.application.channels[channel] = OpenChannel(channel, host, port)
            self.application.channels[channel].start()
        return self.application.channels[channel]

class ReadChannel(tornado.web.RequestHandler, ApplicationMixin):
    @tornado.web.asynchronous
    def get(self, channel):
        # get the channel
        channel = self.GetChannel(channel)
        # write out its entire contents as a list
        self.write('{}'.format(channel[:]))
        self.finish() # not necessary?


class GetHandler(tornado.web.RequestHandler):

    def get(self):
        self.write("Hello world")


application = tornado.web.Application([
    (r"/", GetHandler),
    (r"/channel/(?P<channel>\S+)", ReadChannel),
])


# add a dictionary containing channels to your application
application.channels = {}


if __name__ == '__main__':
    application.listen(8888)
    print 'running'
    try:
        tornado.ioloop.IOLoop.instance().start()
    except KeyboardInterrupt:
        pass

    # clean up the subscribed channels
    for channel in application.channels:
        application.channels[channel].stop()
        application.channels[channel].join()