Подтвердить что ты не робот

Возможно ли блокирование Redis pubsub?

Я хочу использовать redis 'pubsub для передачи некоторых сообщений, но не хочу блокироваться с помощью listen, например, кода ниже:

import redis
rc = redis.Redis()

ps = rc.pubsub()
ps.subscribe(['foo', 'bar'])

rc.publish('foo', 'hello world')

for item in ps.listen():
    if item['type'] == 'message':
        print item['channel']
        print item['data']

Последний блок for заблокирует. Я просто хочу проверить, есть ли у данного канала данные, как я могу это сделать? Есть ли метод check?

4b9b3361

Ответ 1

Я не думаю, что это было бы возможно. Канал не имеет "текущих данных", вы подписываетесь на канал и начинаете получать сообщения, которые нажимаются другими клиентами на канале, следовательно, это блокирующий API. Кроме того, если вы посмотрите на документацию Redis для pub/sub, это сделает ее более понятной.

Ответ 2

Если вы думаете о неблокирующей асинхронной обработке, вы, вероятно, используете (или должны использовать) асинхронный фреймворк/сервер.

  • если вы используете Торнадо, есть Торнадо-Редис. Он использует родные вызовы генератора Торнадо. В демоверсии Websocket приведен пример использования его в сочетании с pub/sub.

  • если вы используете Twisted, есть txRedis. Там у вас также есть пример pub/sub.

  • также кажется, что вы можете использовать Redis-py в сочетании с Gevent без проблем, используя gevent.monkey.patch_all() Monkey (gevent.monkey.patch_all()).

ОБНОВЛЕНИЕ: Прошло 5 лет с момента первоначального ответа, в то же время Python получил встроенную поддержку асинхронного ввода-вывода. Там сейчас AIORedis, асинхронный клиент IO Redis.

Ответ 3

В новой версии Redis-Py есть поддержка асинхронного pubsub, проверьте https://github.com/andymccurdy/redis-py для более подробной информации. Вот пример из самой документации:

while True:
    message = p.get_message()
    if message:
        # do something with the message
    time.sleep(0.001)  # be nice to the system :)

Ответ 4

Принятый ответ устарел, поскольку redis-py рекомендует использовать неблокирующий get_message(). Но это также позволяет легко использовать потоки.

https://pypi.python.org/pypi/redis

Существует три разных способа чтения сообщений.

За кулисами get_message() использует модуль выбора систем для быстрого опроса сокета соединений. Если доступны данные, доступные для чтения, get_message() будет читать их, отформатировать сообщение и вернуть его или передать его обработчику сообщений. Если нет данных для чтения, get_message() немедленно вернет None. Это делает тривиальным интегрирование в существующий цикл событий внутри вашего приложения.

 while True:
     message = p.get_message()
     if message:
         # do something with the message
     time.sleep(0.001)  # be nice to the system :)

Старые версии redis-py только читают сообщения с pubsub.listen(). listen() - это генератор, который блокирует до тех пор, пока сообщение не будет доступно. Если вашему приложению не нужно ничего делать, кроме как принимать и принимать сообщения, полученные от redis, listen() - это простой способ запустить работу.

 for message in p.listen():
     # do something with the message

Третий вариант запускает цикл событий в отдельном потоке. pubsub.run_in_thread() создает новый поток и запускает цикл события. Объект потока возвращается вызывающей стороне run_in_thread(). Вызывающий может использовать метод thread.stop() для завершения цикла и потока событий. За кулисами это просто обертка вокруг get_message(), которая работает в отдельном потоке, по сути создавая крошечный неблокирующий цикл событий для вас. run_in_thread() принимает необязательный аргумент sleep_time. Если указано, цикл события вызовет time.sleep() со значением в каждой итерации цикла.

Примечание. Поскольку они выполнялись в отдельном потоке, они не могут обрабатывать сообщения, которые автоматически обрабатываются зарегистрированными обработчиками сообщений. Поэтому redis-py не позволяет вам запускать run_in_thread(), если вы подписаны на шаблоны или каналы, на которые не подключены обработчики сообщений.

p.subscribe(**{'my-channel': my_handler})
thread = p.run_in_thread(sleep_time=0.001)
# the event loop is now running in the background processing messages
# when it time to shut it down...
thread.stop()

Итак, чтобы ответить на ваш вопрос, просто проверьте get_message, когда вы хотите узнать, прибыло ли сообщение.

Ответ 5

Это рабочий пример для потокового прослушивания.

import sys
import cmd
import redis
import threading


def monitor():
    r = redis.Redis(YOURHOST, YOURPORT, YOURPASSWORD, db=0)

    channel = sys.argv[1]
    p = r.pubsub()
    p.subscribe(channel)

    print 'monitoring channel', channel
    for m in p.listen():
        print m['data']


class my_cmd(cmd.Cmd):
    """Simple command processor example."""

    def do_start(self, line):
        my_thread.start()

    def do_EOF(self, line):
        return True


if __name__ == '__main__':
    if len(sys.argv) == 1:
        print "missing argument! please provide the channel name."
    else:
        my_thread = threading.Thread(target=monitor)
        my_thread.setDaemon(True)

        my_cmd().cmdloop()

Ответ 6

Вот неблокирующее решение без потоков:

fd = ps.connection._sock.fileno();
rlist,, = select.select([fd], [], [], 0) # or replace 0 with None to block
if rlist:
    for rfd in rlist:
        if fd == rfd:
            message = ps.get_message()

ps.get_message() достаточно само по себе, но я использую этот метод, чтобы я мог ждать несколько fds вместо просто redis-соединения.

Ответ 7

Чтобы достичь неблокирующего кода, вы должны сделать другой код парадигмы. Это не сложно, используя новый поток, чтобы прослушать все изменения и оставить основной поток, чтобы делать другие вещи.

Кроме того, вам понадобится механизм для обмена данными между основным потоком и потоком подписчиков redis.

Ответ 8

Самый эффективный подход будет основан скорее на основе зелёных, чем на потоковых. Являясь основанной на зеленой структуре concurrency, gevent уже полностью установлен в мире Python. Поэтому интеграция gevent с redis-py была бы замечательной. Это именно то, что обсуждается в этом выпуске на github:

https://github.com/andymccurdy/redis-py/issues/310

Ответ 9

Вы можете использовать gevent, gevent monkey patching для создания неблокирующего приложения redis pubsub.

Ответ 10

Redis 'pub/sub отправляет сообщения клиентам, которые подписываются (прослушиваются) на канале. Если вы не слушаете, вы пропустите сообщение (следовательно, блокирующий вызов). Если вы хотите, чтобы он был неблокирующим, я рекомендую использовать очередь (redis тоже хорош в этом). Если вам нужно использовать pub/sub, вы можете использовать в качестве предлагаемого gevent для асинхронного, блокирующего прослушивателя, push-сообщения в очередь и использовать отдельного пользователя для обработки сообщений из этой очереди неблокирующим способом.