Я новый студент, изучающий Kafka, и у меня возникли некоторые фундаментальные проблемы с пониманием нескольких потребителей, что статьи, документация и т.д. пока не слишком помогли.
Одна вещь, которую я пытался сделать, это написать собственный производитель и потребитель Kafka высокого уровня и запустить их одновременно, опубликовав 100 простых сообщений для темы и получив мой потребитель. Мне удалось сделать это успешно, но когда я пытаюсь представить второго потребителя, чтобы потреблять из той же темы, что и сообщения, которые были только что опубликованы, он не получает сообщений.
Насколько я понимаю, для каждой темы у вас могут быть потребители из отдельных групп потребителей, и каждая из этих групп потребителей получит полную копию сообщений, выпущенных для какой-либо темы. Это верно? Если нет, то каков был бы правильный способ настроить несколько потребителей? Это потребительский класс, который я написал до сих пор:
public class AlternateConsumer extends Thread {
private final KafkaConsumer<Integer, String> consumer;
private final String topic;
private final Boolean isAsync = false;
public AlternateConsumer(String topic, String consumerGroup) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("group.id", consumerGroup);
properties.put("partition.assignment.strategy", "roundrobin");
properties.put("enable.auto.commit", "true");
properties.put("auto.commit.interval.ms", "1000");
properties.put("session.timeout.ms", "30000");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<Integer, String>(properties);
consumer.subscribe(topic);
this.topic = topic;
}
public void run() {
while (true) {
ConsumerRecords<Integer, String> records = consumer.poll(0);
for (ConsumerRecord<Integer, String> record : records) {
System.out.println("We received message: " + record.value() + " from topic: " + record.topic());
}
}
}
}
Кроме того, я заметил, что изначально я тестировал вышеупомянутое потребление для темы "тест" только с одним разделом. Когда я добавил другого потребителя в существующую потребительскую группу, скажем, "testGroup", это вызвало перебалансировку Kafka, которая замедлила латентность моего потребления на значительную величину в размере секунд. Я думал, что это была проблема с перебалансировкой, поскольку у меня был только один раздел, но когда я создал новую тему "несколько разделов" с 6 разделами, возникли аналогичные проблемы, когда добавление большего количества потребителей в одну и ту же группу потребителей вызвало проблемы с задержкой. Я огляделся, и люди говорят мне, что я должен использовать многопоточного потребителя - может ли кто-нибудь пролить свет на это?