Подтвердить что ты не робот

Потребление не подтверждается сообщениями от RabbitMq

Я создал простого издателя и пользователя, который подписывается в очереди с помощью basic.consume.

Мой потребитель подтверждает сообщения, когда работа выполняется без исключения. Всякий раз, когда я сталкиваюсь с исключением, я не понимаю сообщение и возвращаюсь раньше. Только сообщения с подтверждением исчезнут из очереди, так что они работают правильно.
Теперь я хочу, чтобы потребитель снова собирал неудавшиеся сообщения, но единственный способ пересмотреть эти сообщения - это перезапустить пользователя.

Как мне подойти к этому варианту использования?

Код установки

$channel = new AMQPChannel($connection);

$exchange = new AMQPExchange($channel);

$exchange->setName('my-exchange');
$exchange->setType('fanout');
$exchange->declare();

$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->declare();
$queue->bind('my-exchange');

Код пользователя

$queue->consume(array($this, 'callback'));

public function callback(AMQPEnvelope $msg)
{
    try {
        //Do some business logic
    } catch (Exception $ex) {
        //Log exception
        return;
    }
    return $queue->ack($msg->getDeliveryTag());
}

Код производителя

$exchange->publish('message');
4b9b3361

Ответ 1

Если сообщение не было подтверждено и приложение не работает, оно будет автоматически обновлено, а свойство redelivered на конверте будет установлено на true (если вы не используете их с флагом no-ack = true).

UPD:

У вас должно быть сообщение nack с флагом пересылки в вашем блоке catch

    try {
        //Do some business logic
    } catch (Exception $ex) {
        //Log exception
        return $queue->nack($msg->getDeliveryTag(), AMQP_REQUEUE);
    }

Остерегайтесь бесконечно ненагруженных сообщений, в то время как количество повторной доставки не выполняется в RabbitMQ и в протоколе AMQP вообще.

Если вы не хотите связываться с такими сообщениями и просто хотите добавить некоторую задержку, вы можете захотеть добавить метод sleep() или usleep() перед вызовом метода nack, но это совсем не хорошая идея.

Существует множество методов решения проблемы повторного набора циклов:

1. Полагайтесь на Мертвые обмены письмами

  • плюсы: надежные, стандартные, четкие
  • cons: требуется дополнительная логика

2. Используйте для каждого сообщения или для TTL очереди в очереди

  • плюсы: просты в реализации, также стандартные, понятные
  • минус: с длинными очередями вы можете потерять некоторое сообщение

Примеры (обратите внимание, что для очереди ttl мы передаем только номер и для сообщения ttl - все, что будет числовой строкой):

2.1 За сообщение ttl:

$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->declareQueue();
$queue->bind('my-exchange');

$exchange->publish(
    'message at ' . microtime(true),
    null,
    AMQP_NOPARAM,
    array(
        'expiration' => '1000'
    )
);

2.2. В очереди ttl:

$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->setArgument('x-message-ttl', 1000);
$queue->declareQueue();
$queue->bind('my-exchange');

$exchange->publish('message at ' . microtime(true));

3. Удерживайте число повторных наборов или количество оставшихся повторных наборов (иначе ограничение на скачок или ttl в стеке IP) в теле сообщения или заголовках

  • профи: дает вам дополнительный контроль над временем жизни сообщений на уровне приложений.
  • минусы: значительные накладные расходы, пока вам нужно изменить сообщение и опубликовать его снова, специфично для приложения, не понятно

код:

$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->declareQueue();
$queue->bind('my-exchange');

$exchange->publish(
    'message at ' . microtime(true),
    null,
    AMQP_NOPARAM,
    array(
        'headers' => array(
            'ttl' => 100
        )
    )
);

$queue->consume(
    function (AMQPEnvelope $msg, AMQPQueue $queue) use ($exchange) {
        $headers = $msg->getHeaders();
        echo $msg->isRedelivery() ? 'redelivered' : 'origin', ' ';
        echo $msg->getDeliveryTag(), ' ';
        echo isset($headers['ttl']) ? $headers['ttl'] : 'no ttl' , ' ';
        echo $msg->getBody(), PHP_EOL;

        try {
            //Do some business logic
            throw new Exception('business logic failed');
        } catch (Exception $ex) {
            //Log exception
            if (isset($headers['ttl'])) {
                // with ttl logic

                if ($headers['ttl'] > 0) {
                    $headers['ttl']--;

                    $exchange->publish($msg->getBody(), $msg->getRoutingKey(), AMQP_NOPARAM, array('headers' => $headers));
                }

                return $queue->ack($msg->getDeliveryTag());
            } else {
                // without ttl logic
                return $queue->nack($msg->getDeliveryTag(), AMQP_REQUEUE); // or drop it without requeue
            }

        }

        return $queue->ack($msg->getDeliveryTag());
    }
);

Возможно, существуют и другие способы улучшения управления потоками сообщений.

Заключение: нет серебряного пулевого решения. Вы должны решить, какое решение соответствует вашим потребностям наилучшим образом или узнать что-то другое, но не забудьте поделиться им здесь;)

Ответ 2

Если вы не хотите перезапускать пользователя, то команда basic.recover AMQP может быть тем, что вы хотите. Согласно AMQP protocol:

basic.recover(bit requeue)

Redeliver unacknowledged messages.

This method asks the server to redeliver all unacknowledged messages on a specified channel. 
Zero or more messages may be redelivered. This method replaces the asynchronous Recover.