Подтвердить что ты не робот

Эффективная стратегия избежания дублирования сообщений в apache kafka consumer

Я изучаю apache kafka уже месяц. Тем не менее, я сейчас застрял. Мой вариант использования: у меня есть два или несколько потребительских процессов, работающих на разных машинах. Я провел несколько тестов, в которых я опубликовал 10 000 сообщений на сервере kafka. Затем, обрабатывая эти сообщения, я убил один из потребительских процессов и перезапустил его. Потребители писали обработанные сообщения в файле. Таким образом, после того, как потребление закончилось, файл показывал более 10 тыс. Сообщений. Поэтому некоторые сообщения были дублированы.

В потребительском процессе я отключил автоматическую фиксацию. Потребители вручную фиксируют смещения в пакетном режиме. Так, например, если в файл записано 100 сообщений, потребитель совершает смещения. Когда один потребительский процесс запущен и он аварийно завершает работу и восстанавливает дублирование, таким образом избегается. Но когда работает более одного потребителя, и один из них падает и восстанавливается, он записывает дубликаты сообщений в файл.

Есть ли эффективная стратегия, чтобы избежать дублирования сообщений?

4b9b3361

Ответ 1

Короткий ответ: нет.

То, что вы ищете, - это некоторая обработка. Хотя это может показаться практически осуществимым, на него никогда нельзя полагаться, потому что всегда есть оговорки.

Даже для того, чтобы попытаться предотвратить дубликаты, вам нужно будет использовать простого потребителя. Как работает этот подход, для каждого потребителя, когда сообщение потребляется из какого-либо раздела, записывайте раздел и смещение потребляемого сообщения на диск. Когда потребитель перезапустится после сбоя, прочитайте последнее потребленное смещение для каждого раздела с диска.

Но даже с этим шаблоном потребитель не может гарантировать, что после сбоя не будет обработано сообщение. Что, если потребитель потребляет сообщение, а затем выходит из строя до смещения на диск? Если вы пишете на диск, прежде чем обрабатывать сообщение, что делать, если вы пишете смещение, а затем проваливаете, прежде чем обрабатывать сообщение? Эта же проблема будет существовать, даже если вы должны были совершать смещения в ZooKeeper после каждого сообщения.

Однако есть некоторые случаи, когда точно-некоторая обработка более достижима, но только для определенных случаев использования. Это просто требует, чтобы ваше смещение сохранялось в том же месте, что и выход приложения приложения. Например, если вы пишете потребителя, который подсчитывает сообщения, сохраняя последнее подсчитанное смещение с каждым количеством, вы можете гарантировать, что смещение сохраняется одновременно с состоянием потребителя. Разумеется, для того, чтобы гарантировать ровно однократную обработку, это потребует, чтобы вы потребляли ровно одно сообщение и обновляли состояние ровно один раз для каждого сообщения, и это совершенно непрактично для большинства потребительских приложений Kafka. По своей природе Кафка потребляет сообщения в партиях по соображениям производительности.

Обычно ваше время будет более затратным, и ваше приложение будет намного надежнее, если вы просто создадите его как идемпотент.

Ответ 2

Это то, что Kafka часто задает вопрос о том, что нужно точно:

Как мне получить сообщение "Кафка" ровно один раз?

Точно как только семантика состоит из двух частей: избегайте дублирования во время создания данных и избегайте дублирования во время потребления данных.

Существует два подхода к получению точно семантики при создании данных:

  • Используйте одиночный писатель для каждого раздела, и каждый раз, когда вы получаете сетевую ошибку, проверяйте последнее сообщение в этом разделе, чтобы увидеть, была ли ваша последняя запись успешной.
  • Включить в сообщение первичный ключ (UUID или что-то еще) и дедуплицировать на пользователя.

Если вы сделаете одну из этих вещей, журнал, в котором находится Kafka, будет без дубликатов. Однако чтение без дубликатов зависит от некоторого сотрудничества со стороны потребителя. Если потребитель периодически проверяет свою позицию, то, если он терпит неудачу и перезапускается, он перезапустится с контрольной точки. Таким образом, если вывод данных и контрольная точка не будут записаны атомарно, здесь также можно будет получить дубликаты. Эта проблема особенно важна для вашей системы хранения. Например, если вы используете базу данных, вы можете совершить их вместе в транзакции. Загрузчик HDFS Camus, который написал LinkedIn, делает что-то подобное для нагрузок Hadoop. Другой альтернативой, которая не требует транзакции, является сохранение смещения с загруженными и дедуплицированными данными с использованием комбинации тем/раздела/смещения.

Я думаю, что есть два улучшения, которые сделают это намного проще:

  • Иденомпотенция производителя может быть выполнена автоматически и намного дешевле, опционально интегрируя поддержку этого на сервере.
  • Существующий потребитель высокого уровня не подвергает много более мелкозернистого управления смещениями (например, до reset вашей позиции). Мы скоро будем работать над этим.

Ответ 3

Я согласен с дедупликацией RaGe на стороне потребителя. И мы используем Redis для дедупликации сообщения Kafka.

Предположим, что класс Message имеет член, называемый uniqId, который заполняется стороной производителя и гарантированно является уникальным. Мы используем случайную строку длиной 12 строк. (regexp '^[A-Za-z0-9]{12}$')

Потребительская сторона использует Redis SETNX для дедупликации и EXPIRE для автоматического удаления истекших ключей. Пример кода:

Message msg = ... // eg. ConsumerIterator.next().message().fromJson();
Jedis jedis = ... // eg. JedisPool.getResource();
String key = "SPOUT:" + msg.uniqId; // prefix name at will
String val = Long.toString(System.currentTimeMillis());
long rsps = jedis.setnx(key, val);
if (rsps <= 0) {
    log.warn("kafka dup: {}", msg.toJson()); // and other logic
} else {
    jedis.expire(key, 7200); // 2 hours is ok for production environment;
}

В приведенном выше коде были обнаружены повторяющиеся сообщения несколько раз, когда у Kafka (версия 0.8.x) были ситуации. В нашем журнале аудита баланса ввода/вывода не было потеряно ни одного сообщения или дублирования.

Ответ 4

Что бы мы ни делали на стороне производителя, тем не менее, мы считаем, что лучший способ доставки точно один раз от kafka - это справиться с этим на стороне потребителя:

  1. Создать сообщение с UUID в качестве сообщения Kafka Ключ в тему T1
  2. потребительская сторона читает сообщение из T1, записывает его на hbase с помощью uuid как rowkey
  3. читай обратно из hbase с той же rowkey и пиши в другую тему T2
  4. ваши конечные потребители фактически потребляют из темы T2