Подтвердить что ты не робот

RabbitMQ С# драйвер перестает получать сообщения

Есть ли у вас указатели, как определить, когда возникла проблема с подпиской, чтобы я мог повторно подключиться?

Моя служба использует RabbitMQ.Client.MessagePatterns.Subscription для подписки. Через некоторое время мой клиент молча перестает получать сообщения. Я подозреваю, что проблемы с сетью связаны с тем, что мое VPN-соединение не является самым надежным.

Я прочитал документы за некоторое время, ища ключ, чтобы узнать, когда эта подписка может быть нарушена из-за сетевой проблемы без большой удачи. Я попытался проверить, что соединение и канал все еще открыты, но он всегда, кажется, сообщает, что он все еще открыт.

Сообщения, которые он обрабатывает, работают достаточно хорошо и подтверждаются обратно в очередь, поэтому я не думаю, что это проблема с "ack".

Я уверен, что я просто пропустил что-то простое, но я еще не нашел его.

public void Run(string brokerUri, Action<byte[]> handler)
{
    log.Debug("Connecting to broker: {0}".Fill(brokerUri));
    ConnectionFactory factory = new ConnectionFactory { Uri = brokerUri };

    using (IConnection connection = factory.CreateConnection())
    {
        using (IModel channel = connection.CreateModel())
        {
            channel.QueueDeclare(queueName, true, false, false, null);

            using (Subscription subscription = new Subscription(channel, queueName, false))
            {
                while (!Cancelled)
                {
                    BasicDeliverEventArgs args;

                    if (!channel.IsOpen)
                    {
                        log.Error("The channel is no longer open, but we are still trying to process messages.");
                        throw new InvalidOperationException("Channel is closed.");
                    }
                    else if (!connection.IsOpen)
                    {
                        log.Error("The connection is no longer open, but we are still trying to process message.");
                        throw new InvalidOperationException("Connection is closed.");
                    }

                    bool gotMessage = subscription.Next(250, out args);

                    if (gotMessage)
                    {
                        log.Debug("Received message");
                        try
                        {
                            handler(args.Body);
                        }
                        catch (Exception e)
                        {
                            log.Debug("Exception caught while processing message. Will be bubbled up.", e);
                            throw;
                        }

                        log.Debug("Acknowledging message completion");
                        subscription.Ack(args);
                    }
                }
            }
        }
    }
}

UPDATE:

Я смоделировал сетевой сбой, запустив сервер на виртуальной машине, и я получаю исключение (RabbitMQ.Client.Exceptions.OperationInterruptedException: операция AMQP была прервана), когда я прерываю соединение достаточно долго, поэтому, возможно, t проблема сети. Теперь я не знаю, что бы это было, но это не удалось через пару часов работы.

4b9b3361

Ответ 1

РЕДАКТИРОВАТЬ: Поскольку я нахожусь на подоконнике, я должен указать, что клиент .NET RabbitMQ теперь имеет эту функциональность: https://www.rabbitmq.com/dotnet-api-guide.html#connection-recovery

В идеале вы должны иметь возможность использовать это и избегать ручного внедрения логики повторного подключения.


Недавно мне пришлось реализовать почти то же самое. Из того, что я могу сказать, большая часть доступной информации о RabbitMQ предполагает, что ваша сеть очень надежна или вы запускаете брокера RabbitMQ на том же компьютере, что и любой клиент, отправляющий или получающий сообщения, что позволяет Кролику решать любые проблемы с подключением.

На самом деле не так сложно настроить клиента Rabbit на надежную защиту от удаленных соединений, но есть несколько особенностей, с которыми вам нужно иметь дело.

Первое, что вам нужно сделать, включить heartbeat:

ConnectionFactory factory = new ConnectionFactory() 
{
  Uri = brokerUri,
  RequestedHeartbeat = 30,
}; 

Установка "RequestedHeartbeat" на 30 заставит клиента проверять каждые 30 секунд, если соединение все еще живое. Без этого, абонент сообщения будет сидеть там счастливо, ожидая появления другого сообщения, не подозревая, что его соединение ухудшилось.

Включение биения также позволяет серверу проверить, все ли подключено соединение, что может быть очень важно. Если соединение ухудшилось после того, как сообщение было подхвачено подписчиком, но до его подтверждения сервер просто предполагает, что клиент занимает много времени, и сообщение "застревает" на мертвом соединении, пока оно не будет закрыто. При включенном сердцебиении сервер узнает, когда соединение ухудшится и закроет его, вернув сообщение в очередь, чтобы другой абонент мог его обработать. Без сердечного приступа я должен был вручную и закрыть соединение в интерфейсе управления кроликом, чтобы застрявшее сообщение могло быть передано подписчику.

Во-вторых, вам нужно будет обрабатывать OperationInterruptedException. Как вы заметили, это, как правило, исключение, которое клиент кролика выбрасывает, когда он замечает, что соединение было прервано. Если IModel.QueueDeclare() вызывается, когда соединение было прервано, это исключение вы получите. Обработайте это исключение, удалив свою подписку, канал и соединение и создав новые.

Наконец, вам придется обрабатывать то, что делает ваш потребитель, пытаясь использовать сообщения из закрытого соединения. К сожалению, каждый другой способ потребления сообщений из очереди в клиенте Rabbit, похоже, реагирует по-разному. QueueingBasicConsumer выдает EndOfStreamException, если вы вызываете QueueingBasicConsumer.Queue.Dequeue в закрытом соединении. EventingBasicConsumer ничего не делает, поскольку он просто ждет сообщения. Из того, что я могу сказать, попробовав его, класс Subscription, который вы используете, возвращает true из вызова Subscription.Next, но значение args равно null. Еще раз, справитесь с этим, избавьтесь от своего соединения, канала и подписки и заново создайте их.

Значение connection.IsOpen будет обновлено до False, когда соединение завершится неудачей с биением, поэтому вы можете проверить это, если хотите. Однако, поскольку сердцебиение работает в отдельном потоке, вам все равно придется обрабатывать случай, когда соединение открыто, когда вы его проверяете, но закрывается до вызова subscription.Next().

Одна последняя вещь, на которую нужно обратить внимание - это IConnection.Dispose(). Этот вызов будет вызывать EndOfStreamException, если вы вызываете dispose после того, как соединение было закрыто. Это кажется ошибкой для меня, и мне не нравится, что я не вызываю dispose на объект IDisposable, поэтому я вызываю его и проглатываю исключение.

Объединяя все это в быстрый и грязный пример:

public bool Cancelled { get; set; }

IConnection _connection = null;
IModel _channel = null;
Subscription _subscription = null;

public void Run(string brokerUri, string queueName, Action<byte[]> handler)
{
    ConnectionFactory factory = new ConnectionFactory() 
    {
        Uri = brokerUri,
        RequestedHeartbeat = 30,
    };

    while (!Cancelled)
    {               
        try
        {
            if(_subscription == null)
            {
                try
                {
                    _connection = factory.CreateConnection();
                }
                catch(BrokerUnreachableException)
                {
                    //You probably want to log the error and cancel after N tries, 
                    //otherwise start the loop over to try to connect again after a second or so.
                    continue;
                }

                _channel = _connection.CreateModel();
                _channel.QueueDeclare(queueName, true, false, false, null);
                _subscription = new Subscription(_channel, queueName, false);
            }

            BasicDeliverEventArgs args;
            bool gotMessage = _subscription.Next(250, out args);
            if (gotMessage)
            {
                if(args == null)
                {
                    //This means the connection is closed.
                    DisposeAllConnectionObjects();
                    continue;
                }

                handler(args.Body);
                _subscription.Ack(args);
            }
        }
        catch(OperationInterruptedException ex)
        {
            DisposeAllConnectionObjects();
        }
    }
    DisposeAllConnectionObjects();
}

private void DisposeAllConnectionObjects()
{
    if(_subscription != null)
    {
        //IDisposable is implemented explicitly for some reason.
        ((IDisposable)_subscription).Dispose();
        _subscription = null;
    }

    if(_channel != null)
    {
        _channel.Dispose();
        _channel = null;
    }

    if(_connection != null)
    {
        try
        {
            _connection.Dispose();
        }
        catch(EndOfStreamException) 
        {
        }
        _connection = null;
    }
}