Подтвердить что ты не робот

Кафка, как читать с темы __consumer_offsets

Я пытаюсь выяснить, что заставляет моих текущих потребителей высокого уровня отработать. Я использую Kafka 0.8.2.1, без "offset.storage", установленного в server.properties Kafka - который, я думаю, означает, что смещения хранятся в Kafka. (Я также подтвердил, что никакие смещения не хранятся в Zookeeper, проверяя этот путь в оболочке Zk: /consumers/consumer_group_name/offsets/topic_name/partition_number)

Я попытался выслушать тему __consumer_offsets чтобы узнать, какой потребитель сохраняет значение смещений, но это не сработало...

Я попробовал следующее:

создал конфигурационный файл для пользователя консоли следующим образом:

=> more kafka_offset_consumer.config 

 exclude.internal.topics=false

и попробовал две версии консольных потребительских скриптов:

#1:
bin/kafka-console-consumer.sh --consumer.config kafka_offset_consumer.config --topic __consumer_offsets --zookeeper localhost:2181

#2
./bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 0 --broker-list localhost:9092 --formatter "kafka.server.OffsetManager\$OffsetsMessageFormatter" --consumer.config kafka_offset_consumer.config

Ни работало - оно просто сидит там, но ничего не печатает, даже если потребители активно потребляют/экономят смещения.

Я пропустил некоторые другие конфигурации/свойства?

Спасибо!

Марина

4b9b3361

Ответ 1

Я сталкивался с этим вопросом, пытаясь также использовать тему __consumer_offsets. Мне удалось выяснить это для разных версий Kafka и я решил поделиться тем, что нашел

Для Кафки 0.8.2.x

Примечание: это использует соединение Zookeeper

#Create consumer config
echo "exclude.internal.topics=false" > /tmp/consumer.config
#Consume all offsets
./kafka-console-consumer.sh --consumer.config /tmp/consumer.config \
--formatter "kafka.server.OffsetManager\$OffsetsMessageFormatter" \
--zookeeper localhost:2181 --topic __consumer_offsets --from-beginning

Для Кафки 0.9.xx и 0.10.xx

#Create consumer config
echo "exclude.internal.topics=false" > /tmp/consumer.config
#Consume all offsets
./kafka-console-consumer.sh --new-consumer --consumer.config /tmp/consumer.config \
--formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" \
--bootstrap-server localhost:9092 --topic __consumer_offsets --from-beginning

Для 0.11.xx - 2.x

#Create consumer config
echo "exclude.internal.topics=false" > /tmp/consumer.config
#Consume all offsets
./kafka-console-consumer.sh --consumer.config /tmp/consumer.config \
--formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" \
--bootstrap-server localhost:9092 --topic __consumer_offsets --from-beginning

Ответ 2

Хорошо, я выяснил, в чем проблема. Моя Кафка фактически использовала Zookeeper в качестве хранилища смещения, а не Kafka... Причина, по которой я не обнаружил, что это произошло, потому что я неправильно проверял содержимое ZK:

Я делал

ls  /consumers/consumer_group_name/offsets/topic_name/partition_number

и ничего не видит. Вместо этого мне пришлось "получить" контент, который показал правильные смещения для моих потребителей, например, ниже:

get /consumers/consumer_group_name/offsets/topic_name/partition_number 
185530404
cZxid = 0x70789ad05
ctime = Mon Nov 23 17:49:46 GMT 2015
mZxid = 0x7216cdc5c
mtime = Thu Dec 03 20:18:57 GMT 2015
pZxid = 0x70789ad05
cversion = 0
dataVersion = 3537384
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 9
numChildren = 0

Ответ 3

Если вы добавите --from-beginning, это, скорее всего, даст вам некоторые результаты, по крайней мере, когда я попытался. И если вы не указали этот аргумент, но прочитали больше сообщений (и совершили смещение затухания), пока вы слушаете этого пользователя, это также должно отображать там сообщения.

Ответ 4

Начиная с Kafka 0.11, исходный код (Scala) можно найти здесь

Для тех, кому нужен перевод на Java, из любого процесса Потребителя, скажем, вы получаете ConsumerRecord<byte[], byte[]> consumerRecord, и вы можете использовать

  1. Получите ключ (проверьте, не первый ли ключ) и используйте GroupMetadataManager.readMessageKey(consumerRecord.key). Это может возвращать разные типы, поэтому проверьте if (... instanceof OffsetKey), затем произведите его, и вы можете получить от него различные значения.

  2. Чтобы получить значение записи Kafka для смещений, вы можете использовать String.valueOf(GroupMetadataManager.readOffsetMessageValue(consumerRecord.value))

Минимальный пример Java, переведенный из кода Scala...

byte[] key = consumerRecord.key;
if (key != null) {
    Object o = GroupMetadataManager.readMessageKey(key);
    if (o != null && o instanceOf OffsetKey) {
        OffsetKey offsetKey = (OffsetKey) o;
        Object groupTopicPartition = offsetKey.key;
        byte[] value = consumerRecord.value;
        String formattedValue = String.valueOf(GroupMetadataManager.readOffsetMessageValue(value);
       // TODO: Print, store, or compute results with the new key and value 
    }
}

Обратите внимание, что также возможно использовать API-интерфейсы AdminClient для описания групп, а не для использования этих необработанных сообщений


Исходный код Scala extract

def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream) {
  Option(consumerRecord.key).map(key => GroupMetadataManager.readMessageKey(ByteBuffer.wrap(key))).foreach {
    // Only print if the message is an offset record.
    // We ignore the timestamp of the message because GroupMetadataMessage has its own timestamp.
    case offsetKey: OffsetKey =>
      val groupTopicPartition = offsetKey.key
      val value = consumerRecord.value
      val formattedValue =
        if (value == null) "NULL"
        else GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(value)).toString
      output.write(groupTopicPartition.toString.getBytes(StandardCharsets.UTF_8))
      output.write("::".getBytes(StandardCharsets.UTF_8))
      output.write(formattedValue.getBytes(StandardCharsets.UTF_8))
      output.write("\n".getBytes(StandardCharsets.UTF_8))
    case _ => // no-op
  }

Ответ 5

Для Kafka-2.X используйте следующую команду

kafka-console-consumer --bootstrap-server localhost:9092 --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter"