Подтвердить что ты не робот

Когда клиент Apache Kafka выдает исключение "Batch Expired"?

Используя клиент Apache Kafka Java (0.9), я пытаюсь отправить длинную серию записей брокеру с помощью класса Kafka Producer.

Асинхронный метод отправки возвращается немедленно на некоторое время, затем начинает блокировку для каждого вызова в течение короткого периода времени. Примерно через тридцать секунд клиент начинает бросать исключения (TimeoutException), с сообщением "Batch expired".

Какие обстоятельства вызывают это исключение?

4b9b3361

Ответ 1

Это исключение указывает, что вы записываете записи в очередь быстрее, чем их можно отправить.

Когда вы вызываете метод send, ProducerRecord будет сохранен во внутреннем буфер для отправки брокеру. Метод возвращается сразу после буферизации ProducerRecord, независимо от того, была ли она отправлена.

Записи группируются в партии для отправки брокеру, чтобы снизить нагрузку на одно сообщение и увеличить пропускную способность.

После того, как в запись добавлена ​​партия, существует ограничение по времени для отправки этой партии, чтобы гарантировать, что она была отправлена ​​в течение определенной продолжительности. Это контролируется параметром конфигурации Producer, request.timeout.ms, который по умолчанию составляет тридцать секунд.

Если партия была поставлена ​​в очередь дольше, чем ограничение таймаута, исключение будет выбрано. Записи в этой партии будут удалены из очереди отправки.

Увеличение предела тайм-аута с использованием параметра конфигурации позволит клиенту дольше заканчивать партии дольше.

Ответ 2

Я получил это исключение в совершенно другом контексте.

Я установил мини-кластер zookeeper vm, брокера vm и производителя/потребителя vm. Я открыл все необходимые порты на сервере (9092) и на zookeeper (2181), а затем попытался опубликовать сообщение от потребителя/издателя vm брокеру. Я получил исключение, упомянутое OP, но поскольку я опубликовал только одно сообщение (или, по крайней мере, я пытался), решение не могло бы увеличить тайм-аут или размер партии. Поэтому я искал и нашел этот список рассылки, описывающий аналогичную проблему, которую я испытывал при попытке потребления сообщений внутри пользователя/производителя vm (ClosedChannelException): http://grokbase.com/t/kafka/users/152jsjekrm/having-trouble-with-the-simplest-remote-kafka-config Последнее сообщение в этом списке рассылки действительно описывает, как решить проблему.

Короче говоря, если вы столкнулись с исключением ChannelClosedException и Batch Expired, вам, вероятно, придется изменить эту строку следующим образом в файле server.config и перезапустить брокера:

advertised.host.name=<broker public IP address>

Если он не установлен, он возвращается к свойству host.name (который, вероятно, не установлен), а затем возвращается к каноническому имени хоста класса InetAddress Java, что, в конце концов, не является конечно, и, таким образом, путать удаленные узлы.

Ответ 3

Параметр, определяющий время до отправки брокеру, linger.ms. Его значение по умолчанию равно 0 (без задержки).

Ответ 4

Я использую клиентскую версию Kafka Java версии 0.11.0.0. Я также начал видеть один и тот же шаблон при непредставлении больших сообщений. Он передавал несколько сообщений, и не хватало некоторых других. (Хотя оба переданных и неудачных сообщения имели одинаковый размер). В моем случае размер каждого сообщения составлял около 60 КБ, что намного выше, чем Kafka по умолчанию batch.size 16 КБ, а также для моего linger.ms установлено значение по умолчанию 0. Эта ошибка возникает, когда клиент производителя синхронизируется, прежде чем он сможет получить успешный ответ с сервера. Как правило, в моем коде этот вызов был отключен: kafkaProd.send(pr).get(). Чтобы исправить это, мне пришлось увеличить клиент производителя по умолчанию request.timeout.ms до 60000

Ответ 5

при создании пользовательского набора ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG значения true.