Подтвердить что ты не робот

Zeromq: как предотвратить бесконечное ожидание?

Я только начал работать с ZMQ. Я разрабатываю приложение, чей рабочий процесс:

  • один из многих клиентов (у которых есть случайные PULL-адреса) PUSH запрос на сервер в 5555
  • сервер всегда ждет клиентских PUSHes. Когда приходит, рабочий процесс порождается для этого конкретного запроса. Да, рабочие процессы могут существовать одновременно.
  • Когда этот процесс завершает задачу, он возвращает клиенту результат.

Я предполагаю, что для этого подходит архитектура PUSH/PULL. Пожалуйста, исправьте меня.


Но как мне обрабатывать эти сценарии?

  • client_receiver.recv() будет ждать бесконечное время, когда сервер не сможет ответить.
  • клиент может отправить запрос, но он сработает сразу же после этого, поэтому рабочий процесс останется на сервере server_sender.send() навсегда.

Итак, как мне настроить что-то вроде таймаута в модели PUSH/PULL?


EDIT: спасибо user938949 предложениям, я получил рабочий ответ, и я поделился им для потомков.

4b9b3361

Ответ 1

Если вы используете zeromq >= 3.0, вы можете установить опцию сокета RCVTIMEO:

client_receiver.RCVTIMEO = 1000 # in milliseconds

Но в целом вы можете использовать индикаторы:

poller = zmq.Poller()
poller.register(client_receiver, zmq.POLLIN) # POLLIN for recv, POLLOUT for send

И poller.poll() принимает тайм-аут:

evts = poller.poll(1000) # wait *up to* one second for a message to arrive.

evts будет пустым списком, если ничего не получить.

Вы можете опросить с помощью zmq.POLLOUT, чтобы проверить, удастся ли отправить сообщение.

Или, чтобы обработать случай однорангового узла, который мог быть сбой, a:

worker.send(msg, zmq.NOBLOCK)

может быть достаточно, что всегда будет немедленно возвращено - повышение ZMQError (zmq.EAGAIN), если отправка не может быть завершена.

Ответ 2

Это был быстрый хак, который я сделал после того, как я назвал user938949 ответ и http://taotetek.wordpress.com/2011/02/02/python-multiprocessing-with-zeromq/. Если вы сделаете лучше, отправьте свой ответ, Я порекомендую ваш ответ.

Для тех, кто хочет долгосрочные решения по надежности, обратитесь http://zguide.zeromq.org/page:all#toc64

Версия 3.0 zeromq (бета-банкомат) поддерживает тайм-аут в ZMQ_RCVTIMEO и ZMQ_SNDTIMEO. http://api.zeromq.org/3-0:zmq-setsockopt

Сервер

Zmq.NOBLOCK гарантирует, что когда клиент не существует, функция send() не блокируется.

import time
import zmq
context = zmq.Context()

ventilator_send = context.socket(zmq.PUSH)
ventilator_send.bind("tcp://127.0.0.1:5557")

i=0

while True:
    i=i+1
    time.sleep(0.5)
    print ">>sending message ",i
    try:
        ventilator_send.send(repr(i),zmq.NOBLOCK)
        print "  succeed"
    except:
        print "  failed"

Client

Объект poller может прослушиваться во многих приемных сокетах (см. выше описанную выше "Многопроцессорность Python с ZeroMQ". Я связал ее только с work_receiver. В бесконечном цикле опросы клиентов с интервалом из 1000 мкс. Объект носки возвращается пустым, если в это время не было получено сообщение.

import time
import zmq
context = zmq.Context()

work_receiver = context.socket(zmq.PULL)
work_receiver.connect("tcp://127.0.0.1:5557")

poller = zmq.Poller()
poller.register(work_receiver, zmq.POLLIN)

# Loop and accept messages from both channels, acting accordingly
while True:
    socks = dict(poller.poll(1000))
    if socks:
        if socks.get(work_receiver) == zmq.POLLIN:
            print "got message ",work_receiver.recv(zmq.NOBLOCK)
    else:
        print "error: message timeout"

Ответ 3

Если вы используете ZMQ_NOBLOCK, вы будете блокировать отправку, но если вы попытаетесь закрыть сокет и контекст, этот шаг заблокирует запуск программы.

Причина заключается в том, что сокет ждет какого-либо однорангового узла, чтобы обеспечить доставку исходящих сообщений. Чтобы немедленно закрыть сокет и удалить исходящие сообщения из буфера, используйте ZMQ_LINGER и установите его на 0..

Ответ 4

Если вы ожидаете только один сокет, а не создаете Poller, вы можете сделать это:

if work_receiver.poll(1000, zmq.POLLIN):
    print "got message ",work_receiver.recv(zmq.NOBLOCK)
else:
    print "error: message timeout"

Вы можете использовать это, если ваш тайм-аут меняется в зависимости от ситуации, вместо установки work_receiver.RCVTIMEO.