Подтвердить что ты не робот

Как получить данные из старой точки смещения в Кафке?

Я использую zookeeper для получения данных от kafka. И здесь я всегда получаю данные из последней точки смещения. Есть ли способ указать время смещения для получения старых данных?

Существует один вариант autooffset.reset. Он принимает наименьший или самый большой. Может кто-нибудь объяснить, что является самым маленьким и самым большим. Может ли autooffset.reset помочь в получении данных из старой точки смещения вместо последней точки смещения?

4b9b3361

Ответ 1

Потребители всегда принадлежат группе, и для каждого раздела Zookeeper отслеживает прогресс этой группы потребителей в разделе.

Чтобы извлечь изначально, вы можете удалить все данные, связанные с прогрессом, как указал Хуссейн

ZkUtils.maybeDeletePath(${zkhost:zkport}", "/consumers/${group.id}");

Вы также можете указать смещение нужного раздела, как указано в файле core/src/main/ scala/kafka/tools/UpdateOffsetsInZK.scala

ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + partition, offset.toString)

Однако смещение не индексируется во времени, но вы знаете, что для каждого раздела есть последовательность.

Если ваше сообщение содержит метку времени (и остерегайтесь, чтобы эта метка времени не имела никакого отношения к тому моменту, когда Кафка получил ваше сообщение), вы можете попытаться сделать индексатор, который пытается получить одну запись в шагах, увеличивая смещение на N, и сохраните кортеж (тема X, часть 2, офсет 100, временная метка) где-то.

Если вы хотите получать записи с определенного момента времени, вы можете применить двоичный поиск к своему грубому индексу, пока не найдете нужную запись и не получите оттуда.

Ответ 2

Из документации Kafka они говорят "kafka.api.OffsetRequest.EarliestTime() находит начало данных в журналах и начинает потоковое оттуда, kafka.api.OffsetRequest.LatestTime() будет передавать только новые сообщения. Не считайте, что смещение 0 является начальным смещением, поскольку сообщения стареют из журнала с течением времени."

Используйте SimpleConsumerExample здесь: https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example

Аналогичный вопрос: Пользователь высокого уровня Kafka выберет все сообщения из темы с использованием Java API (эквивалентно - от начала)

Это может помочь

Ответ 3

Обратитесь к документу о kafka config: http://kafka.apache.org/08/configuration.html для вашего запроса о наименьших и наибольших значениях параметра смещения.

Кстати, изучая kafka, мне было интересно, как воспроизводить все сообщения для потребителя. Я имею в виду, если группа потребителей опросила все сообщения и захочет их повторно получить.

То, как это может быть достигнуто, - удалить данные из zookeeper. Используйте класс kafka.utils.ZkUtils, чтобы удалить node на zookeeper. Ниже приведено его использование:

ZkUtils.maybeDeletePath(${zkhost:zkport}", "/consumers/${group.id}");

Ответ 4

Протокол Kafka Protocol Doc - отличный источник для игры с запросом/ответом/смещениями/сообщениями: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol вы используете пример Simple Consumer, где следующий код демонстрирует состояние:

FetchRequest req = new FetchRequestBuilder()

        .clientId(clientName)

        .addFetch(a_topic, a_partition, readOffset, 100000) 

        .build();

FetchResponse fetchResponse = simpleConsumer.fetch(req);

установить readOffset для начала начального смещения. но вам нужно проверить максимальное смещение, как и выше, будет обеспечивать ограниченное количество смещений в соответствии с FetchSize в последнем параметре метода addFetch.

Ответ 5

На данный момент

Часто задаваемые вопросы Kafka дают ответы на эту проблему.

Как правильно получить смещения сообщений для определенной метки времени с помощью функции OffsetRequest?

Kafka позволяет запрашивать смещения сообщений по времени и делает это при детализации сегмента. Параметр timestamp - это временная метка unix и запрос смещения по метке времени возвращает последнее возможное смещение сообщения, которое добавляется не позднее указанной отметки времени. Есть два специальных значения метки времени - самые последние и самые ранние. Для любого другого значения временной отметки unix Kafka получит начальное смещение сегмента журнала, которое создается не позднее указанной отметки времени. Из-за этого, и поскольку запрос смещения обслуживается только в степени детализации сегмента, запрос выборки смещения возвращает менее точные результаты для больших размеров сегмента.

Для получения более точных результатов вы можете настроить размер сегмента журнала на основе времени (log.roll.ms) вместо размера (log.segment.bytes). Однако следует проявлять осторожность, так как это может увеличить количество обработчиков файлов из-за частых отрезков сегмента журнала.


Планы на будущее

Kafka добавит отметку времени в формат сообщения. См.

https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata

Ответ 6

Используя KafkaConsumer, вы можете использовать Seek, SeekToBeginning и SeekToEnd для перемещения в потоке.

https://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#seekToBeginning(java.util.Collection)

Кроме того, если раздел не предоставляется, он будет искать первое смещение для всех назначенных в настоящее время разделов.

Ответ 7

Вы пробовали это?

bin/kafka-console-consumer.sh --bootstrap-server localhost: 9092 --topic test --from-begin

Он напечатает все сообщения для данной темы, "test" в этом примере.

Подробнее об этой ссылке https://kafka.apache.org/quickstart