Я создал простого издателя и пользователя, который подписывается в очереди с помощью basic.consume
.
Мой потребитель подтверждает сообщения, когда работа выполняется без исключения. Всякий раз, когда я сталкиваюсь с исключением, я не понимаю сообщение и возвращаюсь раньше. Только сообщения с подтверждением исчезнут из очереди, так что они работают правильно.
Теперь я хочу, чтобы потребитель снова собирал неудавшиеся сообщения, но единственный способ пересмотреть эти сообщения - это перезапустить пользователя.
Как мне подойти к этому варианту использования?
Код установки
$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName('my-exchange');
$exchange->setType('fanout');
$exchange->declare();
$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->declare();
$queue->bind('my-exchange');
Код пользователя
$queue->consume(array($this, 'callback'));
public function callback(AMQPEnvelope $msg)
{
try {
//Do some business logic
} catch (Exception $ex) {
//Log exception
return;
}
return $queue->ack($msg->getDeliveryTag());
}
Код производителя
$exchange->publish('message');