Подтвердить что ты не робот

Как отправить большие сообщения с помощью Kafka (более 15 МБ)?

Я отправляю String-сообщения в Kafka V. 0.8 с API Java Producer. Если размер сообщения составляет около 15 МБ, я получаю MessageSizeTooLargeException. Я попытался установить message.max.bytes до 40 МБ, но я все еще получаю исключение. Малые сообщения работали без проблем.

(Исключение появляется у производителя, у меня нет потребителя в этом приложении.)

Что я могу сделать, чтобы избавиться от этого исключения?

Мой пример конфигурации производителя

private ProducerConfig kafkaConfig() {
    Properties props = new Properties();
    props.put("metadata.broker.list", BROKERS);
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    props.put("request.required.acks", "1");
    props.put("message.max.bytes", "" + 1024 * 1024 * 40);
    return new ProducerConfig(props);
}

Error-Log:

4709 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 214 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
4869 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with    correlation id 217 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5035 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with   correlation id 220 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5198 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 223 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5305 [main] ERROR kafka.producer.async.DefaultEventHandler  - Failed to send requests for topics datasift with correlation ids in [213,224]

kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(Unknown Source)
at kafka.producer.Producer.send(Unknown Source)
at kafka.javaapi.producer.Producer.send(Unknown Source)
4b9b3361

Ответ 1

Вам необходимо настроить три (или четыре) свойства:

  • Сторона потребителя: fetch.message.max.bytes - это определяет наибольший размер сообщения, которое может быть выбрано потребителем.
  • Сторона брокера: replica.fetch.max.bytes - это позволит репликам в брокерах отправлять сообщения в кластере и убедиться, что сообщения реплицируются правильно. Если это слишком мало, тогда сообщение никогда не будет реплицировано, и поэтому потребитель никогда не увидит сообщение, потому что сообщение никогда не будет выполнено (полностью реплицировано).
  • Сторона брокера: message.max.bytes - это самый большой размер сообщения, которое может получить брокер от производителя.
  • Сторона брокера (по теме): max.message.bytes - это самый большой размер сообщения, которое брокер позволит добавить в эту тему. Этот размер проверяется на предварительное сжатие. (По умолчанию брокер message.max.bytes.)

Я нашел трудный путь по номеру 2 - вы не получаете ЛЮБЫХ исключений, сообщений или предупреждений от Kafka, поэтому не забудьте подумать об этом при отправке больших сообщений.

Ответ 2

Незначительные изменения, необходимые для Kafka 0.10 и нового пользователя по сравнению с laughing_man answer:

  • Брокер: никаких изменений, вам все равно необходимо увеличить свойства message.max.bytes и replica.fetch.max.bytes. message.max.bytes должен быть равен или меньше (*), чем replica.fetch.max.bytes.
  • Производитель: Увеличьте max.request.size, чтобы отправить сообщение большего размера.
  • Потребитель: увеличьте max.partition.fetch.bytes, чтобы получать сообщения большего размера.

(*) Прочитайте комментарии, чтобы узнать больше о message.max.bytes <= replica.fetch.max.bytes

Ответ 3

Вам необходимо переопределить следующие свойства:

Брокерские конфигурации ($ KAFKA_HOME/config/server.properties)

  • replica.fetch.max.bytes
  • message.max.bytes

Пользовательские конфигурации ($ KAFKA_HOME/config/consumer.properties)
Этот шаг не помог мне. Я добавляю его в потребительское приложение, и он отлично работает

  • fetch.message.max.bytes

Перезагрузите сервер.

посмотрите эту документацию для получения дополнительной информации: http://kafka.apache.org/08/configuration.html

Ответ 4

Идея состоит в том, чтобы иметь одинаковый размер сообщения, отправляемого от Kafka Producer в Kafka Broker, а затем получаемого Kafka Consumer, т.е.

Кафка производитель → Кафка Брокер → Кафка Потребитель

Предположим, что если требуется отправить 15 МБ сообщения, то все три источника должны быть синхронизированы.

Kafka Producer отправляет 15 МБ -> Kafka Broker разрешает/сохраняет 15 МБ -> Kafka Consumer получает 15 МБ

Поэтому настройка должна быть:

а) на брокера:

message.max.bytes=15728640 
replica.fetch.max.bytes=15728640

б) на потребителя:

fetch.message.max.bytes=15728640

Ответ 5

Следует помнить, что атрибут message.max.bytes должен быть в синхронизации с пользовательским свойством fetch.message.max.bytes. размер выборки должен быть как минимум равным максимальному размеру сообщения, иначе может возникнуть ситуация, когда производители могут отправлять сообщения, большие, чем потребитель может потреблять/извлекать. Возможно, стоит взглянуть на это.
Какую версию Kafka вы используете? Также предоставите более подробную информацию о том, что вы получаете. есть ли что-то вроде... payload size of xxxx larger than 1000000 в журнале?

Ответ 6

Ответ @laughing_man довольно точный. Но все же я хотел дать рекомендацию, которую я узнал от эксперта Kafka Стефана Маарека из Quora.

Кафка не предназначена для обработки больших сообщений.

Ваш API должен использовать облачное хранилище (например, AWS S3) и просто отправить Kafka или любому брокеру сообщений ссылку на S3. Вы должны найти место для хранения ваших данных, может быть, это сетевой диск, может быть, что угодно, но это не должно быть брокером сообщений.

Теперь, если вы не хотите идти с вышеупомянутым решением

Максимальный размер сообщения составляет 1 МБ (настройка в ваших брокерах называется message.max.bytes) Apache Kafka. Если вам это действительно нужно, вы можете увеличить этот размер и убедиться, что увеличите сетевые буферы для ваших производителей и потребителей.

И если вы действительно заботитесь о разбиении вашего сообщения, убедитесь, что каждое разделение сообщения имеет один и тот же ключ, чтобы оно передавалось в один и тот же раздел, а содержимое вашего сообщения должно сообщать "идентификатор части", чтобы ваш потребитель мог полностью восстановить сообщение.,

Вы также можете изучить сжатие, если ваше сообщение основано на тексте (gzip, snappy, lz4 сжатие), что может уменьшить размер данных, но не волшебным образом.

Опять же, вы должны использовать внешнюю систему для хранения этих данных и просто отправить внешнюю ссылку на Кафку. Это очень распространенная архитектура, и она должна быть принята и широко принята.

Имейте в виду, что Kafka работает лучше всего, только если сообщения огромны по объему, но не по размеру.

Источник: https://www.quora.com/How-do-I-send-Large-messages-80-MB-in-Kafka