Подтвердить что ты не робот

Кафка Потребитель - Опрос

Я сталкиваюсь с некоторыми серьезными проблемами, пытаясь найти решение для своих нужд, касающееся KafkaConsumer (> = 0,9).

Представьте, что у меня есть функция, которая должна читать только n сообщений из темы кафки.

Например: getMsgs(5) → получает следующие 5 сообщений кафки в теме.

Итак, у меня есть петля, которая выглядит следующим образом. Отредактировано с актуальными правильными параметрами. В этом случае для параметра max .poll.records потребителя было задано значение 1, поэтому реальный цикл повторялся только один раз. Разные потребители (некоторые из них просматривали множество сообщений) разделяли абстрактного отца (вот этого), поэтому он так и кодировался. Часть numMss была специальной для этого потребителя.

for (boolean exit= false;!exit;)
{
   Records = consumer.poll(config.pollTime);
   for (Record r:records) {
       processRecord(r); //do my things
       numMss++;
       if (numMss==maximum) //maximum=5
          exit=true;
   }
}

Учитывая это, проблема в том, что метод poll() может получить более 5 сообщений. Например, если он получит 10 сообщений, мой код навсегда забудет эти 5 сообщений, поскольку Кафка будет думать, что они уже использованы.

Я попытался зафиксировать смещение, но, похоже, не работает:

    consumer.commitSync(Collections.singletonMap(partition,
    new OffsetAndMetadata(record.offset() + 1)));

Даже при конфигурации смещения каждый раз, когда я снова запускаю потребителя, он будет начинаться не с 6-го сообщения (помните, я просто хотел 5 сообщений), а с 11-го (поскольку первый опрос потреблял 10 сообщений).

Есть ли какое-то решение для этого, или, может быть (наверняка) я что-то упустил?

Заранее спасибо!!

4b9b3361

Ответ 1

Вы можете установить max.poll.records на любой номер, который вам нравится, чтобы в большинстве случаев вы получите столько записей в каждом опросе.

В вашем случае использования, которое вы указали в этой проблеме, вам не нужно явно выполнять смещения самостоятельно. вы можете просто установить enable.auto.commit в true и установить auto.offset.reset в earliest, чтобы он включался, когда нет потребителя group.id (другие слова, когда вы начинаете читать из раздела для самого первого время). После того, как у вас есть group.id и некоторые потребительские смещения, хранящиеся в Kafka, и в случае, если ваш потребительский процесс Kafka будет умирать, он будет продолжаться с последнего зафиксированного смещения, поскольку это поведение по умолчанию, потому что, когда потребитель начинает, он сначала ищет, если есть какие-либо и если да, то будет продолжаться с последнего зафиксированного смещения, а auto.offset.reset не будет.

Ответ 2

установите свойство auto.offset.reset как "последнее". Затем попробуйте использовать, вы получите потребляемые записи из зафиксированного смещения.

Или вы используете user.seek(TopicPartition, offset) api перед опросом.

Ответ 3

Если вы отключили автоматическую фиксацию, установив для параметра enable.auto.commit значение false. Вам нужно отключить это, если вы хотите вручную зафиксировать смещение. Без этого следующего вызова poll() будет автоматически зафиксировано последнее смещение сообщений, полученных вами из предыдущего опроса().

Ответ 4

Из Kafka 0.9 изменены имена параметров auto.offset.reset;

Что делать, если в Kafka нет начального смещения или если текущее смещение больше не существует на сервере (например, поскольку эти данные были удалены):

earliest: automatically reset the offset to the earliest offset

latest: automatically reset the offset to the latest offset

none: throw exception to the consumer if no previous offset is found for the consumer group

anything else: throw exception to the consumer.