Подтвердить что ты не робот

Есть ли способ удалить все данные из темы или удалить тему перед каждым запуском?

Есть ли способ удалить все данные из темы или удалить тему перед каждым запуском?

Можно ли изменить файл KafkaConfig.scala, чтобы изменить свойство logRetentionHours? Есть ли способ, по которому сообщения удаляются, как только потребитель читает его?

Я использую производителей для получения данных откуда-то и отправки данных в конкретную тему, где потребитель потребляет, могу ли я удалить все данные из этой темы при каждом запуске? Я хочу только новые данные каждый раз в теме. Есть ли способ повторной инициализации темы?

4b9b3361

Ответ 1

Не думаю, что это еще поддерживается. Взгляните на эту проблему JIRA "Добавить поддержку удаления темы".

Чтобы удалить вручную:

  1. Выключить кластер
  2. Очистите каталог журнала log.dir (указанный атрибутом log.dir в файле конфигурации kafka), а также данные zookeeper
  3. Перезагрузите кластер

Для любой данной темы, что вы можете сделать, это

  1. Стоп кафка
  2. Очистите журнал kafka, специфичный для раздела, kafka хранит свой файл журнала в формате "logDir/topic-partition", поэтому для темы с именем "MyTopic" журнал для идентификатора раздела 0 будет храниться в /tmp/kafka-logs/MyTopic-0 где /tmp/kafka-logs указывается атрибутом log.dir
  3. Перезагрузить кафку

Это NOT хороший и рекомендуемый подход, но он должен работать. В конфигурационном файле брокера log.retention.hours.per.topic атрибут log.retention.hours.per.topic используется для определения The number of hours to keep a log file before deleting it for some specific topic

Кроме того, есть ли способ удалить сообщения, как только их прочитает потребитель?

Из документации Кафки:

Кластер Kafka сохраняет все опубликованные сообщения - независимо от того, были они использованы или нет - в течение настраиваемого периода времени. Например, если срок хранения журнала равен двум дням, то в течение двух дней после публикации сообщения оно становится доступным для использования, после чего оно будет сброшено для освобождения места. Производительность Kafka практически постоянна в отношении размера данных, поэтому сохранение большого количества данных не является проблемой.

Фактически единственные метаданные, сохраняемые для каждого потребителя, - это позиция потребителя в журнале, называемая "смещением". Это смещение контролируется потребителем: обычно потребитель смещает свое смещение линейно, когда читает сообщения, но на самом деле позиция контролируется потребителем, и он может потреблять сообщения в любом порядке, который ему нравится. Например, потребитель может вернуться к старому смещению для повторной обработки.

Для нахождения начального смещения для чтения в примере Kafka 0.8 Simple Consumer говорят

Kafka включает в себя две константы, чтобы помочь, kafka.api.OffsetRequest.EarliestTime() находит начало данных в журналах и начинает оттуда потоковую передачу, kafka.api.OffsetRequest.LatestTime() будет только передавать новые сообщения.

Вы также можете найти пример кода для управления смещением на стороне потребителя.

    public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
                                 long whichTime, String clientName) {
    TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
    Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
    requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
    kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(),clientName);
    OffsetResponse response = consumer.getOffsetsBefore(request);

    if (response.hasError()) {
        System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );
        return 0;
    }
    long[] offsets = response.offsets(topic, partition);
    return offsets[0];
}

Ответ 2

Как я уже говорил здесь Purge Kafka Queue:

Протестировано в Kafka 0.8.2, для примера с быстрым запуском: сначала добавьте одну строку в файл server.properties в папке config:

delete.topic.enable=true

тогда вы можете запустить эту команду:

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test

Ответ 3

Протестировано с помощью kafka 0.10

1. stop zookeeper & Kafka server,
2. then go to 'kafka-logs' folder , there you will see list of kafka topic folders, delete folder with topic name
3. go to 'zookeeper-data' folder , delete data inside that.
4. start zookeeper & kafka server again.

Примечание. Если вы удаляете папку /s в папке kafka-log, но не из папки zookeeper-data, то вы увидите, что темы все еще существуют.

Ответ 4

Ниже приведены сценарии для очистки и удаления раздела Kafka, предполагающего использование localhost в качестве сервера zookeeper, а Kafka_Home установлен в каталог установки:

Сценарий ниже очистит тему, установив время ее хранения на 1 секунду, а затем удалив конфигурацию:

#!/bin/bash
echo "Enter name of topic to empty:"
read topicName
/$Kafka_Home/bin/kafka-configs --zookeeper localhost:2181 --alter --entity-type topics --entity-name $topicName --add-config retention.ms=1000
sleep 5
/$Kafka_Home/bin/kafka-configs --zookeeper localhost:2181 --alter --entity-type topics --entity-name $topicName --delete-config retention.ms

Чтобы полностью удалить темы, вы должны остановить любого подходящего брокера (ов) kafka и удалить его каталог из каталога журнала kafka (по умолчанию: /tmp/kafka-logs), а затем запустить этот скрипт, чтобы удалить тему из zookeeper. Чтобы убедиться, что он был удален из zookeeper, вывод ls/brokers/themes больше не должен включать тему:

#!/bin/bash
echo "Enter name of topic to delete from zookeeper:"
read topicName
/$Kafka_Home/bin/zookeeper-shell localhost:2181 <<EOF
rmr /brokers/topics/$topicName
ls /brokers/topics
quit
EOF

Ответ 5

В качестве грязного обходного пути вы можете настроить параметры хранения во время выполнения, например. bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my_topic --config retention.bytes=1 (retention.bytes = 0 также может работать)

Через некоторое время кафка должна освободить место. Не уверен, что это имеет какие-то последствия по сравнению с повторным созданием темы.

пс. Лучше принесите настройки удержания назад, как только kafka выполнит очистку.

Вы также можете использовать retention.ms для сохранения исторических данных

Ответ 6

Мы довольно много пытались описать другие ответы с умеренным уровнем успеха. Что действительно сработало для нас (Apache Kafka 0.8.1) - это команда класса

sh kafka-run-class.sh kafka.admin.DeleteTopicCommand --topic yourtopic --zookeeper localhost: 2181

Ответ 7

Для brew пользователей

Если вы используете brew как я и потратили много времени на поиски печально известной папки kafka-logs, больше не бойтесь. (и, пожалуйста, дайте мне знать, если это сработает для вас и нескольких разных версий Homebrew, Kafka и т.д.:))

Вероятно, вы найдете его под:

Расположение:

/usr/local/var/lib/kafka-logs


Как найти этот путь

(это также полезно для каждого приложения, которое вы устанавливаете через brew)

1) brew services list

Кафка запустила матбх /Users/matbhz/Library/LaunchAgents/homebrew.mxcl.kafka.plist

2) Откройте и прочитайте, что plist вы обнаружили выше

3) Найдите строку, определяющую расположение server.properties, откройте ее, в моем случае:

  • /usr/local/etc/kafka/server.properties

4) Найдите строку log.dirs:

log.dirs =/USR/локальные/вар/Library/Кафка-журналы

5) Перейдите в это место и удалите журналы для желаемых тем

6) Перезапустите Kafka с помощью brew services restart kafka

Ответ 8

Все данные о разделах и их разделах хранятся в tmp/kafka-logs/. Кроме того, они сохраняются в формате topic-partionNumber, поэтому, если вы хотите удалить тему newTopic, вы можете:

  • stop kafka
  • удалить файлы rm -rf /tmp/kafka-logs/newTopic-*

Ответ 9

  • Stop ZooKeeper и Kafka
  • В server.properties измените значение log.retention.hours. Вы можете прокомментировать log.retention.hours и добавить log.retention.ms=1000. Это будет держать запись на Кафке Тема только на одну секунду.
  • Запустите zookeeper и kafka.
  • Проверьте консоль пользователя. Когда я впервые открыл консоль, запись была там. Но когда я снова открыл консоль, запись была удалена.
  • Позже вы можете установить значение log.retention.hours на нужную цифру.

Ответ 10

При ручном удалении темы из кластера kafka вы можете просто проверить это https://github.com/darrenfu/bigdata/issues/6 Важный шаг, пропустивший много в большинстве решений, заключается в удалении /config/topics/<topic_name> в ZK.

Ответ 11

Я использую этот скрипт:

#!/bin/bash
topics='kafka-topics --list --zookeeper zookeeper:2181'
for t in $topics; do 
    for p in retention.ms retention.bytes segment.ms segment.bytes; do
        kafka-topics --zookeeper zookeeper:2181 --alter --topic $t --config ${p}=100
    done
done
sleep 60
for t in $topics; do 
    for p in retention.ms retention.bytes segment.ms segment.bytes; do
        kafka-topics --zookeeper zookeeper:2181 --alter --topic $t --delete-config ${p}
    done
done

Ответ 12

Начиная с версии kafka 2.3.0 существует альтернативный способ мягкого удаления Kafka (старый подход устарел).

Обновите значение retention.ms до 1 секунды (1000 мс), а затем снова установите его через минуту, установив значение по умолчанию, т.е. 7 дней (168 часов, 604 800 000 в мс)

Мягкое удаление: - (rentention.ms = 1000) (с использованием kafka-configs.sh)

bin/kafka-configs.sh --zookeeper 192.168.1.10:2181 --alter --entity-name kafka_topic3p3r --entity-type topics  --add-config retention.ms=1000
Completed Updating config for entity: topic 'kafka_topic3p3r'.

Настройка по умолчанию: - 7 дней (168 часов, retention.ms = 604800000)

bin/kafka-configs.sh --zookeeper 192.168.1.10:2181 --alter --entity-name kafka_topic3p3r --entity-type topics  --add-config retention.ms=604800000

Ответ 13

Я использую приведенную ниже утилиту для очистки после запуска интеграционного теста.

Он использует последние API AdminZkClient. Более старый API устарел.

import javax.inject.Inject
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.common.utils.Time

class ZookeeperUtils @Inject() (config: AppConfig) {

  val testTopic = "users_1"

  val zkHost = config.KafkaConfig.zkHost
  val sessionTimeoutMs = 10 * 1000
  val connectionTimeoutMs = 60 * 1000
  val isSecure = false
  val maxInFlightRequests = 10
  val time: Time = Time.SYSTEM

  def cleanupTopic(config: AppConfig) = {

    val zkClient = KafkaZkClient.apply(zkHost, isSecure, sessionTimeoutMs, connectionTimeoutMs, maxInFlightRequests, time)
    val zkUtils = new AdminZkClient(zkClient)

    val pp = new Properties()
    pp.setProperty("delete.retention.ms", "10")
    pp.setProperty("file.delete.delay.ms", "1000")
    zkUtils.changeTopicConfig(testTopic , pp)
    //    zkUtils.deleteTopic(testTopic)

    println("Waiting for topic to be purged. Then reset to retain records for the run")
    Thread.sleep(60000L)

    val resetProps = new Properties()
    resetProps.setProperty("delete.retention.ms", "3000000")
    resetProps.setProperty("file.delete.delay.ms", "4000000")
    zkUtils.changeTopicConfig(testTopic , resetProps)

  }


}

Есть опция удаления темы. Но это помечает тему для удаления. Zookeeper позже удаляет тему. Поскольку это может быть непредсказуемо долго, я предпочитаю подход retention.ms