Подтвердить что ты не робот

Потребительские извлечения метаданных Kafka по темам не удались

Я пытаюсь написать клиент Java для сторонних серверов Kafka и ZooKeeper. Я могу перечислить и описать темы, но когда я пытаюсь прочитать их, появляется ClosedChannelException. Я воспроизвожу их здесь с клиентом командной строки.

$ bin/kafka-console-consumer.sh --zookeeper 255.255.255.255:2181 --topic eventbustopic
[2015-06-02 16:23:04,375] WARN Fetching topic metadata with correlation id 0 for topics [Set(eventbustopic)] from broker [id:1,host:SOME_HOST,port:9092] failed (kafka.client.ClientUtils$)
java.nio.channels.ClosedChannelException                                       
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)           
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)        
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:113)                
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)        
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)        
    at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)         
[2015-06-02 16:23:04,515] WARN Fetching topic metadata with correlation id 0 for topics [Set(eventbustopic)] from broker [id:0,host:SOME_HOST,port:9092] failed (kafka.client.ClientUtils$)
java.nio.channels.ClosedChannelException                                       
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)           
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)        
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:113)                
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)        
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)        
    at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)         

Выполняются альтернативные команды:

$ bin/kafka-topics.sh --describe --zookeeper 255.255.255.255:2181 --topic eventbustopic
Topic:eventbustopic   PartitionCount:2        ReplicationFactor:1     Configs:
    Topic: eventbustopic  Partition: 0    Leader: 1       Replicas: 1     Isr: 1
    Topic: eventbustopic  Partition: 1    Leader: 0       Replicas: 0     Isr: 0

$ bin/kafka-topics.sh --list --zookeeper 255.255.255.255:2181 --topic eventbustopic
eventbustopic

(Ипы были отредактированы и заменены на 255.255.255.255)

Когда я отправляю это исключение, я вижу проблемы со стороны производителя - действительно, источник для ClientUtils.fetchTopicMetadata предполагает, что это в основном используется производителями.

Одна из проблем, которые возникают у меня, это то, что это может быть продукт сетевого макета: пакеты искалечены Haproxy и отправляются через VPN.

Что именно здесь работает?

4b9b3361

Ответ 1

Брокер сообщает клиенту, какое имя хоста следует использовать для создания/потребления сообщений. По умолчанию Kafka использует имя хоста системы, в которой он работает. Если это имя хоста не может быть разрешено на стороне клиента, вы получите это исключение.

Вы можете попробовать установить advertised.host.name в конфигурации Kafka на имя/адрес хоста, которые должны использовать клиенты.

Ответ 2

Вот мой способ решить эту проблему:

  • запустите bin/kafka-server-stop.sh, чтобы остановить запуск сервера kafka.
  • измените файл свойств config/server.properties, добавив строку: listeners=PLAINTEXT://{ip.of.your.kafka.server}:9092
  • перезапустить сервер kafka.

Так как без настройки lisener kafka будет использовать java.net.InetAddress.getCanonicalHostName() для получения адреса, который слушает сервер сокета.

Ответ 3

У вас проблема с Zookeeper. 255.255.255.255:2181 не является допустимым адресом Zookeeper; это широковещательный адрес в вашей сети или маске подсети. Чтобы все было в порядке, найдите IP-адрес или имя хоста на компьютере, на котором запущен Zookeeper.

Ответ 4

Ran в эту ошибку на AWS. Проблема в том, что я был чрезмерно ограничительным с группой безопасности и установил порты 2181 и 9092 на "мой IP". Это означало, что экземпляр kafka не смог найти ZK в одном окне.

Решение - откройте его - немного.